changeset 21:5baccb8f7fbd

add RemoteTake, but it isn't working
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Mon, 22 Jan 2018 23:22:09 +0900
parents 7b45ccc0f70e
children 77583ea56656
files src/main/java/christie/codegear/CodeGear.java src/main/java/christie/codegear/Command.java src/main/java/christie/codegear/CommandType.java src/main/java/christie/codegear/InputDataGear.java src/main/java/christie/daemon/Connection.java src/main/java/christie/daemon/IncomingTcpConnection.java src/main/java/christie/daemon/OutboundTcpConnection.java src/main/java/christie/datagear/Command.java src/main/java/christie/datagear/CommandType.java src/main/java/christie/datagear/DataGearManager.java src/main/java/christie/datagear/LocalDataGearManager.java src/main/java/christie/datagear/RemoteDataGearManager.java src/main/java/christie/test/RemoteTake/CreateRemoteTakeTest.java src/main/java/christie/test/RemoteTake/CreateRemoteTakeTeste.java src/main/java/christie/test/RemoteTake/RemoteTakeTest.java src/main/java/christie/test/RemoteTake/StartRemoteTake.java
diffstat 16 files changed, 245 insertions(+), 177 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/christie/codegear/CodeGear.java	Mon Jan 22 15:34:38 2018 +0900
+++ b/src/main/java/christie/codegear/CodeGear.java	Mon Jan 22 23:22:09 2018 +0900
@@ -1,14 +1,15 @@
 package christie.codegear;
 
 import christie.annotation.Peek;
+import christie.annotation.RemoteTake;
 import christie.annotation.Take;
-import christie.daemon.ChristieDaemon;
+import christie.datagear.Command;
+import christie.datagear.CommandType;
 import christie.datagear.DataGear;
 import christie.datagear.DataGearManager;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
