Mercurial > hg > Database > Christie
changeset 21:5baccb8f7fbd
add RemoteTake, but it isn't working
line wrap: on
line diff
--- a/src/main/java/christie/codegear/CodeGear.java Mon Jan 22 15:34:38 2018 +0900 +++ b/src/main/java/christie/codegear/CodeGear.java Mon Jan 22 23:22:09 2018 +0900 @@ -1,14 +1,15 @@ package christie.codegear; import christie.annotation.Peek; +import christie.annotation.RemoteTake; import christie.annotation.Take; -import christie.daemon.ChristieDaemon; +import christie.datagear.Command; +import christie.datagear.CommandType; import christie.datagear.DataGear; import christie.datagear.DataGearManager; import java.lang.reflect.Field; import java.util.ArrayList; -import java.util.HashMap; /** * Created by e125769 on 12/7/17. @@ -33,19 +34,22 @@ for (Field field : this.getClass().getDeclaredFields()) {//AnnotationからInputDataGearをセット if (field.isAnnotationPresent(Take.class)) { Take ano = field.getAnnotation(Take.class); - checkAndSetCommand(field, ano.value()); + checkAndSetCommand(field, "local", ano.value()); } else if (field.isAnnotationPresent(Peek.class)) { Peek ano = field.getAnnotation(Peek.class); - checkAndSetCommand(field, ano.value()); + checkAndSetCommand(field, "local", ano.value()); + } else if (field.isAnnotationPresent(RemoteTake.class)) { + RemoteTake ano = field.getAnnotation(RemoteTake.class); + checkAndSetCommand(field, ano.dsmName(), ano.key()); } } idg.finishInput(cgm, commandList); } - public void checkAndSetCommand(Field field, String name){ + public void checkAndSetCommand(Field field, String dsmName, String key){ - if (!field.getName().equals(name)){ + if (!field.getName().equals(key)){ throw new IllegalArgumentException("key and DataGearName do not match"); } @@ -60,7 +64,7 @@ throw new NullPointerException("please initialize DataGear"); } - commandList.add(new Command(this, dg, cgm.cgmID,"local", name, CommandType.TAKE)); + commandList.add(new Command(CommandType.TAKE, this, cgm.cgmID, dsmName, key, dg)); } public DataGearManager getLocalDGM() {
--- a/src/main/java/christie/codegear/Command.java Mon Jan 22 15:34:38 2018 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,76 +0,0 @@ -package christie.codegear; - -import christie.daemon.RemoteMessage; -import christie.datagear.DataGear; -import christie.datagear.MessagePackDataGear; -import org.msgpack.MessagePack; - -import java.io.IOException; -import java.nio.ByteBuffer; - -public class Command { - public CodeGear cg = null; - public DataGear dg; - public int cgmID = 1; - public String dgmName = "local"; - public String key; - public CommandType type; - public Class clazz = null; - - //for put - public Command(DataGear dg, int cgmID, String dgmName, String key, CommandType type, Class clazz){ - this.dg = dg; - this.cgmID = cgmID; - this.dgmName = dgmName; - this.key = key; - this.type = type; - this.clazz = clazz; - } - - //for take - public Command(CodeGear cg, DataGear dg, int cgmID, String dgmName, String key, CommandType type){ - this.cg = cg; - this.dg = dg; - this.cgmID = cgmID; - this.dgmName = dgmName; - this.key = key; - this.type = type; - } - - public ByteBuffer convert() { - ByteBuffer buf = null; - - try { - byte[] command = null; - byte[] data = null; - byte[] dataSize = null; - MessagePack packer = new MessagePack(); - - switch (type) { - case PUT: - case REPLY: - RemoteMessage mes = new RemoteMessage(type.id, cgmID, key, clazz.getName()); - - data = new MessagePackDataGear(dg.getData(), dg.getClazz()).getMessagePack(); - command = packer.write(mes); - dataSize = packer.write(data.length); - - buf = ByteBuffer.allocate(command.length+dataSize.length+data.length); - buf.put(command); - buf.put(dataSize); - buf.put(data); - break; - default: - command = packer.write(new RemoteMessage(type.id, cgmID, key, clazz.getName())); - buf = ByteBuffer.allocate(command.length); - buf.put(command); - break; - } - - buf.flip(); - } catch (IOException e) { - e.printStackTrace(); - } - return buf; - } -}
--- a/src/main/java/christie/codegear/CommandType.java Mon Jan 22 15:34:38 2018 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,34 +0,0 @@ -package christie.codegear; - -import java.util.HashMap; - -public enum CommandType { - PUT, - TAKE, - PEEK, - REPLY, - CLOSE, - FINISH; - - public int id;//コマンドのid - public static HashMap<Integer, CommandType> hash = new HashMap<Integer, CommandType>();//コマンド対応表 - private static int lastId = 0;//コマンドの総数 - - private CommandType() { - this.id = incrementLastId(); - }//for init - - private int incrementLastId() { - return ++lastId; - } - - public static CommandType getCommandTypeFromId(int id) { - return hash.get(id); - } - - static { - for (CommandType type : CommandType.values()) { - hash.put(type.id, type); - } - } -}
--- a/src/main/java/christie/codegear/InputDataGear.java Mon Jan 22 15:34:38 2018 +0900 +++ b/src/main/java/christie/codegear/InputDataGear.java Mon Jan 22 23:22:09 2018 +0900 @@ -2,13 +2,12 @@ import christie.annotation.Peek; import christie.annotation.Take; +import christie.datagear.Command; import christie.datagear.DataGear; import java.lang.reflect.Field; import java.util.ArrayList; -import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; /**
--- a/src/main/java/christie/daemon/Connection.java Mon Jan 22 15:34:38 2018 +0900 +++ b/src/main/java/christie/daemon/Connection.java Mon Jan 22 23:22:09 2018 +0900 @@ -4,7 +4,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.LinkedBlockingQueue; -import christie.codegear.Command; +import christie.datagear.Command; public class Connection {
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java Mon Jan 22 15:34:38 2018 +0900 +++ b/src/main/java/christie/daemon/IncomingTcpConnection.java Mon Jan 22 23:22:09 2018 +0900 @@ -2,9 +2,8 @@ import christie.codegear.CodeGearManager; -import christie.codegear.Command; -import christie.codegear.CommandType; -import christie.datagear.DataGear; +import christie.datagear.Command; +import christie.datagear.CommandType; import christie.datagear.MessagePackDataGear; import christie.datagear.RemoteDataGearManager; @@ -17,13 +16,18 @@ public class IncomingTcpConnection extends Thread { + RemoteDataGearManager manager; ConcurrentHashMap<Integer, CodeGearManager> cgms; Connection connection; private MessagePack packer = new MessagePack(); public IncomingTcpConnection(Connection connection, CodeGearManager cgm) { + this.connection = connection; this.cgms = cgm.getCgms(); - this.connection = connection; + } + + public void setManager(RemoteDataGearManager manager){ + this.manager = manager; } public void run() { @@ -60,20 +64,31 @@ } break; - /* + case PEEK: case TAKE: - cmd = new Command(null, dg, null, null, null); + //cmd = new Command(null, dg, null, null, null, connection); + + if (cgms.containsKey(msg.cgmID)){ + cgms.get(msg.cgmID).getLocalDGM().take(cmd); + } else { + throw new IllegalArgumentException("DGM_ID:" + msg.cgmID + "is not found"); + } + + //connection.write(cmd); break; - case REPLY: + case REPLY://待っていたwaitListに渡してcsにセット connection.socket.getInputStream().read(data); - dg = new DataGear(data, data.getClass()); + try { + MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz)); + //waitListからcommandをとりだし値をセット + //resolveWait(cmd); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } - Command rCmd = new Command(null, dg, null, null, null); - //cmd.cg.idg.reply(cmd.dg, rCmd); - - break;*/ + break; default: break; }
--- a/src/main/java/christie/daemon/OutboundTcpConnection.java Mon Jan 22 15:34:38 2018 +0900 +++ b/src/main/java/christie/daemon/OutboundTcpConnection.java Mon Jan 22 23:22:09 2018 +0900 @@ -1,6 +1,6 @@ package christie.daemon; -import christie.codegear.Command; +import christie.datagear.Command; public class OutboundTcpConnection extends Thread {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/datagear/Command.java Mon Jan 22 23:22:09 2018 +0900 @@ -0,0 +1,82 @@ +package christie.datagear; + +import christie.codegear.CodeGear; +import christie.daemon.Connection; +import christie.daemon.RemoteMessage; +import org.msgpack.MessagePack; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class Command { + public CodeGear cg = null; + public DataGear dg; + public int cgmID = 1; + public String dgmName = "local"; + public String key; + public CommandType type; + public Class clazz = null; + public Connection connection = null; + + //for put/reply + public Command(CommandType type, int cgmID, String dgmName, String key, DataGear dg, Class clazz){ + this.dg = dg; + this.cgmID = cgmID; + this.dgmName = dgmName; + this.key = key; + this.type = type; + this.clazz = clazz; + } + + //for take + public Command(CommandType type, CodeGear cg, int cgmID, String dgmName, String key, DataGear dg){ + this.cg = cg; + this.dg = dg; + this.cgmID = cgmID; + this.dgmName = dgmName; + this.key = key; + this.type = type; + this.connection = connection; + } + + public void setConnection(Connection connection) { + this.connection = connection; + } + + public ByteBuffer convert() { + ByteBuffer buf = null; + + try { + byte[] command = null; + byte[] data = null; + byte[] dataSize = null; + MessagePack packer = new MessagePack(); + + switch (type) { + case PUT: + case REPLY: + RemoteMessage mes = new RemoteMessage(type.id, cgmID, key, clazz.getName()); + + data = new MessagePackDataGear(dg.getData(), dg.getClazz()).getMessagePack(); + command = packer.write(mes); + dataSize = packer.write(data.length); + + buf = ByteBuffer.allocate(command.length+dataSize.length+data.length); + buf.put(command); + buf.put(dataSize); + buf.put(data); + break; + default: + command = packer.write(new RemoteMessage(type.id, cgmID, key, clazz.getName())); + buf = ByteBuffer.allocate(command.length); + buf.put(command); + break; + } + + buf.flip(); + } catch (IOException e) { + e.printStackTrace(); + } + return buf; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/datagear/CommandType.java Mon Jan 22 23:22:09 2018 +0900 @@ -0,0 +1,34 @@ +package christie.datagear; + +import java.util.HashMap; + +public enum CommandType { + PUT, + TAKE, + PEEK, + REPLY, + CLOSE, + FINISH; + + public int id;//コマンドのid + public static HashMap<Integer, CommandType> hash = new HashMap<Integer, CommandType>();//コマンド対応表 + private static int lastId = 0;//コマンドの総数 + + private CommandType() { + this.id = incrementLastId(); + }//for init + + private int incrementLastId() { + return ++lastId; + } + + public static CommandType getCommandTypeFromId(int id) { + return hash.get(id); + } + + static { + for (CommandType type : CommandType.values()) { + hash.put(type.id, type); + } + } +}
--- a/src/main/java/christie/datagear/DataGearManager.java Mon Jan 22 15:34:38 2018 +0900 +++ b/src/main/java/christie/datagear/DataGearManager.java Mon Jan 22 23:22:09 2018 +0900 @@ -1,12 +1,5 @@ package christie.datagear; -import christie.codegear.CodeGear; -import christie.codegear.Command; -import christie.datagear.DataGear; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Queue; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue;
--- a/src/main/java/christie/datagear/LocalDataGearManager.java Mon Jan 22 15:34:38 2018 +0900 +++ b/src/main/java/christie/datagear/LocalDataGearManager.java Mon Jan 22 23:22:09 2018 +0900 @@ -1,11 +1,5 @@ package christie.datagear; -import christie.codegear.Command; -import christie.codegear.CommandType; - -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; /** @@ -32,13 +26,13 @@ } public void put(String key, DataGear dg){ - runCommand(new Command(dg, 1, "local", key, CommandType.PUT, dg.getClazz())); + runCommand(new Command(CommandType.PUT, 1, "local", key, dg, dg.getClazz())); } public synchronized void runCommand(Command cm){ - switch (cm.type){ + switch (cm.type) { case PUT: - if(dataGears.containsKey(cm.key)){ + if (dataGears.containsKey(cm.key)) { dataGears.get(cm.key).add(cm.dg); } else { LinkedBlockingQueue<DataGear> queue = new LinkedBlockingQueue<DataGear>(); @@ -46,27 +40,29 @@ dataGears.put(cm.key, queue); } - if (waitList.containsKey(cm.key)){ - runCommand(waitList.get(cm.key).poll());//待ちコマンドの先頭をとる - if (waitList.get(cm.key).isEmpty()){ - waitList.remove(cm.key); - } - } + resolveWait(cm.key); + break; case TAKE: - dataGears.get(cm.key).peek(); cm.dg.setData(dataGears.get(cm.key).poll().getData()); - if (dataGears.get(cm.key).isEmpty()){ + if (dataGears.get(cm.key).isEmpty()) { dataGears.remove(cm.key); } - cm.cg.getIdg().setInputs(cm.key, cm.dg); + if (cm.connection == null){//localからならcgにデータをセット + cm.cg.getIdg().setInputs(cm.key, cm.dg); + } else {//remoteからならREPLY + //runCommand(new Command(CommandType.REPLY, cm.dg, cm.connection)); + } + break; case PEEK: cm.dg.setData(dataGears.get(cm.key).peek().getData()); cm.cg.getIdg().setInputs(cm.key, cm.dg); break; - + case REPLY: + //cm.connection.write(cm); + break; } } @@ -81,9 +77,12 @@ } } - private void deleteElement(Map<String, LinkedBlockingQueue<Object>> map, String key){ - if (map.get(key).isEmpty()){ - map.remove(key); + private void resolveWait(String key){ + if (waitList.containsKey(key)){ + runCommand(waitList.get(key).poll());//待ちコマンドの先頭をとる + if (waitList.get(key).isEmpty()){ + waitList.remove(key); + } } } }
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java Mon Jan 22 15:34:38 2018 +0900 +++ b/src/main/java/christie/datagear/RemoteDataGearManager.java Mon Jan 22 23:22:09 2018 +0900 @@ -1,9 +1,6 @@ package christie.datagear; -import christie.codegear.CodeGear; import christie.codegear.CodeGearManager; -import christie.codegear.Command; -import christie.codegear.CommandType; import christie.daemon.Connection; import christie.daemon.IncomingTcpConnection; import christie.daemon.OutboundTcpConnection; @@ -11,14 +8,19 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; +import java.util.concurrent.LinkedBlockingQueue; public class RemoteDataGearManager extends DataGearManager{ - Connection connection; + public Connection connection; + public CodeGearManager cgm; public RemoteDataGearManager(final String connectionName, final String address, final int port, CodeGearManager cgm) { connection = new Connection(); connection.name = connectionName; + this.cgm = cgm; + RemoteDataGearManager manager = this; + new Thread("Connect-" + connectionName) { public void run() { boolean connect = true; @@ -37,6 +39,7 @@ } } while (connect); IncomingTcpConnection in = new IncomingTcpConnection(connection, cgm); + in.setManager(manager); in.setName(connectionName+"-IncomingTcp"); in.setPriority(MAX_PRIORITY); in.start(); @@ -50,12 +53,13 @@ @Override public void take(Command cm) { - + cm.setConnection(connection); + connection.write(cm); } public void metaPut(int cgmID, String key, Object data){//meta - Command cmd = new Command(new DataGear(data, data.getClass()), cgmID,"local", key, CommandType.PUT, data.getClass()); - connection.write(cmd); + Command cm = new Command(CommandType.PUT, cgmID,"local", key, new DataGear(data, data.getClass()), data.getClass()); + connection.write(cm); } @Override @@ -69,7 +73,13 @@ } @Override - public void addWaitList(Command command) { - + public void addWaitList(Command cm) { + if(waitList.containsKey(cm.key)){ + waitList.get(cm.key).add(cm); + } else { + LinkedBlockingQueue<Command> queue = new LinkedBlockingQueue<>(); + queue.add(cm); + waitList.put(cm.key, queue); + } } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/RemoteTake/CreateRemoteTakeTest.java Mon Jan 22 23:22:09 2018 +0900 @@ -0,0 +1,20 @@ +package christie.test.RemoteTake; + +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; +import christie.test.Remote.RemotePutTest; + +public class CreateRemoteTakeTest extends CodeGear{ + + int cgmID; + + public CreateRemoteTakeTest(int cgmID) { + this.cgmID = cgmID; + } + + @Override + protected void run(CodeGearManager cgm) { + cgm.setup(new RemoteTakeTest(cgmID)); + getDGM("remote").metaPut(cgmID,"hoge", 1); + } +}
--- a/src/main/java/christie/test/RemoteTake/CreateRemoteTakeTeste.java Mon Jan 22 15:34:38 2018 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,4 +0,0 @@ -package christie.test.RemoteTake; - -public class CreateRemoteTakeTeste { -}
--- a/src/main/java/christie/test/RemoteTake/RemoteTakeTest.java Mon Jan 22 15:34:38 2018 +0900 +++ b/src/main/java/christie/test/RemoteTake/RemoteTakeTest.java Mon Jan 22 23:22:09 2018 +0900 @@ -9,11 +9,18 @@ @RemoteTake(dsmName = "remote",key = "hoge") public DataGear<Integer> hoge = new DataGear<>(); + int cgmID; + + public RemoteTakeTest(int cgmID) { + this.cgmID = cgmID; + } + + @Override protected void run(CodeGearManager cgm) { if (hoge.getData() != 10){ System.out.println(hoge.getData()); - cgm.setup(new RemoteTakeTest()); + cgm.setup(new RemoteTakeTest(cgmID)); getLocalDGM().put("hoge", hoge.getData() + 1); } }
--- a/src/main/java/christie/test/RemoteTake/StartRemoteTake.java Mon Jan 22 15:34:38 2018 +0900 +++ b/src/main/java/christie/test/RemoteTake/StartRemoteTake.java Mon Jan 22 23:22:09 2018 +0900 @@ -1,4 +1,23 @@ package christie.test.RemoteTake; -public class StartRemoteTake { +import christie.codegear.CodeGearManager; +import christie.codegear.StartCodeGear; +import christie.test.Remote.StartRemotePut; + +public class StartRemoteTake extends StartCodeGear{ + public StartRemoteTake(CodeGearManager cgm) { + super(cgm); + } + + public static void main(String args[]){ + CodeGearManager cgm = createCGM(10000); + new StartRemotePut(cgm); + + cgm.createRemoteDGM("remote", "localhost", 10001); + cgm.setup(new CreateRemoteTakeTest(2));//この時点でcgm"second"は作られていない→notifyAllで対処? + + CodeGearManager cgm2 = createCGM(10001); + cgm2.createRemoteDGM("remote", "localhost", 10000); + cgm2.setup(new CreateRemoteTakeTest(1)); + } }