changeset 22:77583ea56656

add WaitList and implement RemoteTake but not work
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Tue, 23 Jan 2018 22:02:54 +0900
parents 5baccb8f7fbd
children 695705dba324
files src/main/java/christie/codegear/CodeGear.java src/main/java/christie/daemon/Connection.java src/main/java/christie/daemon/IncomingTcpConnection.java src/main/java/christie/daemon/RemoteMessage.java src/main/java/christie/datagear/Command.java src/main/java/christie/datagear/DataGearManager.java src/main/java/christie/datagear/LocalDataGearManager.java src/main/java/christie/datagear/RemoteDataGearManager.java src/main/java/christie/datagear/WaitList.java src/main/java/christie/test/RemoteTake/CreateRemoteTakeTest.java src/main/java/christie/test/RemoteTake/StartRemoteTake.java
diffstat 11 files changed, 103 insertions(+), 79 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/christie/codegear/CodeGear.java	Mon Jan 22 23:22:09 2018 +0900
+++ b/src/main/java/christie/codegear/CodeGear.java	Tue Jan 23 22:02:54 2018 +0900
@@ -64,7 +64,7 @@
             throw new NullPointerException("please initialize DataGear");
         }
 
-        commandList.add(new Command(CommandType.TAKE, this, cgm.cgmID, dsmName, key, dg));
+        commandList.add(new Command(CommandType.TAKE, this, cgm.cgmID, dsmName, key, dg, dg.getClazz(), null));
     }
 
     public DataGearManager getLocalDGM() {
--- a/src/main/java/christie/daemon/Connection.java	Mon Jan 22 23:22:09 2018 +0900
+++ b/src/main/java/christie/daemon/Connection.java	Tue Jan 23 22:02:54 2018 +0900
@@ -39,8 +39,6 @@
             socket.shutdownInput();
             socket.close();
         } catch (Exception e) { }
-        //putConnectionInfo();
-
     }
 
     public synchronized void write(Command cmd) {
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java	Mon Jan 22 23:22:09 2018 +0900
+++ b/src/main/java/christie/daemon/IncomingTcpConnection.java	Tue Jan 23 22:02:54 2018 +0900
@@ -2,10 +2,7 @@
 
 
 import christie.codegear.CodeGearManager;
-import christie.datagear.Command;
-import christie.datagear.CommandType;
-import christie.datagear.MessagePackDataGear;
-import christie.datagear.RemoteDataGearManager;
+import christie.datagear.*;
 
 import org.msgpack.MessagePack;
 import org.msgpack.unpacker.Unpacker;
@@ -67,7 +64,7 @@
 
                     case PEEK:
                     case TAKE:
-                        //cmd = new Command(null, dg, null, null, null, connection);
+                        cmd = new Command(type, null, msg.cgmID, msg.dgmName, msg.key, null, null, connection);
 
                         if (cgms.containsKey(msg.cgmID)){
                             cgms.get(msg.cgmID).getLocalDGM().take(cmd);
@@ -75,15 +72,17 @@
                             throw new IllegalArgumentException("DGM_ID:" + msg.cgmID + "is not found");
                         }
 
-                        //connection.write(cmd);
-
                         break;
                     case REPLY://待っていたwaitListに渡してcsにセット
                         connection.socket.getInputStream().read(data);
                         try {
                             MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));
-                            //waitListからcommandをとりだし値をセット
-                            //resolveWait(cmd);
+
+                            DataGearManager dgm = cgms.get(msg.cgmID).getDGM(msg.dgmName);
+                            Command cm = dgm.waitList.getAndRemoveCommand(msg.key);
+                            cm.dg = dg;
+                            dgm.runCommand(cm);
+
                         } catch (ClassNotFoundException e) {
                             e.printStackTrace();
                         }
