Mercurial > hg > Database > Christie
changeset 12:b49a926cbdd9
add RemotePutTest and that is working
line wrap: on
line diff
--- a/build.gradle Sat Dec 30 20:11:36 2017 +0900 +++ b/build.gradle Tue Jan 09 17:37:43 2018 +0900 @@ -14,7 +14,9 @@ } dependencies { + compile fileTree(dir: 'lib', include: '*.jar') testCompile group: 'junit', name: 'junit', version: '4.21' + compile group: 'org.msgpack', name: 'msgpack', version: '0.6.12' } jar {
--- a/src/main/java/christie/codegear/CodeGear.java Sat Dec 30 20:11:36 2017 +0900 +++ b/src/main/java/christie/codegear/CodeGear.java Tue Jan 09 17:37:43 2018 +0900 @@ -42,8 +42,8 @@ idg.finishInput(cgm, commandList); } - public DataGearManager dgm(String dest) { - return cgm.getDGM(dest); + public DataGearManager dgm(String dsmName) { + return cgm.getDGM(dsmName); } public void checkAndSetCommand(Field field, String name){ @@ -63,6 +63,7 @@ throw new NullPointerException("please initialize DataGear"); } - commandList.add(new Command(this, dg, "local", name, CommandType.TAKE)); + //TODO:cgmName + commandList.add(new Command(this, dg, "first","local", name, CommandType.TAKE)); } }
--- a/src/main/java/christie/codegear/CodeGearManager.java Sat Dec 30 20:11:36 2017 +0900 +++ b/src/main/java/christie/codegear/CodeGearManager.java Tue Jan 09 17:37:43 2018 +0900 @@ -26,12 +26,12 @@ return dataGearManagers.get("local"); } - public DataGearManager getDGM(String dest){ - return dataGearManagers.get(dest); + public DataGearManager getDGM(String dsmName){ + return dataGearManagers.get(dsmName); } - public void createRemoteDataGeareManager(String dist){ - dataGearManagers.put(dist, new RemoteDataGearManager()); + public void createRemoteDGM(String dsmName, String address, int port){ + dataGearManagers.put(dsmName, new RemoteDataGearManager(dsmName, address, port, this)); } public void submit(CodeGear cg){
--- a/src/main/java/christie/codegear/Command.java Sat Dec 30 20:11:36 2017 +0900 +++ b/src/main/java/christie/codegear/Command.java Tue Jan 09 17:37:43 2018 +0900 @@ -1,20 +1,74 @@ package christie.codegear; +import christie.daemon.RemoteMessage; import christie.datagear.DataGear; +import org.msgpack.MessagePack; + +import java.io.IOException; +import java.nio.ByteBuffer; public class Command { - public CodeGear cs; + public CodeGear cg = null; public DataGear dg; - public String dest; + public String cgmName = "first"; + public String dsmName = "local"; public String key; public CommandType type; + private static final MessagePack packer = new MessagePack(); - public Command(CodeGear cs, DataGear dg, String dest, String key, CommandType type){ - this.cs = cs; + //for put + public Command(DataGear dg, String cgmName, String dsmName, String key, CommandType type){ this.dg = dg; - this.dest = dest; + this.cgmName = cgmName; + this.dsmName = dsmName; + this.key = key; + this.type = type; + } + + //for take + public Command(CodeGear cg, DataGear dg, String cgmName, String dsmName, String key, CommandType type){ + this.cg = cg; + this.dg = dg; + this.cgmName = cgmName; + this.dsmName = dsmName; this.key = key; this.type = type; } + public ByteBuffer convert() { + ByteBuffer buf = null; + + try { + byte[] command = null; + byte[] data = null; + byte[] dataSize = null; + + switch (type) { + case PUT: + case REPLY: + + RemoteMessage cm = new RemoteMessage(); + + data = dg.getMessagePack(); + + command = packer.write(cm); + dataSize = packer.write(data.length); + buf = ByteBuffer.allocate(command.length+dataSize.length+data.length); + buf.put(command); + buf.put(dataSize); + buf.put(data); + break; + default: + command = packer.write(new RemoteMessage()); + buf = ByteBuffer.allocate(command.length); + buf.put(command); + break; + } + + buf.flip(); + } catch (IOException e) { + e.printStackTrace(); + } + return buf; + } }
--- a/src/main/java/christie/codegear/CommandType.java Sat Dec 30 20:11:36 2017 +0900 +++ b/src/main/java/christie/codegear/CommandType.java Tue Jan 09 17:37:43 2018 +0900 @@ -5,5 +5,22 @@ public enum CommandType { PUT, TAKE, - PEEK; + PEEK, + REPLY, + CLOSE, + FINISH; + + public int id;//コマンドのid + public static HashMap<Integer, CommandType> hash = new HashMap<Integer, CommandType>();//コマンド対応表 + + + public static CommandType getCommandTypeFromId(int id) { + return hash.get(id); + } + + static { + for (CommandType type : CommandType.values()) { + hash.put(type.id, type); + } + } }
--- a/src/main/java/christie/codegear/InputDataGear.java Sat Dec 30 20:11:36 2017 +0900 +++ b/src/main/java/christie/codegear/InputDataGear.java Tue Jan 09 17:37:43 2018 +0900 @@ -31,7 +31,7 @@ } for(Command cm : commandList){ - cgm.getDGM(cm.dest).take(cm); + cgm.getDGM(cm.dsmName).take(cm); } }
--- a/src/main/java/christie/codegear/OutputDataGear.java Sat Dec 30 20:11:36 2017 +0900 +++ b/src/main/java/christie/codegear/OutputDataGear.java Tue Jan 09 17:37:43 2018 +0900 @@ -21,7 +21,7 @@ cgm.getDGM("local").put(key, data); } - public void put(String dist, String key, Object data){ - cgm.getDGM(dist).put(key, data); + public void put(String dsmName, String key, Object data){ + cgm.getDGM(dsmName).put(key, data); } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/daemon/Connection.java Tue Jan 09 17:37:43 2018 +0900 @@ -0,0 +1,56 @@ +package christie.daemon; + +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.concurrent.LinkedBlockingQueue; + +import christie.codegear.Command; + +public class Connection { + + public Socket socket; + public String name; + public LinkedBlockingQueue<Command> sendQueue = new LinkedBlockingQueue<Command>(); + public boolean sendManager = true; + + public Connection(Socket socket) { + this.socket = socket; + } + + public Connection() {} + + public void sendCommand(Command cmd) { + try { + sendQueue.put(cmd); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public String getInfoString() { + return socket.getInetAddress().getHostName() + + ":" + socket.getPort(); + } + + + public void close(){ + try { + socket.shutdownOutput(); + socket.shutdownInput(); + socket.close(); + } catch (Exception e) { } + //putConnectionInfo(); + + } + + public synchronized void write(Command cmd) { + ByteBuffer buffer = cmd.convert(); + try { + while (buffer.hasRemaining()) { + socket.getChannel().write(buffer); + } + } catch (Exception e) { + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/daemon/IncomingTcpConnection.java Tue Jan 09 17:37:43 2018 +0900 @@ -0,0 +1,79 @@ +package christie.daemon; + + +import christie.codegear.CodeGearManager; +import christie.codegear.Command; +import christie.codegear.CommandType; +import christie.datagear.DataGear; +import christie.datagear.RemoteDataGearManager; + +import org.msgpack.MessagePack; +import org.msgpack.unpacker.Unpacker; + +import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.util.concurrent.ConcurrentHashMap; + +public class IncomingTcpConnection extends Thread { + + ConcurrentHashMap<String, CodeGearManager> cgms; + Connection connection; + private static final MessagePack packer = new MessagePack(); + + public IncomingTcpConnection(Connection connection, CodeGearManager cgm) { + this.cgms = cgm.getCgms(); + this.connection = connection; + } + + public void run() { + Unpacker unpacker = null; + try { + unpacker = packer.createUnpacker(connection.socket.getInputStream()); + } catch (IOException e) { + e.printStackTrace(); + } + if (unpacker == null) { + return; + } + while (true) { + try { + Command cmd = null; + DataGear dg = null; + RemoteMessage msg = unpacker.read(RemoteMessage.class); + CommandType type = CommandType.getCommandTypeFromId(msg.type); + int dataSize = unpacker.readInt(); + byte[] data = new byte[dataSize]; + switch (type) { + case PUT: + connection.socket.getInputStream().read(data); + + cgms.get(msg.cgmName).getDGM().put(msg.key, data); + + break; + /* + case PEEK: + case TAKE: + cmd = new Command(null, dg, null, null, null); + + break; + case REPLY: + connection.socket.getInputStream().read(data); + dg = new DataGear(data, data.getClass()); + + Command rCmd = new Command(null, dg, null, null, null); + //cmd.cg.idg.reply(cmd.dg, rCmd); + + break;*/ + default: + break; + } + } catch (ClosedChannelException e) { + return; + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/daemon/OutboundTcpConnection.java Tue Jan 09 17:37:43 2018 +0900 @@ -0,0 +1,33 @@ +package christie.daemon; + +import christie.codegear.Command; + +public class OutboundTcpConnection extends Thread { + + Connection connection; + + public OutboundTcpConnection(Connection connection) { + this.connection = connection; + } + + public void run() { + while (true) { + try { + Command cmd = connection.sendQueue.take(); + switch (cmd.type) { + case CLOSE: + connection.close(); + return; + case FINISH: + System.exit(0); + return; + default: + break; + } + connection.write(cmd);//ここでconvert()がよばれてる + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/daemon/RemoteMessage.java Tue Jan 09 17:37:43 2018 +0900 @@ -0,0 +1,15 @@ +package christie.daemon; + +public class RemoteMessage { + public int type;//PUT, PEEKなどのコマンドタイプ + public String cgmName; + public String key;//DS key + + public RemoteMessage() {} + + public RemoteMessage(int type, String cgmName, String key) { + this.type = type; + this.cgmName = cgmName; + this.key = key; + } +}
--- a/src/main/java/christie/datagear/DataGear.java Sat Dec 30 20:11:36 2017 +0900 +++ b/src/main/java/christie/datagear/DataGear.java Tue Jan 09 17:37:43 2018 +0900 @@ -1,8 +1,15 @@ package christie.datagear; +import org.msgpack.MessagePack; + +import java.io.IOException; + public class DataGear<T>{ private T data; private Class<T> clazz; + private byte[] messagePack = null; + private int dataSize; + private MessagePack packer = new MessagePack(); public DataGear(T data, Class clazz) { setClazz(clazz); @@ -15,12 +22,33 @@ this.clazz = type; } + public DataGear(byte[] messagePack){ + this.data = null; + this.clazz = null; + this.messagePack = messagePack; + } + public void setData(T data) { if (data.getClass() == this.clazz){ this.data = data; } } + public byte[] getMessagePack(){ + if (messagePack != null){ + return messagePack; + } else { + try { + messagePack = packer.write(data); + setDataSize(messagePack.length); + } catch (IOException e) { + e.printStackTrace(); + } + + return messagePack; + } + } + public void setClazz(Class clazz){ this.clazz = clazz; }; @@ -30,7 +58,18 @@ }; public T getData(){ + if (data == null){ + try { + data = (T) packer.unconvert(messagePack); + } catch (IOException e) { + e.printStackTrace(); + } + } return data; }; + public void setDataSize(int dataSize) { + this.dataSize = dataSize; + } + }
--- a/src/main/java/christie/datagear/LocalDataGearManager.java Sat Dec 30 20:11:36 2017 +0900 +++ b/src/main/java/christie/datagear/LocalDataGearManager.java Tue Jan 09 17:37:43 2018 +0900 @@ -20,6 +20,10 @@ public void put(String key, Object data) { DataGear dg = new DataGear(data, data.getClass()); + put(key, dg); + } + + public void put(String key, DataGear dg){ if(dataGears.containsKey(key)){ dataGears.get(key).add(dg); } else { @@ -29,7 +33,7 @@ } if (waitList.containsKey(key)){ - waitList.get(key).dg.setData(data); + waitList.get(key).dg.setData(dg.getData()); runCommand(waitList.get(key)); } } @@ -38,14 +42,14 @@ public void runCommand(Command cm){ switch (cm.type){ case TAKE: - cm.cs.idg.setInputs(cm.key, cm.dg); + cm.cg.idg.setInputs(cm.key, cm.dg); dataGears.get(cm.key).poll(); if (dataGears.get(cm.key).isEmpty()){ dataGears.remove(cm.key); } break; case PEEK: - cm.cs.idg.setInputs(cm.key, cm.dg); + cm.cg.idg.setInputs(cm.key, cm.dg); break; } waitList.remove(cm.key);
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java Sat Dec 30 20:11:36 2017 +0900 +++ b/src/main/java/christie/datagear/RemoteDataGearManager.java Tue Jan 09 17:37:43 2018 +0900 @@ -1,9 +1,53 @@ package christie.datagear; +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; import christie.codegear.Command; +import christie.codegear.CommandType; +import christie.daemon.Connection; +import christie.daemon.IncomingTcpConnection; +import christie.daemon.OutboundTcpConnection; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; public class RemoteDataGearManager implements DataGearManager{ + Connection connection; + + public RemoteDataGearManager(final String connectionName, final String address, final int port, CodeGearManager cgm) { + connection = new Connection(); + connection.name = connectionName; + new Thread("Connect-" + connectionName) { + public void run() { + boolean connect = true; + do { + try { + SocketChannel sc = SocketChannel.open(new InetSocketAddress(address, port)); + connection.socket = sc.socket(); + connection.socket.setTcpNoDelay(true); + connect = false; + } catch (IOException e) { + try { + Thread.sleep(50); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } while (connect); + IncomingTcpConnection in = new IncomingTcpConnection(connection, cgm); + in.setName(connectionName+"-IncomingTcp"); + in.setPriority(MAX_PRIORITY); + in.start(); + OutboundTcpConnection out = new OutboundTcpConnection(connection); + out.setName(connectionName + "-OutboundTcp"); + out.setPriority(MAX_PRIORITY); + out.start(); + } + }.start(); + } + @Override public void take(Command cm) { @@ -11,7 +55,9 @@ @Override public void put(String key, Object data) { - + //cgmNameを受け取れるようにする + Command cmd = new Command(new DataGear(data, data.getClass()), "first","local", key, CommandType.PUT); + connection.write(cmd); } @Override
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/Remote/RemotePutTest.java Tue Jan 09 17:37:43 2018 +0900 @@ -0,0 +1,24 @@ +package christie.test.Remote; + +import christie.annotation.Take; +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; +import christie.datagear.DataGear; +import christie.test.TestLocal.TestCodeGear; + +public class RemotePutTest extends CodeGear { + + @Take("hoge") + DataGear<Integer> hoge = new DataGear<>(); + + @Override + protected void run(CodeGearManager cgm) { + System.out.println(hoge.getData()); + + if (hoge.getData()!= 10){ + cgm.setup(new TestCodeGear()); + dgm("remote").put("hoge", hoge.getData() + 1); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/Remote/StartTest.java Tue Jan 09 17:37:43 2018 +0900 @@ -0,0 +1,20 @@ +package christie.test.Remote; + +import christie.codegear.CodeGearManager; +import christie.codegear.StartCodeGear; + +public class StartTest extends StartCodeGear { + + public static void main(String args[]){ + christie.test.TestLocal.StartTest start = new christie.test.TestLocal.StartTest(); + CodeGearManager cgm = start.createCGM("first"); + cgm.setup(start); + } + + @Override + protected void run(CodeGearManager cgm) { + cgm.setup(new RemotePutTest()); + cgm.createRemoteDGM("remote","localhost", 1000); + dgm("remote").put("test", 1); + } +}
--- a/src/main/java/christie/test/StartTest.java Sat Dec 30 20:11:36 2017 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,25 +0,0 @@ -package christie.test; - -import christie.codegear.CodeGearManager; -import christie.codegear.StartCodeGear; - -import java.util.HashMap; - -public class StartTest extends StartCodeGear{ - - - public static void main(String args[]){ - StartTest start = new StartTest(); - CodeGearManager cgm = start.createCGM("first"); - cgm.setup(start); - } - - @Override - protected void run(CodeGearManager cgm) { - TestCodeGear cg = new TestCodeGear(); - cgm.setup(cg); - - localDGM.put("hoge", 1); - - } -}
--- a/src/main/java/christie/test/TestCodeGear.java Sat Dec 30 20:11:36 2017 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,30 +0,0 @@ -package christie.test; - -import christie.annotation.Peek; -import christie.annotation.Take; -import christie.codegear.CodeGear; -import christie.codegear.CodeGearManager; -import christie.datagear.DataGear; - -import java.util.HashMap; - - -/** - * Created by e125769 on 12/7/17. - */ -public class TestCodeGear extends CodeGear { - - @Take("hoge") - public DataGear<Integer> hoge = new DataGear<>(); - - public void run(CodeGearManager cgm){ - System.out.println(hoge.getData()); - - if (hoge.getData()!= 10){ - cgm.setup(new TestCodeGear()); - } - - localDGM.put("hoge", hoge.getData() + 1); - } - -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/TestLocal/StartTest.java Tue Jan 09 17:37:43 2018 +0900 @@ -0,0 +1,23 @@ +package christie.test.TestLocal; + +import christie.codegear.CodeGearManager; +import christie.codegear.StartCodeGear; + +public class StartTest extends StartCodeGear{ + + + public static void main(String args[]){ + StartTest start = new StartTest(); + CodeGearManager cgm = start.createCGM("first"); + cgm.setup(start); + } + + @Override + protected void run(CodeGearManager cgm) { + TestCodeGear cg = new TestCodeGear(); + cgm.setup(cg); + + localDGM.put("hoge", 1); + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/TestLocal/TestCodeGear.java Tue Jan 09 17:37:43 2018 +0900 @@ -0,0 +1,26 @@ +package christie.test.TestLocal; + +import christie.annotation.Take; +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; +import christie.datagear.DataGear; + + +/** + * Created by e125769 on 12/7/17. + */ +public class TestCodeGear extends CodeGear { + + @Take("hoge") + public DataGear<Integer> hoge = new DataGear<>(); + + public void run(CodeGearManager cgm){ + System.out.println(hoge.getData()); + + if (hoge.getData()!= 10){ + cgm.setup(new TestCodeGear()); + localDGM.put("hoge", hoge.getData() + 1); + } + } + +}