changeset 23:695705dba324

Increased commands by type
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Thu, 25 Jan 2018 17:46:55 +0900
parents 77583ea56656
children 0930b0554299
files src/main/java/christie/annotation/RemotePeek.java src/main/java/christie/codegear/CodeGear.java src/main/java/christie/codegear/InputDataGear.java src/main/java/christie/daemon/Connection.java src/main/java/christie/daemon/IncomingTcpConnection.java src/main/java/christie/daemon/OutboundTcpConnection.java src/main/java/christie/daemon/RemoteMessage.java src/main/java/christie/datagear/Command.java src/main/java/christie/datagear/Command/Command.java src/main/java/christie/datagear/Command/CommandType.java src/main/java/christie/datagear/Command/PeekCommand.java src/main/java/christie/datagear/Command/PutCommand.java src/main/java/christie/datagear/Command/RemotePeekCommand.java src/main/java/christie/datagear/Command/RemoteTakeCommand.java src/main/java/christie/datagear/Command/ReplyCommand.java src/main/java/christie/datagear/Command/TakeCommand.java src/main/java/christie/datagear/CommandType.java src/main/java/christie/datagear/DataGearManager.java src/main/java/christie/datagear/DataGears.java src/main/java/christie/datagear/LocalDataGearManager.java src/main/java/christie/datagear/RemoteDataGearManager.java src/main/java/christie/datagear/RemoteMessage.java src/main/java/christie/datagear/WaitList.java src/main/java/christie/test/Remote/CreateRemotePutTest.java src/main/java/christie/test/Remote/RemotePutTest.java src/main/java/christie/test/Remote/StartRemotePut.java src/main/java/christie/test/RemoteTake/CreateRemoteTakeTest.java src/main/java/christie/test/RemoteTake/RemoteTakeTest.java src/main/java/christie/test/RemoteTake/StartRemoteTake.java
diffstat 29 files changed, 344 insertions(+), 234 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/annotation/RemotePeek.java	Thu Jan 25 17:46:55 2018 +0900
@@ -0,0 +1,13 @@
+package christie.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RemotePeek {
+    String dsmName();
+    String key();
+}
--- a/src/main/java/christie/codegear/CodeGear.java	Tue Jan 23 22:02:54 2018 +0900
+++ b/src/main/java/christie/codegear/CodeGear.java	Thu Jan 25 17:46:55 2018 +0900
@@ -3,10 +3,9 @@
 import christie.annotation.Peek;
 import christie.annotation.RemoteTake;
 import christie.annotation.Take;
-import christie.datagear.Command;
-import christie.datagear.CommandType;
-import christie.datagear.DataGear;
-import christie.datagear.DataGearManager;
+import christie.datagear.*;
+import christie.datagear.Command.Command;
+import christie.datagear.Command.TakeCommand;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -34,20 +33,21 @@
         for (Field field : this.getClass().getDeclaredFields()) {//AnnotationからInputDataGearをセット
             if (field.isAnnotationPresent(Take.class)) {
                 Take ano = field.getAnnotation(Take.class);
-                checkAndSetCommand(field, "local", ano.value());
+                createCommand(field, "local", ano.value());
             } else if (field.isAnnotationPresent(Peek.class)) {
                 Peek ano = field.getAnnotation(Peek.class);
-                checkAndSetCommand(field, "local", ano.value());
+                createCommand(field, "local", ano.value());
             } else if (field.isAnnotationPresent(RemoteTake.class)) {
                 RemoteTake ano = field.getAnnotation(RemoteTake.class);
-                checkAndSetCommand(field, ano.dsmName(), ano.key());
+                createCommand(field, ano.dsmName(), ano.key());
             }
+            //ToDo:add peek
         }
 
         idg.finishInput(cgm, commandList);
     }
 