--- a/src/main/java/christie/daemon/RemoteMessage.java	Mon Jan 22 23:22:09 2018 +0900
+++ b/src/main/java/christie/daemon/RemoteMessage.java	Tue Jan 23 22:02:54 2018 +0900
@@ -5,15 +5,17 @@
 @Message
 public class RemoteMessage {
     public int type;//PUT, PEEKなどのコマンドタイプ
-    public int cgmID;
-    public String key;//DS key
+    public int cgmID;//Localが複数上がっている場合どのLocalか
+    public String dgmName;//送り元のdsmName。REPLYのときに使用。
+    public String key;
     public String clazz;
 
-    public RemoteMessage(){}
+    public RemoteMessage(){}//for messagePack
 
-    public RemoteMessage(int type, int cgmID, String key, String clazz) {
+    public RemoteMessage(int type, int cgmID, String dgmName, String key, String clazz) {
         this.type = type;
         this.cgmID = cgmID;
+        this.dgmName = dgmName;
         this.key = key;
         this.clazz = clazz;
     }
--- a/src/main/java/christie/datagear/Command.java	Mon Jan 22 23:22:09 2018 +0900
+++ b/src/main/java/christie/datagear/Command.java	Tue Jan 23 22:02:54 2018 +0900
@@ -12,34 +12,36 @@
     public CodeGear cg = null;
     public DataGear dg;
     public int cgmID = 1;
-    public String dgmName = "local";
+    public String dgmName;
     public String key;
     public CommandType type;
     public Class clazz = null;
     public Connection connection = null;
 
-    //for put/reply
+    //ToDo:merge put/reply
+    //for put
     public Command(CommandType type, int cgmID, String dgmName, String key, DataGear dg, Class clazz){
-        this.dg = dg;
+        this.type = type;
         this.cgmID = cgmID;
         this.dgmName = dgmName;
         this.key = key;
-        this.type = type;
+        this.dg = dg;
         this.clazz = clazz;
     }
 
-    //for take
-    public Command(CommandType type, CodeGear cg, int cgmID, String dgmName, String key, DataGear dg){
-        this.cg = cg;
+    //for take/reply
+    public Command(CommandType type, CodeGear cg, int cgmID, String dgmName, String key, DataGear dg,  Class clazz, Connection cn){
+        this.type = type;
+        this.cg = cg;//who wait
+        this.cgmID = cgmID;//where from
+        this.dgmName = dgmName;//where from
+        this.key = key;
         this.dg = dg;
-        this.cgmID = cgmID;
-        this.dgmName = dgmName;
-        this.key = key;
-        this.type = type;
-        this.connection = connection;
+        this.clazz = clazz;
+        this.connection = cn;
     }
 
-    public void setConnection(Connection connection) {
+    public void setConnection(Connection connection) {//for remote take
         this.connection = connection;
     }
 
@@ -50,12 +52,13 @@
             byte[] command = null;
             byte[] data = null;
             byte[] dataSize = null;
+            RemoteMessage mes;
             MessagePack packer = new MessagePack();
 
             switch (type) {
                 case PUT:
                 case REPLY:
-                    RemoteMessage mes = new RemoteMessage(type.id, cgmID, key, clazz.getName());
+                    mes = new RemoteMessage(type.id, cgmID, dgmName, key, clazz.getName());
 
                     data = new MessagePackDataGear(dg.getData(), dg.getClazz()).getMessagePack();
                     command = packer.write(mes);
@@ -67,7 +70,8 @@
                     buf.put(data);
                     break;
                 default:
-                    command = packer.write(new RemoteMessage(type.id, cgmID, key, clazz.getName()));
+                    mes = new RemoteMessage(type.id, cgmID, dgmName, key, clazz.getName());
+                    command = packer.write(mes);
                     buf = ByteBuffer.allocate(command.length);
                     buf.put(command);
                     break;
--- a/src/main/java/christie/datagear/DataGearManager.java	Mon Jan 22 23:22:09 2018 +0900
+++ b/src/main/java/christie/datagear/DataGearManager.java	Tue Jan 23 22:02:54 2018 +0900
@@ -10,10 +10,10 @@
  */
 public abstract class DataGearManager {
     protected TreeMap<String, LinkedBlockingQueue<DataGear>> dataGears = new TreeMap<String, LinkedBlockingQueue<DataGear>>();
-    protected ConcurrentHashMap<String, LinkedBlockingQueue<Command>> waitList = new ConcurrentHashMap<>();
+    public WaitList waitList = new WaitList();
     public abstract void take(Command cm);
     public abstract void put(String key, Object data);
     public abstract void metaPut(int cgmID, String key, Object data);
     public abstract void runCommand(Command cm);
-    public abstract void addWaitList(Command command);
+
 }
--- a/src/main/java/christie/datagear/LocalDataGearManager.java	Mon Jan 22 23:22:09 2018 +0900
+++ b/src/main/java/christie/datagear/LocalDataGearManager.java	Tue Jan 23 22:02:54 2018 +0900
@@ -11,7 +11,7 @@
         if (dataGears.containsKey(cm.key)) {
             runCommand(cm);
         } else {
-            addWaitList(cm);
+            waitList.add(cm);
         }
     }
 
@@ -40,7 +40,9 @@
                     dataGears.put(cm.key, queue);
                 }
 
-                resolveWait(cm.key);
+                if (waitList.containsKey(cm.key)){
+                    runCommand(waitList.getAndRemoveCommand(cm.key));
+                }
 
                 break;
             case TAKE:
@@ -51,38 +53,24 @@
 
                 if (cm.connection == null){//localからならcgにデータをセット
                     cm.cg.getIdg().setInputs(cm.key, cm.dg);
-                } else {//remoteからならREPLY
-                    //runCommand(new Command(CommandType.REPLY, cm.dg,  cm.connection));
+                } else {//remoteからならREPLY...ここはRemoteDGMに書くべき?
+                    runCommand(new Command(CommandType.REPLY, null, cm.cgmID, cm.dgmName, cm.key, cm.dg, cm.dg.getClazz(), cm.connection));
                 }
 
                 break;
             case PEEK:
                 cm.dg.setData(dataGears.get(cm.key).peek().getData());
-                cm.cg.getIdg().setInputs(cm.key, cm.dg);
+
+                if (cm.connection == null) {
+                    cm.cg.getIdg().setInputs(cm.key, cm.dg);
+                } else {
+                    //ToDo:implement
+                }
+
                 break;
             case REPLY:
-                //cm.connection.write(cm);
+                cm.connection.write(cm);
                 break;
         }
     }
-
-    @Override
-    public void addWaitList(Command cm) {
-        if(waitList.containsKey(cm.key)){
-            waitList.get(cm.key).add(cm);
-        } else {
-            LinkedBlockingQueue<Command> queue = new LinkedBlockingQueue<>();
-            queue.add(cm);
-            waitList.put(cm.key, queue);
-        }
-    }
-
-    private void resolveWait(String key){
-        if (waitList.containsKey(key)){
-            runCommand(waitList.get(key).poll());//待ちコマンドの先頭をとる
-            if (waitList.get(key).isEmpty()){
-                waitList.remove(key);
-            }
-        }
-    }
 }
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java	Mon Jan 22 23:22:09 2018 +0900
+++ b/src/main/java/christie/datagear/RemoteDataGearManager.java	Tue Jan 23 22:02:54 2018 +0900
@@ -15,13 +15,13 @@
     public Connection connection;
     public CodeGearManager cgm;
 
