Mercurial > hg > Database > Alice
changeset 574:ea21af9a4762 dispose
delete serializeFlag, fix MessagePack pack&unpack
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 15 Dec 2015 11:49:07 +0900 |
parents | b7cb1062828e |
children | fe55be1ce12d |
files | src/main/java/alice/daemon/CommandMessage.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java src/main/java/alice/datasegment/DataSegmentManager.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java src/main/java/alice/test/codesegment/remote/RemoteIncrement.java src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java |
diffstat | 8 files changed, 21 insertions(+), 47 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/alice/daemon/CommandMessage.java Thu Nov 26 23:36:53 2015 +0900 +++ b/src/main/java/alice/daemon/CommandMessage.java Tue Dec 15 11:49:07 2015 +0900 @@ -12,7 +12,6 @@ public int seq;//DSの待ち合わせを行っているCSを表すunique number public String key;//DS key public boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか - public boolean serialized = false;//シリアライズされているかどうか public boolean compressed = false;//圧縮されているかどうか public int dataSize = 0;//圧縮前のサイズ @@ -26,13 +25,12 @@ public CommandMessage() {} public CommandMessage(int type, int index, int seq, String key - , boolean qFlag, boolean sFlag, boolean cFlag, int datasize) { + , boolean qFlag, boolean cFlag, int datasize) { this.type = type; this.index = index; this.seq = seq; this.key = key; this.quickFlag = qFlag; - this.serialized = sFlag; this.compressed = cFlag; this.dataSize = datasize; }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java Thu Nov 26 23:36:53 2015 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Tue Dec 15 11:49:07 2015 +0900 @@ -60,11 +60,7 @@ case UPDATE: case PUT: int dataSize = unpacker.readInt(); - if (msg.compressed) { - rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(dataSize), byte[].class), true, msg.dataSize); - } else { - rData = new ReceiveData(unpacker.getSerializedByteArray(dataSize), false, msg.dataSize); - } + rData = new ReceiveData(unpacker.getSerializedByteArray(dataSize), msg.compressed, msg.dataSize); if (msg.setTime) { rData.setTimes(msg.time, true, msg.depth); @@ -104,11 +100,7 @@ case REPLY: cmd = manager.getAndRemoveCmd(msg.seq); - if (msg.compressed) { - rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), true, msg.dataSize); - } else { - rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), false, msg.dataSize); - } + rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed, msg.dataSize); Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); cmd.setCompressFlag(msg.compressed);
--- a/src/main/java/alice/datasegment/Command.java Thu Nov 26 23:36:53 2015 +0900 +++ b/src/main/java/alice/datasegment/Command.java Tue Dec 15 11:49:07 2015 +0900 @@ -80,8 +80,6 @@ byte[] header = null; byte[] data = null; byte[] dataSize = null; - boolean serialized = false; - boolean compressed = false; switch (type) { /* * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment @@ -95,15 +93,12 @@ case PUT: case REPLY: if(compressFlag){ - // ToDo: Do not pack again - data = packer.write(rData.getZMessagePack()); - compressed = true; + data = rData.getZMessagePack(); } else { data = rData.getMessagePack(); - serialized = true; } - CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed, rData.getDataSize()); + CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, compressFlag, rData.getDataSize()); if (rData.getSetTime()) { cm.setTime = true; cm.time = rData.getTime(); @@ -123,7 +118,7 @@ buf.put(data); break; default: - header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag, 0)); + header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, compressFlag, 0)); buf = ByteBuffer.allocate(header.length); buf.put(header); break;
--- a/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java Thu Nov 26 23:36:53 2015 +0900 +++ b/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java Tue Dec 15 11:49:07 2015 +0900 @@ -17,30 +17,12 @@ public CompressedLocalDataSegmentManager(LocalDataSegmentManager manager) { this.manager = manager; - new Thread(replyThread, "CompressedLocalDataSegmentManager-replyCommand").start(); } public void setReverseKey(String s){ reverseKey = s; } - private class RunCommand implements Runnable { - - DataSegmentKey key; - Command cmd; - - public RunCommand(DataSegmentKey key, Command cmd) { - this.key = key; - this.cmd = cmd; - } - - @Override - public void run() { - key.runCommand(cmd); - } - - } - public void submitCommand(DataSegmentKey key, Command cmd) { manager.submitCommand(key, cmd); }
--- a/src/main/java/alice/datasegment/DataSegmentManager.java Thu Nov 26 23:36:53 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentManager.java Tue Dec 15 11:49:07 2015 +0900 @@ -39,7 +39,7 @@ }; public Command getAndRemoveCmd(int index){ - System.err.println("DSM getAndRemoveCmd seq : " + index); + //System.err.println("DSM getAndRemoveCmd seq : " + index); return seqHash.remove(index); }
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Thu Nov 26 23:36:53 2015 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Tue Dec 15 11:49:07 2015 +0900 @@ -92,7 +92,7 @@ public void take1(boolean quickFlag, Command cmd) { int seq = this.seq.getAndIncrement(); - System.err.println("DataSegment take seq :" + seq); + //System.err.println("DataSegment take seq :" + seq); cmd.setSeq(seq); seqHash.put(seq, cmd); cmd.setQuickFlag(quickFlag);
--- a/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java Thu Nov 26 23:36:53 2015 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java Tue Dec 15 11:49:07 2015 +0900 @@ -13,14 +13,20 @@ */ @Override public void run() { + String z = ""; + if (num.getReceiveData().compressed()){ + z = "zMP"; + } int num = this.num.asInteger(); - System.out.println("[CodeSegment] " + num++); - if (num == 10) System.exit(0); + System.out.println("[CodeSegment" + z + "] " + num++); + if (num == 5) System.exit(0); RemoteIncrement cs = new RemoteIncrement(); - cs.num.setKey("compressedremote", "num"); - ods.put("compressedlocal", "num", num); + ods.put("compressedremote", "num", num); + ods.put("remote", "num", num); + + cs.num.setKey("compressedlocal", "num"); } } \ No newline at end of file
--- a/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Thu Nov 26 23:36:53 2015 +0900 +++ b/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java Tue Dec 15 11:49:07 2015 +0900 @@ -7,8 +7,9 @@ @Override public void run() { RemoteIncrement cs = new RemoteIncrement(); - cs.num.setKey("compressedremote", "num"); - ods.put("compressedlocal", "num", 0); + ods.put("compressedremote", "num", 0); + + cs.num.setKey("compressedlocal", "num"); } } \ No newline at end of file