changeset 455:b004f62b83e5 dispose

refactor (remove quick method from DataSegmentManager and use flag)
author sugi
date Sun, 02 Nov 2014 18:07:43 +0900 (2014-11-02)
parents f8a8f869f016
children 212a81cf7a86
files src/main/java/alice/codesegment/InputDataSegment.java src/main/java/alice/codesegment/OutputDataSegment.java src/main/java/alice/daemon/Connection.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/daemon/IncomingUdpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/DataSegmentManager.java src/main/java/alice/datasegment/LocalDataSegmentManager.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java
diffstat 9 files changed, 81 insertions(+), 122 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/InputDataSegment.java	Tue Oct 28 17:34:26 2014 +0900
+++ b/src/main/java/alice/codesegment/InputDataSegment.java	Sun Nov 02 18:07:43 2014 +0900
@@ -7,6 +7,7 @@
 import alice.datasegment.DataSegment;
 import alice.datasegment.ReceiveData;
 import alice.datasegment.Receiver;
+import alice.datasegment.SendOption;
 
 /**
  * InputDataSegment Manager
@@ -31,37 +32,41 @@
     public void quickPeek(Receiver receiver) {
         cs.list.add(receiver);
         if (receiver.managerKey==null){
-            DataSegment.getLocal().peek(receiver, cs);
+            DataSegment.getLocal().peek(receiver, cs, null);
         } else {
-            DataSegment.get(receiver.managerKey).quickPeek(receiver ,cs);
+            SendOption option = new SendOption(true, false);
+            DataSegment.get(receiver.managerKey).peek(receiver, cs, option);
+        }
+    }
+
+
+    public void peek(Receiver receiver) {
+        cs.list.add(receiver);
+        if (receiver.managerKey==null){
+            DataSegment.getLocal().peek(receiver, cs, null);
+        } else {
+            SendOption option = new SendOption(false, false);
+            DataSegment.get(receiver.managerKey).peek(receiver, cs, option);
         }
     }
 
     public void quickTake(Receiver receiver) {
         cs.list.add(receiver);
         if (receiver.managerKey==null){
-            DataSegment.getLocal().quickTake(receiver, cs);
+            DataSegment.getLocal().take(receiver, cs, null);
         } else {
-            DataSegment.get(receiver.managerKey).quickTake(receiver ,cs);
+            SendOption option = new SendOption(true, false);
+            DataSegment.get(receiver.managerKey).take(receiver, cs, option);
         }
     }
 
-    public void peek(Receiver receiver) {
-        cs.list.add(receiver);
-        if (receiver.managerKey==null){
-            DataSegment.getLocal().peek(receiver, cs);
-        } else {
-            DataSegment.get(receiver.managerKey).peek(receiver, cs);
-        }
-    }
-
-
     public void take(Receiver receiver) {
         cs.list.add(receiver);
         if (receiver.managerKey==null){
-            DataSegment.getLocal().take(receiver, cs);
+            DataSegment.getLocal().take(receiver, cs, null);
         } else {
-            DataSegment.get(receiver.managerKey).take(receiver, cs);
+            SendOption option = new SendOption(false, false);
+            DataSegment.get(receiver.managerKey).take(receiver, cs, option);
         }
     }
 
--- a/src/main/java/alice/codesegment/OutputDataSegment.java	Tue Oct 28 17:34:26 2014 +0900
+++ b/src/main/java/alice/codesegment/OutputDataSegment.java	Sun Nov 02 18:07:43 2014 +0900
@@ -3,24 +3,26 @@
 import alice.datasegment.CommandType;
 import alice.datasegment.DataSegment;
 import alice.datasegment.Receiver;
+import alice.datasegment.SendOption;
 
 public class OutputDataSegment {
+    private boolean compressFlag = false;
 
     /**
      * for local
      */
 
     public void flip(Receiver receiver) {
-        DataSegment.getLocal().put(receiver.key, receiver.getObj());
+        DataSegment.getLocal().put(receiver.key, receiver.getObj(), null);
     }
 
     public void flip(Receiver receiver, CommandType type) {
         switch (type) {
         case PUT: 
-            DataSegment.getLocal().put(receiver.key, receiver.getObj());
+            DataSegment.getLocal().put(receiver.key, receiver.getObj(), null);
             break;
         case UPDATE:
-            DataSegment.getLocal().update(receiver.key, receiver.getObj());
+            DataSegment.getLocal().update(receiver.key, receiver.getObj(), null);
             break;
         default:
             break;
@@ -28,19 +30,11 @@
     }
 
     public void put(String key, Object val) {
-        DataSegment.getLocal().put(key, val);
-    }
-
-    public void quickPut(String key, Object val) {
-        put(key, val);
+        DataSegment.getLocal().put(key, val, null);
     }
 
     public void update(String key, Object val) {
-        DataSegment.getLocal().update(key, val);
-    }
-
-    public void quickuUpdate(String key, Object val) {
-        update(key, val);
+        DataSegment.getLocal().update(key, val, null);
     }
 
     /**
@@ -48,7 +42,8 @@
      */
     public void put(String managerKey, String key, Object val) {
         if (!managerKey.equals("local")){
-            DataSegment.get(managerKey).put(key,val);
+            SendOption option = new SendOption(false, compressFlag());
+            DataSegment.get(managerKey).put(key, val, option);
         } else {
             put(key, val);
         }
@@ -56,7 +51,8 @@
 
     public void quickPut(String managerKey, String key, Object val) {
         if (!managerKey.equals("local")){
-            DataSegment.get(managerKey).quickPut(key, val);
+            SendOption option = new SendOption(true, compressFlag());
+            DataSegment.get(managerKey).put(key, val, option);
         } else {
             put(key, val);
         }
@@ -64,7 +60,8 @@
 
     public void update(String managerKey, String key, Object val) {
         if (!managerKey.equals("local")){
-            DataSegment.get(managerKey).update(key, val);
+            SendOption option = new SendOption(false, compressFlag());
+            DataSegment.get(managerKey).update(key, val, option);
         } else {
             update(key, val);
         }
@@ -72,7 +69,8 @@
 
     public void quickUpdate(String managerKey, String key, Object val) {
         if (!managerKey.equals("local")){
-            DataSegment.get(managerKey).update(key, val);
+            SendOption option = new SendOption(true, compressFlag());
+            DataSegment.get(managerKey).update(key, val, option);
         } else {
             update(key, val);
         }
@@ -118,4 +116,12 @@
     public void shutdown(String managerKey){
         DataSegment.get(managerKey).shutdown();
     }
+
+    public boolean compressFlag() {
+        return compressFlag;
+    }
+
+    public void setCompressFlag(boolean cFlag) {
+        compressFlag = cFlag;
+    }
 }
--- a/src/main/java/alice/daemon/Connection.java	Tue Oct 28 17:34:26 2014 +0900
+++ b/src/main/java/alice/daemon/Connection.java	Sun Nov 02 18:07:43 2014 +0900
@@ -63,7 +63,7 @@
 
     public void putConnectionInfo() {
         ConnectionInfo c = new ConnectionInfo(socket.getInetAddress().toString() ,socket.getPort());
-        DataSegment.getLocal().put("disconnect", c);
+        DataSegment.getLocal().put("disconnect", c, null);
 
     }
 }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java	Tue Oct 28 17:34:26 2014 +0900
+++ b/src/main/java/alice/daemon/IncomingTcpConnection.java	Sun Nov 02 18:07:43 2014 +0900
@@ -87,7 +87,7 @@
                     DataSegment.get(reverseKey).response(msg.key);
                     break;
                 case RESPONSE:
-                    DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()));
+                    DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()), null);
                     break;
                 default:
                     break;
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java	Tue Oct 28 17:34:26 2014 +0900
+++ b/src/main/java/alice/daemon/IncomingUdpConnection.java	Sun Nov 02 18:07:43 2014 +0900
@@ -77,7 +77,7 @@
                     DataSegment.get(reverseKey).response(msg.key);
                     break;
                 case RESPONSE:
