changeset 574:ea21af9a4762 dispose

delete serializeFlag, fix MessagePack pack&unpack
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Tue, 15 Dec 2015 11:49:07 +0900
parents b7cb1062828e
children fe55be1ce12d
files src/main/java/alice/daemon/CommandMessage.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java src/main/java/alice/datasegment/DataSegmentManager.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java src/main/java/alice/test/codesegment/remote/RemoteIncrement.java src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java
diffstat 8 files changed, 21 insertions(+), 47 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/daemon/CommandMessage.java	Thu Nov 26 23:36:53 2015 +0900
+++ b/src/main/java/alice/daemon/CommandMessage.java	Tue Dec 15 11:49:07 2015 +0900
@@ -12,7 +12,6 @@
     public int seq;//DSの待ち合わせを行っているCSを表すunique number
     public String key;//DS key
     public boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか
-    public boolean serialized = false;//シリアライズされているかどうか
     public boolean compressed = false;//圧縮されているかどうか
     public int dataSize = 0;//圧縮前のサイズ
 
@@ -26,13 +25,12 @@
     public CommandMessage() {}
 
     public CommandMessage(int type, int index, int seq, String key
-            , boolean qFlag, boolean sFlag, boolean cFlag, int datasize) {
+            , boolean qFlag, boolean cFlag, int datasize) {
         this.type = type;
         this.index = index;
         this.seq = seq;
         this.key = key;
         this.quickFlag = qFlag;
-        this.serialized = sFlag;
         this.compressed = cFlag;
         this.dataSize = datasize;
     }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Thu Nov 26 23:36:53 2015 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Tue Dec 15 11:49:07 2015 +0900
@@ -60,11 +60,7 @@
                 case UPDATE:
                 case PUT:
                     int dataSize = unpacker.readInt();
-                    if (msg.compressed) {
-                        rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(dataSize), byte[].class), true, msg.dataSize);
-                    } else {
-                        rData = new ReceiveData(unpacker.getSerializedByteArray(dataSize), false, msg.dataSize);
-                    }
+                    rData = new ReceiveData(unpacker.getSerializedByteArray(dataSize), msg.compressed, msg.dataSize);
 
                     if (msg.setTime) {
                         rData.setTimes(msg.time, true, msg.depth);
@@ -104,11 +100,7 @@
                 case REPLY:
                     cmd = manager.getAndRemoveCmd(msg.seq);
 
-                    if (msg.compressed) {
-                        rData = new ReceiveData(packer.read(unpacker.getSerializedByteArray(unpacker.readInt()), byte[].class), true, msg.dataSize);
-                    } else {
-                        rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), false, msg.dataSize);
-                    }
+                    rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed, msg.dataSize);
 
                     Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, "");
                     cmd.setCompressFlag(msg.compressed);
--- a/src/main/java/alice/datasegment/Command.java	Thu Nov 26 23:36:53 2015 +0900
+++ b/src/main/java/alice/datasegment/Command.java	Tue Dec 15 11:49:07 2015 +0900
@@ -80,8 +80,6 @@
             byte[] header = null;
             byte[] data = null;
             byte[] dataSize = null;
-            boolean serialized = false;
-            boolean compressed = false;
             switch (type) {
         /*
          * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment
@@ -95,15 +93,12 @@
                 case PUT:
                 case REPLY:
                     if(compressFlag){
-                        // ToDo: Do not pack again
-                        data = packer.write(rData.getZMessagePack());
-                        compressed = true;
+                        data = rData.getZMessagePack();
                     } else {
                         data = rData.getMessagePack();
-                        serialized = true;
                     }
 
-                    CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed, rData.getDataSize());
+                    CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, compressFlag, rData.getDataSize());
                     if (rData.getSetTime()) {
                         cm.setTime = true;
                         cm.time = rData.getTime();
@@ -123,7 +118,7 @@
                     buf.put(data);
                     break;
                 default:
-                    header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag, 0));
+                    header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, compressFlag, 0));
                     buf = ByteBuffer.allocate(header.length);
                     buf.put(header);
                     break;
--- a/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java	Thu Nov 26 23:36:53 2015 +0900
+++ b/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java	Tue Dec 15 11:49:07 2015 +0900
@@ -17,30 +17,12 @@
 
     public CompressedLocalDataSegmentManager(LocalDataSegmentManager manager) {
         this.manager = manager;
-        new Thread(replyThread, "CompressedLocalDataSegmentManager-replyCommand").start();
     }
 
     public void setReverseKey(String s){
         reverseKey = s;
     }
 
-    private class RunCommand implements Runnable {
-
-        DataSegmentKey key;
-        Command cmd;
-
-        public RunCommand(DataSegmentKey key, Command cmd) {
-            this.key = key;
-            this.cmd = cmd;
-        }
-
-        @Override
-        public void run() {
-            key.runCommand(cmd);
-        }
-
-    }
-
     public void submitCommand(DataSegmentKey key, Command cmd) {
         manager.submitCommand(key, cmd);
     }
--- a/src/main/java/alice/datasegment/DataSegmentManager.java	Thu Nov 26 23:36:53 2015 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentManager.java	Tue Dec 15 11:49:07 2015 +0900
@@ -39,7 +39,7 @@
     };
 
     public Command getAndRemoveCmd(int index){
-        System.err.println("DSM getAndRemoveCmd seq : " + index);
+        //System.err.println("DSM getAndRemoveCmd seq : " + index);
         return seqHash.remove(index);
     }
 
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Thu Nov 26 23:36:53 2015 +0900
+++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Tue Dec 15 11:49:07 2015 +0900
@@ -92,7 +92,7 @@
 
     public void take1(boolean quickFlag, Command cmd) {
         int seq = this.seq.getAndIncrement();
-        System.err.println("DataSegment take seq :" + seq);
+        //System.err.println("DataSegment take seq :" + seq);
         cmd.setSeq(seq);
         seqHash.put(seq, cmd);
         cmd.setQuickFlag(quickFlag);
--- a/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java	Thu Nov 26 23:36:53 2015 +0900
+++ b/src/main/java/alice/test/codesegment/remote/RemoteIncrement.java	Tue Dec 15 11:49:07 2015 +0900
@@ -13,14 +13,20 @@
      */
     @Override
     public void run() {
+        String z = "";
+        if (num.getReceiveData().compressed()){
+            z = "zMP";
+        }
         int num = this.num.asInteger();
-        System.out.println("[CodeSegment] " + num++);
-        if (num == 10) System.exit(0);
+        System.out.println("[CodeSegment" + z + "] " + num++);
+        if (num == 5) System.exit(0);
 
         RemoteIncrement cs = new RemoteIncrement();
-        cs.num.setKey("compressedremote", "num");
 
-        ods.put("compressedlocal", "num", num);
+        ods.put("compressedremote", "num", num);
+        ods.put("remote", "num", num);
+
+        cs.num.setKey("compressedlocal", "num");
     }
 
 }
\ No newline at end of file
--- a/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java	Thu Nov 26 23:36:53 2015 +0900
+++ b/src/main/java/alice/test/codesegment/remote/RemoteStartCodeSegment.java	Tue Dec 15 11:49:07 2015 +0900
@@ -7,8 +7,9 @@
     @Override
     public void run() {
         RemoteIncrement cs = new RemoteIncrement();
-        cs.num.setKey("compressedremote", "num");
 
-        ods.put("compressedlocal", "num", 0);
+        ods.put("compressedremote", "num", 0);
+
+        cs.num.setKey("compressedlocal", "num");
     }
 }
\ No newline at end of file