changeset 13:bcd4f2c19185

don't work MessagePack unconvert for remote put
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Wed, 10 Jan 2018 20:37:47 +0900
parents b49a926cbdd9
children f2b30b47ef67
files src/main/java/christie/codegear/CodeGear.java src/main/java/christie/codegear/CodeGearManager.java src/main/java/christie/codegear/Command.java src/main/java/christie/codegear/CommandType.java src/main/java/christie/codegear/StartCodeGear.java src/main/java/christie/daemon/AcceptThread.java src/main/java/christie/daemon/ChristieDaemon.java src/main/java/christie/daemon/Config.java src/main/java/christie/daemon/IncomingTcpConnection.java src/main/java/christie/daemon/OutboundTcpConnection.java src/main/java/christie/daemon/RemoteMessage.java src/main/java/christie/datagear/DataGear.java src/main/java/christie/datagear/RemoteDataGearManager.java src/main/java/christie/test/Remote/RemotePutTest.java src/main/java/christie/test/Remote/StartRemoteTest.java src/main/java/christie/test/Remote/StartTest.java src/main/java/christie/test/TestLocal/StartTest.java
diffstat 17 files changed, 204 insertions(+), 45 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/christie/codegear/CodeGear.java	Tue Jan 09 17:37:43 2018 +0900
+++ b/src/main/java/christie/codegear/CodeGear.java	Wed Jan 10 20:37:47 2018 +0900
@@ -2,6 +2,7 @@
 
 import christie.annotation.Peek;
 import christie.annotation.Take;
+import christie.daemon.ChristieDaemon;
 import christie.datagear.DataGear;
 import christie.datagear.DataGearManager;
 
--- a/src/main/java/christie/codegear/CodeGearManager.java	Tue Jan 09 17:37:43 2018 +0900
+++ b/src/main/java/christie/codegear/CodeGearManager.java	Wed Jan 10 20:37:47 2018 +0900
@@ -1,5 +1,8 @@
 package christie.codegear;
 
+import christie.daemon.ChristieDaemon;
+import christie.daemon.Config;
+import christie.daemon.IncomingTcpConnection;
 import christie.datagear.DataGearManager;
 import christie.datagear.LocalDataGearManager;
 import christie.datagear.RemoteDataGearManager;
@@ -15,15 +18,20 @@
     private ConcurrentHashMap<String, DataGearManager>  dataGearManagers = new ConcurrentHashMap<String, DataGearManager>();
     private ConcurrentHashMap<String, CodeGearManager> cgms;
     private ThreadPoolExecutor threadPoolExecutor;
+    private LocalDataGearManager localDGM = new LocalDataGearManager();
+    private ConcurrentHashMap<String, IncomingTcpConnection> acceptHash = new ConcurrentHashMap<String, IncomingTcpConnection>();
+    public Config conf;
 
-    public CodeGearManager(ThreadPoolExecutor exe, ConcurrentHashMap<String, CodeGearManager> cgms) {
-        dataGearManagers.put("local", new LocalDataGearManager());
+    public CodeGearManager(ThreadPoolExecutor exe, ConcurrentHashMap<String, CodeGearManager> cgms, Config conf) {
+        dataGearManagers.put("local", localDGM);
         this.cgms = cgms;
         threadPoolExecutor = exe;
+        this.conf = conf;
+        new ChristieDaemon(conf, this).listen();//run only first time
     }
 
-    public DataGearManager getDGM(){
-        return dataGearManagers.get("local");
+    public LocalDataGearManager getDGM(){
+        return localDGM;
     }
 
     public DataGearManager getDGM(String dsmName){
@@ -34,6 +42,10 @@
         dataGearManagers.put(dsmName, new RemoteDataGearManager(dsmName, address, port, this));
     }
 
+    public void createRemoteDGM(String dsmName){
+        dataGearManagers.put(dsmName, new RemoteDataGearManager(dsmName, conf.address, conf.connectPort, this));
+    }
+
     public void submit(CodeGear cg){
         threadPoolExecutor.execute(cg.cge);
     }
@@ -46,4 +58,7 @@
         return cgms;
     }
 
+    public void setAccept(String key, IncomingTcpConnection in) {
+        acceptHash.put(key, in);
+    }
 }
--- a/src/main/java/christie/codegear/Command.java	Tue Jan 09 17:37:43 2018 +0900
+++ b/src/main/java/christie/codegear/Command.java	Wed Jan 10 20:37:47 2018 +0900
@@ -14,15 +14,17 @@
     public String dsmName = "local";
     public String key;
     public CommandType type;
