changeset 528:6ebddfac7ff6 dispose

delete RecieveData.setCompressFlag
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Thu, 30 Apr 2015 20:52:21 +0900
parents bfec2c3ff1b8
children cb7c31848d16
files src/main/java/alice/codesegment/SingletonMessage.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/ReceiveData.java
diffstat 4 files changed, 22 insertions(+), 170 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/SingletonMessage.java	Thu Apr 30 18:14:02 2015 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,13 +0,0 @@
-package alice.codesegment;
-
-import org.msgpack.MessagePack;
-
-public class SingletonMessage {
-
-    private static final MessagePack instance = new MessagePack(); 
-
-    public static MessagePack getInstance(){
-        return instance;
-    }
-
-}
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Thu Apr 30 18:14:02 2015 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Thu Apr 30 20:52:21 2015 +0900
@@ -5,6 +5,7 @@
 import java.nio.channels.ClosedChannelException;
 
 import alice.datasegment.*;
+import org.msgpack.MessagePack;
 import org.msgpack.unpacker.Unpacker;
 
 import alice.codesegment.SingletonMessage;
@@ -16,7 +17,7 @@
     protected DataSegmentManager manager;
     protected String reverseKey;
     private LocalDataSegmentManager lmanager = DataSegment.getLocal();
-    private LocalDataSegmentManager compressedlmanager = DataSegment.getCompressedLocal();
+    private static final MessagePack packer = new MessagePack();
 
     public IncomingTcpConnection(DataSegmentManager manager) {
         this.manager = manager;
@@ -32,17 +33,13 @@
         return lmanager;
     }
 
-    public LocalDataSegmentManager getCompressedLocalDataSegmentManager(){//追加
-        return compressedlmanager;
-    }
-
     /**
      * pipeline thread for receiving
      */
     public void run() {
         Unpacker unpacker = null;
         try {
-            unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream());
+            unpacker = packer.createUnpacker(connection.socket.getInputStream());
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -59,8 +56,7 @@
                 case UPDATE:
                 case PUT:
                     System.out.println("in TCP PUT");
-                    rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()));
-                    rData.setCompressFlag(msg.compressed);
+                    rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()), msg.compressed);
 
                     if (msg.setTime) {
                         rData.setTime = true;
@@ -70,25 +66,16 @@
 
                     cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey);
 
-                    if (msg.compressed){
-                        compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd);
-                    } else {
-                        lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
-                    }
+                    lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
+
 
                     break;
                 case PEEK:
                 case TAKE:
                     System.out.println("in TCP TAKE");
                     cmd = new Command(type, null, null, null, msg.index, msg.seq, null, null, connection);
-                    cmd.setQuickFlag(msg.quickFlag);
-                    cmd.setCompressFlag(msg.compressed);
 
-                    if (msg.compressed) {
-                        compressedlmanager.getDataSegmentKey(msg.key).runCommand(cmd);
-                    } else {
-                        lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
-                    }
+                    lmanager.getDataSegmentKey(msg.key).runCommand(cmd);
 
                     break;
                 case REMOVE:
@@ -100,7 +87,6 @@
                     cmd = manager.getAndRemoveCmd(msg.seq);
 
                     rData = new ReceiveData(unpacker.getSerializedByteArray(unpacker.readInt()));//ここのコンストラクタでx
-                    rData.setCompressFlag(msg.compressed);
 
                     Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, "");
                     cmd.cs.ids.reply(cmd.receiver, rCmd);
--- a/src/main/java/alice/datasegment/Command.java	Thu Apr 30 18:14:02 2015 +0900
+++ b/src/main/java/alice/datasegment/Command.java	Thu Apr 30 20:52:21 2015 +0900
@@ -88,24 +88,19 @@
 
         case UPDATE:
             System.out.println("update compressFlag:" + compressFlag);
-            rData.setCompressFlag(compressFlag);
             break;
         case PUT:
             System.out.println("put compressFlag:" + compressFlag);
