Mercurial > hg > Database > Alice
changeset 528:6ebddfac7ff6 dispose
delete RecieveData.setCompressFlag
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 30 Apr 2015 20:52:21 +0900 |
parents | bfec2c3ff1b8 |
children | cb7c31848d16 |
files | src/main/java/alice/codesegment/SingletonMessage.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/ReceiveData.java |
diffstat | 4 files changed, 22 insertions(+), 170 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/SingletonMessage.java Thu Apr 30 18:14:02 2015 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,13 +0,0 @@ -package alice.codesegment; - -import org.msgpack.MessagePack; - -public class SingletonMessage { - - private static final MessagePack instance = new MessagePack(); - - public static MessagePack getInstance(){ - return instance; - } - -}
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java Thu Apr 30 18:14:02 2015 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Thu Apr 30 20:52:21 2015 +0900 @@ -5,6 +5,7 @@ import java.nio.channels.ClosedChannelException; import alice.datasegment.*; +import org.msgpack.MessagePack; import org.msgpack.unpacker.Unpacker; import alice.codesegment.SingletonMessage; @@ -16,7 +17,7 @@ protected DataSegmentManager manager; protected String reverseKey; private LocalDataSegmentManager lmanager = DataSegment.getLocal(); - private LocalDataSegmentManager compressedlmanager = DataSegment.getCompressedLocal(); + private static final MessagePack packer = new MessagePack(); public IncomingTcpConnection(DataSegmentManager manager) { this.manager = manager; @@ -32,17 +33,13 @@ return lmanager; } - public LocalDataSegmentManager getCompressedLocalDataSegmentManager(){//追加 - return compressedlmanager; - } - /** * pipeline thread for receiving */ public void run() { Unpacker unpacker = null; try { - unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream()); + unpacker = packer.createUnpacker(connection.socket.getInputStream()); } catch (IOException e) { e.printStackTrace(); } @@ -59,8 +56,7 @@ case UPDATE: case PUT: System.out.println("in TCP PUT"); - rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt())); - rData.setCompressFlag(msg.compressed); + rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed); if (msg.setTime) { rData.setTime = true; @@ -70,25 +66,16 @@ cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey); - if (msg.compressed){ - compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd); - } else { - lmanager.getDataSegmentKey(msg.key).runCommand(cmd); - } + lmanager.getDataSegmentKey(msg.key).runCommand(cmd); + break; case PEEK: case TAKE: System.out.println("in TCP TAKE"); cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection); - cmd.setQuickFlag(msg.quickFlag); - cmd.setCompressFlag(msg.compressed); - if (msg.compressed) { - compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd); - } else { - lmanager.getDataSegmentKey(msg.key).runCommand(cmd); - } + lmanager.getDataSegmentKey(msg.key).runCommand(cmd); break; case REMOVE: @@ -100,7 +87,6 @@ cmd = manager.getAndRemoveCmd(msg.seq); rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()));//ここのコンストラクタでx - rData.setCompressFlag(msg.compressed); Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); cmd.cs.ids.reply(cmd.receiver, rCmd);
--- a/src/main/java/alice/datasegment/Command.java Thu Apr 30 18:14:02 2015 +0900 +++ b/src/main/java/alice/datasegment/Command.java Thu Apr 30 20:52:21 2015 +0900 @@ -88,24 +88,19 @@ case UPDATE: System.out.println("update compressFlag:" + compressFlag); - rData.setCompressFlag(compressFlag); break; case PUT: System.out.println("put compressFlag:" + compressFlag); - rData.setCompressFlag(compressFlag); break; case REPLY://ReceiveDataからREPLYするDSを取得 System.out.println("in REPLY"); - System.out.println("reply compressFlag:" + compressFlag); - buf = rData.setMPHeader(new CommandMessage(type.id, index, seq, key, false, rData.serialized(), rData.compressed()), type); - rData.setCompressFlag(compressFlag); + System.out.println("reply compressFlag:" + compressFlag + ", " + type.id+ ", " + index+ ", " + seq + ", " + key+ ", " + false+ ", " + rData.serialized()+ ", " + rData.compressed()); + break; default://PEEK, TAKE, RemoveならCommandMessageとそのサイズのみセット ReceiveData rData2 = new ReceiveData("hoge"); System.out.println("default compressFlag:" + compressFlag); - buf = rData2.setMPHeader(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag), type); - rData2.setCompressFlag(compressFlag); break; }
--- a/src/main/java/alice/datasegment/ReceiveData.java Thu Apr 30 18:14:02 2015 +0900 +++ b/src/main/java/alice/datasegment/ReceiveData.java Thu Apr 30 20:52:21 2015 +0900 @@ -30,10 +30,8 @@ public boolean setTime = false; public int depth = 1; - private Deflater deflater = new Deflater(); - private Inflater inflater = new Inflater(); private ByteBuffer buf = null; - private MessagePack msg = SingletonMessage.getInstance(); + private static final MessagePack packer = new MessagePack(); /** * コンストラクタ。Object型のDSと圧縮のメタ情報を受け取る。 @@ -50,14 +48,10 @@ * * @param messagePack DS本体(byteArray) */ - public ReceiveData(byte[] messagePack) { - clazz = messagePack.getClass(); - - if (messagePack[0] == 0xc1){ - System.out.println("ReceiveData is zMessagePack"); + public ReceiveData(byte[] messagePack, boolean compressed) { + if (compressed){ this.zMessagePack = messagePack; } else { - System.out.println("ReceiveData is MessagePack"); this.messagePack = messagePack; } } @@ -83,11 +77,11 @@ return asClass(String.class); } - public int asInteger(){ + public int asInteger() { return asClass(Integer.class); } - public Float asFloat(){ + public Float asFloat() { return asClass(Float.class); } @@ -96,7 +90,7 @@ return asClass(Value.class); } else { try { - return SingletonMessage.getInstance().unconvert(val);///convert to Value type by MassagePack + return packer.unconvert(val);///convert to Value type by MassagePack } catch (IOException e) { e.printStackTrace(); } @@ -121,10 +115,8 @@ return (T) val; } - if (zMessagePack != null && messagePack == null) {//ToDo:fix + if (zMessagePack != null && messagePack == null) { messagePack = unzip(zMessagePack); - System.out.println("unzip messagePack:" + messagePack); - //zMessagePack = null;? } return SingletonMessage.getInstance().read(messagePack, clazz); @@ -135,122 +127,12 @@ } } - public void setCompressFlag(boolean cFlag) {///compress - LinkedList<ByteBuffer> input = new LinkedList<ByteBuffer>(); - LinkedList<ByteBuffer> output = new LinkedList<ByteBuffer>(); - - if (cFlag){ - System.out.println("in setCompressFlag val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack); - //messagePack = asByteArray();///ToDo:fix - if (val != null){ - try { - messagePack = msg.write(val); - } catch (IOException e) { - e.printStackTrace(); - } - } else { - messagePack = unzip(zMessagePack); - } - System.out.println("no zip messagePack:" + messagePack); - System.out.print("no zip messagePack: "); - for (int i = 0; i < messagePack.length; i++) { - System.out.print(Integer.toHexString(messagePack[i] & 0xff)); - } - System.out.print("\n"); - System.out.println("no zip messagePack length:" + messagePack.length); - - try { - //System.out.println("in zip"); - input.add(ByteBuffer.wrap(messagePack)); - int len = zip(input, 0, output); - - byte[] ziped = new byte[len + 8]; - ziped[0] = (byte) 0xc1;///set compressedFlag to header - ziped[1] = ziped[2] = ziped[3] = (byte) 0x00; - System.arraycopy(intToByteArray(messagePack.length), 0, ziped, 4, 4);///set data length to header - - System.out.println("zipedlen: " + len); - //System.out.println("limit: " + output.get(0).limit()); - //System.out.println("remaining: " + output.get(0).remaining()); - int tmp = 0; - for (int i = 0; i < output.size(); i++){///Is this copy OK??? - System.arraycopy(output.get(i).array(), 0, ziped, 8 + tmp, output.get(i).limit());//limit? remaining? - tmp += output.get(i).limit(); - } - - System.out.print("ziped: "); - for (int i = 0; i < ziped.length; i++) { - System.out.print(Integer.toHexString(ziped[i] & 0xff)); - } - System.out.print("\n"); - - zMessagePack = ziped; - val = null; - messagePack = null; - } catch (IOException e) { - e.printStackTrace(); - } - } - } - public ByteBuffer setMPHeader(CommandMessage cm, CommandType type){ - - System.out.println("in setMPHeader val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack); - - try { - byte[] header = null;//DSのメタデータ用byteArray - byte[] data = null;//DS本体用byteArray - byte[] dataSize = null;//DSのサイズ - - if (type == CommandType.REPLY){ - if (val != null) {//純粋なオブジェクトの場合シリアライズ - data = msg.write(val); - System.out.print("header MP data: "); - for (int i = 0; i < data.length; i++) { - System.out.print(Integer.toHexString(data[i] & 0xff)); - } - System.out.print("\n"); - } else { // rData is RAW ByteArray or already serialized - data = messagePack; - } - - if (setTime) {//AliceVNCの計測用(消してもいい) - cm.setTime = true; - cm.time = time; - cm.depth = depth + 1; - } + public int zip() throws IOException { + LinkedList<ByteBuffer> inputs = new LinkedList<ByteBuffer>(messagePack); + int inputIndex + LinkedList<ByteBuffer> outputs; - //MessagePackでDSを作成(ヘッダー・データ本体のサイズ・データ本体) - header = msg.write(cm); - dataSize = msg.write(data.length); - buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); - buf.put(header); - buf.put(dataSize); - buf.put(data); - } else { - header = msg.write(cm); - buf = ByteBuffer.allocate(header.length); - buf.put(header); - } - - buf.flip(); - } catch (IOException e) { - e.printStackTrace(); - } - - messagePack = buf.array(); - - System.out.print("MP with header: "); - for (int i = 0; i < messagePack.length; i++) { - System.out.print(Integer.toHexString(messagePack[i] & 0xff)); - } - System.out.print("\n"); - - return buf; - } - - - public int zip(LinkedList<ByteBuffer> inputs, int inputIndex, LinkedList<ByteBuffer> outputs) throws IOException { int len = 0; int INFLATE_BUFSIZE = 1024 * 100; ByteBuffer c1 = allocate(INFLATE_BUFSIZE);//for output @@ -287,6 +169,8 @@ c1.flip(); outputs.addLast(c1); } + + zMessagePack = outputs deflater.reset(); return len;///return length of ziped data }