Mercurial > hg > Database > Christie
changeset 13:bcd4f2c19185
don't work MessagePack unconvert for remote put
line wrap: on
line diff
--- a/src/main/java/christie/codegear/CodeGear.java Tue Jan 09 17:37:43 2018 +0900 +++ b/src/main/java/christie/codegear/CodeGear.java Wed Jan 10 20:37:47 2018 +0900 @@ -2,6 +2,7 @@ import christie.annotation.Peek; import christie.annotation.Take; +import christie.daemon.ChristieDaemon; import christie.datagear.DataGear; import christie.datagear.DataGearManager;
--- a/src/main/java/christie/codegear/CodeGearManager.java Tue Jan 09 17:37:43 2018 +0900 +++ b/src/main/java/christie/codegear/CodeGearManager.java Wed Jan 10 20:37:47 2018 +0900 @@ -1,5 +1,8 @@ package christie.codegear; +import christie.daemon.ChristieDaemon; +import christie.daemon.Config; +import christie.daemon.IncomingTcpConnection; import christie.datagear.DataGearManager; import christie.datagear.LocalDataGearManager; import christie.datagear.RemoteDataGearManager; @@ -15,15 +18,20 @@ private ConcurrentHashMap<String, DataGearManager> dataGearManagers = new ConcurrentHashMap<String, DataGearManager>(); private ConcurrentHashMap<String, CodeGearManager> cgms; private ThreadPoolExecutor threadPoolExecutor; + private LocalDataGearManager localDGM = new LocalDataGearManager(); + private ConcurrentHashMap<String, IncomingTcpConnection> acceptHash = new ConcurrentHashMap<String, IncomingTcpConnection>(); + public Config conf; - public CodeGearManager(ThreadPoolExecutor exe, ConcurrentHashMap<String, CodeGearManager> cgms) { - dataGearManagers.put("local", new LocalDataGearManager()); + public CodeGearManager(ThreadPoolExecutor exe, ConcurrentHashMap<String, CodeGearManager> cgms, Config conf) { + dataGearManagers.put("local", localDGM); this.cgms = cgms; threadPoolExecutor = exe; + this.conf = conf; + new ChristieDaemon(conf, this).listen();//run only first time } - public DataGearManager getDGM(){ - return dataGearManagers.get("local"); + public LocalDataGearManager getDGM(){ + return localDGM; } public DataGearManager getDGM(String dsmName){ @@ -34,6 +42,10 @@ dataGearManagers.put(dsmName, new RemoteDataGearManager(dsmName, address, port, this)); } + public void createRemoteDGM(String dsmName){ + dataGearManagers.put(dsmName, new RemoteDataGearManager(dsmName, conf.address, conf.connectPort, this)); + } + public void submit(CodeGear cg){ threadPoolExecutor.execute(cg.cge); } @@ -46,4 +58,7 @@ return cgms; } + public void setAccept(String key, IncomingTcpConnection in) { + acceptHash.put(key, in); + } }
--- a/src/main/java/christie/codegear/Command.java Tue Jan 09 17:37:43 2018 +0900 +++ b/src/main/java/christie/codegear/Command.java Wed Jan 10 20:37:47 2018 +0900 @@ -14,15 +14,17 @@ public String dsmName = "local"; public String key; public CommandType type; + public Class clazz = null; private static final MessagePack packer = new MessagePack(); //for put - public Command(DataGear dg, String cgmName, String dsmName, String key, CommandType type){ + public Command(DataGear dg, String cgmName, String dsmName, String key, CommandType type, Class clazz){ this.dg = dg; this.cgmName = cgmName; this.dsmName = dsmName; this.key = key; this.type = type; + this.clazz = clazz; } //for take @@ -46,20 +48,19 @@ switch (type) { case PUT: case REPLY: - - RemoteMessage cm = new RemoteMessage(); + RemoteMessage mes = new RemoteMessage(type.id, cgmName, key, clazz.getName()); data = dg.getMessagePack(); + command = packer.write(mes); + dataSize = packer.write(data.length); - command = packer.write(cm); - dataSize = packer.write(data.length); buf = ByteBuffer.allocate(command.length+dataSize.length+data.length); buf.put(command); buf.put(dataSize); buf.put(data); break; default: - command = packer.write(new RemoteMessage()); + command = packer.write(new RemoteMessage(type.id, cgmName, key, clazz.getName())); buf = ByteBuffer.allocate(command.length); buf.put(command); break;
--- a/src/main/java/christie/codegear/CommandType.java Tue Jan 09 17:37:43 2018 +0900 +++ b/src/main/java/christie/codegear/CommandType.java Wed Jan 10 20:37:47 2018 +0900 @@ -12,7 +12,15 @@ public int id;//コマンドのid public static HashMap<Integer, CommandType> hash = new HashMap<Integer, CommandType>();//コマンド対応表 + private static int lastId = 0;//コマンドの総数 + private CommandType() { + this.id = incrementLastId(); + } + + private int incrementLastId() { + return ++lastId; + } public static CommandType getCommandTypeFromId(int id) { return hash.get(id);
--- a/src/main/java/christie/codegear/StartCodeGear.java Tue Jan 09 17:37:43 2018 +0900 +++ b/src/main/java/christie/codegear/StartCodeGear.java Wed Jan 10 20:37:47 2018 +0900 @@ -1,5 +1,8 @@ package christie.codegear; +import christie.daemon.ChristieDaemon; +import christie.daemon.Config; + import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -13,9 +16,14 @@ Integer.MAX_VALUE, // keepAliveTime TimeUnit.SECONDS, taskQueue); + Config conf; + + public StartCodeGear(Config conf){ + this.conf = conf; + } public CodeGearManager createCGM(String name){ - CodeGearManager cgm = new CodeGearManager(threadPoolExecutor, cgms); + CodeGearManager cgm = new CodeGearManager(threadPoolExecutor, cgms, conf); cgms.put(name, cgm); return cgm; }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/daemon/AcceptThread.java Wed Jan 10 20:37:47 2018 +0900 @@ -0,0 +1,45 @@ +package christie.daemon; + +import christie.codegear.CodeGearManager; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; + +public class AcceptThread extends Thread { + + private ServerSocket ss; + public int counter = 0; + public CodeGearManager cgm; + + public AcceptThread(ServerSocket ss, String name, CodeGearManager cgm) { + super(name); + this.ss = ss; + this.cgm = cgm; + } + + @Override + public void run() { + while (true) { + try { + Socket socket = ss.accept(); + socket.setTcpNoDelay(true); + System.out.println("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort()); + Connection connection = new Connection(socket); + String key = "accept" + counter; + IncomingTcpConnection in = + new IncomingTcpConnection(connection, cgm); + in.setName(connection.getInfoString()+"-IncomingTcp"); + in.start(); + cgm.setAccept(key, in); + OutboundTcpConnection out = new OutboundTcpConnection(connection); + out.setName(connection.getInfoString()+"-OutboundTcp"); + out.start(); + counter++; + } catch (IOException e) { + e.printStackTrace(); + } + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/daemon/ChristieDaemon.java Wed Jan 10 20:37:47 2018 +0900 @@ -0,0 +1,40 @@ +package christie.daemon; + +import christie.codegear.CodeGear; +import christie.codegear.CodeGearManager; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.nio.channels.ServerSocketChannel; + +public class ChristieDaemon { + + private Config conf; + private AcceptThread acceptThread; + public CodeGearManager cgm; + + public ChristieDaemon(Config conf, CodeGearManager cgm) { + this.conf = conf; + this.cgm = cgm; + } + + public void listen() { + try { + ServerSocketChannel ssChannel = ServerSocketChannel.open(); + ServerSocket ss = ssChannel.socket(); + ss.setReuseAddress(true); + + // listen on any address ipv4/ipv6 + InetSocketAddress a = new InetSocketAddress("::", conf.localPort); + + System.out.println("AliceDaemon.listen: bind to " + a); + ss.bind(a); + acceptThread = new AcceptThread(ss, "ACCEPT" + conf.localPort, cgm); + acceptThread.start(); + } catch (IOException e) { + e.printStackTrace(); + } + + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/daemon/Config.java Wed Jan 10 20:37:47 2018 +0900 @@ -0,0 +1,21 @@ +package christie.daemon; + +public class Config { + + public int localPort = 10000; + public String address = "127.0.0.1"; + public int connectPort = 10001; + + public Config(String[] args) { + for (int i = 0; i< args.length; i++) { + if ("-localPort".equals(args[i])) { + localPort = Integer.parseInt(args[++i]); + }if ("-address".equals(args[i])) { + address = args[++i]; + } else if ("-connectPort".equals(args[i])) { + connectPort = Integer.parseInt(args[++i]); + } + } + } + +}
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java Tue Jan 09 17:37:43 2018 +0900 +++ b/src/main/java/christie/daemon/IncomingTcpConnection.java Wed Jan 10 20:37:47 2018 +0900 @@ -46,8 +46,11 @@ switch (type) { case PUT: connection.socket.getInputStream().read(data); - - cgms.get(msg.cgmName).getDGM().put(msg.key, data); + try { + cgms.get(msg.cgmName).getDGM().put(msg.key, new DataGear(data, Class.forName(msg.clazz))); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } break; /*
--- a/src/main/java/christie/daemon/OutboundTcpConnection.java Tue Jan 09 17:37:43 2018 +0900 +++ b/src/main/java/christie/daemon/OutboundTcpConnection.java Wed Jan 10 20:37:47 2018 +0900 @@ -24,7 +24,7 @@ default: break; } - connection.write(cmd);//ここでconvert()がよばれてる + connection.write(cmd); } catch (InterruptedException e) { e.printStackTrace(); }
--- a/src/main/java/christie/daemon/RemoteMessage.java Tue Jan 09 17:37:43 2018 +0900 +++ b/src/main/java/christie/daemon/RemoteMessage.java Wed Jan 10 20:37:47 2018 +0900 @@ -1,15 +1,20 @@ package christie.daemon; +import org.msgpack.annotation.Message; + +@Message public class RemoteMessage { public int type;//PUT, PEEKなどのコマンドタイプ public String cgmName; public String key;//DS key + public String clazz; - public RemoteMessage() {} + public RemoteMessage(){} - public RemoteMessage(int type, String cgmName, String key) { + public RemoteMessage(int type, String cgmName, String key, String clazz) { this.type = type; this.cgmName = cgmName; this.key = key; + this.clazz = clazz; } }
--- a/src/main/java/christie/datagear/DataGear.java Tue Jan 09 17:37:43 2018 +0900 +++ b/src/main/java/christie/datagear/DataGear.java Wed Jan 10 20:37:47 2018 +0900 @@ -11,20 +11,20 @@ private int dataSize; private MessagePack packer = new MessagePack(); - public DataGear(T data, Class clazz) { + public DataGear(T data, Class clazz) {//for normal setClazz(clazz); setData(data); } - public DataGear(T... dg) { + public DataGear(T... dg) {//for input DataGear init @SuppressWarnings("unchecked") Class<T> type = (Class<T>) dg.getClass().getComponentType(); this.clazz = type; } - public DataGear(byte[] messagePack){ + public DataGear(byte[] messagePack, Class clazz){//for remote this.data = null; - this.clazz = null; + this.clazz = clazz; this.messagePack = messagePack; } @@ -60,7 +60,7 @@ public T getData(){ if (data == null){ try { - data = (T) packer.unconvert(messagePack); + setData((T) packer.unconvert(messagePack));//ここで展開できてない } catch (IOException e) { e.printStackTrace(); }
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java Tue Jan 09 17:37:43 2018 +0900 +++ b/src/main/java/christie/datagear/RemoteDataGearManager.java Wed Jan 10 20:37:47 2018 +0900 @@ -56,7 +56,7 @@ @Override public void put(String key, Object data) { //cgmNameを受け取れるようにする - Command cmd = new Command(new DataGear(data, data.getClass()), "first","local", key, CommandType.PUT); + Command cmd = new Command(new DataGear(data, data.getClass()), "first","local", key, CommandType.PUT, data.getClass()); connection.write(cmd); }
--- a/src/main/java/christie/test/Remote/RemotePutTest.java Tue Jan 09 17:37:43 2018 +0900 +++ b/src/main/java/christie/test/Remote/RemotePutTest.java Wed Jan 10 20:37:47 2018 +0900 @@ -9,13 +9,13 @@ public class RemotePutTest extends CodeGear { @Take("hoge") - DataGear<Integer> hoge = new DataGear<>(); + public DataGear<Integer> hoge = new DataGear<>(); @Override protected void run(CodeGearManager cgm) { System.out.println(hoge.getData()); - if (hoge.getData()!= 10){ + if (hoge.getData() != 10){ cgm.setup(new TestCodeGear()); dgm("remote").put("hoge", hoge.getData() + 1); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/test/Remote/StartRemoteTest.java Wed Jan 10 20:37:47 2018 +0900 @@ -0,0 +1,26 @@ +package christie.test.Remote; + +import christie.codegear.CodeGearManager; +import christie.codegear.StartCodeGear; +import christie.daemon.Config; + +public class StartRemoteTest extends StartCodeGear { + + public StartRemoteTest(Config conf) { + super(conf); + } + + public static void main(String args[]){ + StartRemoteTest start = new StartRemoteTest(new Config(args)); + CodeGearManager cgm = start.createCGM("first"); + cgm.setup(start); + //もう一つスタートする + } + + @Override + protected void run(CodeGearManager cgm) { + cgm.setup(new RemotePutTest()); + cgm.createRemoteDGM("remote"); + dgm("remote").put("hoge", 1); + } +}
--- a/src/main/java/christie/test/Remote/StartTest.java Tue Jan 09 17:37:43 2018 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,20 +0,0 @@ -package christie.test.Remote; - -import christie.codegear.CodeGearManager; -import christie.codegear.StartCodeGear; - -public class StartTest extends StartCodeGear { - - public static void main(String args[]){ - christie.test.TestLocal.StartTest start = new christie.test.TestLocal.StartTest(); - CodeGearManager cgm = start.createCGM("first"); - cgm.setup(start); - } - - @Override - protected void run(CodeGearManager cgm) { - cgm.setup(new RemotePutTest()); - cgm.createRemoteDGM("remote","localhost", 1000); - dgm("remote").put("test", 1); - } -}
--- a/src/main/java/christie/test/TestLocal/StartTest.java Tue Jan 09 17:37:43 2018 +0900 +++ b/src/main/java/christie/test/TestLocal/StartTest.java Wed Jan 10 20:37:47 2018 +0900 @@ -2,12 +2,18 @@ import christie.codegear.CodeGearManager; import christie.codegear.StartCodeGear; +import christie.daemon.Config; +import christie.daemon.RemoteConfig; public class StartTest extends StartCodeGear{ + public StartTest(Config conf) { + super(conf); + } + public static void main(String args[]){ - StartTest start = new StartTest(); + StartTest start = new StartTest(new Config(args)); CodeGearManager cgm = start.createCGM("first"); cgm.setup(start); }