-                    DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()));
+                    DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()), null);
                     break;
                 default:
                     break;
--- a/src/main/java/alice/datasegment/Command.java	Tue Oct 28 17:34:26 2014 +0900
+++ b/src/main/java/alice/datasegment/Command.java	Sun Nov 02 18:07:43 2014 +0900
@@ -85,10 +85,7 @@
                 if (!serializeFlag) {
                     data = (byte[]) val;
                 } else {
-                    long start = System.currentTimeMillis();
                     data = msg.write(val);
-                    long end = System.currentTimeMillis();
-                    System.out.println("convert DataSegment" +(end - start));
                 }
                 if (compressFlag) {
                     data = zip(data);
@@ -98,10 +95,7 @@
                 buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
                 buf.put(header);
                 buf.put(dataSize);
-                long start = System.currentTimeMillis();
                 buf.put(data);
-                long end = System.currentTimeMillis();
-                System.out.println("put DataSegment" +(end - start));
                 break;
             case REPLY: // only serialize
                 if (serializeFlag) {
--- a/src/main/java/alice/datasegment/DataSegmentManager.java	Tue Oct 28 17:34:26 2014 +0900
+++ b/src/main/java/alice/datasegment/DataSegmentManager.java	Sun Nov 02 18:07:43 2014 +0900
@@ -50,15 +50,10 @@
         }
     }
 
-    public abstract void put(String key, Object val);
-    public abstract void update(String key, Object val);
-    public abstract void peek(Receiver receiver, CodeSegment cs);
-    public abstract void take(Receiver receiver, CodeSegment cs);
-
-    public abstract void quickPut(String key, Object val);
-    public abstract void quickUpdate(String key, Object val);
-    public abstract void quickPeek(Receiver receiver, CodeSegment cs);
-    public abstract void quickTake(Receiver receiver, CodeSegment cs);
+    public abstract void put(String key, Object val, SendOption option);
+    public abstract void update(String key, Object val, SendOption option);
+    public abstract void peek(Receiver receiver, CodeSegment cs, SendOption option);
+    public abstract void take(Receiver receiver, CodeSegment cs, SendOption option);
 
     public abstract void remove(String key);
     public abstract void shutdown();
