Mercurial > hg > Database > Christie
changeset 17:59fabebb67d8
delete cgmName as String, add cgmID as int
line wrap: on
line diff
--- a/build.gradle Mon Jan 15 16:27:58 2018 +0900 +++ b/build.gradle Thu Jan 18 16:49:54 2018 +0900 @@ -2,8 +2,8 @@ apply plugin: 'eclipse' apply plugin: 'maven' -sourceCompatibility = 1.9; -targetCompatibility = 1.9; +sourceCompatibility = 1.8; +targetCompatibility = 1.8; [compileJava, compileTestJava]*.options*.encoding = 'UTF-8' group = 'cr.ie.u_ryukyu.ac.jp'
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/christie/annotation/RemoteTake.java Thu Jan 18 16:49:54 2018 +0900 @@ -0,0 +1,18 @@ +package christie.annotation; + + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Created by e125769 on 12/7/17. + */ +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +public @interface RemoteTake { + String dsmName(); + String cgmName(); + String key(); +} \ No newline at end of file
--- a/src/main/java/christie/codegear/CodeGear.java Mon Jan 15 16:27:58 2018 +0900 +++ b/src/main/java/christie/codegear/CodeGear.java Thu Jan 18 16:49:54 2018 +0900 @@ -43,8 +43,8 @@ idg.finishInput(cgm, commandList); } - public DataGearManager dgm(String dsmName) { - return cgm.getDGM(dsmName); + public DataGearManager dgm(String dgmName) { + return cgm.getDGM(dgmName); } public void checkAndSetCommand(Field field, String name){ @@ -64,6 +64,6 @@ throw new NullPointerException("please initialize DataGear"); } - commandList.add(new Command(this, dg, cgm.cgmName,"local", name, CommandType.TAKE)); + commandList.add(new Command(this, dg, cgm.cgmID,"local", name, CommandType.TAKE)); } }
--- a/src/main/java/christie/codegear/CodeGearManager.java Mon Jan 15 16:27:58 2018 +0900 +++ b/src/main/java/christie/codegear/CodeGearManager.java Thu Jan 18 16:49:54 2018 +0900 @@ -16,18 +16,18 @@ */ public class CodeGearManager{ private ConcurrentHashMap<String, DataGearManager> dataGearManagers = new ConcurrentHashMap<String, DataGearManager>(); - private ConcurrentHashMap<String, CodeGearManager> cgms; + private ConcurrentHashMap<Integer, CodeGearManager> cgms; private ThreadPoolExecutor threadPoolExecutor; private LocalDataGearManager localDGM = new LocalDataGearManager(); private ConcurrentHashMap<String, IncomingTcpConnection> acceptHash = new ConcurrentHashMap<String, IncomingTcpConnection>(); - public String cgmName; + public int cgmID; public Config conf;//ToDo: make daemon from config - public CodeGearManager(String cgmName, ThreadPoolExecutor exe, ConcurrentHashMap<String, CodeGearManager> cgms, int localPort) { + public CodeGearManager(int cgmID, ThreadPoolExecutor exe, ConcurrentHashMap<Integer, CodeGearManager> cgms, int localPort) { dataGearManagers.put("local", localDGM); this.cgms = cgms; threadPoolExecutor = exe; - this.cgmName = cgmName; + this.cgmID = cgmID; new ChristieDaemon(localPort, this).listen(); } @@ -35,13 +35,18 @@ return localDGM; } - public DataGearManager getDGM(String dsmName){ - return dataGearManagers.get(dsmName); + public DataGearManager getDGM(String dgmName){ + if (dataGearManagers.containsKey(dgmName)){ + return dataGearManagers.get(dgmName); + } else { + throw new IllegalArgumentException("DGM "+ dgmName + " is not found."); + } } - public RemoteDataGearManager createRemoteDGM(String dsmName, String address, int port){ - RemoteDataGearManager remote = new RemoteDataGearManager(dsmName, address, port, this); - dataGearManagers.put(dsmName, remote); + public RemoteDataGearManager createRemoteDGM(String dgmName, String address, int port){ + RemoteDataGearManager remote = new RemoteDataGearManager(dgmName, address, port, this); + dataGearManagers.put(dgmName, remote); + return remote; } @@ -53,7 +58,7 @@ cg.setup(this); } - public ConcurrentHashMap<String, CodeGearManager> getCgms() { + public ConcurrentHashMap<Integer, CodeGearManager> getCgms() { return cgms; }
--- a/src/main/java/christie/codegear/Command.java Mon Jan 15 16:27:58 2018 +0900 +++ b/src/main/java/christie/codegear/Command.java Thu Jan 18 16:49:54 2018 +0900 @@ -10,29 +10,29 @@ public class Command { public CodeGear cg = null; public DataGear dg; - public String cgmName = "first"; - public String dsmName = "local"; + public int cgmID = 1; + public String dgmName = "local"; public String key; public CommandType type; public Class clazz = null; private MessagePack packer = new MessagePack(); //for put - public Command(DataGear dg, String cgmName, String dsmName, String key, CommandType type, Class clazz){ + public Command(DataGear dg, int cgmID, String dgmName, String key, CommandType type, Class clazz){ this.dg = dg; - this.cgmName = cgmName; - this.dsmName = dsmName; + this.cgmID = cgmID; + this.dgmName = dgmName; this.key = key; this.type = type; this.clazz = clazz; } //for take - public Command(CodeGear cg, DataGear dg, String cgmName, String dsmName, String key, CommandType type){ + public Command(CodeGear cg, DataGear dg, int cgmID, String dgmName, String key, CommandType type){ this.cg = cg; this.dg = dg; - this.cgmName = cgmName; - this.dsmName = dsmName; + this.cgmID = cgmID; + this.dgmName = dgmName; this.key = key; this.type = type; } @@ -48,7 +48,7 @@ switch (type) { case PUT: case REPLY: - RemoteMessage mes = new RemoteMessage(type.id, cgmName, key, clazz.getName()); + RemoteMessage mes = new RemoteMessage(type.id, cgmID, key, clazz.getName()); data = dg.getMessagePack(); command = packer.write(mes); @@ -60,7 +60,7 @@ buf.put(data); break; default: - command = packer.write(new RemoteMessage(type.id, cgmName, key, clazz.getName())); + command = packer.write(new RemoteMessage(type.id, cgmID, key, clazz.getName())); buf = ByteBuffer.allocate(command.length); buf.put(command); break;
--- a/src/main/java/christie/codegear/InputDataGear.java Mon Jan 15 16:27:58 2018 +0900 +++ b/src/main/java/christie/codegear/InputDataGear.java Thu Jan 18 16:49:54 2018 +0900 @@ -31,7 +31,7 @@ } for(Command cm : commandList){ - cgm.getDGM(cm.dsmName).take(cm); + cgm.getDGM(cm.dgmName).take(cm); } }
--- a/src/main/java/christie/codegear/OutputDataGear.java Mon Jan 15 16:27:58 2018 +0900 +++ b/src/main/java/christie/codegear/OutputDataGear.java Thu Jan 18 16:49:54 2018 +0900 @@ -21,7 +21,7 @@ cgm.getDGM("local").put(key, data); } - public void put(String dsmName, String key, Object data){ - cgm.getDGM(dsmName).put(key, data); + public void put(String dgmName, String key, Object data){ + cgm.getDGM(dgmName).put(key, data); } }
--- a/src/main/java/christie/codegear/StartCodeGear.java Mon Jan 15 16:27:58 2018 +0900 +++ b/src/main/java/christie/codegear/StartCodeGear.java Thu Jan 18 16:49:54 2018 +0900 @@ -9,32 +9,25 @@ import java.util.concurrent.TimeUnit; public abstract class StartCodeGear extends CodeGear{ - static ConcurrentHashMap<String, CodeGearManager> cgms = new ConcurrentHashMap<>(); + static ConcurrentHashMap<Integer, CodeGearManager> cgms = new ConcurrentHashMap<>(); static LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>(); static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads Runtime.getRuntime().availableProcessors(), Integer.MAX_VALUE, // keepAliveTime TimeUnit.SECONDS, taskQueue); + static int cgmCount = 1; public StartCodeGear(CodeGearManager cgm){ cgm.setup(this); } - public static CodeGearManager createCGM(String name, int localPort){ - if (cgms.containsKey(name)){ - throw new IllegalArgumentException("CGM name '" + name +"' is already used"); - } - - CodeGearManager cgm = new CodeGearManager(name, threadPoolExecutor, cgms, localPort); - cgms.put(name, cgm); + public static CodeGearManager createCGM(int localPort){ + CodeGearManager cgm = new CodeGearManager(cgmCount, threadPoolExecutor, cgms, localPort); + cgms.put(cgmCount++, cgm); return cgm; } - public static CodeGearManager createCGM(String name){ - return createCGM(name, 10000); - } - public static CodeGearManager getCGM(String name){ return cgms.get(name); }
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java Mon Jan 15 16:27:58 2018 +0900 +++ b/src/main/java/christie/daemon/IncomingTcpConnection.java Thu Jan 18 16:49:54 2018 +0900 @@ -16,7 +16,7 @@ public class IncomingTcpConnection extends Thread { - ConcurrentHashMap<String, CodeGearManager> cgms; + ConcurrentHashMap<Integer, CodeGearManager> cgms; Connection connection; private MessagePack packer = new MessagePack(); @@ -48,7 +48,14 @@ try { DataGear dg = new DataGear(); dg.setMessagePack(data, Class.forName(msg.clazz)); - cgms.get(msg.cgmName).getLocalDGM().put(msg.key, dg); + + if (cgms.containsKey(msg.cgmID)){ + cgms.get(msg.cgmID).getLocalDGM().put(msg.key, dg); + } else { + //addwaitList? newThread notify()? + throw new IllegalArgumentException("DGM_ID:" + msg.cgmID + "is not found"); + } + } catch (ClassNotFoundException e) { e.printStackTrace(); }
--- a/src/main/java/christie/daemon/RemoteMessage.java Mon Jan 15 16:27:58 2018 +0900 +++ b/src/main/java/christie/daemon/RemoteMessage.java Thu Jan 18 16:49:54 2018 +0900 @@ -5,15 +5,15 @@ @Message public class RemoteMessage { public int type;//PUT, PEEKなどのコマンドタイプ - public String cgmName; + public int cgmID; public String key;//DS key public String clazz; public RemoteMessage(){} - public RemoteMessage(int type, String cgmName, String key, String clazz) { + public RemoteMessage(int type, int cgmID, String key, String clazz) { this.type = type; - this.cgmName = cgmName; + this.cgmID = cgmID; this.key = key; this.clazz = clazz; }
--- a/src/main/java/christie/datagear/DataGearManager.java Mon Jan 15 16:27:58 2018 +0900 +++ b/src/main/java/christie/datagear/DataGearManager.java Thu Jan 18 16:49:54 2018 +0900 @@ -8,6 +8,7 @@ import java.util.HashMap; import java.util.Queue; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; /** @@ -16,10 +17,10 @@ */ public abstract class DataGearManager { TreeMap<String, LinkedBlockingQueue<DataGear>> dataGears = new TreeMap<String, LinkedBlockingQueue<DataGear>>(); - HashMap<String, Command> waitList = new HashMap<String, Command>(); + ConcurrentHashMap<String, Command> waitList = new ConcurrentHashMap<String, Command>(); public abstract void take(Command cm); public abstract void put(String key, Object data); - public abstract void put(String dsmName, String key, Object data); + public abstract void put(int cgmID, String key, Object data); public abstract void runCommand(Command cm); public abstract void addWaitList(Command command); }
--- a/src/main/java/christie/datagear/LocalDataGearManager.java Mon Jan 15 16:27:58 2018 +0900 +++ b/src/main/java/christie/datagear/LocalDataGearManager.java Thu Jan 18 16:49:54 2018 +0900 @@ -1,6 +1,7 @@ package christie.datagear; import christie.codegear.Command; +import christie.codegear.CommandType; import java.util.TreeMap; import java.util.concurrent.LinkedBlockingQueue; @@ -24,28 +25,30 @@ } @Override - public void put(String dsmName, String key, Object data) { + public void put(int cgmID, String key, Object data) { put(key, data); } public void put(String key, DataGear dg){ - if(dataGears.containsKey(key)){ - dataGears.get(key).add(dg); - } else { - LinkedBlockingQueue<DataGear> queue = new LinkedBlockingQueue<DataGear>(); - queue.add(dg); - dataGears.put(key, queue); - } - - if (waitList.containsKey(key)){ - waitList.get(key).dg.setData(dg.getData()); - runCommand(waitList.get(key)); - } + runCommand(new Command(dg, 1, "local", key, CommandType.PUT, dg.getClazz())); } + public synchronized void runCommand(Command cm){ + switch (cm.type){ + case PUT: + if(dataGears.containsKey(cm.key)){ + dataGears.get(cm.key).add(cm.dg); + } else { + LinkedBlockingQueue<DataGear> queue = new LinkedBlockingQueue<DataGear>(); + queue.add(cm.dg); + dataGears.put(cm.key, queue); + } - public void runCommand(Command cm){ - switch (cm.type){ + if (waitList.containsKey(cm.key)){ + waitList.get(cm.key).dg.setData(cm.dg.getData()); + runCommand(waitList.get(cm.key)); + } + break; case TAKE: cm.cg.idg.setInputs(cm.key, cm.dg); TreeMap<String, LinkedBlockingQueue<DataGear>> hoge = dataGears; @@ -57,6 +60,7 @@ case PEEK: cm.cg.idg.setInputs(cm.key, cm.dg); break; + } waitList.remove(cm.key); }
--- a/src/main/java/christie/datagear/RemoteDataGearManager.java Mon Jan 15 16:27:58 2018 +0900 +++ b/src/main/java/christie/datagear/RemoteDataGearManager.java Thu Jan 18 16:49:54 2018 +0900 @@ -53,15 +53,14 @@ } - public void put(String cgmName, String key, Object data){//main use - Command cmd = new Command(new DataGear(data, data.getClass()), cgmName,"local", key, CommandType.PUT, data.getClass()); + public void put(int cgmID, String key, Object data){//meta + Command cmd = new Command(new DataGear(data, data.getClass()), cgmID,"local", key, CommandType.PUT, data.getClass()); connection.write(cmd); } @Override public void put(String key, Object data) { - Command cmd = new Command(new DataGear(data, data.getClass()), "first","local", key, CommandType.PUT, data.getClass()); - connection.write(cmd); + put(1, key, data); } @Override
--- a/src/main/java/christie/test/Remote/CreateRemotePutTest.java Mon Jan 15 16:27:58 2018 +0900 +++ b/src/main/java/christie/test/Remote/CreateRemotePutTest.java Thu Jan 18 16:49:54 2018 +0900 @@ -5,15 +5,15 @@ public class CreateRemotePutTest extends CodeGear { - String cgmName; + int cgmID; - public CreateRemotePutTest(String cgmName) { - this.cgmName = cgmName; + public CreateRemotePutTest(int cgmID) { + this.cgmID = cgmID; } @Override - protected void run(CodeGearManager cgm) { - cgm.setup(new RemotePutTest(cgmName)); - dgm("remote").put(cgmName,"hoge", 1); + protected void run(CodeGearManager cgm) {//できるだけ並列に走らせるためにStartCodeGearには書かない + cgm.setup(new RemotePutTest(cgmID)); + dgm("remote").put(cgmID,"hoge", 1); } }
--- a/src/main/java/christie/test/Remote/RemotePutTest.java Mon Jan 15 16:27:58 2018 +0900 +++ b/src/main/java/christie/test/Remote/RemotePutTest.java Thu Jan 18 16:49:54 2018 +0900 @@ -11,18 +11,18 @@ @Take("hoge") public DataGear<Integer> hoge = new DataGear<>(); - public String cgmName; + public int cgmID; - public RemotePutTest(String cgmName){ - this.cgmName = cgmName; + public RemotePutTest(int cgmID){ + this.cgmID = cgmID; } @Override protected void run(CodeGearManager cgm) { if (hoge.getData() != 10){ System.out.println(hoge.getData()); - cgm.setup(new RemotePutTest(cgmName)); - dgm("remote").put(cgmName,"hoge", hoge.getData() + 1); + cgm.setup(new RemotePutTest(cgmID)); + dgm("remote").put(cgmID,"hoge", hoge.getData() + 1); } }
--- a/src/main/java/christie/test/Remote/StartRemoteTest.java Mon Jan 15 16:27:58 2018 +0900 +++ b/src/main/java/christie/test/Remote/StartRemoteTest.java Thu Jan 18 16:49:54 2018 +0900 @@ -3,6 +3,8 @@ import christie.codegear.CodeGearManager; import christie.codegear.StartCodeGear; +import static java.lang.Thread.sleep; + public class StartRemoteTest extends StartCodeGear{ public StartRemoteTest(CodeGearManager cgm) { @@ -10,15 +12,15 @@ } public static void main(String args[]){ - CodeGearManager cgm = createCGM("first", 10000); + CodeGearManager cgm = createCGM(10000); new StartRemoteTest(cgm); - cgm.createRemoteDGM("remote", "localhost", 10001);//CS生成とremoteを立てる順番の依存性について - cgm.setup(new CreateRemotePutTest( "second")); + cgm.createRemoteDGM("remote", "localhost", 10001); + cgm.setup(new CreateRemotePutTest( 2));//この時点でcgm"second"は作られていない→notifyAllで対処? - CodeGearManager cgm2 = createCGM("second", 10001); + CodeGearManager cgm2 = createCGM(10001); cgm2.createRemoteDGM("remote", "localhost", 10000); - cgm2.setup(new CreateRemotePutTest("first")); + cgm2.setup(new CreateRemotePutTest(1)); + } - } }
--- a/src/main/java/christie/test/TestLocal/StartTest.java Mon Jan 15 16:27:58 2018 +0900 +++ b/src/main/java/christie/test/TestLocal/StartTest.java Thu Jan 18 16:49:54 2018 +0900 @@ -11,7 +11,7 @@ } public static void main(String args[]){ - StartTest start = new StartTest(createCGM("first")); + StartTest start = new StartTest(createCGM(10000)); } @Override