-            rData.setCompressFlag(compressFlag);
             break;
         case REPLY://ReceiveDataからREPLYするDSを取得
             System.out.println("in REPLY");
-            System.out.println("reply compressFlag:" + compressFlag);
-            buf = rData.setMPHeader(new CommandMessage(type.id, index, seq, key, false, rData.serialized(), rData.compressed()), type);
-            rData.setCompressFlag(compressFlag);
+            System.out.println("reply compressFlag:" + compressFlag + ", " + type.id+ ", " +  index+ ", " + seq + ", " +  key+ ", " +  false+ ", " +  rData.serialized()+ ", " +  rData.compressed());
+
 
             break;
         default://PEEK, TAKE, RemoveならCommandMessageとそのサイズのみセット
             ReceiveData rData2 = new ReceiveData("hoge");
             System.out.println("default compressFlag:" + compressFlag);
-            buf = rData2.setMPHeader(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag), type);
-            rData2.setCompressFlag(compressFlag);
 
             break;
         }
--- a/src/main/java/alice/datasegment/ReceiveData.java	Thu Apr 30 18:14:02 2015 +0900
+++ b/src/main/java/alice/datasegment/ReceiveData.java	Thu Apr 30 20:52:21 2015 +0900
@@ -30,10 +30,8 @@
     public boolean setTime = false;
     public int depth = 1;
 
-    private Deflater deflater = new Deflater();
-    private Inflater inflater = new Inflater();
     private ByteBuffer buf = null;
-    private MessagePack msg = SingletonMessage.getInstance();
+    private static final MessagePack packer = new MessagePack();
 
     /**
      * コンストラクタ。Object型のDSと圧縮のメタ情報を受け取る。
@@ -50,14 +48,10 @@
      *
      * @param messagePack DS本体(byteArray)
      */