-import java.util.HashMap;
 
 /**
  * Created by e125769 on 12/7/17.
@@ -33,19 +34,22 @@
         for (Field field : this.getClass().getDeclaredFields()) {//AnnotationからInputDataGearをセット
             if (field.isAnnotationPresent(Take.class)) {
                 Take ano = field.getAnnotation(Take.class);
-                checkAndSetCommand(field, ano.value());
+                checkAndSetCommand(field, "local", ano.value());
             } else if (field.isAnnotationPresent(Peek.class)) {
                 Peek ano = field.getAnnotation(Peek.class);
-                checkAndSetCommand(field, ano.value());
+                checkAndSetCommand(field, "local", ano.value());
+            } else if (field.isAnnotationPresent(RemoteTake.class)) {
+                RemoteTake ano = field.getAnnotation(RemoteTake.class);
+                checkAndSetCommand(field, ano.dsmName(), ano.key());
             }
         }
 
         idg.finishInput(cgm, commandList);
     }
 
-    public void checkAndSetCommand(Field field, String name){
+    public void checkAndSetCommand(Field field, String dsmName, String key){
 
-        if (!field.getName().equals(name)){
+        if (!field.getName().equals(key)){
             throw new IllegalArgumentException("key and DataGearName do not match");
         }
 
@@ -60,7 +64,7 @@
             throw new NullPointerException("please initialize DataGear");
         }
 
-        commandList.add(new Command(this, dg, cgm.cgmID,"local", name, CommandType.TAKE));
+        commandList.add(new Command(CommandType.TAKE, this, cgm.cgmID, dsmName, key, dg));
     }
 
     public DataGearManager getLocalDGM() {
--- a/src/main/java/christie/codegear/Command.java	Mon Jan 22 15:34:38 2018 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,76 +0,0 @@
-package christie.codegear;
-
-import christie.daemon.RemoteMessage;
-import christie.datagear.DataGear;
-import christie.datagear.MessagePackDataGear;
-import org.msgpack.MessagePack;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public class Command {
-    public CodeGear cg = null;
-    public DataGear dg;
-    public int cgmID = 1;
-    public String dgmName = "local";
-    public String key;
-    public CommandType type;
-    public Class clazz = null;
-
-    //for put
-    public Command(DataGear dg, int cgmID, String dgmName, String key, CommandType type, Class clazz){
-        this.dg = dg;
-        this.cgmID = cgmID;
-        this.dgmName = dgmName;
-        this.key = key;
-        this.type = type;
-        this.clazz = clazz;
-    }
-
-    //for take
-    public Command(CodeGear cg, DataGear dg, int cgmID, String dgmName, String key, CommandType type){
-        this.cg = cg;
-        this.dg = dg;
-        this.cgmID = cgmID;
-        this.dgmName = dgmName;
-        this.key = key;
-        this.type = type;
-    }
-
-    public ByteBuffer convert() {
-        ByteBuffer buf = null;
-
-        try {
-            byte[] command = null;
-            byte[] data = null;
-            byte[] dataSize = null;
-            MessagePack packer = new MessagePack();
-
-            switch (type) {
-                case PUT:
-                case REPLY:
-                    RemoteMessage mes = new RemoteMessage(type.id, cgmID, key, clazz.getName());
-
-                    data = new MessagePackDataGear(dg.getData(), dg.getClazz()).getMessagePack();
-                    command = packer.write(mes);
-                    dataSize = packer.write(data.length);
-
-                    buf = ByteBuffer.allocate(command.length+dataSize.length+data.length);
-                    buf.put(command);
-                    buf.put(dataSize);
-                    buf.put(data);
-                    break;
-                default:
-                    command = packer.write(new RemoteMessage(type.id, cgmID, key, clazz.getName()));
-                    buf = ByteBuffer.allocate(command.length);
-                    buf.put(command);
-                    break;
-            }
-
-            buf.flip();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        return buf;
-    }
-}
--- a/src/main/java/christie/codegear/CommandType.java	Mon Jan 22 15:34:38 2018 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,34 +0,0 @@
-package christie.codegear;
-
-import java.util.HashMap;
-
-public enum CommandType {
-    PUT,
-    TAKE,
-    PEEK,
-    REPLY,
-    CLOSE,
-    FINISH;
-
-    public int id;//コマンドのid
-    public static HashMap<Integer, CommandType> hash = new HashMap<Integer, CommandType>();//コマンド対応表
-    private static int lastId = 0;//コマンドの総数
-
-    private CommandType() {
-        this.id = incrementLastId();
-    }//for init
-
-    private int incrementLastId() {
-        return ++lastId;
-    }
-
-    public static CommandType getCommandTypeFromId(int id) {
-        return hash.get(id);
-    }
-
-    static {
-        for (CommandType type : CommandType.values()) {
-            hash.put(type.id, type);
-        }
-    }
-}
--- a/src/main/java/christie/codegear/InputDataGear.java	Mon Jan 22 15:34:38 2018 +0900
+++ b/src/main/java/christie/codegear/InputDataGear.java	Mon Jan 22 23:22:09 2018 +0900
@@ -2,13 +2,12 @@
 
 import christie.annotation.Peek;
 import christie.annotation.Take;
+import christie.datagear.Command;
 import christie.datagear.DataGear;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
--- a/src/main/java/christie/daemon/Connection.java	Mon Jan 22 15:34:38 2018 +0900
+++ b/src/main/java/christie/daemon/Connection.java	Mon Jan 22 23:22:09 2018 +0900
@@ -4,7 +4,7 @@
 import java.nio.ByteBuffer;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import christie.codegear.Command;
+import christie.datagear.Command;
 
 public class Connection {
 
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java	Mon Jan 22 15:34:38 2018 +0900
+++ b/src/main/java/christie/daemon/IncomingTcpConnection.java	Mon Jan 22 23:22:09 2018 +0900
@@ -2,9 +2,8 @@
 
 
 import christie.codegear.CodeGearManager;
-import christie.codegear.Command;
-import christie.codegear.CommandType;
-import christie.datagear.DataGear;
+import christie.datagear.Command;
+import christie.datagear.CommandType;
 import christie.datagear.MessagePackDataGear;
 import christie.datagear.RemoteDataGearManager;
 
@@ -17,13 +16,18 @@
 
 public class IncomingTcpConnection extends Thread {
 
+    RemoteDataGearManager manager;
     ConcurrentHashMap<Integer, CodeGearManager> cgms;
     Connection connection;
     private MessagePack packer = new MessagePack();
 
     public IncomingTcpConnection(Connection connection, CodeGearManager cgm) {
+        this.connection = connection;
         this.cgms = cgm.getCgms();
-        this.connection = connection;
+    }
+
+    public void setManager(RemoteDataGearManager manager){
+        this.manager = manager;
     }
 
     public void run() {
@@ -60,20 +64,31 @@
                         }
 
                         break;
-                        /*
+
                     case PEEK:
                     case TAKE:
-                        cmd = new Command(null, dg, null, null, null);
+                        //cmd = new Command(null, dg, null, null, null, connection);
+
+                        if (cgms.containsKey(msg.cgmID)){
+                            cgms.get(msg.cgmID).getLocalDGM().take(cmd);
+                        } else {
+                            throw new IllegalArgumentException("DGM_ID:" + msg.cgmID + "is not found");
+                        }
+
+                        //connection.write(cmd);
 
                         break;
-                    case REPLY:
+                    case REPLY://待っていたwaitListに渡してcsにセット
                         connection.socket.getInputStream().read(data);
-                        dg = new DataGear(data, data.getClass());
+                        try {
+                            MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));
+                            //waitListからcommandをとりだし値をセット
+                            //resolveWait(cmd);
+                        } catch (ClassNotFoundException e) {
+                            e.printStackTrace();
+                        }
 
-                        Command rCmd = new Command(null, dg, null, null, null);
-                        //cmd.cg.idg.reply(cmd.dg, rCmd);
-
-                        break;*/
+                        break;
                     default:
                         break;
                 }
