Mercurial > hg > Database > Alice
view src/main/java/alice/datasegment/ReceiveData.java @ 531:b6049fb123d8 dispose
resolve unzip, working TestRemoteAlice
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Sun, 03 May 2015 15:58:31 +0900 |
parents | 4aeebea0c9b5 |
children | 33f981dd91d2 |
line wrap: on
line source
package alice.datasegment; import java.io.*; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.zip.*; import org.msgpack.MessagePack; import org.msgpack.type.Value; /** * 送られてきたDSを一時的に取っておくクラス。inputでも使用。 */ public class ReceiveData { private Object val;//for Object DS private byte[] messagePack;//for byteArray(serialized) DS private byte[] zMessagePack;//for byteArray(compressed) DS private int dataSize; private Class<?> clazz; public long time;//測定用 public boolean setTime = false; public int depth = 1; private static final MessagePack packer = new MessagePack(); /** * コンストラクタ。Object型のDSと圧縮のメタ情報を受け取る。 * put/update/reply用? * @param obj DS本体(Object) */ public ReceiveData(Object obj) { clazz = obj.getClass(); val = obj; } /** * コンストラクタ。byteArray型のDSと圧縮のメタ情報を受け取り、byteArrayフラグを立てる。 * * @param messagePack DS本体(byteArray) */ public ReceiveData(byte[] messagePack, boolean compressed, int datasize) { this.dataSize = datasize; if (compressed){ this.zMessagePack = messagePack; } else { this.messagePack = messagePack; } } public boolean isByteArray(){ return messagePack != null | zMessagePack != null; } public boolean compressed(){ return zMessagePack != null; } public boolean serialized(){ return val == null; } public Object getObj(){ return asClass(Object.class); } public String asString(){ return asClass(String.class); } public int asInteger() { return asClass(Integer.class); } public Float asFloat() { return asClass(Float.class); } public Value getVal(){///get DS as Value type if (val == null){///val != null return asClass(Value.class); } else { try { return packer.unconvert(val);///convert to Value type by MassagePack } catch (IOException e) { e.printStackTrace(); } return null; } } /** * DSを任意の型で取得するメソッド。 * DSがbyteArrayでなければ指定された型に変換して返す。 * DSがbyteArrayなら解凍状態にして指定された型に変換して返す。 * * @param clazz * @param <T> * @return */ public <T> T asClass(Class<T> clazz) {///javasist System.out.println("in asClass val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack); try { if (val != null) { return (T) val; } if (zMessagePack != null && messagePack == null) { messagePack = unzip(zMessagePack, dataSize);///ToDo:read header and set length } return packer.read(messagePack, clazz); } catch (IOException e) {// | DataFormatException e e.printStackTrace(); return null; } } public byte[] getMessagePack(){ if (messagePack != null){ return messagePack; } else { try { messagePack = packer.write(val); setDataSize(messagePack.length); } catch (IOException e) { e.printStackTrace(); } return messagePack; } } public byte[] getZMessagePack(){ if (zMessagePack != null){ System.out.println("have zMessagePack"); return zMessagePack; } else { try { zip(); } catch (IOException e) { e.printStackTrace(); } return zMessagePack; } } public void zip() throws IOException { System.out.println("in zip"); LinkedList<ByteBuffer> inputs = new LinkedList<ByteBuffer>(); int inputIndex = 0; LinkedList<ByteBuffer> outputs = new LinkedList<ByteBuffer>(); Deflater deflater = new Deflater(); inputs.add(ByteBuffer.wrap(getMessagePack())); int len = 0; int INFLATE_BUFSIZE = 1024 * 100;//ToDo:fix ByteBuffer c1 = allocate(INFLATE_BUFSIZE);//for output while (inputIndex < inputs.size()) { ByteBuffer b1 = inputs.get(inputIndex++); deflater.setInput(b1.array(), b1.position(), b1.remaining()); if (inputIndex == inputs.size()){ deflater.finish(); } int len1 = 0; do { len1 = deflater.deflate(c1.array(), c1.position(), c1.remaining()); if (len1 > 0) { len += len1; c1.position(c1.position() + len1); if (c1.remaining() == 0) { c1.flip(); outputs.addLast(c1); c1 = allocate(INFLATE_BUFSIZE); } } } while (len1 > 0 || !deflater.needsInput()); } if (c1.position() != 0) { c1.flip(); outputs.addLast(c1); } deflater.reset(); zMessagePack = new byte[len]; int tmp = 0; for (int i = 0; i < outputs.size(); i++){ System.arraycopy(outputs.get(i).array(), 0, zMessagePack, 0 + tmp, outputs.get(i).limit());//limit? remaining? tmp += outputs.get(i).limit(); } System.out.print("in make zMessagePack2: "); for (int i = 0; i < zMessagePack.length; i++) { System.out.print(Integer.toHexString(zMessagePack[i] & 0xff)); } System.out.print("\n"); } protected byte[] unzip(byte[] input, int zippedLength) {///read header & unzip int length = input.length; Inflater inflater = new Inflater(); System.out.print("unziped input: "); for (int i = 0; i < input.length; i++) { System.out.print(Integer.toHexString(input[i] & 0xff)); } System.out.print("\n"); byte [] output = new byte [zippedLength];///byteArray for unziped data inflater.setInput(input, 0, length);///set unzip data without header try { inflater.inflate(output, 0, zippedLength);///unzip } catch (DataFormatException e) { e.printStackTrace(); } inflater.reset(); System.out.print("unziped: "); for (int i = 0; i < output.length; i++) { System.out.print(Integer.toHexString(output[i] & 0xff)); } System.out.print("\n"); return output; } public ByteBuffer allocate(int size) { ByteBuffer b = null; while(true){ try { b = ByteBuffer.allocate(size); } catch (OutOfMemoryError e) { b = null; } if (b!=null) { break; } try { wait(); } catch (InterruptedException e) { } } return b; } public static int byteArrayToInt(byte[] b) { return b[3] & 0xFF | (b[2] & 0xFF) << 8 | (b[1] & 0xFF) << 16 | (b[0] & 0xFF) << 24; } public static byte[] intToByteArray(int a) { return new byte[] { (byte) ((a >> 24) & 0xFF), (byte) ((a >> 16) & 0xFF), (byte) ((a >> 8) & 0xFF), (byte) (a & 0xFF) }; } public int getDataSize(){ return this.dataSize; } public void setDataSize(int datasize){ this.dataSize = datasize; } }