diff src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java @ 529:cb7c31848d16 dispose

add CompressedDSMs
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Fri, 01 May 2015 18:19:16 +0900
parents
children 4aeebea0c9b5
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java	Fri May 01 18:19:16 2015 +0900
@@ -0,0 +1,172 @@
+package alice.datasegment;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import alice.codesegment.CodeSegment;
+import alice.daemon.Connection;
+import alice.daemon.IncomingTcpConnection;
+import alice.daemon.OutboundTcpConnection;
+
+public class CompressedRemoteDataSegmentManager extends DataSegmentManager {
+    protected Connection connection;
+    protected Logger logger;
+
+    public CompressedRemoteDataSegmentManager(){}
+
+    public CompressedRemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) {
+        logger = Logger.getLogger(connectionKey);
+        connection = new Connection();
+        connection.name = connectionKey;
+        final CompressedRemoteDataSegmentManager manager = this;
+        //new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start();
+        new Thread("Connect-" + connectionKey) {
+            public void run() {
+                boolean connect = true;
+                do {
+                    try {
+                        SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port));
+                        connection.socket = sc.socket();
+                        connection.socket.setTcpNoDelay(true);
+                        connect = false;
+                        logger.info("Connect to " + connection.getInfoString());
+                    } catch (IOException e) {
+                        try {
+                            Thread.sleep(50);
+                        } catch (InterruptedException e1) {
+                            e1.printStackTrace();
+                        }
+                    }
+                } while (connect);
+                IncomingTcpConnection in = new IncomingTcpConnection(connection, manager, reverseKey);
+                in.setName(reverseKey+"-IncomingTcp");
+                in.setPriority(MAX_PRIORITY);
+                in.start();
+                OutboundTcpConnection out = new OutboundTcpConnection(connection);
+                out.setName(connectionKey+"-OutboundTcp");
+                out.setPriority(MAX_PRIORITY);
+                out.start();
+            }
+        }.start();
+    }
+
+    /**
+     * send put command to target DataSegment
+     */
+    @Override
+    public void put(String key, ReceiveData rData, SendOption option) {
+        try {
+            rData.zip();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, "");
+        cmd.setCompressFlag(option.isCompress());//true
+
+        if (option.isQuick()){
+            connection.write(cmd); // put command is executed right now
+        } else {
+            connection.sendCommand(cmd); // put command on the transmission thread
+        }
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+    }
+
+    @Override
+    public void update(String key, ReceiveData rData, SendOption option) {
+        try {
+            rData.zip();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, "");
+        cmd.setCompressFlag(option.isCompress());
+
+        if (option.isQuick()){
+            connection.write(cmd);
+        } else {
+            connection.sendCommand(cmd);
+        }
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+    }
+
+    @Override
+    public void take(Receiver receiver, CodeSegment cs, SendOption option) {
+        int seq = this.seq.getAndIncrement();
+        Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        cmd.setQuickFlag(option.isQuick());
+        seqHash.put(seq, cmd);
+        if (option.isQuick()){
+            connection.write(cmd);
+        } else {
+            connection.sendCommand(cmd);
+        }
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+    }
+
+    @Override
+    public void peek(Receiver receiver, CodeSegment cs, SendOption option) {
+        int seq = this.seq.getAndIncrement();
+        Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+        cmd.setQuickFlag(option.isQuick());
+        seqHash.put(seq, cmd);
+        if (option.isQuick()){
+            connection.write(cmd);
+        } else {
+            connection.sendCommand(cmd);
+        }
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+    }
+
+    @Override
+    public void remove(String key) {
+        Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, "");
+        connection.sendCommand(cmd);
+        if (logger.isDebugEnabled())
+            logger.debug(cmd.getCommandString());
+    }
+
+    @Override
+    public void finish() {
+        Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, "");
+        connection.sendCommand(cmd);
+    }
+
+    @Override
+    public void ping(String returnKey) {
+        Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, "");
+        connection.write(cmd);
+    }
+
+    @Override
+    public void response(String returnKey) {
+        Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, "");
+        connection.write(cmd);
+    }
+
+    @Override
+    public void close() {
+        Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, "");
+        connection.sendManager = false;
+        connection.sendCommand(cmd);
+    }
+
+    @Override
+    public void shutdown() {
+        connection.close();
+        LinkedBlockingQueue<Command> queue = connection.sendQueue;
+        if (!queue.isEmpty()) queue.clear();
+    }
+
+    @Override
+    public void setSendError(boolean b) {
+        connection.sendManager = b;
+    }
+}