changeset 18:b8dc461b29f4

waitList use Queue
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Sun, 21 Jan 2018 23:03:36 +0900
parents 59fabebb67d8
children 0c76ab6b586a
files src/main/java/christie/codegear/CodeGear.java src/main/java/christie/codegear/CodeGearManager.java src/main/java/christie/codegear/Command.java src/main/java/christie/codegear/InputDataGear.java src/main/java/christie/daemon/IncomingTcpConnection.java src/main/java/christie/datagear/DataGear.java src/main/java/christie/datagear/DataGearManager.java src/main/java/christie/datagear/LocalDataGearManager.java src/main/java/christie/test/TestLocal/StartTest.java src/main/java/christie/test/TestLocal/TestCodeGear.java
diffstat 10 files changed, 77 insertions(+), 40 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/christie/codegear/CodeGear.java	Thu Jan 18 16:49:54 2018 +0900
+++ b/src/main/java/christie/codegear/CodeGear.java	Sun Jan 21 23:03:36 2018 +0900
@@ -15,12 +15,12 @@
  * Annotationからのinputコマンドの生成、揃ったDataGearの値を返す
  */
 public abstract class CodeGear{
-    public InputDataGear idg = new InputDataGear(this);
-    public OutputDataGear odg = new OutputDataGear(this);
-    public ArrayList<Command> commandList = new ArrayList<Command>();
-    public CodeGearManager cgm;
-    public DataGearManager localDGM;
-    public CodeGearExecutor cge;
+    private InputDataGear idg = new InputDataGear(this);
+    private OutputDataGear odg = new OutputDataGear(this);
+    private ArrayList<Command> commandList = new ArrayList<Command>();
+    private CodeGearManager cgm;
+    private DataGearManager localDGM;
+    private CodeGearExecutor cge;
 
     protected abstract void run(CodeGearManager cgm);
 
@@ -43,10 +43,6 @@
         idg.finishInput(cgm, commandList);
     }
 
-    public DataGearManager dgm(String dgmName) {
-        return cgm.getDGM(dgmName);
-    }
-
     public void checkAndSetCommand(Field field, String name){
 
         if (!field.getName().equals(name)){
@@ -66,4 +62,24 @@
 
         commandList.add(new Command(this, dg, cgm.cgmID,"local", name, CommandType.TAKE));
     }
+
+    public DataGearManager getLocalDGM() {
+        return localDGM;
+    }
+
+    public DataGearManager dgm(String dgmName) {
+        return cgm.getDGM(dgmName);
+    }
+
+    public InputDataGear getIdg() {
+        return idg;
+    }
+
+    public OutputDataGear getOdg() {
+        return odg;
+    }
+
+    public CodeGearExecutor getCge() {
+        return cge;
+    }
 }
