changeset 123:d92f0bbad1eb

fix msgpack downgrade
author akahori
date Sat, 15 Dec 2018 17:49:57 +0900
parents 3a0f8a93c84a
children cd4b67334b17
files build.gradle src/main/java/christie/daemon/Connection.java src/main/java/christie/daemon/IncomingTcpConnection.java src/main/java/christie/datagear/RemoteMessage.java src/main/java/christie/datagear/command/PutCommand.java src/main/java/christie/datagear/dg/MessagePackDataGear.java src/main/java/christie/test/topology/localTestTopology/StartLocalTestTopology.java src/main/java/christie/topology/HostMessage.java src/main/java/christie/topology/node/ConfigurationFinish.java
diffstat 9 files changed, 47 insertions(+), 59 deletions(-) [+]
line wrap: on
line diff
--- a/build.gradle	Tue Dec 11 20:51:29 2018 +0900
+++ b/build.gradle	Sat Dec 15 17:49:57 2018 +0900
@@ -16,8 +16,10 @@
 dependencies {
     compile fileTree(dir: 'lib', include: '*.jar')
     testCompile('org.junit.jupiter:junit-jupiter-api:5.2.0')
-    compile group: 'org.msgpack', name: 'msgpack-core', version: '0.8.16'
-    compile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.16'
+    //compile group: 'org.msgpack', name: 'msgpack-core', version: '0.8.16'
+    //compile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.16'
+    compile group: 'org.msgpack', name: 'msgpack', version: '0.6.12'
+
 
 }
 
--- a/src/main/java/christie/daemon/Connection.java	Tue Dec 11 20:51:29 2018 +0900
+++ b/src/main/java/christie/daemon/Connection.java	Sat Dec 15 17:49:57 2018 +0900
@@ -64,7 +64,7 @@
             while (buffer.hasRemaining()) {
                 socket.getChannel().write(buffer);
             }