--- a/src/main/java/christie/daemon/OutboundTcpConnection.java	Mon Jan 22 15:34:38 2018 +0900
+++ b/src/main/java/christie/daemon/OutboundTcpConnection.java	Mon Jan 22 23:22:09 2018 +0900
@@ -1,6 +1,6 @@
 package christie.daemon;
 
-import christie.codegear.Command;
+import christie.datagear.Command;
 
 public class OutboundTcpConnection extends Thread {
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/datagear/Command.java	Mon Jan 22 23:22:09 2018 +0900
@@ -0,0 +1,82 @@
+package christie.datagear;
+
+import christie.codegear.CodeGear;
+import christie.daemon.Connection;
+import christie.daemon.RemoteMessage;
+import org.msgpack.MessagePack;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class Command {
+    public CodeGear cg = null;
+    public DataGear dg;
+    public int cgmID = 1;
+    public String dgmName = "local";
+    public String key;
+    public CommandType type;
+    public Class clazz = null;
+    public Connection connection = null;
+
+    //for put/reply
+    public Command(CommandType type, int cgmID, String dgmName, String key, DataGear dg, Class clazz){
+        this.dg = dg;
+        this.cgmID = cgmID;
+        this.dgmName = dgmName;
+        this.key = key;
+        this.type = type;
+        this.clazz = clazz;
+    }
+
+    //for take
+    public Command(CommandType type, CodeGear cg, int cgmID, String dgmName, String key, DataGear dg){
+        this.cg = cg;
+        this.dg = dg;
+        this.cgmID = cgmID;
+        this.dgmName = dgmName;
+        this.key = key;
+        this.type = type;
+        this.connection = connection;
+    }
+
+    public void setConnection(Connection connection) {
+        this.connection = connection;
+    }
+
+    public ByteBuffer convert() {
+        ByteBuffer buf = null;
+
+        try {
+            byte[] command = null;
+            byte[] data = null;
+            byte[] dataSize = null;
+            MessagePack packer = new MessagePack();
+
+            switch (type) {
+                case PUT:
+                case REPLY:
+                    RemoteMessage mes = new RemoteMessage(type.id, cgmID, key, clazz.getName());
+
+                    data = new MessagePackDataGear(dg.getData(), dg.getClazz()).getMessagePack();
+                    command = packer.write(mes);
+                    dataSize = packer.write(data.length);
+
+                    buf = ByteBuffer.allocate(command.length+dataSize.length+data.length);
+                    buf.put(command);
+                    buf.put(dataSize);
+                    buf.put(data);
+                    break;
+                default:
+                    command = packer.write(new RemoteMessage(type.id, cgmID, key, clazz.getName()));
+                    buf = ByteBuffer.allocate(command.length);
+                    buf.put(command);
+                    break;
+            }
+
+            buf.flip();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return buf;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/datagear/CommandType.java	Mon Jan 22 23:22:09 2018 +0900
@@ -0,0 +1,34 @@
+package christie.datagear;
+
+import java.util.HashMap;
+
+public enum CommandType {
+    PUT,
+    TAKE,
+    PEEK,
+    REPLY,
+    CLOSE,
+    FINISH;
+
+    public int id;//コマンドのid
+    public static HashMap<Integer, CommandType> hash = new HashMap<Integer, CommandType>();//コマンド対応表
+    private static int lastId = 0;//コマンドの総数
+
+    private CommandType() {
+        this.id = incrementLastId();
+    }//for init
+
+    private int incrementLastId() {
+        return ++lastId;
+    }
+
+    public static CommandType getCommandTypeFromId(int id) {
+        return hash.get(id);
+    }
+
+    static {
+        for (CommandType type : CommandType.values()) {
+            hash.put(type.id, type);
+        }
+    }
+}
--- a/src/main/java/christie/datagear/DataGearManager.java	Mon Jan 22 15:34:38 2018 +0900
+++ b/src/main/java/christie/datagear/DataGearManager.java	Mon Jan 22 23:22:09 2018 +0900
@@ -1,12 +1,5 @@
 package christie.datagear;
 
-import christie.codegear.CodeGear;
-import christie.codegear.Command;
-import christie.datagear.DataGear;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Queue;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
--- a/src/main/java/christie/datagear/LocalDataGearManager.java	Mon Jan 22 15:34:38 2018 +0900
+++ b/src/main/java/christie/datagear/LocalDataGearManager.java	Mon Jan 22 23:22:09 2018 +0900
@@ -1,11 +1,5 @@
 package christie.datagear;
 
-import christie.codegear.Command;
-import christie.codegear.CommandType;
-
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
@@ -32,13 +26,13 @@
     }
 
     public void put(String key, DataGear dg){
-        runCommand(new Command(dg, 1, "local", key, CommandType.PUT, dg.getClazz()));
+        runCommand(new Command(CommandType.PUT, 1, "local", key, dg, dg.getClazz()));
     }
 
     public synchronized void runCommand(Command cm){
-        switch (cm.type){
+        switch (cm.type) {
             case PUT:
-                if(dataGears.containsKey(cm.key)){
+                if (dataGears.containsKey(cm.key)) {
                     dataGears.get(cm.key).add(cm.dg);
                 } else {
                     LinkedBlockingQueue<DataGear> queue = new LinkedBlockingQueue<DataGear>();
@@ -46,27 +40,29 @@
                     dataGears.put(cm.key, queue);
                 }
 
-                if (waitList.containsKey(cm.key)){
-                    runCommand(waitList.get(cm.key).poll());//待ちコマンドの先頭をとる
-                    if (waitList.get(cm.key).isEmpty()){
-                        waitList.remove(cm.key);
-                    }
-                }
+                resolveWait(cm.key);
+
                 break;
             case TAKE:
-                dataGears.get(cm.key).peek();
                 cm.dg.setData(dataGears.get(cm.key).poll().getData());
-                if (dataGears.get(cm.key).isEmpty()){
+                if (dataGears.get(cm.key).isEmpty()) {
                     dataGears.remove(cm.key);
                 }
 
-                cm.cg.getIdg().setInputs(cm.key, cm.dg);
+                if (cm.connection == null){//localからならcgにデータをセット
+                    cm.cg.getIdg().setInputs(cm.key, cm.dg);
+                } else {//remoteからならREPLY
+                    //runCommand(new Command(CommandType.REPLY, cm.dg,  cm.connection));
+                }
+
                 break;
             case PEEK:
                 cm.dg.setData(dataGears.get(cm.key).peek().getData());
                 cm.cg.getIdg().setInputs(cm.key, cm.dg);
                 break;
-
+            case REPLY:
+                //cm.connection.write(cm);
+                break;
         }
     }
 
@@ -81,9 +77,12 @@
         }
     }
 
-    private void deleteElement(Map<String, LinkedBlockingQueue<Object>> map, String key){
-        if (map.get(key).isEmpty()){
-            map.remove(key);
+    private void resolveWait(String key){
+        if (waitList.containsKey(key)){
+            runCommand(waitList.get(key).poll());//待ちコマンドの先頭をとる
+            if (waitList.get(key).isEmpty()){
+                waitList.remove(key);
+            }
         }
     }
 }
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java	Mon Jan 22 15:34:38 2018 +0900
+++ b/src/main/java/christie/datagear/RemoteDataGearManager.java	Mon Jan 22 23:22:09 2018 +0900
@@ -1,9 +1,6 @@
 package christie.datagear;
 
-import christie.codegear.CodeGear;
 import christie.codegear.CodeGearManager;
-import christie.codegear.Command;
-import christie.codegear.CommandType;
 import christie.daemon.Connection;
 import christie.daemon.IncomingTcpConnection;
 import christie.daemon.OutboundTcpConnection;
@@ -11,14 +8,19 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.channels.SocketChannel;
+import java.util.concurrent.LinkedBlockingQueue;
 
 public class RemoteDataGearManager extends DataGearManager{
 
-    Connection connection;
+    public Connection connection;
+    public CodeGearManager cgm;
 
     public RemoteDataGearManager(final String connectionName, final String address, final int port, CodeGearManager cgm) {
         connection = new Connection();
         connection.name = connectionName;
+        this.cgm = cgm;
+        RemoteDataGearManager manager = this;
+
         new Thread("Connect-" + connectionName) {
             public void run() {
                 boolean connect = true;
@@ -37,6 +39,7 @@
                     }
                 } while (connect);
                 IncomingTcpConnection in = new IncomingTcpConnection(connection, cgm);
+                in.setManager(manager);
                 in.setName(connectionName+"-IncomingTcp");
                 in.setPriority(MAX_PRIORITY);
                 in.start();
@@ -50,12 +53,13 @@
 
     @Override
     public void take(Command cm) {
-
+        cm.setConnection(connection);
+        connection.write(cm);
     }
 
     public void metaPut(int cgmID, String key, Object data){//meta
-        Command cmd = new Command(new DataGear(data, data.getClass()), cgmID,"local", key, CommandType.PUT, data.getClass());
-        connection.write(cmd);
+        Command cm = new Command(CommandType.PUT,  cgmID,"local", key, new DataGear(data, data.getClass()), data.getClass());
+        connection.write(cm);
     }
 
     @Override
@@ -69,7 +73,13 @@
     }
 
     @Override
-    public void addWaitList(Command command) {
-
+    public void addWaitList(Command cm) {
+        if(waitList.containsKey(cm.key)){
+            waitList.get(cm.key).add(cm);
+        } else {
+            LinkedBlockingQueue<Command> queue = new LinkedBlockingQueue<>();
+            queue.add(cm);
+            waitList.put(cm.key, queue);
+        }
     }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/test/RemoteTake/CreateRemoteTakeTest.java	Mon Jan 22 23:22:09 2018 +0900
@@ -0,0 +1,20 @@
+package christie.test.RemoteTake;
+
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import christie.test.Remote.RemotePutTest;
+
+public class CreateRemoteTakeTest extends CodeGear{
+
+    int cgmID;
+
+    public CreateRemoteTakeTest(int cgmID) {
+        this.cgmID = cgmID;
+    }
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        cgm.setup(new RemoteTakeTest(cgmID));
+        getDGM("remote").metaPut(cgmID,"hoge", 1);
+    }
+}
--- a/src/main/java/christie/test/RemoteTake/CreateRemoteTakeTeste.java	Mon Jan 22 15:34:38 2018 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,4 +0,0 @@
-package christie.test.RemoteTake;
-
-public class CreateRemoteTakeTeste {
-}
--- a/src/main/java/christie/test/RemoteTake/RemoteTakeTest.java	Mon Jan 22 15:34:38 2018 +0900
+++ b/src/main/java/christie/test/RemoteTake/RemoteTakeTest.java	Mon Jan 22 23:22:09 2018 +0900
@@ -9,11 +9,18 @@
     @RemoteTake(dsmName = "remote",key = "hoge")
     public DataGear<Integer> hoge = new DataGear<>();
 
+    int cgmID;
+
+    public RemoteTakeTest(int cgmID) {
+        this.cgmID = cgmID;
+    }
+
+
     @Override
     protected void run(CodeGearManager cgm) {
         if (hoge.getData() != 10){
             System.out.println(hoge.getData());
-            cgm.setup(new RemoteTakeTest());
+            cgm.setup(new RemoteTakeTest(cgmID));
             getLocalDGM().put("hoge", hoge.getData() + 1);
         }
     }
--- a/src/main/java/christie/test/RemoteTake/StartRemoteTake.java	Mon Jan 22 15:34:38 2018 +0900
+++ b/src/main/java/christie/test/RemoteTake/StartRemoteTake.java	Mon Jan 22 23:22:09 2018 +0900
@@ -1,4 +1,23 @@
 package christie.test.RemoteTake;
 
-public class StartRemoteTake {
+import christie.codegear.CodeGearManager;
+import christie.codegear.StartCodeGear;
+import christie.test.Remote.StartRemotePut;
+
+public class StartRemoteTake extends StartCodeGear{
+    public StartRemoteTake(CodeGearManager cgm) {
+        super(cgm);
+    }
+
+    public static void main(String args[]){
+        CodeGearManager cgm = createCGM(10000);
+        new StartRemotePut(cgm);
+
+        cgm.createRemoteDGM("remote", "localhost", 10001);
+        cgm.setup(new CreateRemoteTakeTest(2));//この時点でcgm"second"は作られていない→notifyAllで対処?
+
+        CodeGearManager cgm2 = createCGM(10001);
+        cgm2.createRemoteDGM("remote", "localhost", 10000);
+        cgm2.setup(new CreateRemoteTakeTest(1));
+    }
 }