Mercurial > hg > Database > Christie
changeset 18:b8dc461b29f4
waitList use Queue
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Sun, 21 Jan 2018 23:03:36 +0900 |
parents | 59fabebb67d8 |
children | 0c76ab6b586a |
files | src/main/java/christie/codegear/CodeGear.java src/main/java/christie/codegear/CodeGearManager.java src/main/java/christie/codegear/Command.java src/main/java/christie/codegear/InputDataGear.java src/main/java/christie/daemon/IncomingTcpConnection.java src/main/java/christie/datagear/DataGear.java src/main/java/christie/datagear/DataGearManager.java src/main/java/christie/datagear/LocalDataGearManager.java src/main/java/christie/test/TestLocal/StartTest.java src/main/java/christie/test/TestLocal/TestCodeGear.java |
diffstat | 10 files changed, 77 insertions(+), 40 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/christie/codegear/CodeGear.java Thu Jan 18 16:49:54 2018 +0900 +++ b/src/main/java/christie/codegear/CodeGear.java Sun Jan 21 23:03:36 2018 +0900 @@ -15,12 +15,12 @@ * Annotationからのinputコマンドの生成、揃ったDataGearの値を返す */ public abstract class CodeGear{ - public InputDataGear idg = new InputDataGear(this); - public OutputDataGear odg = new OutputDataGear(this); - public ArrayList<Command> commandList = new ArrayList<Command>(); - public CodeGearManager cgm; - public DataGearManager localDGM; - public CodeGearExecutor cge; + private InputDataGear idg = new InputDataGear(this); + private OutputDataGear odg = new OutputDataGear(this); + private ArrayList<Command> commandList = new ArrayList<Command>(); + private CodeGearManager cgm; + private DataGearManager localDGM; + private CodeGearExecutor cge; protected abstract void run(CodeGearManager cgm); @@ -43,10 +43,6 @@ idg.finishInput(cgm, commandList); } - public DataGearManager dgm(String dgmName) { - return cgm.getDGM(dgmName); - } - public void checkAndSetCommand(Field field, String name){ if (!field.getName().equals(name)){ @@ -66,4 +62,24 @@ commandList.add(new Command(this, dg, cgm.cgmID,"local", name, CommandType.TAKE)); } + + public DataGearManager getLocalDGM() { + return localDGM; + } + + public DataGearManager dgm(String dgmName) { + return cgm.getDGM(dgmName); + } + + public InputDataGear getIdg() { + return idg; + } + + public OutputDataGear getOdg() { + return odg; + } + + public CodeGearExecutor getCge() { + return cge; + } }
--- a/src/main/java/christie/codegear/CodeGearManager.java Thu Jan 18 16:49:54 2018 +0900 +++ b/src/main/java/christie/codegear/CodeGearManager.java Sun Jan 21 23:03:36 2018 +0900 @@ -51,7 +51,7 @@ } public void submit(CodeGear cg){ - threadPoolExecutor.execute(cg.cge); + threadPoolExecutor.execute(cg.getCge()); } public void setup(CodeGear cg){
--- a/src/main/java/christie/codegear/Command.java Thu Jan 18 16:49:54 2018 +0900 +++ b/src/main/java/christie/codegear/Command.java Sun Jan 21 23:03:36 2018 +0900 @@ -15,7 +15,6 @@ public String key; public CommandType type; public Class clazz = null; - private MessagePack packer = new MessagePack(); //for put public Command(DataGear dg, int cgmID, String dgmName, String key, CommandType type, Class clazz){ @@ -44,6 +43,7 @@ byte[] command = null; byte[] data = null; byte[] dataSize = null; + MessagePack packer = new MessagePack(); switch (type) { case PUT:
--- a/src/main/java/christie/codegear/InputDataGear.java Thu Jan 18 16:49:54 2018 +0900 +++ b/src/main/java/christie/codegear/InputDataGear.java Sun Jan 21 23:03:36 2018 +0900 @@ -7,16 +7,19 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; /** * Created by e125769 on 12/7/17. * inputDataGearの待ち合わせの管理 */ public class InputDataGear { - public HashMap<String, DataGear> inputValue = new HashMap<String, DataGear>();//受け皿 + public ConcurrentHashMap<String, DataGear> inputValue = new ConcurrentHashMap<String, DataGear>();//受け皿 public CodeGearManager cgm; public CodeGear cg; - private int count = 0; + private AtomicInteger count = new AtomicInteger(0); public InputDataGear(CodeGear cg){ this.cg = cg; @@ -24,9 +27,9 @@ void finishInput(CodeGearManager cgm, ArrayList<Command> commandList){ this.cgm = cgm; - count = commandList.size(); + count = new AtomicInteger(commandList.size()); - if(count == 0){ + if(count.get() == 0){ submitCG(); } @@ -40,8 +43,8 @@ count(); } - public void count(){//Commandが実行されるたびにデクリメント - if (--count == 0){ + public synchronized void count(){//Commandが実行されるたびにデクリメント + if (count.decrementAndGet() == 0){ setInputValue(); submitCG(); }
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java Thu Jan 18 16:49:54 2018 +0900 +++ b/src/main/java/christie/daemon/IncomingTcpConnection.java Sun Jan 21 23:03:36 2018 +0900 @@ -46,13 +46,12 @@ case PUT: connection.socket.getInputStream().read(data); try { - DataGear dg = new DataGear(); + DataGear dg = new DataGear("hoge", String.class); dg.setMessagePack(data, Class.forName(msg.clazz)); if (cgms.containsKey(msg.cgmID)){ cgms.get(msg.cgmID).getLocalDGM().put(msg.key, dg); } else { - //addwaitList? newThread notify()? throw new IllegalArgumentException("DGM_ID:" + msg.cgmID + "is not found"); }
--- a/src/main/java/christie/datagear/DataGear.java Thu Jan 18 16:49:54 2018 +0900 +++ b/src/main/java/christie/datagear/DataGear.java Sun Jan 21 23:03:36 2018 +0900 @@ -20,12 +20,14 @@ @SuppressWarnings("unchecked") Class<T> type = (Class<T>) dg.getClass().getComponentType(); this.clazz = type; + //System.out.println("create:"+this); } - public void setMessagePack(byte[] messagePack, Class clazz){ + public synchronized void setMessagePack(byte[] messagePack, Class clazz){ this.data = null; setClazz(clazz); this.messagePack = messagePack; + //System.out.println("set:"+messagePack+" this:"+this); } public void setData(T data) { @@ -57,9 +59,10 @@ return clazz; }; - public T getData(){ + public synchronized T getData(){ if (data == null){ try { + //System.out.println("get:" +messagePack + " this:" + this); setData(packer.read(messagePack, clazz)); } catch (IOException e) { e.printStackTrace();
--- a/src/main/java/christie/datagear/DataGearManager.java Thu Jan 18 16:49:54 2018 +0900 +++ b/src/main/java/christie/datagear/DataGearManager.java Sun Jan 21 23:03:36 2018 +0900 @@ -16,8 +16,8 @@ * PUT/TAKEなどDataGearManagerに対するComandの実行。 */ public abstract class DataGearManager { - TreeMap<String, LinkedBlockingQueue<DataGear>> dataGears = new TreeMap<String, LinkedBlockingQueue<DataGear>>(); - ConcurrentHashMap<String, Command> waitList = new ConcurrentHashMap<String, Command>(); + protected TreeMap<String, LinkedBlockingQueue<DataGear>> dataGears = new TreeMap<String, LinkedBlockingQueue<DataGear>>(); + protected ConcurrentHashMap<String, LinkedBlockingQueue<Command>> waitList = new ConcurrentHashMap<>(); public abstract void take(Command cm); public abstract void put(String key, Object data); public abstract void put(int cgmID, String key, Object data);
--- a/src/main/java/christie/datagear/LocalDataGearManager.java Thu Jan 18 16:49:54 2018 +0900 +++ b/src/main/java/christie/datagear/LocalDataGearManager.java Sun Jan 21 23:03:36 2018 +0900 @@ -3,7 +3,9 @@ import christie.codegear.Command; import christie.codegear.CommandType; +import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; /** @@ -45,29 +47,43 @@ } if (waitList.containsKey(cm.key)){ - waitList.get(cm.key).dg.setData(cm.dg.getData()); - runCommand(waitList.get(cm.key)); + runCommand(waitList.get(cm.key).poll());//待ちコマンドの先頭をとる + if (waitList.get(cm.key).isEmpty()){ + waitList.remove(cm.key); + } } break; case TAKE: - cm.cg.idg.setInputs(cm.key, cm.dg); - TreeMap<String, LinkedBlockingQueue<DataGear>> hoge = dataGears; - dataGears.get(cm.key).poll(); - if (dataGears.get(cm.key).isEmpty()){ - dataGears.remove(cm.key); - } - break; + dataGears.get(cm.key).peek(); + cm.dg.setData(dataGears.get(cm.key).poll().getData()); + if (dataGears.get(cm.key).isEmpty()){ + dataGears.remove(cm.key); + } + + cm.cg.getIdg().setInputs(cm.key, cm.dg); + break; case PEEK: - cm.cg.idg.setInputs(cm.key, cm.dg); - break; + cm.dg.setData(dataGears.get(cm.key).peek().getData()); + cm.cg.getIdg().setInputs(cm.key, cm.dg); + break; } - waitList.remove(cm.key); } @Override - public void addWaitList(Command command) { - waitList.put(command.key, command); + public void addWaitList(Command cm) { + if(waitList.containsKey(cm.key)){ + waitList.get(cm.key).add(cm); + } else { + LinkedBlockingQueue<Command> queue = new LinkedBlockingQueue<>(); + queue.add(cm); + waitList.put(cm.key, queue); + } } + private void deleteElement(Map<String, LinkedBlockingQueue<Object>> map, String key){ + if (map.get(key).isEmpty()){ + map.remove(key); + } + } }
--- a/src/main/java/christie/test/TestLocal/StartTest.java Thu Jan 18 16:49:54 2018 +0900 +++ b/src/main/java/christie/test/TestLocal/StartTest.java Sun Jan 21 23:03:36 2018 +0900 @@ -18,6 +18,6 @@ protected void run(CodeGearManager cgm) { cgm.setup(new TestCodeGear()); - localDGM.put("hoge", 1); + getLocalDGM().put("hoge", 1); } }
--- a/src/main/java/christie/test/TestLocal/TestCodeGear.java Thu Jan 18 16:49:54 2018 +0900 +++ b/src/main/java/christie/test/TestLocal/TestCodeGear.java Sun Jan 21 23:03:36 2018 +0900 @@ -19,7 +19,7 @@ if (hoge.getData()!= 10){ cgm.setup(new TestCodeGear()); - localDGM.put("hoge", hoge.getData() + 1); + getLocalDGM().put("hoge", hoge.getData() + 1); } }