--- a/src/main/java/alice/datasegment/LocalDataSegmentManager.java	Tue Oct 28 17:34:26 2014 +0900
+++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java	Sun Nov 02 18:07:43 2014 +0900
@@ -65,7 +65,7 @@
     }
 
     @Override
-    public void put(String key, Object val) {
+    public void put(String key, Object val, SendOption option) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
         Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, reverseKey);
         dataSegmentKey.runCommand(cmd);
@@ -73,17 +73,12 @@
             logger.debug(cmd.getCommandString());
     }
 
-    @Override
-    public void quickPut(String key, Object val) {
-        put(key, val);
-    }
-
     /**
      * Enqueue update command to the queue of each DataSegment key
      */
 
     @Override
-    public void update(String key, Object val) {
+    public void update(String key, Object val, SendOption option) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
         Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey);
         dataSegmentKey.runCommand(cmd);
@@ -92,12 +87,7 @@
     }
 
     @Override
-    public void quickUpdate(String key, Object val) {
-        update(key, val);
-    }	
-
-    @Override
-    public void take(Receiver receiver, CodeSegment cs) {
+    public void take(Receiver receiver, CodeSegment cs, SendOption option) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
         int seq = this.seq.getAndIncrement();
         Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
@@ -107,12 +97,7 @@
     }
 
     @Override
-    public void quickTake(Receiver receiver, CodeSegment cs) {
-        take(receiver, cs);		
-    }
-
-    @Override
-    public void peek(Receiver receiver, CodeSegment cs) {
+    public void peek(Receiver receiver, CodeSegment cs, SendOption option) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
         int seq = this.seq.getAndIncrement();
         Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
@@ -122,11 +107,6 @@
     }
 
     @Override
-    public void quickPeek(Receiver receiver, CodeSegment cs) {
-        peek(receiver, cs);
-    }
-
-    @Override
     public void remove(String key) {
         DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
         Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null);
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Tue Oct 28 17:34:26 2014 +0900
+++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java	Sun Nov 02 18:07:43 2014 +0900
@@ -51,82 +51,61 @@
      * send put command to target DataSegment
      */
     @Override
-    public void put(String key, Object val) {
+    public void put(String key, Object val, SendOption option) {
         Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, "");
         cmd.setSerializeFlag(true);
-        connection.sendCommand(cmd); // put command on the transmission thread
+        if (option.isQuick()){
+            connection.write(cmd); // put command is executed right now
+        } else {
+            connection.sendCommand(cmd); // put command on the transmission thread            
+        }
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
     }
 
     @Override
-    public void quickPut(String key, Object val) {
-        Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, "");
-        cmd.setSerializeFlag(true);
-        connection.write(cmd); // put command is executed right now
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
-    }
-
-    @Override
-    public void update(String key, Object val) {
+    public void update(String key, Object val, SendOption option) {
         Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, "");
         cmd.setSerializeFlag(true);
-        connection.sendCommand(cmd);
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
-    }
-
-    @Override
-    public void quickUpdate(String key, Object val) {
-        Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, "");
-        cmd.setSerializeFlag(true);
-        connection.write(cmd);
+        if (option.isQuick()){
+            connection.write(cmd);
+        } else {
+            connection.sendCommand(cmd);
+        }
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
     }
 
     @Override
-    public void take(Receiver receiver, CodeSegment cs) {
+    public void take(Receiver receiver, CodeSegment cs, SendOption option) {
         int seq = this.seq.getAndIncrement();
         Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        cmd.setQuickFlag(option.isQuick());
         seqHash.put(seq, cmd);
-        connection.sendCommand(cmd);
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
-    }
-
-    public void quickTake(Receiver receiver, CodeSegment cs) {
-        int seq = this.seq.getAndIncrement();
-        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
-        cmd.setQuickFlag(true);
-        seqHash.put(seq, cmd);
-        connection.write(cmd);
+        if (option.isQuick()){
+            connection.write(cmd);
+        } else {
+            connection.sendCommand(cmd);
+        }
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
     }
 
     @Override
-    public void peek(Receiver receiver, CodeSegment cs) {
+    public void peek(Receiver receiver, CodeSegment cs, SendOption option) {
         int seq = this.seq.getAndIncrement();
         Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        cmd.setQuickFlag(option.isQuick());
         seqHash.put(seq, cmd);
-        connection.sendCommand(cmd);
+        if (option.isQuick()){
+            connection.write(cmd);
+        } else {
+            connection.sendCommand(cmd);
+        }
         if (logger.isDebugEnabled())
             logger.debug(cmd.getCommandString());
     }
 
-    public void quickPeek(Receiver receiver, CodeSegment cs) {
-        int seq = this.seq.getAndIncrement();
-        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
-        cmd.setQuickFlag(true);
-        seqHash.put(seq, cmd);
-        connection.write(cmd);
-        if (logger.isDebugEnabled())
-            logger.debug(cmd.getCommandString());
-
-    }
-
     @Override
     public void remove(String key) {
         Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, "");