Mercurial > hg > Database > Christie
changeset 25:76fac42a840e
work RemoteTakeTest
line wrap: on
line diff
--- a/src/main/java/christie/codegear/CodeGear.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/codegear/CodeGear.java Fri Jan 26 18:03:36 2018 +0900 @@ -5,6 +5,7 @@ import christie.annotation.Take; import christie.datagear.*; import christie.datagear.Command.Command; +import christie.datagear.Command.PeekCommand; import christie.datagear.Command.TakeCommand; import java.lang.reflect.Field; @@ -33,22 +34,21 @@ for (Field field : this.getClass().getDeclaredFields()) {//AnnotationからInputDataGearをセット if (field.isAnnotationPresent(Take.class)) { Take ano = field.getAnnotation(Take.class); - createCommand(field, "local", ano.value()); + setTakeCommand("local", ano.value(), initDataGear(field, ano.value())); } else if (field.isAnnotationPresent(Peek.class)) { Peek ano = field.getAnnotation(Peek.class); - createCommand(field, "local", ano.value()); + setPeekCommand("local", ano.value(), initDataGear(field, ano.value())); } else if (field.isAnnotationPresent(RemoteTake.class)) { RemoteTake ano = field.getAnnotation(RemoteTake.class); - createCommand(field, ano.dsmName(), ano.key()); + setTakeCommand("local", ano.key(), initDataGear(field, ano.key())); } - //ToDo:add peek + //ToDo:add remote peek } idg.finishInput(cgm, commandList); } - public void createCommand(Field field, String toDsmName, String key){ - + public DataGear initDataGear(Field field, String key){ if (!field.getName().equals(key)){ throw new IllegalArgumentException("key and DataGearName do not match"); } @@ -64,7 +64,15 @@ throw new NullPointerException("please initialize DataGear"); } - commandList.add(new TakeCommand(this, cgm.cgmID, toDsmName, key, dg)); + return dg; + } + + public void setTakeCommand(String toDgmName, String key, DataGear dg){ + commandList.add(new TakeCommand(this, cgm.cgmID, toDgmName, key, dg)); + } + + public void setPeekCommand(String toDgmName, String key, DataGear dg){ + commandList.add(new PeekCommand(this, cgm.cgmID, toDgmName, key, dg)); } public DataGearManager getLocalDGM() {
--- a/src/main/java/christie/codegear/InputDataGear.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/codegear/InputDataGear.java Fri Jan 26 18:03:36 2018 +0900 @@ -1,6 +1,7 @@ package christie.codegear; import christie.annotation.Peek; +import christie.annotation.RemoteTake; import christie.annotation.Take; import christie.datagear.Command.Command; import christie.datagear.DataGear; @@ -38,8 +39,8 @@ } } - public void setInputs(String key, DataGear value){ - inputValue.put(key, value); + public void setInputs(String key, DataGear dg){ + inputValue.put(key, dg); count(); } @@ -50,24 +51,31 @@ } } + void submitCG(){ + cgm.submit(cg); + } + public void setInputValue(){//Annotationから揃ったInputDataGearの値をキャスト for (Field field : cg.getClass().getDeclaredFields()) { field.setAccessible(true); if (field.isAnnotationPresent(Take.class)){ Take ano = field.getAnnotation(Take.class); - try { - field.set(cg, typeCheck(ano.value())); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } + setField(field, ano.value()); } else if (field.isAnnotationPresent(Peek.class)){ Peek ano = field.getAnnotation(Peek.class); - try { - field.set(cg, typeCheck(ano.value())); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } - } + setField(field, ano.value()); + } else if (field.isAnnotationPresent(RemoteTake.class)){ + RemoteTake ano = field.getAnnotation(RemoteTake.class); + setField(field, ano.key()); + }//ToDo:implements RemotePut + } + } + + public void setField(Field field, String key){ + try { + field.set(cg, typeCheck(key)); + } catch (IllegalAccessException e) { + e.printStackTrace(); } } @@ -79,8 +87,4 @@ } } - void submitCG(){ - cgm.submit(cg); - } - }
--- a/src/main/java/christie/codegear/StartCodeGear.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/codegear/StartCodeGear.java Fri Jan 26 18:03:36 2018 +0900 @@ -1,8 +1,5 @@ package christie.codegear; -import christie.daemon.ChristieDaemon; -import christie.daemon.Config; - import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor;
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/daemon/IncomingTcpConnection.java Fri Jan 26 18:03:36 2018 +0900 @@ -44,10 +44,11 @@ try { RemoteMessage msg = unpacker.read(RemoteMessage.class); CommandType type = CommandType.getCommandTypeFromId(msg.type); - int dataSize = unpacker.readInt();//ここでとまる - byte[] data = new byte[dataSize]; + byte[] data; + switch (type) { case PUT: + data = new byte[unpacker.readInt()]; connection.socket.getInputStream().read(data); try { MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz)); @@ -60,6 +61,7 @@ case REMOTEPEEK: case REMOTETAKE: + RemoteTakeCommand cmd = null; try { cmd = new RemoteTakeCommand(msg.fromDgmName, msg.key, Class.forName(msg.clazz), connection); @@ -71,6 +73,7 @@ break; case REPLY://待っていたwaitListに渡してcsにセット + data = new byte[unpacker.readInt()]; connection.socket.getInputStream().read(data); try { MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz));
--- a/src/main/java/christie/datagear/Command/RemoteTakeCommand.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/datagear/Command/RemoteTakeCommand.java Fri Jan 26 18:03:36 2018 +0900 @@ -1,6 +1,8 @@ package christie.datagear.Command; import christie.daemon.Connection; +import christie.datagear.DataGear; +import christie.datagear.MessagePackDataGear; import org.msgpack.MessagePack; import java.io.IOException; @@ -12,6 +14,7 @@ this.type = CommandType.REMOTETAKE; this.fromDgmName = fromDgmName; this.key = key; + this.dg = new MessagePackDataGear(clazz); this.clazz = clazz; this.connection = cn; }
--- a/src/main/java/christie/datagear/Command/ReplyCommand.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/datagear/Command/ReplyCommand.java Fri Jan 26 18:03:36 2018 +0900 @@ -12,4 +12,8 @@ this.connection = cn; } + public void setData(Object data){ + this.dg.setData(data); + } + }
--- a/src/main/java/christie/datagear/DataGearManager.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/datagear/DataGearManager.java Fri Jan 26 18:03:36 2018 +0900 @@ -14,7 +14,7 @@ public WaitList waitList = new WaitList(); public abstract void take(Command cm); public abstract void put(String key, Object data); - public abstract void metaPut(int cgmID, String key, Object data); + public abstract void metaPut(int cgmID, String key, Object data);//this is used from local only public abstract void runCommand(Command cm); }
--- a/src/main/java/christie/datagear/LocalDataGearManager.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/datagear/LocalDataGearManager.java Fri Jan 26 18:03:36 2018 +0900 @@ -43,12 +43,12 @@ break; case TAKE: - setTakeData(cm); + cm.dg.setData(dataGears.take(cm.key)); cm.cg.getIdg().setInputs(cm.key, cm.dg); break; case REMOTETAKE: - setTakeData(cm); + cm.dg.setData(dataGears.take(cm.key)); runCommand(new ReplyCommand(cm.fromDgmName, cm.key, cm.dg, cm.connection)); break; @@ -57,7 +57,7 @@ cm.cg.getIdg().setInputs(cm.key, cm.dg); break; - case REMOTEPEEK: + case REMOTEPEEK://ToDo:implement //cm.dg.setData(dataGears.get(cm.key).peek().getData()); //runCommand(new ReplyCommand(cm.cgmID, cm.fromDgmName, cm.key, cm.dg, cm.connection)); break; @@ -67,7 +67,4 @@ } } - private void setTakeData(Command cm){ - cm.dg.setData(dataGears.take(cm.key)); - } }
--- a/src/main/java/christie/datagear/MessagePackDataGear.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/datagear/MessagePackDataGear.java Fri Jan 26 18:03:36 2018 +0900 @@ -15,6 +15,10 @@ getMessagePack(); } + public MessagePackDataGear(Class clazz){ + setClazz(clazz); + } + public MessagePackDataGear(byte[] messagePack, Class clazz){ setClazz(clazz); this.messagePack = messagePack;
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/datagear/RemoteDataGearManager.java Fri Jan 26 18:03:36 2018 +0900 @@ -66,7 +66,7 @@ @Override public void put(String key, Object data) { - metaPut(1, key, data); + metaPut(1, key, data);//don't use cgmID } @Override
--- a/src/main/java/christie/test/Remote/CreateRemotePutTest.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/test/Remote/CreateRemotePutTest.java Fri Jan 26 18:03:36 2018 +0900 @@ -9,5 +9,6 @@ protected void run(CodeGearManager cgm) {//できるだけ並列に走らせるためにStartCodeGearには書かない cgm.setup(new RemotePutTest()); getDGM("remote").put("hoge", 1); + getDGM("remote").put("cgmID", cgm.cgmID); } }
--- a/src/main/java/christie/test/Remote/RemotePutTest.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/test/Remote/RemotePutTest.java Fri Jan 26 18:03:36 2018 +0900 @@ -11,12 +11,16 @@ @Take("hoge") public DataGear<Integer> hoge = new DataGear<>(); + @Take("cgmID") + public DataGear<Integer> cgmID = new DataGear<>(); + @Override protected void run(CodeGearManager cgm) { if (hoge.getData() != 10){ - System.out.println(hoge.getData()); + System.out.println(cgmID.getData() + " : " +hoge.getData()); cgm.setup(new RemotePutTest()); getDGM("remote").put("hoge", hoge.getData() + 1); + getDGM("remote").put("cgmID", cgm.cgmID); } }
--- a/src/main/java/christie/test/Remote/StartRemotePut.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/test/Remote/StartRemotePut.java Fri Jan 26 18:03:36 2018 +0900 @@ -15,12 +15,12 @@ CodeGearManager cgm = createCGM(10000); new StartRemotePut(cgm); + cgm.setup(new CreateRemotePutTest()); cgm.createRemoteDGM("remote", "localhost", 10001); - cgm.setup(new CreateRemotePutTest());//この時点でcgm"second"は作られていない→notifyAllで対処? CodeGearManager cgm2 = createCGM(10001); + cgm2.setup(new RemotePutTest()); cgm2.createRemoteDGM("remote", "localhost", 10000); - cgm2.setup(new CreateRemotePutTest()); } }
--- a/src/main/java/christie/test/RemoteTake/CreateRemoteTakeTest.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/test/RemoteTake/CreateRemoteTakeTest.java Fri Jan 26 18:03:36 2018 +0900 @@ -10,5 +10,6 @@ protected void run(CodeGearManager cgm) { cgm.setup(new RemoteTakeTest()); getLocalDGM().put("hoge", 1); + getLocalDGM().put("cgmID", cgm.cgmID); } }
--- a/src/main/java/christie/test/RemoteTake/RemoteTakeTest.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/test/RemoteTake/RemoteTakeTest.java Fri Jan 26 18:03:36 2018 +0900 @@ -6,15 +6,20 @@ import christie.datagear.DataGear; public class RemoteTakeTest extends CodeGear{ + @RemoteTake(dsmName = "remote",key = "hoge") public DataGear<Integer> hoge = new DataGear<>(); + @RemoteTake(dsmName = "remote",key = "cgmID") + public DataGear<Integer> cgmID = new DataGear<>(); + @Override protected void run(CodeGearManager cgm) { if (hoge.getData() != 10){ - System.out.println(hoge.getData()); + System.out.println(cgmID.getData() + " : " +hoge.getData()); cgm.setup(new RemoteTakeTest()); getLocalDGM().put("hoge", hoge.getData() + 1); + getLocalDGM().put("cgmID", cgm.cgmID); } } }
--- a/src/main/java/christie/test/RemoteTake/StartRemoteTake.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/test/RemoteTake/StartRemoteTake.java Fri Jan 26 18:03:36 2018 +0900 @@ -14,11 +14,11 @@ CodeGearManager cgm = createCGM(10000); new StartRemoteTake(cgm); - cgm.createRemoteDGM("remote", "localhost", 10000); + cgm.createRemoteDGM("remote", "localhost", 10001); cgm.setup(new CreateRemoteTakeTest()); - /*CodeGearManager cgm2 = createCGM(10001); + CodeGearManager cgm2 = createCGM(10001); cgm2.createRemoteDGM("remote", "localhost", 10000); - cgm2.setup(new CreateRemoteTakeTest());*/ + cgm2.setup(new RemoteTakeTest()); } }
--- a/src/main/java/christie/test/TestLocal/StartTest.java Thu Jan 25 23:02:02 2018 +0900 +++ b/src/main/java/christie/test/TestLocal/StartTest.java Fri Jan 26 18:03:36 2018 +0900 @@ -2,7 +2,6 @@ import christie.codegear.CodeGearManager; import christie.codegear.StartCodeGear; -import christie.daemon.Config; public class StartTest extends StartCodeGear{