+    public Class clazz = null;
     private static final MessagePack packer = new MessagePack();
 
     //for put
-    public Command(DataGear dg, String cgmName, String dsmName, String key, CommandType type){
+    public Command(DataGear dg, String cgmName, String dsmName, String key, CommandType type, Class clazz){
         this.dg = dg;
         this.cgmName = cgmName;
         this.dsmName = dsmName;
         this.key = key;
         this.type = type;
+        this.clazz = clazz;
     }
 
     //for take
@@ -46,20 +48,19 @@
             switch (type) {
                 case PUT:
                 case REPLY:
-
-                    RemoteMessage cm = new RemoteMessage();
+                    RemoteMessage mes = new RemoteMessage(type.id, cgmName, key, clazz.getName());
 
                     data = dg.getMessagePack();
+                    command = packer.write(mes);
+                    dataSize = packer.write(data.length);
 
-                    command = packer.write(cm);
-                    dataSize = packer.write(data.length);
                     buf = ByteBuffer.allocate(command.length+dataSize.length+data.length);
                     buf.put(command);
                     buf.put(dataSize);
                     buf.put(data);
                     break;
                 default:
-                    command = packer.write(new RemoteMessage());
+                    command = packer.write(new RemoteMessage(type.id, cgmName, key, clazz.getName()));
                     buf = ByteBuffer.allocate(command.length);
                     buf.put(command);
                     break;
--- a/src/main/java/christie/codegear/CommandType.java	Tue Jan 09 17:37:43 2018 +0900
+++ b/src/main/java/christie/codegear/CommandType.java	Wed Jan 10 20:37:47 2018 +0900
@@ -12,7 +12,15 @@
 
     public int id;//コマンドのid
     public static HashMap<Integer, CommandType> hash = new HashMap<Integer, CommandType>();//コマンド対応表
+    private static int lastId = 0;//コマンドの総数
 
+    private CommandType() {
+        this.id = incrementLastId();
+    }
+
+    private int incrementLastId() {
+        return ++lastId;
+    }
 
     public static CommandType getCommandTypeFromId(int id) {
         return hash.get(id);
--- a/src/main/java/christie/codegear/StartCodeGear.java	Tue Jan 09 17:37:43 2018 +0900
+++ b/src/main/java/christie/codegear/StartCodeGear.java	Wed Jan 10 20:37:47 2018 +0900
@@ -1,5 +1,8 @@
 package christie.codegear;
 
+import christie.daemon.ChristieDaemon;
+import christie.daemon.Config;
+
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -13,9 +16,14 @@
             Integer.MAX_VALUE, // keepAliveTime
             TimeUnit.SECONDS,
             taskQueue);
+    Config conf;
+
+    public StartCodeGear(Config conf){
+        this.conf = conf;
+    }
 
     public CodeGearManager createCGM(String name){
-        CodeGearManager cgm = new CodeGearManager(threadPoolExecutor, cgms);
+        CodeGearManager cgm = new CodeGearManager(threadPoolExecutor, cgms, conf);
         cgms.put(name, cgm);
         return cgm;
     }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/daemon/AcceptThread.java	Wed Jan 10 20:37:47 2018 +0900
@@ -0,0 +1,45 @@
+package christie.daemon;
+
+import christie.codegear.CodeGearManager;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class AcceptThread extends Thread {
+
+    private ServerSocket ss;
+    public int counter = 0;
+    public CodeGearManager cgm;
+
+    public AcceptThread(ServerSocket ss, String name, CodeGearManager cgm) {
+        super(name);
+        this.ss = ss;
+        this.cgm = cgm;
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            try {
+                Socket socket = ss.accept();
+                socket.setTcpNoDelay(true);
+                System.out.println("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort());
+                Connection connection = new Connection(socket);
+                String key = "accept" + counter;
+                IncomingTcpConnection in =
+                        new IncomingTcpConnection(connection, cgm);
+                in.setName(connection.getInfoString()+"-IncomingTcp");
+                in.start();
+                cgm.setAccept(key, in);
+                OutboundTcpConnection out = new OutboundTcpConnection(connection);
+                out.setName(connection.getInfoString()+"-OutboundTcp");
+                out.start();
+                counter++;
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/daemon/ChristieDaemon.java	Wed Jan 10 20:37:47 2018 +0900
@@ -0,0 +1,40 @@
+package christie.daemon;
+
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.ServerSocketChannel;
+
+public class ChristieDaemon {
+
+    private Config conf;
+    private AcceptThread acceptThread;
+    public CodeGearManager cgm;
+
+    public ChristieDaemon(Config conf, CodeGearManager cgm) {
+        this.conf = conf;
+        this.cgm = cgm;
+    }
+
+    public void listen() {
+        try {
+            ServerSocketChannel ssChannel = ServerSocketChannel.open();
+            ServerSocket ss = ssChannel.socket();
+            ss.setReuseAddress(true);
+
+            // listen on any address ipv4/ipv6
+            InetSocketAddress a = new InetSocketAddress("::", conf.localPort);
+
+            System.out.println("AliceDaemon.listen: bind to " + a);
+            ss.bind(a);
+            acceptThread = new AcceptThread(ss, "ACCEPT" + conf.localPort, cgm);
+            acceptThread.start();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+    }
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/daemon/Config.java	Wed Jan 10 20:37:47 2018 +0900
@@ -0,0 +1,21 @@
+package christie.daemon;
+
+public class Config {
+
+    public int localPort = 10000;
+    public String address = "127.0.0.1";
+    public int connectPort = 10001;
+
+    public Config(String[] args) {
+        for (int i = 0; i< args.length; i++) {
+            if ("-localPort".equals(args[i])) {
+                localPort = Integer.parseInt(args[++i]);
+            }if ("-address".equals(args[i])) {
+                address = args[++i];
+            } else if ("-connectPort".equals(args[i])) {
+                connectPort = Integer.parseInt(args[++i]);
+            }
+        }
+    }
+
+}
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java	Tue Jan 09 17:37:43 2018 +0900
+++ b/src/main/java/christie/daemon/IncomingTcpConnection.java	Wed Jan 10 20:37:47 2018 +0900
@@ -46,8 +46,11 @@
                 switch (type) {
                     case PUT:
                         connection.socket.getInputStream().read(data);
-
-                        cgms.get(msg.cgmName).getDGM().put(msg.key, data);
+                        try {
+                            cgms.get(msg.cgmName).getDGM().put(msg.key, new DataGear(data, Class.forName(msg.clazz)));
+                        } catch (ClassNotFoundException e) {
+                            e.printStackTrace();
+                        }
 
                         break;
                         /*
--- a/src/main/java/christie/daemon/OutboundTcpConnection.java	Tue Jan 09 17:37:43 2018 +0900
+++ b/src/main/java/christie/daemon/OutboundTcpConnection.java	Wed Jan 10 20:37:47 2018 +0900
@@ -24,7 +24,7 @@
                     default:
                         break;
                 }
-                connection.write(cmd);//ここでconvert()がよばれてる
+                connection.write(cmd);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
--- a/src/main/java/christie/daemon/RemoteMessage.java	Tue Jan 09 17:37:43 2018 +0900
+++ b/src/main/java/christie/daemon/RemoteMessage.java	Wed Jan 10 20:37:47 2018 +0900
@@ -1,15 +1,20 @@
 package christie.daemon;
 
+import org.msgpack.annotation.Message;
+
+@Message
 public class RemoteMessage {
     public int type;//PUT, PEEKなどのコマンドタイプ
     public String cgmName;
     public String key;//DS key
+    public String clazz;
 
-    public RemoteMessage() {}
+    public RemoteMessage(){}
 
-    public RemoteMessage(int type, String cgmName, String key) {
+    public RemoteMessage(int type, String cgmName, String key, String clazz) {
         this.type = type;
         this.cgmName = cgmName;
         this.key = key;
+        this.clazz = clazz;
     }
 }
--- a/src/main/java/christie/datagear/DataGear.java	Tue Jan 09 17:37:43 2018 +0900
+++ b/src/main/java/christie/datagear/DataGear.java	Wed Jan 10 20:37:47 2018 +0900
@@ -11,20 +11,20 @@
     private int dataSize;
     private MessagePack packer = new MessagePack();
 
-    public DataGear(T data, Class clazz) {
+    public DataGear(T data, Class clazz) {//for normal
         setClazz(clazz);
         setData(data);
     }
 
-    public DataGear(T... dg) {
+    public DataGear(T... dg) {//for input DataGear init
         @SuppressWarnings("unchecked")
         Class<T> type = (Class<T>) dg.getClass().getComponentType();
         this.clazz = type;
     }
 
-    public DataGear(byte[] messagePack){
+    public DataGear(byte[] messagePack, Class clazz){//for remote
         this.data = null;
-        this.clazz = null;
+        this.clazz = clazz;
         this.messagePack = messagePack;
     }
 
@@ -60,7 +60,7 @@
     public T getData(){
         if (data == null){
             try {
-                data = (T) packer.unconvert(messagePack);
+                setData((T) packer.unconvert(messagePack));//ここで展開できてない
             } catch (IOException e) {
                 e.printStackTrace();
             }
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java	Tue Jan 09 17:37:43 2018 +0900
+++ b/src/main/java/christie/datagear/RemoteDataGearManager.java	Wed Jan 10 20:37:47 2018 +0900
@@ -56,7 +56,7 @@
     @Override
     public void put(String key, Object data) {
         //cgmNameを受け取れるようにする
-        Command cmd = new Command(new DataGear(data, data.getClass()), "first","local", key, CommandType.PUT);
+        Command cmd = new Command(new DataGear(data, data.getClass()), "first","local", key, CommandType.PUT, data.getClass());
         connection.write(cmd);
     }
 
--- a/src/main/java/christie/test/Remote/RemotePutTest.java	Tue Jan 09 17:37:43 2018 +0900
+++ b/src/main/java/christie/test/Remote/RemotePutTest.java	Wed Jan 10 20:37:47 2018 +0900
@@ -9,13 +9,13 @@
 public class RemotePutTest extends CodeGear {
 
     @Take("hoge")
-    DataGear<Integer> hoge = new DataGear<>();
+    public DataGear<Integer> hoge = new DataGear<>();
 
     @Override
     protected void run(CodeGearManager cgm) {
         System.out.println(hoge.getData());
 
-        if (hoge.getData()!= 10){
+        if (hoge.getData() != 10){
             cgm.setup(new TestCodeGear());
             dgm("remote").put("hoge", hoge.getData() + 1);
         }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/test/Remote/StartRemoteTest.java	Wed Jan 10 20:37:47 2018 +0900
@@ -0,0 +1,26 @@
+package christie.test.Remote;
+
+import christie.codegear.CodeGearManager;
+import christie.codegear.StartCodeGear;
+import christie.daemon.Config;
+
+public class StartRemoteTest extends StartCodeGear {
+
+    public StartRemoteTest(Config conf) {
+        super(conf);
+    }
+
+    public static void main(String args[]){
+        StartRemoteTest start = new StartRemoteTest(new Config(args));
+        CodeGearManager cgm = start.createCGM("first");
+        cgm.setup(start);
+        //もう一つスタートする
+    }
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        cgm.setup(new RemotePutTest());
+        cgm.createRemoteDGM("remote");
+        dgm("remote").put("hoge", 1);
+    }
+}
--- a/src/main/java/christie/test/Remote/StartTest.java	Tue Jan 09 17:37:43 2018 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,20 +0,0 @@
-package christie.test.Remote;
-
-import christie.codegear.CodeGearManager;
-import christie.codegear.StartCodeGear;
-
-public class StartTest extends StartCodeGear {
-
-    public static void main(String args[]){
-        christie.test.TestLocal.StartTest start = new christie.test.TestLocal.StartTest();
-        CodeGearManager cgm = start.createCGM("first");
-        cgm.setup(start);
-    }
-
-    @Override
-    protected void run(CodeGearManager cgm) {
-        cgm.setup(new RemotePutTest());
-        cgm.createRemoteDGM("remote","localhost", 1000);
-        dgm("remote").put("test", 1);
-    }
-}
--- a/src/main/java/christie/test/TestLocal/StartTest.java	Tue Jan 09 17:37:43 2018 +0900
+++ b/src/main/java/christie/test/TestLocal/StartTest.java	Wed Jan 10 20:37:47 2018 +0900
@@ -2,12 +2,18 @@
 
 import christie.codegear.CodeGearManager;
 import christie.codegear.StartCodeGear;
+import christie.daemon.Config;
+import christie.daemon.RemoteConfig;
 
 public class StartTest extends StartCodeGear{
 
 
+    public StartTest(Config conf) {
+        super(conf);
+    }
+
     public static void main(String args[]){
-        StartTest start = new StartTest();
+        StartTest start = new StartTest(new Config(args));
         CodeGearManager cgm = start.createCGM("first");
         cgm.setup(start);
     }