--- a/src/main/java/christie/codegear/CodeGearManager.java	Thu Jan 18 16:49:54 2018 +0900
+++ b/src/main/java/christie/codegear/CodeGearManager.java	Sun Jan 21 23:03:36 2018 +0900
@@ -51,7 +51,7 @@
     }
 
     public void submit(CodeGear cg){
-        threadPoolExecutor.execute(cg.cge);
+        threadPoolExecutor.execute(cg.getCge());
     }
 
     public void setup(CodeGear cg){
--- a/src/main/java/christie/codegear/Command.java	Thu Jan 18 16:49:54 2018 +0900
+++ b/src/main/java/christie/codegear/Command.java	Sun Jan 21 23:03:36 2018 +0900
@@ -15,7 +15,6 @@
     public String key;
     public CommandType type;
     public Class clazz = null;
-    private MessagePack packer = new MessagePack();
 
     //for put
     public Command(DataGear dg, int cgmID, String dgmName, String key, CommandType type, Class clazz){
@@ -44,6 +43,7 @@
             byte[] command = null;
             byte[] data = null;
             byte[] dataSize = null;
+            MessagePack packer = new MessagePack();
 
             switch (type) {
                 case PUT:
--- a/src/main/java/christie/codegear/InputDataGear.java	Thu Jan 18 16:49:54 2018 +0900
+++ b/src/main/java/christie/codegear/InputDataGear.java	Sun Jan 21 23:03:36 2018 +0900
@@ -7,16 +7,19 @@
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Created by e125769 on 12/7/17.
  * inputDataGearの待ち合わせの管理
  */
 public class InputDataGear {
-    public HashMap<String, DataGear> inputValue = new HashMap<String, DataGear>();//受け皿
+    public ConcurrentHashMap<String, DataGear> inputValue = new ConcurrentHashMap<String, DataGear>();//受け皿
     public CodeGearManager cgm;
     public CodeGear cg;
-    private int count = 0;
+    private AtomicInteger count = new AtomicInteger(0);
 
     public InputDataGear(CodeGear cg){
         this.cg = cg;
@@ -24,9 +27,9 @@
 
     void finishInput(CodeGearManager cgm, ArrayList<Command> commandList){
         this.cgm = cgm;
-        count = commandList.size();
+        count = new AtomicInteger(commandList.size());
 
-        if(count == 0){
+        if(count.get() == 0){
             submitCG();
         }
 
@@ -40,8 +43,8 @@
         count();
     }
 
-    public void count(){//Commandが実行されるたびにデクリメント
-        if (--count == 0){
+    public synchronized void count(){//Commandが実行されるたびにデクリメント
+        if (count.decrementAndGet() == 0){
             setInputValue();
             submitCG();
         }
--- a/src/main/java/christie/daemon/IncomingTcpConnection.java	Thu Jan 18 16:49:54 2018 +0900
+++ b/src/main/java/christie/daemon/IncomingTcpConnection.java	Sun Jan 21 23:03:36 2018 +0900
@@ -46,13 +46,12 @@
                     case PUT:
                         connection.socket.getInputStream().read(data);
                         try {
-                            DataGear dg = new DataGear();
+                            DataGear dg = new DataGear("hoge", String.class);
                             dg.setMessagePack(data, Class.forName(msg.clazz));
 
                             if (cgms.containsKey(msg.cgmID)){
                                 cgms.get(msg.cgmID).getLocalDGM().put(msg.key, dg);
                             } else {
-                                //addwaitList? newThread notify()?
                                 throw new IllegalArgumentException("DGM_ID:" + msg.cgmID + "is not found");
                             }
 
--- a/src/main/java/christie/datagear/DataGear.java	Thu Jan 18 16:49:54 2018 +0900
+++ b/src/main/java/christie/datagear/DataGear.java	Sun Jan 21 23:03:36 2018 +0900
@@ -20,12 +20,14 @@
         @SuppressWarnings("unchecked")
         Class<T> type = (Class<T>) dg.getClass().getComponentType();
         this.clazz = type;
+        //System.out.println("create:"+this);
     }
 
-    public void setMessagePack(byte[] messagePack, Class clazz){
+    public synchronized void setMessagePack(byte[] messagePack, Class clazz){
         this.data = null;
         setClazz(clazz);
         this.messagePack = messagePack;
+        //System.out.println("set:"+messagePack+" this:"+this);
     }
 
     public void setData(T data) {
@@ -57,9 +59,10 @@
         return clazz;
     };
 
-    public T getData(){
+    public synchronized T getData(){
         if (data == null){
             try {
+                //System.out.println("get:" +messagePack + " this:" + this);
                 setData(packer.read(messagePack, clazz));
             } catch (IOException e) {
                 e.printStackTrace();
--- a/src/main/java/christie/datagear/DataGearManager.java	Thu Jan 18 16:49:54 2018 +0900
+++ b/src/main/java/christie/datagear/DataGearManager.java	Sun Jan 21 23:03:36 2018 +0900
@@ -16,8 +16,8 @@
  * PUT/TAKEなどDataGearManagerに対するComandの実行。
  */
 public abstract class DataGearManager {
-    TreeMap<String, LinkedBlockingQueue<DataGear>> dataGears = new TreeMap<String, LinkedBlockingQueue<DataGear>>();
-    ConcurrentHashMap<String, Command> waitList = new ConcurrentHashMap<String, Command>();
+    protected TreeMap<String, LinkedBlockingQueue<DataGear>> dataGears = new TreeMap<String, LinkedBlockingQueue<DataGear>>();
+    protected ConcurrentHashMap<String, LinkedBlockingQueue<Command>> waitList = new ConcurrentHashMap<>();
     public abstract void take(Command cm);
     public abstract void put(String key, Object data);
     public abstract void put(int cgmID, String key, Object data);
--- a/src/main/java/christie/datagear/LocalDataGearManager.java	Thu Jan 18 16:49:54 2018 +0900
+++ b/src/main/java/christie/datagear/LocalDataGearManager.java	Sun Jan 21 23:03:36 2018 +0900
@@ -3,7 +3,9 @@
 import christie.codegear.Command;
 import christie.codegear.CommandType;
 
+import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
@@ -45,29 +47,43 @@
                 }
 
                 if (waitList.containsKey(cm.key)){
-                    waitList.get(cm.key).dg.setData(cm.dg.getData());
-                    runCommand(waitList.get(cm.key));
+                    runCommand(waitList.get(cm.key).poll());//待ちコマンドの先頭をとる
+                    if (waitList.get(cm.key).isEmpty()){
+                        waitList.remove(cm.key);
+                    }
                 }
                 break;
             case TAKE:
-                    cm.cg.idg.setInputs(cm.key, cm.dg);
-                    TreeMap<String, LinkedBlockingQueue<DataGear>> hoge = dataGears;
-                    dataGears.get(cm.key).poll();
-                    if (dataGears.get(cm.key).isEmpty()){
-                        dataGears.remove(cm.key);
-                    }
-                    break;
+                dataGears.get(cm.key).peek();
+                cm.dg.setData(dataGears.get(cm.key).poll().getData());
+                if (dataGears.get(cm.key).isEmpty()){
+                    dataGears.remove(cm.key);
+                }
+
+                cm.cg.getIdg().setInputs(cm.key, cm.dg);
+                break;
             case PEEK:
-                    cm.cg.idg.setInputs(cm.key, cm.dg);
-                    break;
+                cm.dg.setData(dataGears.get(cm.key).peek().getData());
+                cm.cg.getIdg().setInputs(cm.key, cm.dg);
+                break;
 
         }
-        waitList.remove(cm.key);
     }
 
     @Override
-    public void addWaitList(Command command) {
-        waitList.put(command.key, command);
+    public void addWaitList(Command cm) {
+        if(waitList.containsKey(cm.key)){
+            waitList.get(cm.key).add(cm);
+        } else {
+            LinkedBlockingQueue<Command> queue = new LinkedBlockingQueue<>();
+            queue.add(cm);
+            waitList.put(cm.key, queue);
+        }
     }
 
+    private void deleteElement(Map<String, LinkedBlockingQueue<Object>> map, String key){
+        if (map.get(key).isEmpty()){
+            map.remove(key);
+        }
+    }
 }
--- a/src/main/java/christie/test/TestLocal/StartTest.java	Thu Jan 18 16:49:54 2018 +0900
+++ b/src/main/java/christie/test/TestLocal/StartTest.java	Sun Jan 21 23:03:36 2018 +0900
@@ -18,6 +18,6 @@
     protected void run(CodeGearManager cgm) {
         cgm.setup(new TestCodeGear());
 
-        localDGM.put("hoge", 1);
+        getLocalDGM().put("hoge", 1);
     }
 }
--- a/src/main/java/christie/test/TestLocal/TestCodeGear.java	Thu Jan 18 16:49:54 2018 +0900
+++ b/src/main/java/christie/test/TestLocal/TestCodeGear.java	Sun Jan 21 23:03:36 2018 +0900
@@ -19,7 +19,7 @@
 
         if (hoge.getData()!= 10){
             cgm.setup(new TestCodeGear());
-            localDGM.put("hoge", hoge.getData() + 1);
+            getLocalDGM().put("hoge", hoge.getData() + 1);
         }
     }