-            System.out.println("write : " + cmd.key);
+            //System.out.println("write : " + cmd.key);
 
         } catch (Exception e) {
             e.printStackTrace();
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java	Tue Dec 11 20:51:29 2018 +0900
+++ b/src/main/java/christie/daemon/IncomingTcpConnection.java	Sat Dec 15 17:49:57 2018 +0900
@@ -8,19 +8,12 @@
 import christie.datagear.RemoteMessage;
 import christie.datagear.command.RemotePeekCommand;
 import christie.datagear.command.RemoteTakeCommand;
-import christie.datagear.dg.DataGear;
 import christie.datagear.dg.MessagePackDataGear;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.msgpack.jackson.dataformat.MessagePackFactory;
-
-//import org.msgpack.MessagePack;
-//import org.msgpack.unpacker.Unpacker;
-
+import org.msgpack.MessagePack;
+import org.msgpack.unpacker.Unpacker;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.channels.ClosedChannelException;
 
 public class IncomingTcpConnection extends Thread {
@@ -28,48 +21,42 @@
     RemoteDataGearManager manager;
     CodeGearManager cgm;
     Connection connection;
-    ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
+    private MessagePack packer = new MessagePack();
 
     public IncomingTcpConnection(Connection connection, CodeGearManager cgm) {
         this.connection = connection;
         this.cgm = cgm;
     }
 
-    public void setManager(RemoteDataGearManager manager) {
+    public void setManager(RemoteDataGearManager manager){
         this.manager = manager;
     }
 
     public void run() {
-
-        objectMapper.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
-
-        InputStream in = null;
+        Unpacker unpacker = null;
         try {
-            in = connection.socket.getInputStream();
+            unpacker = packer.createUnpacker(connection.socket.getInputStream());
         } catch (IOException e) {
             e.printStackTrace();
         }
-        if (in == null) {
-            System.out.println("return");
+        if (unpacker == null) {
             return;
         }
-
         while (true) {
             try {
-
-                RemoteMessage msg = objectMapper.readValue(in, RemoteMessage.class);
-
+                RemoteMessage msg = unpacker.read(RemoteMessage.class);
                 CommandType type = CommandType.getCommandTypeFromId(msg.type);
-                System.out.println("read : " + msg.key);
+                //System.out.println("read " + msg.key);
+                byte[] data;
 
                 switch (type) {
                     case PUT:
-                        byte[] msgpackdg = objectMapper.readValue(in, byte[].class);
-
+                        data = new byte[unpacker.readInt()];
+                        connection.socket.getInputStream().read(data);
                         try {
-                            MessagePackDataGear dg = new MessagePackDataGear(msgpackdg, Class.forName(msg.clazz));
+                            MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));
                             cgm.getLocalDGM().put(msg.key, dg);
-                        } catch (Exception e) {
+                        } catch (ClassNotFoundException e) {
                             e.printStackTrace();
                         }
 
@@ -83,10 +70,11 @@
 
                         break;
                     case REPLY://待っていたwaitListに渡してcsにセット
-                        //data = new byte[objectMapper.readValue(convertedCommand, Integer.class)];
-                        //connection.socket.getInputStream().read(data);
+                        data = new byte[unpacker.readInt()];
+                        connection.socket.getInputStream().read(data);
+
                         try {
-                            MessagePackDataGear dg = new MessagePackDataGear(null, Class.forName(msg.clazz));
+                            MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));
                             cgm.getDGM(msg.fromDgmName).resolveWaitCommand(msg.key, dg);
                         } catch (ClassNotFoundException e) {
                             e.printStackTrace();
@@ -96,12 +84,10 @@
                     default:
                         break;
                 }
-
             } catch (ClosedChannelException e) {
-                e.printStackTrace();
                 return;
-            } catch (EOFException e) {
-                e.printStackTrace();
+            }catch (EOFException e) {
+                return;
             } catch (IOException e) {
                 e.printStackTrace();
             }
--- a/src/main/java/christie/datagear/RemoteMessage.java	Tue Dec 11 20:51:29 2018 +0900
+++ b/src/main/java/christie/datagear/RemoteMessage.java	Sat Dec 15 17:49:57 2018 +0900
@@ -1,5 +1,8 @@
 package christie.datagear;
 
+import org.msgpack.annotation.Message;
+
+@Message
 public class RemoteMessage {
     public int type;//PUT, PEEKなどのコマンドタイプ
     public String fromDgmName;//送り元のdsmName。REPLYのときに使用。
--- a/src/main/java/christie/datagear/command/PutCommand.java	Tue Dec 11 20:51:29 2018 +0900
+++ b/src/main/java/christie/datagear/command/PutCommand.java	Sat Dec 15 17:49:57 2018 +0900
@@ -4,14 +4,12 @@
 import christie.datagear.command.CommandType;
 import christie.datagear.dg.DataGear;
 import christie.datagear.dg.MessagePackDataGear;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.msgpack.jackson.dataformat.MessagePackFactory;
+import org.msgpack.MessagePack;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class PutCommand extends Command {
-    ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
 
     public PutCommand(int cgmID, String toDgmName, String key, DataGear dg){
         this.type = CommandType.PUT;
@@ -25,20 +23,23 @@
     @Override
     public ByteBuffer convert() {
         ByteBuffer buf = null;
+        MessagePack packer = new MessagePack();
+
         try {
-            byte[] command = objectMapper.writeValueAsBytes(createRemoteMessage());
-            byte[] data = objectMapper.writeValueAsBytes(new MessagePackDataGear(dg.getData()).getMessagePack());
+            byte[] command = packer.write(createRemoteMessage());
+            byte[] data = new MessagePackDataGear(dg.getData()).getMessagePack();
+            byte[] dataSize = packer.write(data.length);
 
-            buf = ByteBuffer.allocate(command.length+data.length);
+            buf = ByteBuffer.allocate(command.length+dataSize.length+data.length);
             buf.put(command);
+            buf.put(dataSize);
             buf.put(data);
+
             buf.flip();
-
         } catch (IOException e) {
             e.printStackTrace();
         }
 
         return buf;
     }
-
 }
--- a/src/main/java/christie/datagear/dg/MessagePackDataGear.java	Tue Dec 11 20:51:29 2018 +0900
+++ b/src/main/java/christie/datagear/dg/MessagePackDataGear.java	Sat Dec 15 17:49:57 2018 +0900
@@ -1,18 +1,13 @@
 package christie.datagear.dg;
 
-//import org.msgpack.MessagePack;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.msgpack.jackson.dataformat.MessagePackFactory;
-
+import org.msgpack.MessagePack;
 
 import java.io.IOException;
 
 public class MessagePackDataGear<T> extends DataGear {//必ずmessagePack形式を持つDataGear
     private byte[] messagePack = null;
     private int dataSize;
-    //private MessagePack packer = new MessagePack();
-    ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
-
+    private MessagePack packer = new MessagePack();
 
     public MessagePackDataGear(T data){
         super(data);
@@ -33,7 +28,7 @@
             return messagePack;
         } else {
             try {
-                messagePack = objectMapper.writeValueAsBytes(data);
+                messagePack = packer.write(data);
                 setDataSize(messagePack.length);
             } catch (IOException e) {
                 e.printStackTrace();
@@ -46,7 +41,7 @@
     public synchronized T getData(){
         if (data == null){
             try {
-                setData(objectMapper.readValue(messagePack, clazz));
+                setData(packer.read(messagePack, clazz));
             } catch (IOException e) {
                 e.printStackTrace();
             }
--- a/src/main/java/christie/test/topology/localTestTopology/StartLocalTestTopology.java	Tue Dec 11 20:51:29 2018 +0900
+++ b/src/main/java/christie/test/topology/localTestTopology/StartLocalTestTopology.java	Sat Dec 15 17:49:57 2018 +0900
@@ -22,17 +22,17 @@
         String[] managerArg = {"--localPort", String.valueOf(managerPort), "--confFile", "scripts/ring.dot"};
         TopologyManagerConfig topologyManagerConfig = new TopologyManagerConfig(managerArg);
         new StartTopologyManager(topologyManagerConfig);
-
+        CodeGearManager nodeCGM = null;
         for (int i = 1; i<=nodeNum; i++) {
-            CodeGearManager nodeCGM = createCGM(managerPort + i);
+            nodeCGM = createCGM(managerPort + i);
             String[] nodeArg = {"--managerPort", String.valueOf(managerPort),
                                 "--managerHost", "localhost"};
             TopologyNodeConfig cs  = new TopologyNodeConfig(nodeArg);
 
             new StartTopologyNode(nodeCGM, cs, new LTRemoteIncrement());
-            nodeCGM.getLocalDGM().put("num", 0);
 
         }
+        nodeCGM.getLocalDGM().put("num", 0);
 
 
     }
--- a/src/main/java/christie/topology/HostMessage.java	Tue Dec 11 20:51:29 2018 +0900
+++ b/src/main/java/christie/topology/HostMessage.java	Sat Dec 15 17:49:57 2018 +0900
@@ -1,9 +1,11 @@
 package christie.topology;
 
+import org.msgpack.annotation.Message;
+
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
-
+@Message
 public class HostMessage {
     private String hostName;
     private int port;
--- a/src/main/java/christie/topology/node/ConfigurationFinish.java	Tue Dec 11 20:51:29 2018 +0900
+++ b/src/main/java/christie/topology/node/ConfigurationFinish.java	Sat Dec 15 17:49:57 2018 +0900
@@ -25,7 +25,6 @@
     @Override
     protected void run(CodeGearManager cgm) {
         reverseCount++;
-        System.out.println(reverseCount + " " + connectNodeNum);
         if (reverseCount == connectNodeNum) {
             getDGM(topologyNodeConfig.getManagerKey()).put("nodePrepareDone", "done");
             cgm.setup(new Start());