changeset 538:8c17a9e66cc7 dispose work-compressedDSM

Compressed LDSM refactoring & flip refactoring
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Fri, 19 Jun 2015 14:06:10 +0900 (2015-06-19)
parents 8f949fa80653
children 0832af83583f 3841f137cbae 767d93626b88 5a9b83c64ddf ae13e4854c55
files src/main/java/alice/codesegment/OutputDataSegment.java src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java src/main/java/alice/datasegment/DataSegment.java src/main/java/alice/datasegment/LocalDataSegmentManager.java
diffstat 4 files changed, 52 insertions(+), 75 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/OutputDataSegment.java	Mon Jun 15 19:34:00 2015 +0900
+++ b/src/main/java/alice/codesegment/OutputDataSegment.java	Fri Jun 19 14:06:10 2015 +0900
@@ -23,7 +23,16 @@
         }
     }
 
-    public void flip(Receiver receiver, CommandType type) {// ToDo: add remote
+    public void flip(String managerKey, String key, Receiver receiver){
+        if (receiver.isCompressed()){
+            DataSegment.get("compressed" + managerKey).put(key, receiver.getReceiveData(), false);
+        } else {
+            DataSegment.get(managerKey).put(key, receiver.getReceiveData(), false);
+        }
+
+    }
+
+    public void flip(Receiver receiver, CommandType type) {
         switch (type) {
             case PUT:
                 if (receiver.isCompressed()){
--- a/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java	Mon Jun 15 19:34:00 2015 +0900
+++ b/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java	Fri Jun 19 14:06:10 2015 +0900
@@ -12,18 +12,12 @@
 
 public class CompressedLocalDataSegmentManager extends DataSegmentManager {
 
-    private String reverseKey = "local";
-    private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
-    private Logger logger = Logger.getLogger("local");
+    LocalDataSegmentManager manager;
+    private String reverseKey = "compressedlocal";
 
-    private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads
-            Runtime.getRuntime().availableProcessors(),
-            Integer.MAX_VALUE, // keepAliveTime
-            TimeUnit.SECONDS,
-            new LinkedBlockingQueue<Runnable>());
-
-    public CompressedLocalDataSegmentManager() {
-        new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start();
+    public CompressedLocalDataSegmentManager(LocalDataSegmentManager manager) {
+        this.manager = manager;
+        new Thread(replyThread, "CompressedLocalDataSegmentManager-replyCommand").start();
     }
 
     public void setReverseKey(String s){
@@ -48,31 +42,19 @@
     }
 
     public void submitCommand(DataSegmentKey key, Command cmd) {
-        dataSegmentExecutor.execute(new RunCommand(key, cmd));
+        manager.submitCommand(key, cmd);
     }
 
     public DataSegmentKey getDataSegmentKey(String key) {
-        DataSegmentKey dsKey = dataSegments.get(key);
-        if (dsKey != null)
-            return dsKey;
-        if (key == null)
-            return null;
-        DataSegmentKey newDataSegmentKey = new DataSegmentKey();
-        DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
-        if (dataSegmentKey == null) {
-            dataSegmentKey = newDataSegmentKey;
-        }
-        return dataSegmentKey;
+        return manager.getDataSegmentKey(key);
     }
 
     public void removeDataSegmentKey(String key) {
-        if (key!=null)
-            dataSegments.remove(key);
+        manager.removeDataSegmentKey(key);
     }
 
     @Override
     public void put(String key, ReceiveData rData, boolean quickFlag) {
-        DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
         if (!rData.compressed()){
             try {
                 rData.zip();
@@ -84,9 +66,7 @@
         Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey);
         cmd.setCompressFlag(true);
 
-        dataSegmentKey.runCommand(cmd);
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
+        manager.put1(key, cmd);
     }
 
     /**
@@ -95,7 +75,7 @@
 
     @Override
     public void update(String key, ReceiveData rData, boolean quickFlag) {
-        DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
+
         if (!rData.compressed()){
             try {
                 rData.zip();
@@ -107,46 +87,32 @@
         Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey);
         cmd.setCompressFlag(true);
 
-        dataSegmentKey.runCommand(cmd);
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
+        manager.put1(key, cmd);
     }
 
     @Override
     public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) {
-        DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
-        int seq = this.seq.getAndIncrement();
-        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null);
         cmd.setCompressFlag(true);
 
-        dataSegmentKey.runCommand(cmd);
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
+        manager.take1(receiver, cmd);
     }
 
     @Override
     public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) {
-        DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
-        int seq = this.seq.getAndIncrement();
-        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null);
         cmd.setCompressFlag(true);
 
-        dataSegmentKey.runCommand(cmd);
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
+        manager.take1(receiver, cmd);
     }
 
     @Override
     public void remove(String key) {
-        DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-        Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null);
-        dataSegmentKey.runCommand(cmd);
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
+        manager.remove(key);
     }
 
     @Override public void finish() {
-        System.exit(0);
+        manager.finish();
     }
 
     @Override
@@ -155,13 +121,7 @@
     }
 
     public void recommand(Receiver receiver, CodeSegment cs) {
-        DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
-        int seq = this.seq.getAndIncrement();
-        Command cmd = new Command(receiver.type, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
-        dataSegmentKey.runCommand(cmd);
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
-
+        manager.recommand(receiver, cs);
     }
 
     @Override
--- a/src/main/java/alice/datasegment/DataSegment.java	Mon Jun 15 19:34:00 2015 +0900
+++ b/src/main/java/alice/datasegment/DataSegment.java	Fri Jun 19 14:06:10 2015 +0900
@@ -10,7 +10,7 @@
 
     private static DataSegment dataSegment = new DataSegment();
     private LocalDataSegmentManager local = new LocalDataSegmentManager();
-    private CompressedLocalDataSegmentManager compressedLocal = new CompressedLocalDataSegmentManager();//追加
+    private CompressedLocalDataSegmentManager compressedLocal = new CompressedLocalDataSegmentManager(local);//追加
     private ConcurrentHashMap<String, DataSegmentManager> dataSegmentManagers = new ConcurrentHashMap<String, DataSegmentManager>(); //TODO Over Head
     private ConcurrentHashMap<String, IncomingTcpConnection> acceptHash = new ConcurrentHashMap<String, IncomingTcpConnection>();
 
@@ -20,7 +20,11 @@
     }
 
     public static DataSegmentManager get(String key) {
-        return dataSegment.dataSegmentManagers.get(key);
+        if (key == null){
+            return dataSegment.dataSegmentManagers.get("local");
+        } else {
+            return dataSegment.dataSegmentManagers.get(key);
+        }
     }
 
     public static LocalDataSegmentManager getLocal() {
@@ -41,7 +45,7 @@
             System.exit(0);
         }
         RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connectionKey, reverseKey, hostName, port);
-        CompressedRemoteDataSegmentManager compressedManager = new CompressedRemoteDataSegmentManager(manager.connection);
+        CompressedRemoteDataSegmentManager compressedManager = new CompressedRemoteDataSegmentManager(manager);
 
         register(connectionKey, manager);
         register("compressed" + connectionKey, compressedManager);
--- a/src/main/java/alice/datasegment/LocalDataSegmentManager.java	Mon Jun 15 19:34:00 2015 +0900
+++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java	Fri Jun 19 14:06:10 2015 +0900
@@ -71,8 +71,12 @@
 
     @Override
     public void put(String key, ReceiveData rData, boolean quickFlag) {
+        Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey);
+        put1(key, cmd);
+    }
+
+    public void put1(String key, Command cmd) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-        Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey);
         dataSegmentKey.runCommand(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
@@ -84,18 +88,22 @@
 
     @Override
     public void update(String key, ReceiveData rData, boolean quickFlag) {
-        DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
         Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey);
-        dataSegmentKey.runCommand(cmd);
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
+        put1(key, cmd);
     }
 
     @Override
     public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) {
-        DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
+
+        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null);
+        take1(receiver, cmd);
+    }
+
+    public void take1(Receiver receiver, Command cmd) {
         int seq = this.seq.getAndIncrement();
-        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        cmd.setSeq(seq);
+        //seqHash.put(seq, cmd);
+        DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
         dataSegmentKey.runCommand(cmd);
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
@@ -103,12 +111,8 @@
 
     @Override
     public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) {
-        DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
-        int seq = this.seq.getAndIncrement();
-        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
-        dataSegmentKey.runCommand(cmd);
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
+        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null);
+        take1(receiver, cmd);
     }
 
     @Override