-    public RemoteDataGearManager(final String connectionName, final String address, final int port, CodeGearManager cgm) {
+    public RemoteDataGearManager(final String dgmName, final String address, final int port, CodeGearManager cgm) {
         connection = new Connection();
-        connection.name = connectionName;
+        connection.name = dgmName;
         this.cgm = cgm;
         RemoteDataGearManager manager = this;
 
-        new Thread("Connect-" + connectionName) {
+        new Thread("Connect-" + dgmName) {
             public void run() {
                 boolean connect = true;
                 do {
@@ -40,11 +40,12 @@
                 } while (connect);
                 IncomingTcpConnection in = new IncomingTcpConnection(connection, cgm);
                 in.setManager(manager);
-                in.setName(connectionName+"-IncomingTcp");
+                in.setName(dgmName+"-IncomingTcp");
+                in.setPriority(MAX_PRIORITY);
                 in.setPriority(MAX_PRIORITY);
                 in.start();
                 OutboundTcpConnection out = new OutboundTcpConnection(connection);
-                out.setName(connectionName + "-OutboundTcp");
+                out.setName(dgmName + "-OutboundTcp");
                 out.setPriority(MAX_PRIORITY);
                 out.start();
             }
@@ -53,7 +54,9 @@
 
     @Override
     public void take(Command cm) {
+        cm.dgmName = connection.name;//送信元
         cm.setConnection(connection);
+        waitList.add(cm);
         connection.write(cm);
     }
 
@@ -69,17 +72,13 @@
 
     @Override
     public void runCommand(Command cm) {
-
+        switch (cm.type) {
+            case PUT:
+                break;
+            case TAKE:
+                cm.cg.getIdg().setInputs(cm.key, cm.dg);
+                break;
+        }
     }
 
-    @Override
-    public void addWaitList(Command cm) {
-        if(waitList.containsKey(cm.key)){
-            waitList.get(cm.key).add(cm);
-        } else {
-            LinkedBlockingQueue<Command> queue = new LinkedBlockingQueue<>();
-            queue.add(cm);
-            waitList.put(cm.key, queue);
-        }
-    }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/datagear/WaitList.java	Tue Jan 23 22:02:54 2018 +0900
@@ -0,0 +1,34 @@
+package christie.datagear;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class WaitList {
+
+    private ConcurrentHashMap<String, LinkedBlockingQueue<Command>> waitList = new ConcurrentHashMap<>();
+
+    public void add(Command cm) {
+        if(waitList.containsKey(cm.key)){
+            waitList.get(cm.key).add(cm);
+        } else {
+            LinkedBlockingQueue<Command> queue = new LinkedBlockingQueue<>();
+            queue.add(cm);
+            waitList.put(cm.key, queue);
+        }
+    }
+
+    public Command getAndRemoveCommand(String key) {
+        Command cm = null;
+        if (waitList.containsKey(key)){
+            cm = waitList.get(key).poll();
+            if (waitList.get(key).isEmpty()){
+                waitList.remove(key);
+            }
+        }
+        return cm;
+    }
+
+    public boolean containsKey(String key){
+        return waitList.containsKey(key);
+    }
+}
--- a/src/main/java/christie/test/RemoteTake/CreateRemoteTakeTest.java	Mon Jan 22 23:22:09 2018 +0900
+++ b/src/main/java/christie/test/RemoteTake/CreateRemoteTakeTest.java	Tue Jan 23 22:02:54 2018 +0900
@@ -15,6 +15,6 @@
     @Override
     protected void run(CodeGearManager cgm) {
         cgm.setup(new RemoteTakeTest(cgmID));
-        getDGM("remote").metaPut(cgmID,"hoge", 1);
+        getLocalDGM().metaPut(cgmID,"hoge", 1);
     }
 }
--- a/src/main/java/christie/test/RemoteTake/StartRemoteTake.java	Mon Jan 22 23:22:09 2018 +0900
+++ b/src/main/java/christie/test/RemoteTake/StartRemoteTake.java	Tue Jan 23 22:02:54 2018 +0900
@@ -14,10 +14,10 @@
         new StartRemotePut(cgm);
 
         cgm.createRemoteDGM("remote", "localhost", 10001);
-        cgm.setup(new CreateRemoteTakeTest(2));//この時点でcgm"second"は作られていない→notifyAllで対処?
+        cgm.setup(new CreateRemoteTakeTest(1));
 
-        CodeGearManager cgm2 = createCGM(10001);
+        /*CodeGearManager cgm2 = createCGM(10001);
         cgm2.createRemoteDGM("remote", "localhost", 10000);
-        cgm2.setup(new CreateRemoteTakeTest(1));
+        cgm2.setup(new CreateRemoteTakeTest(1));*/
     }
 }