Mercurial > hg > Database > Christie
changeset 20:7b45ccc0f70e
add MessagePackDataGear
line wrap: on
line diff
--- a/src/main/java/christie/annotation/RemoteTake.java Sun Jan 21 23:27:21 2018 +0900 +++ b/src/main/java/christie/annotation/RemoteTake.java Mon Jan 22 15:34:38 2018 +0900 @@ -13,6 +13,5 @@ @Retention(RetentionPolicy.RUNTIME) public @interface RemoteTake { String dsmName(); - String cgmName(); String key(); } \ No newline at end of file
--- a/src/main/java/christie/codegear/CodeGear.java Sun Jan 21 23:27:21 2018 +0900 +++ b/src/main/java/christie/codegear/CodeGear.java Mon Jan 22 15:34:38 2018 +0900 @@ -67,7 +67,7 @@ return localDGM; } - public DataGearManager dgm(String dgmName) { + public DataGearManager getDGM(String dgmName) { return cgm.getDGM(dgmName); }
--- a/src/main/java/christie/codegear/Command.java Sun Jan 21 23:27:21 2018 +0900 +++ b/src/main/java/christie/codegear/Command.java Mon Jan 22 15:34:38 2018 +0900 @@ -2,6 +2,7 @@ import christie.daemon.RemoteMessage; import christie.datagear.DataGear; +import christie.datagear.MessagePackDataGear; import org.msgpack.MessagePack; import java.io.IOException; @@ -50,7 +51,7 @@ case REPLY: RemoteMessage mes = new RemoteMessage(type.id, cgmID, key, clazz.getName()); - data = dg.getMessagePack(); + data = new MessagePackDataGear(dg.getData(), dg.getClazz()).getMessagePack(); command = packer.write(mes); dataSize = packer.write(data.length);
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java Sun Jan 21 23:27:21 2018 +0900 +++ b/src/main/java/christie/daemon/IncomingTcpConnection.java Mon Jan 22 15:34:38 2018 +0900 @@ -5,6 +5,7 @@ import christie.codegear.Command; import christie.codegear.CommandType; import christie.datagear.DataGear; +import christie.datagear.MessagePackDataGear; import christie.datagear.RemoteDataGearManager; import org.msgpack.MessagePack; @@ -46,8 +47,7 @@ case PUT: connection.socket.getInputStream().read(data); try { - DataGear dg = new DataGear("hoge", String.class); - dg.setMessagePack(data, Class.forName(msg.clazz)); + MessagePackDataGear dg = new MessagePackDataGear(data, Class.forName(msg.clazz)); if (cgms.containsKey(msg.cgmID)){ cgms.get(msg.cgmID).getLocalDGM().put(msg.key, dg);
--- a/src/main/java/christie/datagear/DataGear.java Sun Jan 21 23:27:21 2018 +0900 +++ b/src/main/java/christie/datagear/DataGear.java Mon Jan 22 15:34:38 2018 +0900 @@ -5,11 +5,8 @@ import java.io.IOException; public class DataGear<T>{ - private T data = null; - private Class<T> clazz = null; - private byte[] messagePack = null; - private int dataSize; - private MessagePack packer = new MessagePack(); + protected T data = null; + protected Class<T> clazz = null; public DataGear(T data, Class clazz) {//for normal setClazz(clazz); @@ -20,14 +17,6 @@ @SuppressWarnings("unchecked") Class<T> type = (Class<T>) dg.getClass().getComponentType(); this.clazz = type; - //System.out.println("create:"+this); - } - - public synchronized void setMessagePack(byte[] messagePack, Class clazz){ - this.data = null; - setClazz(clazz); - this.messagePack = messagePack; - //System.out.println("set:"+messagePack+" this:"+this); } public void setData(T data) { @@ -36,21 +25,6 @@ } } - public byte[] getMessagePack(){ - if (messagePack != null){ - return messagePack; - } else { - try { - messagePack = packer.write(data); - setDataSize(messagePack.length); - } catch (IOException e) { - e.printStackTrace(); - } - - return messagePack; - } - } - public void setClazz(Class clazz){ this.clazz = clazz; }; @@ -59,20 +33,9 @@ return clazz; }; - public synchronized T getData(){ - if (data == null){ - try { - //System.out.println("get:" +messagePack + " this:" + this); - setData(packer.read(messagePack, clazz)); - } catch (IOException e) { - e.printStackTrace(); - } - } + public T getData() { return data; - }; - - public void setDataSize(int dataSize) { - this.dataSize = dataSize; } + }
--- a/src/main/java/christie/datagear/DataGearManager.java Sun Jan 21 23:27:21 2018 +0900 +++ b/src/main/java/christie/datagear/DataGearManager.java Mon Jan 22 15:34:38 2018 +0900 @@ -20,7 +20,7 @@ protected ConcurrentHashMap<String, LinkedBlockingQueue<Command>> waitList = new ConcurrentHashMap<>(); public abstract void take(Command cm); public abstract void put(String key, Object data); - public abstract void put(int cgmID, String key, Object data); + public abstract void metaPut(int cgmID, String key, Object data); public abstract void runCommand(Command cm); public abstract void addWaitList(Command command); }
--- a/src/main/java/christie/datagear/LocalDataGearManager.java Sun Jan 21 23:27:21 2018 +0900 +++ b/src/main/java/christie/datagear/LocalDataGearManager.java Mon Jan 22 15:34:38 2018 +0900 @@ -27,7 +27,7 @@ } @Override - public void put(int cgmID, String key, Object data) { + public void metaPut(int cgmID, String key, Object data) { put(key, data); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/datagear/MessagePackDataGear.java Mon Jan 22 15:34:38 2018 +0900 @@ -0,0 +1,52 @@ +package christie.datagear; + +import org.msgpack.MessagePack; + +import java.io.IOException; + +public class MessagePackDataGear<T> extends DataGear {//必ずmessagePack形式を持つDataGear + private byte[] messagePack = null; + private int dataSize; + private MessagePack packer = new MessagePack(); + + public MessagePackDataGear(T data, Class clazz){ + setClazz(clazz); + setData(data); + getMessagePack(); + } + + public MessagePackDataGear(byte[] messagePack, Class clazz){ + setClazz(clazz); + this.messagePack = messagePack; + } + + public byte[] getMessagePack(){ + if (messagePack != null){ + return messagePack; + } else { + try { + messagePack = packer.write(data); + setDataSize(messagePack.length); + } catch (IOException e) { + e.printStackTrace(); + } + + return messagePack; + } + } + + public synchronized T getData(){ + if (data == null){ + try { + setData(packer.read(messagePack, clazz)); + } catch (IOException e) { + e.printStackTrace(); + } + } + return (T) super.getData(); + } + + public void setDataSize(int dataSize) { + this.dataSize = dataSize; + } +}
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java Sun Jan 21 23:27:21 2018 +0900 +++ b/src/main/java/christie/datagear/RemoteDataGearManager.java Mon Jan 22 15:34:38 2018 +0900 @@ -53,14 +53,14 @@ } - public void put(int cgmID, String key, Object data){//meta + public void metaPut(int cgmID, String key, Object data){//meta Command cmd = new Command(new DataGear(data, data.getClass()), cgmID,"local", key, CommandType.PUT, data.getClass()); connection.write(cmd); } @Override public void put(String key, Object data) { - put(1, key, data); + metaPut(1, key, data); } @Override
--- a/src/main/java/christie/test/Remote/CreateRemotePutTest.java Sun Jan 21 23:27:21 2018 +0900 +++ b/src/main/java/christie/test/Remote/CreateRemotePutTest.java Mon Jan 22 15:34:38 2018 +0900 @@ -14,6 +14,6 @@ @Override protected void run(CodeGearManager cgm) {//できるだけ並列に走らせるためにStartCodeGearには書かない cgm.setup(new RemotePutTest(cgmID)); - dgm("remote").put(cgmID,"hoge", 1); + getDGM("remote").metaPut(cgmID,"hoge", 1); } }
--- a/src/main/java/christie/test/Remote/RemotePutTest.java Sun Jan 21 23:27:21 2018 +0900 +++ b/src/main/java/christie/test/Remote/RemotePutTest.java Mon Jan 22 15:34:38 2018 +0900 @@ -22,7 +22,7 @@ if (hoge.getData() != 10){ System.out.println(hoge.getData()); cgm.setup(new RemotePutTest(cgmID)); - dgm("remote").put(cgmID,"hoge", hoge.getData() + 1); + getDGM("remote").metaPut(cgmID,"hoge", hoge.getData() + 1); } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/Remote/StartRemotePut.java Mon Jan 22 15:34:38 2018 +0900 @@ -0,0 +1,26 @@ +package christie.test.Remote; + +import christie.codegear.CodeGearManager; +import christie.codegear.StartCodeGear; + +import static java.lang.Thread.sleep; + +public class StartRemotePut extends StartCodeGear{ + + public StartRemotePut(CodeGearManager cgm) { + super(cgm); + } + + public static void main(String args[]){ + CodeGearManager cgm = createCGM(10000); + new StartRemotePut(cgm); + + cgm.createRemoteDGM("remote", "localhost", 10001); + cgm.setup(new CreateRemotePutTest( 2));//この時点でcgm"second"は作られていない→notifyAllで対処? + + CodeGearManager cgm2 = createCGM(10001); + cgm2.createRemoteDGM("remote", "localhost", 10000); + cgm2.setup(new CreateRemotePutTest(1)); + } + +}
--- a/src/main/java/christie/test/Remote/StartRemoteTest.java Sun Jan 21 23:27:21 2018 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,26 +0,0 @@ -package christie.test.Remote; - -import christie.codegear.CodeGearManager; -import christie.codegear.StartCodeGear; - -import static java.lang.Thread.sleep; - -public class StartRemoteTest extends StartCodeGear{ - - public StartRemoteTest(CodeGearManager cgm) { - super(cgm); - } - - public static void main(String args[]){ - CodeGearManager cgm = createCGM(10000); - new StartRemoteTest(cgm); - - cgm.createRemoteDGM("remote", "localhost", 10001); - cgm.setup(new CreateRemotePutTest( 2));//この時点でcgm"second"は作られていない→notifyAllで対処? - - CodeGearManager cgm2 = createCGM(10001); - cgm2.createRemoteDGM("remote", "localhost", 10000); - cgm2.setup(new CreateRemotePutTest(1)); - } - -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/RemoteTake/CreateRemoteTakeTeste.java Mon Jan 22 15:34:38 2018 +0900 @@ -0,0 +1,4 @@ +package christie.test.RemoteTake; + +public class CreateRemoteTakeTeste { +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/RemoteTake/RemoteTakeTest.java Mon Jan 22 15:34:38 2018 +0900 @@ -0,0 +1,20 @@ +package christie.test.RemoteTake; + +import christie.annotation.RemoteTake; +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; +import christie.datagear.DataGear; + +public class RemoteTakeTest extends CodeGear{ + @RemoteTake(dsmName = "remote",key = "hoge") + public DataGear<Integer> hoge = new DataGear<>(); + + @Override + protected void run(CodeGearManager cgm) { + if (hoge.getData() != 10){ + System.out.println(hoge.getData()); + cgm.setup(new RemoteTakeTest()); + getLocalDGM().put("hoge", hoge.getData() + 1); + } + } +}