Mercurial > hg > Database > Christie
changeset 22:77583ea56656
add WaitList and implement RemoteTake but not work
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 23 Jan 2018 22:02:54 +0900 |
parents | 5baccb8f7fbd |
children | 695705dba324 |
files | src/main/java/christie/codegear/CodeGear.java src/main/java/christie/daemon/Connection.java src/main/java/christie/daemon/IncomingTcpConnection.java src/main/java/christie/daemon/RemoteMessage.java src/main/java/christie/datagear/Command.java src/main/java/christie/datagear/DataGearManager.java src/main/java/christie/datagear/LocalDataGearManager.java src/main/java/christie/datagear/RemoteDataGearManager.java src/main/java/christie/datagear/WaitList.java src/main/java/christie/test/RemoteTake/CreateRemoteTakeTest.java src/main/java/christie/test/RemoteTake/StartRemoteTake.java |
diffstat | 11 files changed, 103 insertions(+), 79 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/christie/codegear/CodeGear.java Mon Jan 22 23:22:09 2018 +0900 +++ b/src/main/java/christie/codegear/CodeGear.java Tue Jan 23 22:02:54 2018 +0900 @@ -64,7 +64,7 @@ throw new NullPointerException("please initialize DataGear"); } - commandList.add(new Command(CommandType.TAKE, this, cgm.cgmID, dsmName, key, dg)); + commandList.add(new Command(CommandType.TAKE, this, cgm.cgmID, dsmName, key, dg, dg.getClazz(), null)); } public DataGearManager getLocalDGM() {
--- a/src/main/java/christie/daemon/Connection.java Mon Jan 22 23:22:09 2018 +0900 +++ b/src/main/java/christie/daemon/Connection.java Tue Jan 23 22:02:54 2018 +0900 @@ -39,8 +39,6 @@ socket.shutdownInput(); socket.close(); } catch (Exception e) { } - //putConnectionInfo(); - } public synchronized void write(Command cmd) {
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java Mon Jan 22 23:22:09 2018 +0900 +++ b/src/main/java/christie/daemon/IncomingTcpConnection.java Tue Jan 23 22:02:54 2018 +0900 @@ -2,10 +2,7 @@ import christie.codegear.CodeGearManager; -import christie.datagear.Command; -import christie.datagear.CommandType; -import christie.datagear.MessagePackDataGear; -import christie.datagear.RemoteDataGearManager; +import christie.datagear.*; import org.msgpack.MessagePack; import org.msgpack.unpacker.Unpacker; @@ -67,7 +64,7 @@ case PEEK: case TAKE: - //cmd = new Command(null, dg, null, null, null, connection); + cmd = new Command(type, null, msg.cgmID, msg.dgmName, msg.key, null, null, connection); if (cgms.containsKey(msg.cgmID)){ cgms.get(msg.cgmID).getLocalDGM().take(cmd); @@ -75,15 +72,17 @@ throw new IllegalArgumentException("DGM_ID:" + msg.cgmID + "is not found"); } - //connection.write(cmd); - break; case REPLY://待っていたwaitListに渡してcsにセット connection.socket.getInputStream().read(data); try { MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz)); - //waitListからcommandをとりだし値をセット - //resolveWait(cmd); + + DataGearManager dgm = cgms.get(msg.cgmID).getDGM(msg.dgmName); + Command cm = dgm.waitList.getAndRemoveCommand(msg.key); + cm.dg = dg; + dgm.runCommand(cm); + } catch (ClassNotFoundException e) { e.printStackTrace(); }
--- a/src/main/java/christie/daemon/RemoteMessage.java Mon Jan 22 23:22:09 2018 +0900 +++ b/src/main/java/christie/daemon/RemoteMessage.java Tue Jan 23 22:02:54 2018 +0900 @@ -5,15 +5,17 @@ @Message public class RemoteMessage { public int type;//PUT, PEEKなどのコマンドタイプ - public int cgmID; - public String key;//DS key + public int cgmID;//Localが複数上がっている場合どのLocalか + public String dgmName;//送り元のdsmName。REPLYのときに使用。 + public String key; public String clazz; - public RemoteMessage(){} + public RemoteMessage(){}//for messagePack - public RemoteMessage(int type, int cgmID, String key, String clazz) { + public RemoteMessage(int type, int cgmID, String dgmName, String key, String clazz) { this.type = type; this.cgmID = cgmID; + this.dgmName = dgmName; this.key = key; this.clazz = clazz; }
--- a/src/main/java/christie/datagear/Command.java Mon Jan 22 23:22:09 2018 +0900 +++ b/src/main/java/christie/datagear/Command.java Tue Jan 23 22:02:54 2018 +0900 @@ -12,34 +12,36 @@ public CodeGear cg = null; public DataGear dg; public int cgmID = 1; - public String dgmName = "local"; + public String dgmName; public String key; public CommandType type; public Class clazz = null; public Connection connection = null; - //for put/reply + //ToDo:merge put/reply + //for put public Command(CommandType type, int cgmID, String dgmName, String key, DataGear dg, Class clazz){ - this.dg = dg; + this.type = type; this.cgmID = cgmID; this.dgmName = dgmName; this.key = key; - this.type = type; + this.dg = dg; this.clazz = clazz; } - //for take - public Command(CommandType type, CodeGear cg, int cgmID, String dgmName, String key, DataGear dg){ - this.cg = cg; + //for take/reply + public Command(CommandType type, CodeGear cg, int cgmID, String dgmName, String key, DataGear dg, Class clazz, Connection cn){ + this.type = type; + this.cg = cg;//who wait + this.cgmID = cgmID;//where from + this.dgmName = dgmName;//where from + this.key = key; this.dg = dg; - this.cgmID = cgmID; - this.dgmName = dgmName; - this.key = key; - this.type = type; - this.connection = connection; + this.clazz = clazz; + this.connection = cn; } - public void setConnection(Connection connection) { + public void setConnection(Connection connection) {//for remote take this.connection = connection; } @@ -50,12 +52,13 @@ byte[] command = null; byte[] data = null; byte[] dataSize = null; + RemoteMessage mes; MessagePack packer = new MessagePack(); switch (type) { case PUT: case REPLY: - RemoteMessage mes = new RemoteMessage(type.id, cgmID, key, clazz.getName()); + mes = new RemoteMessage(type.id, cgmID, dgmName, key, clazz.getName()); data = new MessagePackDataGear(dg.getData(), dg.getClazz()).getMessagePack(); command = packer.write(mes); @@ -67,7 +70,8 @@ buf.put(data); break; default: - command = packer.write(new RemoteMessage(type.id, cgmID, key, clazz.getName())); + mes = new RemoteMessage(type.id, cgmID, dgmName, key, clazz.getName()); + command = packer.write(mes); buf = ByteBuffer.allocate(command.length); buf.put(command); break;
--- a/src/main/java/christie/datagear/DataGearManager.java Mon Jan 22 23:22:09 2018 +0900 +++ b/src/main/java/christie/datagear/DataGearManager.java Tue Jan 23 22:02:54 2018 +0900 @@ -10,10 +10,10 @@ */ public abstract class DataGearManager { protected TreeMap<String, LinkedBlockingQueue<DataGear>> dataGears = new TreeMap<String, LinkedBlockingQueue<DataGear>>(); - protected ConcurrentHashMap<String, LinkedBlockingQueue<Command>> waitList = new ConcurrentHashMap<>(); + public WaitList waitList = new WaitList(); public abstract void take(Command cm); public abstract void put(String key, Object data); public abstract void metaPut(int cgmID, String key, Object data); public abstract void runCommand(Command cm); - public abstract void addWaitList(Command command); + }
--- a/src/main/java/christie/datagear/LocalDataGearManager.java Mon Jan 22 23:22:09 2018 +0900 +++ b/src/main/java/christie/datagear/LocalDataGearManager.java Tue Jan 23 22:02:54 2018 +0900 @@ -11,7 +11,7 @@ if (dataGears.containsKey(cm.key)) { runCommand(cm); } else { - addWaitList(cm); + waitList.add(cm); } } @@ -40,7 +40,9 @@ dataGears.put(cm.key, queue); } - resolveWait(cm.key); + if (waitList.containsKey(cm.key)){ + runCommand(waitList.getAndRemoveCommand(cm.key)); + } break; case TAKE: @@ -51,38 +53,24 @@ if (cm.connection == null){//localからならcgにデータをセット cm.cg.getIdg().setInputs(cm.key, cm.dg); - } else {//remoteからならREPLY - //runCommand(new Command(CommandType.REPLY, cm.dg, cm.connection)); + } else {//remoteからならREPLY...ここはRemoteDGMに書くべき? + runCommand(new Command(CommandType.REPLY, null, cm.cgmID, cm.dgmName, cm.key, cm.dg, cm.dg.getClazz(), cm.connection)); } break; case PEEK: cm.dg.setData(dataGears.get(cm.key).peek().getData()); - cm.cg.getIdg().setInputs(cm.key, cm.dg); + + if (cm.connection == null) { + cm.cg.getIdg().setInputs(cm.key, cm.dg); + } else { + //ToDo:implement + } + break; case REPLY: - //cm.connection.write(cm); + cm.connection.write(cm); break; } } - - @Override - public void addWaitList(Command cm) { - if(waitList.containsKey(cm.key)){ - waitList.get(cm.key).add(cm); - } else { - LinkedBlockingQueue<Command> queue = new LinkedBlockingQueue<>(); - queue.add(cm); - waitList.put(cm.key, queue); - } - } - - private void resolveWait(String key){ - if (waitList.containsKey(key)){ - runCommand(waitList.get(key).poll());//待ちコマンドの先頭をとる - if (waitList.get(key).isEmpty()){ - waitList.remove(key); - } - } - } }
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java Mon Jan 22 23:22:09 2018 +0900 +++ b/src/main/java/christie/datagear/RemoteDataGearManager.java Tue Jan 23 22:02:54 2018 +0900 @@ -15,13 +15,13 @@ public Connection connection; public CodeGearManager cgm; - public RemoteDataGearManager(final String connectionName, final String address, final int port, CodeGearManager cgm) { + public RemoteDataGearManager(final String dgmName, final String address, final int port, CodeGearManager cgm) { connection = new Connection(); - connection.name = connectionName; + connection.name = dgmName; this.cgm = cgm; RemoteDataGearManager manager = this; - new Thread("Connect-" + connectionName) { + new Thread("Connect-" + dgmName) { public void run() { boolean connect = true; do { @@ -40,11 +40,12 @@ } while (connect); IncomingTcpConnection in = new IncomingTcpConnection(connection, cgm); in.setManager(manager); - in.setName(connectionName+"-IncomingTcp"); + in.setName(dgmName+"-IncomingTcp"); + in.setPriority(MAX_PRIORITY); in.setPriority(MAX_PRIORITY); in.start(); OutboundTcpConnection out = new OutboundTcpConnection(connection); - out.setName(connectionName + "-OutboundTcp"); + out.setName(dgmName + "-OutboundTcp"); out.setPriority(MAX_PRIORITY); out.start(); } @@ -53,7 +54,9 @@ @Override public void take(Command cm) { + cm.dgmName = connection.name;//送信元 cm.setConnection(connection); + waitList.add(cm); connection.write(cm); } @@ -69,17 +72,13 @@ @Override public void runCommand(Command cm) { - + switch (cm.type) { + case PUT: + break; + case TAKE: + cm.cg.getIdg().setInputs(cm.key, cm.dg); + break; + } } - @Override - public void addWaitList(Command cm) { - if(waitList.containsKey(cm.key)){ - waitList.get(cm.key).add(cm); - } else { - LinkedBlockingQueue<Command> queue = new LinkedBlockingQueue<>(); - queue.add(cm); - waitList.put(cm.key, queue); - } - } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/datagear/WaitList.java Tue Jan 23 22:02:54 2018 +0900 @@ -0,0 +1,34 @@ +package christie.datagear; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +public class WaitList { + + private ConcurrentHashMap<String, LinkedBlockingQueue<Command>> waitList = new ConcurrentHashMap<>(); + + public void add(Command cm) { + if(waitList.containsKey(cm.key)){ + waitList.get(cm.key).add(cm); + } else { + LinkedBlockingQueue<Command> queue = new LinkedBlockingQueue<>(); + queue.add(cm); + waitList.put(cm.key, queue); + } + } + + public Command getAndRemoveCommand(String key) { + Command cm = null; + if (waitList.containsKey(key)){ + cm = waitList.get(key).poll(); + if (waitList.get(key).isEmpty()){ + waitList.remove(key); + } + } + return cm; + } + + public boolean containsKey(String key){ + return waitList.containsKey(key); + } +}
--- a/src/main/java/christie/test/RemoteTake/CreateRemoteTakeTest.java Mon Jan 22 23:22:09 2018 +0900 +++ b/src/main/java/christie/test/RemoteTake/CreateRemoteTakeTest.java Tue Jan 23 22:02:54 2018 +0900 @@ -15,6 +15,6 @@ @Override protected void run(CodeGearManager cgm) { cgm.setup(new RemoteTakeTest(cgmID)); - getDGM("remote").metaPut(cgmID,"hoge", 1); + getLocalDGM().metaPut(cgmID,"hoge", 1); } }
--- a/src/main/java/christie/test/RemoteTake/StartRemoteTake.java Mon Jan 22 23:22:09 2018 +0900 +++ b/src/main/java/christie/test/RemoteTake/StartRemoteTake.java Tue Jan 23 22:02:54 2018 +0900 @@ -14,10 +14,10 @@ new StartRemotePut(cgm); cgm.createRemoteDGM("remote", "localhost", 10001); - cgm.setup(new CreateRemoteTakeTest(2));//この時点でcgm"second"は作られていない→notifyAllで対処? + cgm.setup(new CreateRemoteTakeTest(1)); - CodeGearManager cgm2 = createCGM(10001); + /*CodeGearManager cgm2 = createCGM(10001); cgm2.createRemoteDGM("remote", "localhost", 10000); - cgm2.setup(new CreateRemoteTakeTest(1)); + cgm2.setup(new CreateRemoteTakeTest(1));*/ } }