changeset 116:55e11b053234

refactor msgpack
author akahori
date Thu, 06 Dec 2018 02:42:10 +0900
parents e1e919f12ed9
children 53e31b403815
files src/main/java/christie/daemon/AcceptThread.java src/main/java/christie/daemon/IncomingTcpConnection.java src/main/java/christie/datagear/RemoteDataGearManager.java src/main/java/christie/datagear/command/ConvertedCommand.java src/main/java/christie/datagear/command/PutCommand.java src/main/java/christie/datagear/command/RemoteTakeCommand.java
diffstat 6 files changed, 91 insertions(+), 151 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/christie/daemon/AcceptThread.java	Tue Dec 04 14:20:26 2018 +0900
+++ b/src/main/java/christie/daemon/AcceptThread.java	Thu Dec 06 02:42:10 2018 +0900
@@ -24,8 +24,9 @@
             try {
                 Socket socket = ss.accept();
                 socket.setTcpNoDelay(true);
-                System.out.println("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort());
+                //System.out.println("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort());
                 Connection connection = new Connection(socket);
+                connection.name = getName();
                 String key = "accept" + counter;
                 IncomingTcpConnection in =
                         new IncomingTcpConnection(connection, cgm);
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java	Tue Dec 04 14:20:26 2018 +0900
+++ b/src/main/java/christie/daemon/IncomingTcpConnection.java	Thu Dec 06 02:42:10 2018 +0900
@@ -6,10 +6,11 @@
 
 import christie.datagear.command.CommandType;
 import christie.datagear.RemoteMessage;
-import christie.datagear.command.ConvertedCommand;
 import christie.datagear.command.RemotePeekCommand;
 import christie.datagear.command.RemoteTakeCommand;
+import christie.datagear.dg.DataGear;
 import christie.datagear.dg.MessagePackDataGear;
+import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.msgpack.jackson.dataformat.MessagePackFactory;
 
@@ -17,13 +18,10 @@
 //import org.msgpack.unpacker.Unpacker;
 
 
-
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SocketChannel;
 
 public class IncomingTcpConnection extends Thread {
 
@@ -31,91 +29,79 @@
     CodeGearManager cgm;
     Connection connection;
     ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
-    //private MessagePack packer = new MessagePack();
 
     public IncomingTcpConnection(Connection connection, CodeGearManager cgm) {
         this.connection = connection;
         this.cgm = cgm;
     }
 
-    public void setManager(RemoteDataGearManager manager){
+    public void setManager(RemoteDataGearManager manager) {
         this.manager = manager;
     }
 
     public void run() {
-        //Unpacker unpacker = null;
-        //MessageUnpacker unpacker = null;
+
+        objectMapper.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
 
-        /*try {
-            //unpacker = packer.createUnpacker(connection.socket.getInputStream());
+        InputStream in = null;
+        try {
+            in = connection.socket.getInputStream();
         } catch (IOException e) {
             e.printStackTrace();
         }
-        if (unpacker == null) {
+        if (in == null) {
+            System.out.println("is null");
             return;
-        }*/
-
-        ByteBuffer buf = ByteBuffer.allocate(2048);
-        SocketChannel channel= connection.socket.getChannel();
-
+        }
 
         while (true) {
             try {
-                 //RemoteMessage msg = unpacker.read(RemoteMessage.class);
+
+                RemoteMessage msg = objectMapper.readValue(in, RemoteMessage.class);
 
-                if (channel.read(buf) > 0) {
-                    ConvertedCommand convertedCommand = objectMapper.readValue(buf.array(), ConvertedCommand.class);
-                    RemoteMessage msg = objectMapper.readValue(convertedCommand.getCommand(), RemoteMessage.class);
-
-                    CommandType type = CommandType.getCommandTypeFromId(msg.type);
-                    byte[] data;
-
+                CommandType type = CommandType.getCommandTypeFromId(msg.type);
+                System.out.println("msgkey " + msg.key);
 
-                    switch (type) {
-                        case PUT:
-                            //data = new byte[unpacker.readInt()];
-                            //data = new byte[objectMapper.readValue(connectionInputStream, Integer.class)];
-                            //connection.socket.getInputStream().read(data);
-
-                            data = convertedCommand.getData();
+                switch (type) {
+                    case PUT:
+                        byte[] msgpackdg = objectMapper.readValue(in, byte[].class);
 
-                            try {
-                                MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));
-                                cgm.getLocalDGM().put(msg.key, dg);
-                            } catch (ClassNotFoundException e) {
-                                e.printStackTrace();
-                            }
+                        try {
+                            MessagePackDataGear dg = new MessagePackDataGear(msgpackdg, Class.forName(msg.clazz));
+                            cgm.getLocalDGM().put(msg.key, dg);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
 
-                            break;
+                        break;
 
-                        case REMOTEPEEK:
-                            cgm.getLocalDGM().runCommand(new RemotePeekCommand(msg, connection));
-                            break;
-                        case REMOTETAKE:
-                            cgm.getLocalDGM().runCommand(new RemoteTakeCommand(msg, connection));
+                    case REMOTEPEEK:
+                        cgm.getLocalDGM().runCommand(new RemotePeekCommand(msg, connection));
+                        break;
+                    case REMOTETAKE:
+                        cgm.getLocalDGM().runCommand(new RemoteTakeCommand(msg, connection));
 
-                            break;
-                        case REPLY://待っていたwaitListに渡してcsにセット
-                            //data = new byte[objectMapper.readValue(convertedCommand, Integer.class)];
-                            //connection.socket.getInputStream().read(data);
-                            data = convertedCommand.getData();
-                            try {
-                                MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));
-                                cgm.getDGM(msg.fromDgmName).resolveWaitCommand(msg.key, dg);
-                            } catch (ClassNotFoundException e) {
-                                e.printStackTrace();
-                            }
+                        break;
+                    case REPLY://待っていたwaitListに渡してcsにセット
+                        //data = new byte[objectMapper.readValue(convertedCommand, Integer.class)];
+                        //connection.socket.getInputStream().read(data);
+                        try {
+                            MessagePackDataGear dg = new MessagePackDataGear(null, Class.forName(msg.clazz));
+                            cgm.getDGM(msg.fromDgmName).resolveWaitCommand(msg.key, dg);
+                        } catch (ClassNotFoundException e) {
+                            e.printStackTrace();
+                        }
 
-                            break;
-                        default:
-                            break;
-                    }
-                    buf.clear();
+                        break;
+                    default:
+                        break;
                 }
+
             } catch (ClosedChannelException e) {
+                e.printStackTrace();
                 return;
-            }catch (EOFException e) {
-                return;
+            } catch (EOFException e) {
+                e.printStackTrace();
             } catch (IOException e) {
                 e.printStackTrace();
             }
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java	Tue Dec 04 14:20:26 2018 +0900
+++ b/src/main/java/christie/datagear/RemoteDataGearManager.java	Thu Dec 06 02:42:10 2018 +0900
@@ -11,26 +11,31 @@
 import java.net.InetSocketAddress;
 import java.nio.channels.SocketChannel;
 
-public class RemoteDataGearManager extends DataGearManager{
+import static java.lang.Thread.MAX_PRIORITY;
 
-    public Connection connection;
-    public CodeGearManager cgm;
+public class RemoteDataGearManager extends DataGearManager{
+    private Connection connection;
+    private CodeGearManager cgm;
+    boolean connect = false;
+    Object lock = new Object();
 
     public RemoteDataGearManager(final String dgmName, final String address, final int port, CodeGearManager cgm) {
-        connection = new Connection();
-        connection.name = dgmName;
         this.cgm = cgm;
         RemoteDataGearManager manager = this;
-
         new Thread("Connect-" + dgmName) {
             public void run() {
-                boolean connect = true;
                 do {
                     try {
                         SocketChannel sc = SocketChannel.open(new InetSocketAddress(address, port));
-                        connection.socket = sc.socket();
+                        connection = new Connection(sc.socket());
+                        connection.name = dgmName;
                         connection.socket.setTcpNoDelay(true);
-                        connect = false;
+
+
+                        synchronized (lock){
+                            connect = true;
+                            lock.notify();
+                        }
                     } catch (IOException e) {
                         try {
                             Thread.sleep(50);
@@ -38,7 +43,7 @@
                             e1.printStackTrace();
                         }
                     }
-                } while (connect);
+                } while (!connect);
                 IncomingTcpConnection in = new IncomingTcpConnection(connection, cgm);
                 in.setManager(manager);
                 in.setName(dgmName+"-IncomingTcp");
@@ -50,11 +55,17 @@
                 out.start();
             }
         }.start();
+
     }
 
     @Override
     public void put(String key, Object data) {
+
         Command cm = new PutCommand(0, null, key, new DataGear(data));
+        // これ入れないと, connectionがnullの時があるのでしょうがなくwait.
+        // コンストラクタで呼び出されるThreadをやめて実効すればいんだけどね...
+        if(!connect) connectWait();
+
         connection.write(cm);
     }
 
@@ -93,5 +104,18 @@
         connection.sendCommand(cmd);
     }
 
+    //
+    public void connectWait(){
+        synchronized (lock){
+            while(!connect){
+                try {
+                    lock.wait();
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+
+    }
+
 
 }
--- a/src/main/java/christie/datagear/command/ConvertedCommand.java	Tue Dec 04 14:20:26 2018 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,28 +0,0 @@
-package christie.datagear.command;
-
-
-public class ConvertedCommand {
-    byte[] command;
-    byte[] dataSize;
-    byte[] data;
-
-    public ConvertedCommand(){}
-
-    public ConvertedCommand(byte[] command, byte[] dataSize, byte[] data){
-        this.command = command;
-        this.data = data;
-        this.dataSize = dataSize;
-    }
-
-    public byte[] getCommand() {
-        return command;
-    }
-
-    public byte[] getData() {
-        return data;
-    }
-
-    public byte[] getDataSize() {
-        return dataSize;
-    }
-}
--- a/src/main/java/christie/datagear/command/PutCommand.java	Tue Dec 04 14:20:26 2018 +0900
+++ b/src/main/java/christie/datagear/command/PutCommand.java	Thu Dec 06 02:42:10 2018 +0900
@@ -11,6 +11,7 @@
 import java.nio.ByteBuffer;
 
 public class PutCommand extends Command {
+    ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
 
     public PutCommand(int cgmID, String toDgmName, String key, DataGear dg){
         this.type = CommandType.PUT;
@@ -21,26 +22,15 @@
         this.clazz = dg.getClazz();
     }
 
-    /*
     @Override
     public ByteBuffer convert() {
         ByteBuffer buf = null;
-        //MessagePack packer = new MessagePack();
-
-        ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
-
-
         try {
-            //byte[] command = packer.write(createRemoteMessage());
             byte[] command = objectMapper.writeValueAsBytes(createRemoteMessage());
-            byte[] data = new MessagePackDataGear(dg.getData()).getMessagePack();
-            //byte[] dataSize = packer.write(data.length);
-            byte[] dataSize = objectMapper.writeValueAsBytes(data.length);
+            byte[] data = objectMapper.writeValueAsBytes(new MessagePackDataGear(dg.getData()).getMessagePack());
 
-
-            buf = ByteBuffer.allocate(command.length+dataSize.length+data.length);
+            buf = ByteBuffer.allocate(command.length+data.length);
             buf.put(command);
-            buf.put(dataSize);
             buf.put(data);
             buf.flip();
 
@@ -49,32 +39,6 @@
         }
 
         return buf;
-    }*/
-    @Override
-    public ByteBuffer convert() {
-        ByteBuffer buf = null;
-        //MessagePack packer = new MessagePack();
-
-        ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
-        //objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
+    }
 
-        try {
-            //byte[] command = packer.write(createRemoteMessage());
-            byte[] command = objectMapper.writeValueAsBytes(createRemoteMessage());
-            byte[] data = new MessagePackDataGear(dg.getData()).getMessagePack();
-            //byte[] dataSize = packer.write(data.length);
-            byte[] dataSize = objectMapper.writeValueAsBytes(data.length);
-
-            ConvertedCommand convertedCommand = new ConvertedCommand(command, dataSize, data);
-            byte[] byteCommand = objectMapper.writeValueAsBytes(convertedCommand);
-            buf = ByteBuffer.allocate(byteCommand.length);
-            buf.put(byteCommand);
-            buf.flip();
-
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
-        return buf;
-    }
 }
--- a/src/main/java/christie/datagear/command/RemoteTakeCommand.java	Tue Dec 04 14:20:26 2018 +0900
+++ b/src/main/java/christie/datagear/command/RemoteTakeCommand.java	Thu Dec 06 02:42:10 2018 +0900
@@ -13,6 +13,8 @@
 
 public class RemoteTakeCommand extends Command {
 
+    ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
+
     public RemoteTakeCommand(RemoteMessage msg, Connection cn) {
         this.type = CommandType.REMOTETAKE;
         this.fromDgmName = msg.fromDgmName;
@@ -38,22 +40,13 @@
     @Override
     public ByteBuffer convert() {
         ByteBuffer buf = null;
-        ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
-
-        //MessagePack packer = new MessagePack();
 
         try {
-            //byte[] command = packer.write(createRemoteMessage());
 
             byte[] command = objectMapper.writeValueAsBytes(createRemoteMessage());
-            ConvertedCommand convertedCommand = new ConvertedCommand(command, null, null);
 
-            byte[] byteCommand = objectMapper.writeValueAsBytes(convertedCommand);
-
-            buf = ByteBuffer.allocate(byteCommand.length);
-            buf.put(byteCommand);
-            //buf = ByteBuffer.allocate(command.length);
-            //buf.put(command);
+            buf = ByteBuffer.allocate(command.length);
+            buf.put(command);
 
             buf.flip();
         } catch (IOException e) {