-    public void checkAndSetCommand(Field field, String dsmName, String key){
+    public void createCommand(Field field, String toDsmName, String key){
 
         if (!field.getName().equals(key)){
             throw new IllegalArgumentException("key and DataGearName do not match");
@@ -64,7 +64,7 @@
             throw new NullPointerException("please initialize DataGear");
         }
 
-        commandList.add(new Command(CommandType.TAKE, this, cgm.cgmID, dsmName, key, dg, dg.getClazz(), null));
+        commandList.add(new TakeCommand(this, cgm.cgmID, toDsmName, key, dg));
     }
 
     public DataGearManager getLocalDGM() {
--- a/src/main/java/christie/codegear/InputDataGear.java	Tue Jan 23 22:02:54 2018 +0900
+++ b/src/main/java/christie/codegear/InputDataGear.java	Thu Jan 25 17:46:55 2018 +0900
@@ -2,7 +2,7 @@
 
 import christie.annotation.Peek;
 import christie.annotation.Take;
-import christie.datagear.Command;
+import christie.datagear.Command.Command;
 import christie.datagear.DataGear;
 
 import java.lang.reflect.Field;
@@ -33,7 +33,8 @@
         }
 
         for(Command cm : commandList){
-            cgm.getDGM(cm.dgmName).take(cm);
+            cgm.getDGM(cm.toDgmName).take(cm);
+            //ToDo:add peek
         }
     }
 
--- a/src/main/java/christie/daemon/Connection.java	Tue Jan 23 22:02:54 2018 +0900
+++ b/src/main/java/christie/daemon/Connection.java	Thu Jan 25 17:46:55 2018 +0900
@@ -4,7 +4,7 @@
 import java.nio.ByteBuffer;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import christie.datagear.Command;
