changeset 12:b49a926cbdd9

add RemotePutTest and that is working
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Tue, 09 Jan 2018 17:37:43 +0900
parents 4e5f6db22033
children bcd4f2c19185
files build.gradle src/main/java/christie/codegear/CodeGear.java src/main/java/christie/codegear/CodeGearManager.java src/main/java/christie/codegear/Command.java src/main/java/christie/codegear/CommandType.java src/main/java/christie/codegear/InputDataGear.java src/main/java/christie/codegear/OutputDataGear.java src/main/java/christie/daemon/Connection.java src/main/java/christie/daemon/IncomingTcpConnection.java src/main/java/christie/daemon/OutboundTcpConnection.java src/main/java/christie/daemon/RemoteMessage.java src/main/java/christie/datagear/DataGear.java src/main/java/christie/datagear/LocalDataGearManager.java src/main/java/christie/datagear/RemoteDataGearManager.java src/main/java/christie/test/Remote/RemotePutTest.java src/main/java/christie/test/Remote/StartTest.java src/main/java/christie/test/StartTest.java src/main/java/christie/test/TestCodeGear.java src/main/java/christie/test/TestLocal/StartTest.java src/main/java/christie/test/TestLocal/TestCodeGear.java
diffstat 20 files changed, 459 insertions(+), 75 deletions(-) [+]
line wrap: on
line diff
--- a/build.gradle	Sat Dec 30 20:11:36 2017 +0900
+++ b/build.gradle	Tue Jan 09 17:37:43 2018 +0900
@@ -14,7 +14,9 @@
 }
 
 dependencies {
+    compile fileTree(dir: 'lib', include: '*.jar')
     testCompile group: 'junit', name: 'junit', version: '4.21'
+    compile group: 'org.msgpack', name: 'msgpack', version: '0.6.12'
 }
 
 jar {
--- a/src/main/java/christie/codegear/CodeGear.java	Sat Dec 30 20:11:36 2017 +0900
+++ b/src/main/java/christie/codegear/CodeGear.java	Tue Jan 09 17:37:43 2018 +0900
@@ -42,8 +42,8 @@
         idg.finishInput(cgm, commandList);
     }
 
-    public DataGearManager dgm(String dest) {
-        return cgm.getDGM(dest);
+    public DataGearManager dgm(String dsmName) {
+        return cgm.getDGM(dsmName);
     }
 
     public void checkAndSetCommand(Field field, String name){
@@ -63,6 +63,7 @@
             throw new NullPointerException("please initialize DataGear");
         }
 
-        commandList.add(new Command(this, dg, "local", name, CommandType.TAKE));
+        //TODO:cgmName
+        commandList.add(new Command(this, dg, "first","local", name, CommandType.TAKE));
     }
 }
--- a/src/main/java/christie/codegear/CodeGearManager.java	Sat Dec 30 20:11:36 2017 +0900
+++ b/src/main/java/christie/codegear/CodeGearManager.java	Tue Jan 09 17:37:43 2018 +0900
@@ -26,12 +26,12 @@
         return dataGearManagers.get("local");
     }
 