-    public ReceiveData(byte[] messagePack) {
-        clazz = messagePack.getClass();
-
-        if (messagePack[0] == 0xc1){
-            System.out.println("ReceiveData is zMessagePack");
+    public ReceiveData(byte[] messagePack, boolean compressed) {
+        if (compressed){
             this.zMessagePack = messagePack;
         } else {
-            System.out.println("ReceiveData is MessagePack");
             this.messagePack = messagePack;
         }
     }
@@ -83,11 +77,11 @@
         return asClass(String.class);
     }
 
-    public int asInteger(){
+    public int asInteger() {
         return asClass(Integer.class);
     }
 
-    public Float asFloat(){
+    public Float asFloat() {
         return asClass(Float.class);
     }
 
@@ -96,7 +90,7 @@
             return asClass(Value.class);
         } else {
             try {
-                return SingletonMessage.getInstance().unconvert(val);///convert to Value type by MassagePack
+                return packer.unconvert(val);///convert to Value type by MassagePack
             } catch (IOException e) {
                 e.printStackTrace();
             }
@@ -121,10 +115,8 @@
                 return (T) val;
             }
 
-            if (zMessagePack != null && messagePack == null) {//ToDo:fix
+            if (zMessagePack != null && messagePack == null) {
                 messagePack = unzip(zMessagePack);
-                System.out.println("unzip messagePack:" + messagePack);
-                //zMessagePack = null;?
             }
 
             return SingletonMessage.getInstance().read(messagePack, clazz);
@@ -135,122 +127,12 @@
         }
     }
 
-    public void setCompressFlag(boolean cFlag) {///compress
-        LinkedList<ByteBuffer> input = new LinkedList<ByteBuffer>();
-        LinkedList<ByteBuffer> output = new LinkedList<ByteBuffer>();
-
-        if (cFlag){
-            System.out.println("in setCompressFlag  val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack);
-            //messagePack =  asByteArray();///ToDo:fix
-            if (val != null){
-                try {
-                    messagePack = msg.write(val);
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            } else {
-                messagePack = unzip(zMessagePack);
-            }
-            System.out.println("no zip messagePack:" + messagePack);
-            System.out.print("no zip messagePack: ");
-            for (int i = 0; i < messagePack.length; i++) {
-                System.out.print(Integer.toHexString(messagePack[i] & 0xff));
-            }
-            System.out.print("\n");
-            System.out.println("no zip messagePack length:" + messagePack.length);
-
-            try {
-                //System.out.println("in zip");
-                input.add(ByteBuffer.wrap(messagePack));
-                int len = zip(input, 0, output);
-
-                byte[] ziped = new byte[len + 8];
-                ziped[0] = (byte) 0xc1;///set compressedFlag to header
-                ziped[1] = ziped[2] = ziped[3] = (byte) 0x00;
-                System.arraycopy(intToByteArray(messagePack.length), 0, ziped, 4, 4);///set data length to header
-
-                System.out.println("zipedlen: " + len);
-                //System.out.println("limit: " + output.get(0).limit());
-                //System.out.println("remaining: " + output.get(0).remaining());
-                int tmp = 0;
-                for (int i = 0; i < output.size(); i++){///Is this copy OK???
-                    System.arraycopy(output.get(i).array(), 0, ziped, 8 + tmp, output.get(i).limit());//limit? remaining?
-                    tmp += output.get(i).limit();
-                }
-
-                System.out.print("ziped: ");
-                for (int i = 0; i < ziped.length; i++) {
-                    System.out.print(Integer.toHexString(ziped[i] & 0xff));
-                }
-                System.out.print("\n");
-
-                zMessagePack = ziped;
-                val = null;
-                messagePack = null;
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-    }
 
-    public ByteBuffer setMPHeader(CommandMessage cm, CommandType type){
-
-        System.out.println("in setMPHeader  val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack);
-
-        try {
-            byte[] header = null;//DSのメタデータ用byteArray
-            byte[] data = null;//DS本体用byteArray
-            byte[] dataSize = null;//DSのサイズ
-
-            if (type == CommandType.REPLY){
-                if (val != null) {//純粋なオブジェクトの場合シリアライズ
-                    data = msg.write(val);
-                    System.out.print("header MP data: ");
-                    for (int i = 0; i < data.length; i++) {
-                        System.out.print(Integer.toHexString(data[i] & 0xff));
-                    }
-                    System.out.print("\n");
-                } else { // rData is RAW ByteArray or already serialized
-                    data = messagePack;
-                }
-
-                if (setTime) {//AliceVNCの計測用(消してもいい)
-                    cm.setTime = true;
-                    cm.time = time;
-                    cm.depth = depth + 1;
-                }
+    public int zip() throws IOException {
+        LinkedList<ByteBuffer> inputs = new LinkedList<ByteBuffer>(messagePack);
+        int inputIndex
+        LinkedList<ByteBuffer> outputs;
 
-                //MessagePackでDSを作成(ヘッダー・データ本体のサイズ・データ本体)
-                header = msg.write(cm);
-                dataSize = msg.write(data.length);
-                buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
-                buf.put(header);
-                buf.put(dataSize);
-                buf.put(data);
-            } else {
-                header = msg.write(cm);
-                buf = ByteBuffer.allocate(header.length);
-                buf.put(header);
-            }
-
-            buf.flip();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
-        messagePack = buf.array();
-
-        System.out.print("MP with header: ");
-        for (int i = 0; i < messagePack.length; i++) {
-            System.out.print(Integer.toHexString(messagePack[i] & 0xff));
-        }
-        System.out.print("\n");
-
-        return buf;
-    }
-
-
-    public int zip(LinkedList<ByteBuffer> inputs, int inputIndex, LinkedList<ByteBuffer> outputs) throws IOException {
         int len = 0;
         int INFLATE_BUFSIZE = 1024 * 100;
         ByteBuffer c1 = allocate(INFLATE_BUFSIZE);//for output
@@ -287,6 +169,8 @@
             c1.flip();
             outputs.addLast(c1);
         }
+
+        zMessagePack = outputs
         deflater.reset();
         return len;///return length of ziped data
     }