+import christie.datagear.Command.Command;
 
 public class Connection {
 
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java	Tue Jan 23 22:02:54 2018 +0900
+++ b/src/main/java/christie/daemon/IncomingTcpConnection.java	Thu Jan 25 17:46:55 2018 +0900
@@ -4,23 +4,26 @@
 import christie.codegear.CodeGearManager;
 import christie.datagear.*;
 
+import christie.datagear.Command.Command;
+import christie.datagear.Command.CommandType;
+import christie.datagear.RemoteMessage;
+import christie.datagear.Command.RemoteTakeCommand;
 import org.msgpack.MessagePack;
 import org.msgpack.unpacker.Unpacker;
 
 import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
-import java.util.concurrent.ConcurrentHashMap;
 
 public class IncomingTcpConnection extends Thread {
 
     RemoteDataGearManager manager;
-    ConcurrentHashMap<Integer, CodeGearManager> cgms;
+    CodeGearManager cgm;
     Connection connection;
     private MessagePack packer = new MessagePack();
 
     public IncomingTcpConnection(Connection connection, CodeGearManager cgm) {
         this.connection = connection;
-        this.cgms = cgm.getCgms();
+        this.cgm = cgm;
     }
 
     public void setManager(RemoteDataGearManager manager){
@@ -39,7 +42,6 @@
         }
         while (true) {
             try {
-                Command cmd = null;
                 RemoteMessage msg = unpacker.read(RemoteMessage.class);
                 CommandType type = CommandType.getCommandTypeFromId(msg.type);
                 int dataSize = unpacker.readInt();
@@ -49,28 +51,23 @@
                         connection.socket.getInputStream().read(data);
                         try {
                             MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));
-
-                            if (cgms.containsKey(msg.cgmID)){
-                                cgms.get(msg.cgmID).getLocalDGM().put(msg.key, dg);
-                            } else {
-                                throw new IllegalArgumentException("DGM_ID:" + msg.cgmID + "is not found");
-                            }
-
+                            cgm.getLocalDGM().put(msg.key, dg);
                         } catch (ClassNotFoundException e) {
                             e.printStackTrace();
                         }
 
                         break;
 
-                    case PEEK:
-                    case TAKE:
-                        cmd = new Command(type, null, msg.cgmID, msg.dgmName, msg.key, null, null, connection);
+                    case REMOTEPEEK:
+                    case REMOTETAKE:
+                        RemoteTakeCommand cmd = null;
+                        try {
+                            cmd = new RemoteTakeCommand(msg.fromDgmName, msg.key, Class.forName(msg.clazz), connection);
+                        } catch (ClassNotFoundException e) {
+                            e.printStackTrace();
+                        }
 
-                        if (cgms.containsKey(msg.cgmID)){
-                            cgms.get(msg.cgmID).getLocalDGM().take(cmd);
-                        } else {
-                            throw new IllegalArgumentException("DGM_ID:" + msg.cgmID + "is not found");
-                        }
+                        cgm.getLocalDGM().take(cmd);
 
                         break;
                     case REPLY://待っていたwaitListに渡してcsにセット
@@ -78,7 +75,7 @@
                         try {
                             MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));
 
-                            DataGearManager dgm = cgms.get(msg.cgmID).getDGM(msg.dgmName);
+                            DataGearManager dgm = cgm.getDGM(msg.fromDgmName);
                             Command cm = dgm.waitList.getAndRemoveCommand(msg.key);
                             cm.dg = dg;
                             dgm.runCommand(cm);
--- a/src/main/java/christie/daemon/OutboundTcpConnection.java	Tue Jan 23 22:02:54 2018 +0900
+++ b/src/main/java/christie/daemon/OutboundTcpConnection.java	Thu Jan 25 17:46:55 2018 +0900
@@ -1,6 +1,6 @@
 package christie.daemon;
 
-import christie.datagear.Command;
+import christie.datagear.Command.Command;
 
 public class OutboundTcpConnection extends Thread {
 
--- a/src/main/java/christie/daemon/RemoteMessage.java	Tue Jan 23 22:02:54 2018 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,22 +0,0 @@
-package christie.daemon;
-
-import org.msgpack.annotation.Message;
-
-@Message
-public class RemoteMessage {
-    public int type;//PUT, PEEKなどのコマンドタイプ
-    public int cgmID;//Localが複数上がっている場合どのLocalか
-    public String dgmName;//送り元のdsmName。REPLYのときに使用。
-    public String key;
-    public String clazz;
-
-    public RemoteMessage(){}//for messagePack
-
-    public RemoteMessage(int type, int cgmID, String dgmName, String key, String clazz) {
-        this.type = type;
-        this.cgmID = cgmID;
-        this.dgmName = dgmName;
-        this.key = key;
-        this.clazz = clazz;
-    }
-}
--- a/src/main/java/christie/datagear/Command.java	Tue Jan 23 22:02:54 2018 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,86 +0,0 @@
-package christie.datagear;
-
-import christie.codegear.CodeGear;
-import christie.daemon.Connection;
-import christie.daemon.RemoteMessage;
-import org.msgpack.MessagePack;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public class Command {
-    public CodeGear cg = null;
-    public DataGear dg;
-    public int cgmID = 1;
-    public String dgmName;
-    public String key;
-    public CommandType type;
-    public Class clazz = null;
-    public Connection connection = null;
-
-    //ToDo:merge put/reply
-    //for put
-    public Command(CommandType type, int cgmID, String dgmName, String key, DataGear dg, Class clazz){
-        this.type = type;
-        this.cgmID = cgmID;
-        this.dgmName = dgmName;
-        this.key = key;
-        this.dg = dg;
-        this.clazz = clazz;
-    }
-
-    //for take/reply
-    public Command(CommandType type, CodeGear cg, int cgmID, String dgmName, String key, DataGear dg,  Class clazz, Connection cn){
-        this.type = type;
-        this.cg = cg;//who wait
-        this.cgmID = cgmID;//where from
-        this.dgmName = dgmName;//where from
-        this.key = key;
-        this.dg = dg;
-        this.clazz = clazz;
-        this.connection = cn;
-    }
-
-    public void setConnection(Connection connection) {//for remote take
-        this.connection = connection;
-    }
-
-    public ByteBuffer convert() {
-        ByteBuffer buf = null;
-
-        try {
-            byte[] command = null;
-            byte[] data = null;
-            byte[] dataSize = null;
-            RemoteMessage mes;
-            MessagePack packer = new MessagePack();
-
-            switch (type) {
-                case PUT:
-                case REPLY:
-                    mes = new RemoteMessage(type.id, cgmID, dgmName, key, clazz.getName());
-
-                    data = new MessagePackDataGear(dg.getData(), dg.getClazz()).getMessagePack();
-                    command = packer.write(mes);
-                    dataSize = packer.write(data.length);
-
-                    buf = ByteBuffer.allocate(command.length+dataSize.length+data.length);
-                    buf.put(command);
-                    buf.put(dataSize);
-                    buf.put(data);
-                    break;
-                default:
-                    mes = new RemoteMessage(type.id, cgmID, dgmName, key, clazz.getName());
-                    command = packer.write(mes);
-                    buf = ByteBuffer.allocate(command.length);
-                    buf.put(command);
-                    break;
-            }
-
-            buf.flip();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        return buf;
-    }
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/datagear/Command/Command.java	Thu Jan 25 17:46:55 2018 +0900
@@ -0,0 +1,28 @@
+package christie.datagear.Command;
+
+import christie.codegear.CodeGear;
+import christie.daemon.Connection;
+import christie.datagear.DataGear;
+import christie.datagear.RemoteMessage;
+
+import java.nio.ByteBuffer;
+
+public abstract class Command {
+    public CommandType type;
+    public String key;
+    public String toDgmName;// for take
+    public String fromDgmName = "local";//for remotetake/reply
+    public int cgmID = 0;// for local meta
+    public CodeGear cg = null;//for localtake
+    public DataGear dg = null;//for put/localtake/reply
+    public Class clazz = null;// for remote
+    public Connection connection = null;//for reply
+
+    //for remote
+    public abstract ByteBuffer convert();
+
+    //for remote
+    public RemoteMessage createRemoteMessage(){
+        return new RemoteMessage(type.id, fromDgmName, key, clazz.getName());
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/datagear/Command/CommandType.java	Thu Jan 25 17:46:55 2018 +0900
@@ -0,0 +1,36 @@
+package christie.datagear.Command;
+
+import java.util.HashMap;
+
+public enum CommandType {
+    PUT,
+    TAKE,
+    PEEK,
+    REMOTETAKE,
+    REMOTEPEEK,
+    REPLY,
+    CLOSE,
+    FINISH;
+
+    public int id;//コマンドのid
+    public static HashMap<Integer, CommandType> hash = new HashMap<Integer, CommandType>();//コマンド対応表
+    private static int lastId = 0;//コマンドの総数
+
+    private CommandType() {
+        this.id = incrementLastId();
+    }//for init
+
+    private int incrementLastId() {
+        return ++lastId;
+    }
+
+    public static CommandType getCommandTypeFromId(int id) {
+        return hash.get(id);
+    }
+
+    static {
+        for (CommandType type : CommandType.values()) {
+            hash.put(type.id, type);
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/datagear/Command/PeekCommand.java	Thu Jan 25 17:46:55 2018 +0900
@@ -0,0 +1,11 @@
+package christie.datagear.Command;
+
+import christie.codegear.CodeGear;
+import christie.datagear.DataGear;
+
+public class PeekCommand extends TakeCommand{
+    public PeekCommand(CodeGear cg, int cgmID, String toDgmName, String key, DataGear dg) {
+        super(cg, cgmID, toDgmName, key, dg);
+        this.type = CommandType.PEEK;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/datagear/Command/PutCommand.java	Thu Jan 25 17:46:55 2018 +0900
@@ -0,0 +1,45 @@
+package christie.datagear.Command;
+
+import christie.datagear.Command.Command;
+import christie.datagear.Command.CommandType;
+import christie.datagear.DataGear;
+import christie.datagear.MessagePackDataGear;
+import org.msgpack.MessagePack;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class PutCommand extends Command {
+
+    public PutCommand(int cgmID, String toDgmName, String key, DataGear dg){
+        this.type = CommandType.PUT;
+        this.cgmID = cgmID;
+        this.toDgmName = toDgmName;
+        this.key = key;
+        this.dg = dg;
+        this.clazz = dg.getClazz();
+    }
+
+    @Override
+    public ByteBuffer convert() {
+        ByteBuffer buf = null;
+        MessagePack packer = new MessagePack();
+
+        try {
+            byte[] command = packer.write(createRemoteMessage());
+            byte[] data = new MessagePackDataGear(dg.getData(), dg.getClazz()).getMessagePack();
+            byte[] dataSize = packer.write(data.length);
+
+            buf = ByteBuffer.allocate(command.length+dataSize.length+data.length);
+            buf.put(command);
+            buf.put(dataSize);
+            buf.put(data);
+
+            buf.flip();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        return buf;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/datagear/Command/RemotePeekCommand.java	Thu Jan 25 17:46:55 2018 +0900
@@ -0,0 +1,10 @@
+package christie.datagear.Command;
+
+import christie.daemon.Connection;
+
+public class RemotePeekCommand extends RemoteTakeCommand {
+    public RemotePeekCommand(String fromDgmName, String key, Class clazz, Connection cn) {
+        super(fromDgmName, key, clazz, cn);
+        this.type = CommandType.PEEK;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/datagear/Command/RemoteTakeCommand.java	Thu Jan 25 17:46:55 2018 +0900
@@ -0,0 +1,36 @@
+package christie.datagear.Command;
+
+import christie.daemon.Connection;
+import org.msgpack.MessagePack;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class RemoteTakeCommand extends Command {
+
+    public RemoteTakeCommand(String fromDgmName, String key, Class clazz, Connection cn) {
+        this.type = CommandType.TAKE;
+        this.fromDgmName = fromDgmName;
+        this.key = key;
+        this.clazz = clazz;
+        this.connection = cn;
+    }
+
+    @Override
+    public ByteBuffer convert() {
+        ByteBuffer buf = null;
+        MessagePack packer = new MessagePack();
+
+        try {
+            byte[] command = packer.write(createRemoteMessage());
+            buf = ByteBuffer.allocate(command.length);
+            buf.put(command);
+
+            buf.flip();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        return buf;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/datagear/Command/ReplyCommand.java	Thu Jan 25 17:46:55 2018 +0900
@@ -0,0 +1,15 @@
+package christie.datagear.Command;
+
+import christie.daemon.Connection;
+import christie.datagear.DataGear;
+
+public class ReplyCommand extends PutCommand {
+
+    public ReplyCommand(String fromDgmName, String key, DataGear dg, Connection cn){
+        super(0, null, key, dg);
+        this.fromDgmName = fromDgmName;
+        this.type = CommandType.REPLY;
+        this.connection = cn;
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/datagear/Command/TakeCommand.java	Thu Jan 25 17:46:55 2018 +0900
@@ -0,0 +1,24 @@
+package christie.datagear.Command;
+
+import christie.codegear.CodeGear;
+import christie.datagear.DataGear;
+
+import java.nio.ByteBuffer;
+
+public class TakeCommand extends Command {
+
+    public TakeCommand(CodeGear cg, int cgmID, String toDgmName, String key, DataGear dg){
+        this.type = CommandType.TAKE;
+        this.cgmID =  cgmID;
+        this.toDgmName = toDgmName;
+        this.key = key;
+        this.dg = dg;
+        this.clazz = dg.getClazz();
+        this.cg = cg;
+    }
+
+    @Override
+    public ByteBuffer convert() {
+        return null;
+    }
+}
--- a/src/main/java/christie/datagear/CommandType.java	Tue Jan 23 22:02:54 2018 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,34 +0,0 @@
-package christie.datagear;
-
-import java.util.HashMap;
-
-public enum CommandType {
-    PUT,
-    TAKE,
-    PEEK,
-    REPLY,
-    CLOSE,
-    FINISH;
-
-    public int id;//コマンドのid
-    public static HashMap<Integer, CommandType> hash = new HashMap<Integer, CommandType>();//コマンド対応表
-    private static int lastId = 0;//コマンドの総数
-
-    private CommandType() {
-        this.id = incrementLastId();
-    }//for init
-
-    private int incrementLastId() {
-        return ++lastId;
-    }
-
-    public static CommandType getCommandTypeFromId(int id) {
-        return hash.get(id);
-    }
-
-    static {
-        for (CommandType type : CommandType.values()) {
-            hash.put(type.id, type);
-        }
-    }
-}
--- a/src/main/java/christie/datagear/DataGearManager.java	Tue Jan 23 22:02:54 2018 +0900
+++ b/src/main/java/christie/datagear/DataGearManager.java	Thu Jan 25 17:46:55 2018 +0900
@@ -1,7 +1,8 @@
 package christie.datagear;
 
+import christie.datagear.Command.Command;
+
 import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/datagear/DataGears.java	Thu Jan 25 17:46:55 2018 +0900
@@ -0,0 +1,32 @@
+package christie.datagear;
+
+import java.util.TreeMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class DataGears {
+    protected TreeMap<String, LinkedBlockingQueue<DataGear>> dataGears = new TreeMap<String, LinkedBlockingQueue<DataGear>>();
+
+    public void add(String key, DataGear dg){
+        if (dataGears.containsKey(key)) {
+            dataGears.get(key).add(dg);
+        } else {
+            LinkedBlockingQueue<DataGear> queue = new LinkedBlockingQueue<DataGear>();
+            queue.add(dg);
+            dataGears.put(key, queue);
+        }
+    }
+
+    public Object take(String key){
+        Object data = dataGears.get(key).poll().getData();
+
+        if (dataGears.get(key).isEmpty()) {
+            dataGears.remove(key);
+        }
+
+        return data;
+    }
+
+    public Object peek(String key){
+        return dataGears.get(key).peek().getData();
+    }
+}
--- a/src/main/java/christie/datagear/LocalDataGearManager.java	Tue Jan 23 22:02:54 2018 +0900
+++ b/src/main/java/christie/datagear/LocalDataGearManager.java	Thu Jan 25 17:46:55 2018 +0900
@@ -1,5 +1,7 @@
 package christie.datagear;
 
+import christie.datagear.Command.*;
+
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
@@ -22,11 +24,12 @@
 
     @Override
     public void metaPut(int cgmID, String key, Object data) {
-        put(key, data);
+        DataGear dg = new DataGear(data, data.getClass());
+        runCommand(new PutCommand(cgmID, "local", key, dg));
     }
 
     public void put(String key, DataGear dg){
-        runCommand(new Command(CommandType.PUT, 1, "local", key, dg, dg.getClazz()));
+        runCommand(new PutCommand(1, "local", key, dg));
     }
 
     public synchronized void runCommand(Command cm){
@@ -46,31 +49,34 @@
 
                 break;
             case TAKE:
-                cm.dg.setData(dataGears.get(cm.key).poll().getData());
-                if (dataGears.get(cm.key).isEmpty()) {
-                    dataGears.remove(cm.key);
-                }
+                setTakeData(cm);
+                cm.cg.getIdg().setInputs(cm.key, cm.dg);
 
-                if (cm.connection == null){//localからならcgにデータをセット
-                    cm.cg.getIdg().setInputs(cm.key, cm.dg);
-                } else {//remoteからならREPLY...ここはRemoteDGMに書くべき?
-                    runCommand(new Command(CommandType.REPLY, null, cm.cgmID, cm.dgmName, cm.key, cm.dg, cm.dg.getClazz(), cm.connection));
-                }
+                break;
+            case REMOTETAKE:
+                setTakeData(cm);
+                runCommand(new ReplyCommand(cm.fromDgmName, cm.key, cm.dg, cm.connection));
 
                 break;
             case PEEK:
                 cm.dg.setData(dataGears.get(cm.key).peek().getData());
+                cm.cg.getIdg().setInputs(cm.key, cm.dg);
 
-                if (cm.connection == null) {
-                    cm.cg.getIdg().setInputs(cm.key, cm.dg);
-                } else {
-                    //ToDo:implement
-                }
-
+                break;
+            case REMOTEPEEK:
+                //cm.dg.setData(dataGears.get(cm.key).peek().getData());
+                //runCommand(new ReplyCommand(cm.cgmID, cm.fromDgmName, cm.key, cm.dg, cm.connection));
                 break;
             case REPLY:
                 cm.connection.write(cm);
                 break;
         }
     }
+
+    private void setTakeData(Command cm){
+        cm.dg.setData(dataGears.get(cm.key).poll().getData());
+        if (dataGears.get(cm.key).isEmpty()) {
+            dataGears.remove(cm.key);
+        }
+    }
 }
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java	Tue Jan 23 22:02:54 2018 +0900
+++ b/src/main/java/christie/datagear/RemoteDataGearManager.java	Thu Jan 25 17:46:55 2018 +0900
@@ -4,11 +4,11 @@
 import christie.daemon.Connection;
 import christie.daemon.IncomingTcpConnection;
 import christie.daemon.OutboundTcpConnection;
+import christie.datagear.Command.*;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.channels.SocketChannel;
-import java.util.concurrent.LinkedBlockingQueue;
 
 public class RemoteDataGearManager extends DataGearManager{
 
@@ -54,14 +54,14 @@
 
     @Override
     public void take(Command cm) {
-        cm.dgmName = connection.name;//送信元
-        cm.setConnection(connection);
         waitList.add(cm);
+        //RemoteTakeCommand remoteCmd = new RemoteTakeCommand(cm.cgmID, connection.name, cm.key, cm.clazz, connection);
+        cm.fromDgmName = connection.name;
         connection.write(cm);
     }
 
     public void metaPut(int cgmID, String key, Object data){//meta
-        Command cm = new Command(CommandType.PUT,  cgmID,"local", key, new DataGear(data, data.getClass()), data.getClass());
+        Command cm = new PutCommand(0, null, key, new DataGear(data, data.getClass()));
         connection.write(cm);
     }
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/datagear/RemoteMessage.java	Thu Jan 25 17:46:55 2018 +0900
@@ -0,0 +1,20 @@
+package christie.datagear;
+
+import org.msgpack.annotation.Message;
+
+@Message
+public class RemoteMessage {
+    public int type;//PUT, PEEKなどのコマンドタイプ
+    public String fromDgmName;//送り元のdsmName。REPLYのときに使用。
+    public String key;
+    public String clazz;
+
+    public RemoteMessage(){}//for messagePack
+
+    public RemoteMessage(int type, String fromDgmName, String key, String clazz) {
+        this.type = type;
+        this.fromDgmName = fromDgmName;
+        this.key = key;
+        this.clazz = clazz;
+    }
+}
--- a/src/main/java/christie/datagear/WaitList.java	Tue Jan 23 22:02:54 2018 +0900
+++ b/src/main/java/christie/datagear/WaitList.java	Thu Jan 25 17:46:55 2018 +0900
@@ -1,5 +1,7 @@
 package christie.datagear;
 
+import christie.datagear.Command.Command;
+
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
--- a/src/main/java/christie/test/Remote/CreateRemotePutTest.java	Tue Jan 23 22:02:54 2018 +0900
+++ b/src/main/java/christie/test/Remote/CreateRemotePutTest.java	Thu Jan 25 17:46:55 2018 +0900
@@ -5,15 +5,9 @@
 
 public class CreateRemotePutTest extends CodeGear {
 
-    int cgmID;
-
-    public CreateRemotePutTest(int cgmID) {
-        this.cgmID = cgmID;
-    }
-
     @Override
     protected void run(CodeGearManager cgm) {//できるだけ並列に走らせるためにStartCodeGearには書かない
-        cgm.setup(new RemotePutTest(cgmID));
-        getDGM("remote").metaPut(cgmID,"hoge", 1);
+        cgm.setup(new RemotePutTest());
+        getDGM("remote").put("hoge", 1);
     }
 }
--- a/src/main/java/christie/test/Remote/RemotePutTest.java	Tue Jan 23 22:02:54 2018 +0900
+++ b/src/main/java/christie/test/Remote/RemotePutTest.java	Thu Jan 25 17:46:55 2018 +0900
@@ -11,18 +11,12 @@
     @Take("hoge")
     public DataGear<Integer> hoge = new DataGear<>();
 
-    public int cgmID;
-
-    public RemotePutTest(int cgmID){
-        this.cgmID = cgmID;
-    }
-
     @Override
     protected void run(CodeGearManager cgm) {
         if (hoge.getData() != 10){
             System.out.println(hoge.getData());
-            cgm.setup(new RemotePutTest(cgmID));
-            getDGM("remote").metaPut(cgmID,"hoge", hoge.getData() + 1);
+            cgm.setup(new RemotePutTest());
+            getDGM("remote").put("hoge", hoge.getData() + 1);
         }
     }
 
--- a/src/main/java/christie/test/Remote/StartRemotePut.java	Tue Jan 23 22:02:54 2018 +0900
+++ b/src/main/java/christie/test/Remote/StartRemotePut.java	Thu Jan 25 17:46:55 2018 +0900
@@ -16,11 +16,11 @@
         new StartRemotePut(cgm);
 
         cgm.createRemoteDGM("remote", "localhost", 10001);
-        cgm.setup(new CreateRemotePutTest( 2));//この時点でcgm"second"は作られていない→notifyAllで対処?
+        cgm.setup(new CreateRemotePutTest( ));//この時点でcgm"second"は作られていない→notifyAllで対処?
 
         CodeGearManager cgm2 = createCGM(10001);
         cgm2.createRemoteDGM("remote", "localhost", 10000);
-        cgm2.setup(new CreateRemotePutTest(1));
+        cgm2.setup(new CreateRemotePutTest());
     }
 
 }
--- a/src/main/java/christie/test/RemoteTake/CreateRemoteTakeTest.java	Tue Jan 23 22:02:54 2018 +0900
+++ b/src/main/java/christie/test/RemoteTake/CreateRemoteTakeTest.java	Thu Jan 25 17:46:55 2018 +0900
@@ -6,15 +6,9 @@
 
 public class CreateRemoteTakeTest extends CodeGear{
 
-    int cgmID;
-
-    public CreateRemoteTakeTest(int cgmID) {
-        this.cgmID = cgmID;
-    }
-
     @Override
     protected void run(CodeGearManager cgm) {
-        cgm.setup(new RemoteTakeTest(cgmID));
-        getLocalDGM().metaPut(cgmID,"hoge", 1);
+        cgm.setup(new RemoteTakeTest());
+        getLocalDGM().put("hoge", 1);
     }
 }
--- a/src/main/java/christie/test/RemoteTake/RemoteTakeTest.java	Tue Jan 23 22:02:54 2018 +0900
+++ b/src/main/java/christie/test/RemoteTake/RemoteTakeTest.java	Thu Jan 25 17:46:55 2018 +0900
@@ -9,18 +9,11 @@
     @RemoteTake(dsmName = "remote",key = "hoge")
     public DataGear<Integer> hoge = new DataGear<>();
 
-    int cgmID;
-
-    public RemoteTakeTest(int cgmID) {
-        this.cgmID = cgmID;
-    }
-
-
     @Override
     protected void run(CodeGearManager cgm) {
         if (hoge.getData() != 10){
             System.out.println(hoge.getData());
-            cgm.setup(new RemoteTakeTest(cgmID));
+            cgm.setup(new RemoteTakeTest());
             getLocalDGM().put("hoge", hoge.getData() + 1);
         }
     }
--- a/src/main/java/christie/test/RemoteTake/StartRemoteTake.java	Tue Jan 23 22:02:54 2018 +0900
+++ b/src/main/java/christie/test/RemoteTake/StartRemoteTake.java	Thu Jan 25 17:46:55 2018 +0900
@@ -13,11 +13,11 @@
         CodeGearManager cgm = createCGM(10000);
         new StartRemotePut(cgm);
 
-        cgm.createRemoteDGM("remote", "localhost", 10001);
-        cgm.setup(new CreateRemoteTakeTest(1));
+        cgm.createRemoteDGM("remote", "localhost", 10000);
+        cgm.setup(new CreateRemoteTakeTest());
 
         /*CodeGearManager cgm2 = createCGM(10001);
         cgm2.createRemoteDGM("remote", "localhost", 10000);
-        cgm2.setup(new CreateRemoteTakeTest(1));*/
+        cgm2.setup(new CreateRemoteTakeTest());*/
     }
 }