-    public DataGearManager getDGM(String dest){
-        return dataGearManagers.get(dest);
+    public DataGearManager getDGM(String dsmName){
+        return dataGearManagers.get(dsmName);
     }
 
-    public void createRemoteDataGeareManager(String dist){
-        dataGearManagers.put(dist, new RemoteDataGearManager());
+    public void createRemoteDGM(String dsmName, String address, int port){
+        dataGearManagers.put(dsmName, new RemoteDataGearManager(dsmName, address, port, this));
     }
 
     public void submit(CodeGear cg){
--- a/src/main/java/christie/codegear/Command.java	Sat Dec 30 20:11:36 2017 +0900
+++ b/src/main/java/christie/codegear/Command.java	Tue Jan 09 17:37:43 2018 +0900
@@ -1,20 +1,74 @@
 package christie.codegear;
 
+import christie.daemon.RemoteMessage;
 import christie.datagear.DataGear;
+import org.msgpack.MessagePack;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
 
 public class Command {
-    public CodeGear cs;
+    public CodeGear cg = null;
     public DataGear dg;
-    public String dest;
+    public String cgmName = "first";
+    public String dsmName = "local";
     public String key;
     public CommandType type;
+    private static final MessagePack packer = new MessagePack();
 
-    public Command(CodeGear cs, DataGear dg, String dest, String key, CommandType type){
-        this.cs = cs;
+    //for put
+    public Command(DataGear dg, String cgmName, String dsmName, String key, CommandType type){
         this.dg = dg;
-        this.dest = dest;
+        this.cgmName = cgmName;
+        this.dsmName = dsmName;
+        this.key = key;
+        this.type = type;
+    }
+
+    //for take
+    public Command(CodeGear cg, DataGear dg, String cgmName, String dsmName, String key, CommandType type){
+        this.cg = cg;
+        this.dg = dg;
+        this.cgmName = cgmName;
+        this.dsmName = dsmName;
         this.key = key;
         this.type = type;
     }
 
+    public ByteBuffer convert() {
+        ByteBuffer buf = null;
+
+        try {
+            byte[] command = null;
+            byte[] data = null;
+            byte[] dataSize = null;
+
+            switch (type) {
+                case PUT:
+                case REPLY:
+
+                    RemoteMessage cm = new RemoteMessage();
+
+                    data = dg.getMessagePack();
+
+                    command = packer.write(cm);
+                    dataSize = packer.write(data.length);
+                    buf = ByteBuffer.allocate(command.length+dataSize.length+data.length);
+                    buf.put(command);
+                    buf.put(dataSize);
+                    buf.put(data);
+                    break;
+                default:
+                    command = packer.write(new RemoteMessage());
+                    buf = ByteBuffer.allocate(command.length);
+                    buf.put(command);
+                    break;
+            }
+
+            buf.flip();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return buf;
+    }
 }
--- a/src/main/java/christie/codegear/CommandType.java	Sat Dec 30 20:11:36 2017 +0900
+++ b/src/main/java/christie/codegear/CommandType.java	Tue Jan 09 17:37:43 2018 +0900
@@ -5,5 +5,22 @@
 public enum CommandType {
     PUT,
     TAKE,
-    PEEK;
+    PEEK,
+    REPLY,
+    CLOSE,
+    FINISH;
+
+    public int id;//コマンドのid
+    public static HashMap<Integer, CommandType> hash = new HashMap<Integer, CommandType>();//コマンド対応表
+
+
+    public static CommandType getCommandTypeFromId(int id) {
+        return hash.get(id);
+    }
+
+    static {
+        for (CommandType type : CommandType.values()) {
+            hash.put(type.id, type);
+        }
+    }
 }
--- a/src/main/java/christie/codegear/InputDataGear.java	Sat Dec 30 20:11:36 2017 +0900
+++ b/src/main/java/christie/codegear/InputDataGear.java	Tue Jan 09 17:37:43 2018 +0900
@@ -31,7 +31,7 @@
         }
 
         for(Command cm : commandList){
-            cgm.getDGM(cm.dest).take(cm);
+            cgm.getDGM(cm.dsmName).take(cm);
         }
     }
 
--- a/src/main/java/christie/codegear/OutputDataGear.java	Sat Dec 30 20:11:36 2017 +0900
+++ b/src/main/java/christie/codegear/OutputDataGear.java	Tue Jan 09 17:37:43 2018 +0900
@@ -21,7 +21,7 @@
         cgm.getDGM("local").put(key, data);
     }
 
-    public void put(String dist, String key, Object data){
-        cgm.getDGM(dist).put(key, data);
+    public void put(String dsmName, String key, Object data){
+        cgm.getDGM(dsmName).put(key, data);
     }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/daemon/Connection.java	Tue Jan 09 17:37:43 2018 +0900
@@ -0,0 +1,56 @@
+package christie.daemon;
+
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import christie.codegear.Command;
+
+public class Connection {
+
+    public Socket socket;
+    public String name;
+    public LinkedBlockingQueue<Command> sendQueue = new LinkedBlockingQueue<Command>();
+    public boolean sendManager = true;
+
+    public Connection(Socket socket) {
+        this.socket = socket;
+    }
+
+    public Connection() {}
+
+    public void sendCommand(Command cmd) {
+        try {
+            sendQueue.put(cmd);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public String getInfoString() {
+        return socket.getInetAddress().getHostName()
+                + ":" + socket.getPort();
+    }
+
+
+    public void close(){
+        try {
+            socket.shutdownOutput();
+            socket.shutdownInput();
+            socket.close();
+        } catch (Exception e) { }
+        //putConnectionInfo();
+
+    }
+
+    public synchronized void write(Command cmd) {
+        ByteBuffer buffer = cmd.convert();
+        try {
+            while (buffer.hasRemaining()) {
+                socket.getChannel().write(buffer);
+            }
+        } catch (Exception e) {
+        }
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/daemon/IncomingTcpConnection.java	Tue Jan 09 17:37:43 2018 +0900
@@ -0,0 +1,79 @@
+package christie.daemon;
+
+
+import christie.codegear.CodeGearManager;
+import christie.codegear.Command;
+import christie.codegear.CommandType;
+import christie.datagear.DataGear;
+import christie.datagear.RemoteDataGearManager;
+
+import org.msgpack.MessagePack;
+import org.msgpack.unpacker.Unpacker;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class IncomingTcpConnection extends Thread {
+
+    ConcurrentHashMap<String, CodeGearManager> cgms;
+    Connection connection;
+    private static final MessagePack packer = new MessagePack();
+
+    public IncomingTcpConnection(Connection connection, CodeGearManager cgm) {
+        this.cgms = cgm.getCgms();
+        this.connection = connection;
+    }
+
+    public void run() {
+        Unpacker unpacker = null;
+        try {
+            unpacker = packer.createUnpacker(connection.socket.getInputStream());
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        if (unpacker == null) {
+            return;
+        }
+        while (true) {
+            try {
+                Command cmd = null;
+                DataGear dg = null;
+                RemoteMessage msg = unpacker.read(RemoteMessage.class);
+                CommandType type = CommandType.getCommandTypeFromId(msg.type);
+                int dataSize = unpacker.readInt();
+                byte[] data = new byte[dataSize];
+                switch (type) {
+                    case PUT:
+                        connection.socket.getInputStream().read(data);
+
+                        cgms.get(msg.cgmName).getDGM().put(msg.key, data);
+
+                        break;
+                        /*
+                    case PEEK:
+                    case TAKE:
+                        cmd = new Command(null, dg, null, null, null);
+
+                        break;
+                    case REPLY:
+                        connection.socket.getInputStream().read(data);
+                        dg = new DataGear(data, data.getClass());
+
+                        Command rCmd = new Command(null, dg, null, null, null);
+                        //cmd.cg.idg.reply(cmd.dg, rCmd);
+
+                        break;*/
+                    default:
+                        break;
+                }
+            } catch (ClosedChannelException e) {
+                return;
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/daemon/OutboundTcpConnection.java	Tue Jan 09 17:37:43 2018 +0900
@@ -0,0 +1,33 @@
+package christie.daemon;
+
+import christie.codegear.Command;
+
+public class OutboundTcpConnection extends Thread {
+
+    Connection connection;
+
+    public OutboundTcpConnection(Connection connection) {
+        this.connection = connection;
+    }
+
+    public void run() {
+        while (true) {
+            try {
+                Command cmd = connection.sendQueue.take();
+                switch (cmd.type) {
+                    case CLOSE:
+                        connection.close();
+                        return;
+                    case FINISH:
+                        System.exit(0);
+                        return;
+                    default:
+                        break;
+                }
+                connection.write(cmd);//ここでconvert()がよばれてる
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/daemon/RemoteMessage.java	Tue Jan 09 17:37:43 2018 +0900
@@ -0,0 +1,15 @@
+package christie.daemon;
+
+public class RemoteMessage {
+    public int type;//PUT, PEEKなどのコマンドタイプ
+    public String cgmName;
+    public String key;//DS key
+
+    public RemoteMessage() {}
+
+    public RemoteMessage(int type, String cgmName, String key) {
+        this.type = type;
+        this.cgmName = cgmName;
+        this.key = key;
+    }
+}
--- a/src/main/java/christie/datagear/DataGear.java	Sat Dec 30 20:11:36 2017 +0900
+++ b/src/main/java/christie/datagear/DataGear.java	Tue Jan 09 17:37:43 2018 +0900
@@ -1,8 +1,15 @@
 package christie.datagear;
 
+import org.msgpack.MessagePack;
+
+import java.io.IOException;
+
 public class DataGear<T>{
     private T data;
     private Class<T> clazz;
+    private byte[] messagePack = null;
+    private int dataSize;
+    private MessagePack packer = new MessagePack();
 
     public DataGear(T data, Class clazz) {
         setClazz(clazz);
@@ -15,12 +22,33 @@
         this.clazz = type;
     }
 
+    public DataGear(byte[] messagePack){
+        this.data = null;
+        this.clazz = null;
+        this.messagePack = messagePack;
+    }
+
     public void setData(T data) {
         if (data.getClass() == this.clazz){
             this.data = data;
         }
     }
 
+    public byte[] getMessagePack(){
+        if (messagePack != null){
+            return messagePack;
+        } else {
+            try {
+                messagePack = packer.write(data);
+                setDataSize(messagePack.length);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+
+            return messagePack;
+        }
+    }
+
     public void setClazz(Class clazz){
         this.clazz = clazz;
     };
@@ -30,7 +58,18 @@
     };
 
     public T getData(){
+        if (data == null){
+            try {
+                data = (T) packer.unconvert(messagePack);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
         return data;
     };
 
+    public void setDataSize(int dataSize) {
+        this.dataSize = dataSize;
+    }
+
 }
--- a/src/main/java/christie/datagear/LocalDataGearManager.java	Sat Dec 30 20:11:36 2017 +0900
+++ b/src/main/java/christie/datagear/LocalDataGearManager.java	Tue Jan 09 17:37:43 2018 +0900
@@ -20,6 +20,10 @@
 
     public void put(String key, Object data) {
         DataGear dg = new DataGear(data, data.getClass());
+        put(key, dg);
+    }
+
+    public void put(String key, DataGear dg){
         if(dataGears.containsKey(key)){
             dataGears.get(key).add(dg);
         } else {
@@ -29,7 +33,7 @@
         }
 
         if (waitList.containsKey(key)){
-            waitList.get(key).dg.setData(data);
+            waitList.get(key).dg.setData(dg.getData());
             runCommand(waitList.get(key));
         }
     }
@@ -38,14 +42,14 @@
     public void runCommand(Command cm){
         switch (cm.type){
             case TAKE:
-                    cm.cs.idg.setInputs(cm.key, cm.dg);
+                    cm.cg.idg.setInputs(cm.key, cm.dg);
                     dataGears.get(cm.key).poll();
                     if (dataGears.get(cm.key).isEmpty()){
                         dataGears.remove(cm.key);
                     }
                     break;
             case PEEK:
-                    cm.cs.idg.setInputs(cm.key, cm.dg);
+                    cm.cg.idg.setInputs(cm.key, cm.dg);
                     break;
         }
         waitList.remove(cm.key);
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java	Sat Dec 30 20:11:36 2017 +0900
+++ b/src/main/java/christie/datagear/RemoteDataGearManager.java	Tue Jan 09 17:37:43 2018 +0900
@@ -1,9 +1,53 @@
 package christie.datagear;
 
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
 import christie.codegear.Command;
+import christie.codegear.CommandType;
+import christie.daemon.Connection;
+import christie.daemon.IncomingTcpConnection;
+import christie.daemon.OutboundTcpConnection;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
 
 public class RemoteDataGearManager implements DataGearManager{
 
+    Connection connection;
+
+    public RemoteDataGearManager(final String connectionName, final String address, final int port, CodeGearManager cgm) {
+        connection = new Connection();
+        connection.name = connectionName;
+        new Thread("Connect-" + connectionName) {
+            public void run() {
+                boolean connect = true;
+                do {
+                    try {
+                        SocketChannel sc = SocketChannel.open(new InetSocketAddress(address, port));
+                        connection.socket = sc.socket();
+                        connection.socket.setTcpNoDelay(true);
+                        connect = false;
+                    } catch (IOException e) {
+                        try {
+                            Thread.sleep(50);
+                        } catch (InterruptedException e1) {
+                            e1.printStackTrace();
+                        }
+                    }
+                } while (connect);
+                IncomingTcpConnection in = new IncomingTcpConnection(connection, cgm);
+                in.setName(connectionName+"-IncomingTcp");
+                in.setPriority(MAX_PRIORITY);
+                in.start();
+                OutboundTcpConnection out = new OutboundTcpConnection(connection);
+                out.setName(connectionName + "-OutboundTcp");
+                out.setPriority(MAX_PRIORITY);
+                out.start();
+            }
+        }.start();
+    }
+
     @Override
     public void take(Command cm) {
 
@@ -11,7 +55,9 @@
 
     @Override
     public void put(String key, Object data) {
-
+        //cgmNameを受け取れるようにする
+        Command cmd = new Command(new DataGear(data, data.getClass()), "first","local", key, CommandType.PUT);
+        connection.write(cmd);
     }
 
     @Override
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/test/Remote/RemotePutTest.java	Tue Jan 09 17:37:43 2018 +0900
@@ -0,0 +1,24 @@
+package christie.test.Remote;
+
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import christie.datagear.DataGear;
+import christie.test.TestLocal.TestCodeGear;
+
+public class RemotePutTest extends CodeGear {
+
+    @Take("hoge")
+    DataGear<Integer> hoge = new DataGear<>();
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        System.out.println(hoge.getData());
+
+        if (hoge.getData()!= 10){
+            cgm.setup(new TestCodeGear());
+            dgm("remote").put("hoge", hoge.getData() + 1);
+        }
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/test/Remote/StartTest.java	Tue Jan 09 17:37:43 2018 +0900
@@ -0,0 +1,20 @@
+package christie.test.Remote;
+
+import christie.codegear.CodeGearManager;
+import christie.codegear.StartCodeGear;
+
+public class StartTest extends StartCodeGear {
+
+    public static void main(String args[]){
+        christie.test.TestLocal.StartTest start = new christie.test.TestLocal.StartTest();
+        CodeGearManager cgm = start.createCGM("first");
+        cgm.setup(start);
+    }
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        cgm.setup(new RemotePutTest());
+        cgm.createRemoteDGM("remote","localhost", 1000);
+        dgm("remote").put("test", 1);
+    }
+}
--- a/src/main/java/christie/test/StartTest.java	Sat Dec 30 20:11:36 2017 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,25 +0,0 @@
-package christie.test;
-
-import christie.codegear.CodeGearManager;
-import christie.codegear.StartCodeGear;
-
-import java.util.HashMap;
-
-public class StartTest extends StartCodeGear{
-
-
-    public static void main(String args[]){
-        StartTest start = new StartTest();
-        CodeGearManager cgm = start.createCGM("first");
-        cgm.setup(start);
-    }
-
-    @Override
-    protected void run(CodeGearManager cgm) {
-        TestCodeGear cg = new TestCodeGear();
-        cgm.setup(cg);
-
-        localDGM.put("hoge", 1);
-
-    }
-}
--- a/src/main/java/christie/test/TestCodeGear.java	Sat Dec 30 20:11:36 2017 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,30 +0,0 @@
-package christie.test;
-
-import christie.annotation.Peek;
-import christie.annotation.Take;
-import christie.codegear.CodeGear;
-import christie.codegear.CodeGearManager;
-import christie.datagear.DataGear;
-
-import java.util.HashMap;
-
-
-/**
- * Created by e125769 on 12/7/17.
- */
-public class TestCodeGear extends CodeGear {
-
-    @Take("hoge")
-    public DataGear<Integer> hoge = new DataGear<>();
-
-    public void run(CodeGearManager cgm){
-        System.out.println(hoge.getData());
-
-        if (hoge.getData()!= 10){
-            cgm.setup(new TestCodeGear());
-        }
-
-        localDGM.put("hoge", hoge.getData() + 1);
-    }
-
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/test/TestLocal/StartTest.java	Tue Jan 09 17:37:43 2018 +0900
@@ -0,0 +1,23 @@
+package christie.test.TestLocal;
+
+import christie.codegear.CodeGearManager;
+import christie.codegear.StartCodeGear;
+
+public class StartTest extends StartCodeGear{
+
+
+    public static void main(String args[]){
+        StartTest start = new StartTest();
+        CodeGearManager cgm = start.createCGM("first");
+        cgm.setup(start);
+    }
+
+    @Override
+    protected void run(CodeGearManager cgm) {
+        TestCodeGear cg = new TestCodeGear();
+        cgm.setup(cg);
+
+        localDGM.put("hoge", 1);
+
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/test/TestLocal/TestCodeGear.java	Tue Jan 09 17:37:43 2018 +0900
@@ -0,0 +1,26 @@
+package christie.test.TestLocal;
+
+import christie.annotation.Take;
+import christie.codegear.CodeGear;
+import christie.codegear.CodeGearManager;
+import christie.datagear.DataGear;
+
+
+/**
+ * Created by e125769 on 12/7/17.
+ */
+public class TestCodeGear extends CodeGear {
+
+    @Take("hoge")
+    public DataGear<Integer> hoge = new DataGear<>();
+
+    public void run(CodeGearManager cgm){
+        System.out.println(hoge.getData());
+
+        if (hoge.getData()!= 10){
+            cgm.setup(new TestCodeGear());
+            localDGM.put("hoge", hoge.getData() + 1);
+        }
+    }
+
+}