Mercurial > hg > Database > Alice
changeset 537:8f949fa80653 dispose
Compressed RDSM refactoring
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Mon, 15 Jun 2015 19:34:00 +0900 |
parents | d2f7d02c4976 |
children | 8c17a9e66cc7 |
files | src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java |
diffstat | 1 files changed, 16 insertions(+), 98 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java Mon Jun 15 19:27:06 2015 +0900 +++ b/src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java Mon Jun 15 19:34:00 2015 +0900 @@ -13,51 +13,10 @@ import alice.daemon.OutboundTcpConnection; public class CompressedRemoteDataSegmentManager extends DataSegmentManager { - protected Connection connection; - protected Logger logger; - - public CompressedRemoteDataSegmentManager(){} - - public CompressedRemoteDataSegmentManager(Connection c) { - logger = Logger.getLogger(c.name); - connection = c; - connection.name = "compressed" + c.name; - } + RemoteDataSegmentManager manager; - public CompressedRemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) { - logger = Logger.getLogger(connectionKey); - connection = new Connection(); - connection.name = connectionKey; - final CompressedRemoteDataSegmentManager manager = this; - //new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start(); - new Thread("Connect-" + connectionKey) { - public void run() { - boolean connect = true; - do { - try { - SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port)); - connection.socket = sc.socket(); - connection.socket.setTcpNoDelay(true); - connect = false; - logger.info("Connect to " + connection.getInfoString()); - } catch (IOException e) { - try { - Thread.sleep(50); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - } - } while (connect); - IncomingTcpConnection in = new IncomingTcpConnection(connection, manager, reverseKey); - in.setName(reverseKey+"-IncomingTcp"); - in.setPriority(MAX_PRIORITY); - in.start(); - OutboundTcpConnection out = new OutboundTcpConnection(connection); - out.setName(connectionKey+"-OutboundTcp"); - out.setPriority(MAX_PRIORITY); - out.start(); - } - }.start(); + public CompressedRemoteDataSegmentManager(RemoteDataSegmentManager manager) { + this.manager = manager; } /** @@ -75,13 +34,7 @@ Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, ""); cmd.setCompressFlag(true); - if (quickFlag){ - connection.write(cmd); // put command is executed right now - } else { - connection.sendCommand(cmd); // put command on the transmission thread - } - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); + manager.put1(quickFlag, cmd); } @Override @@ -96,93 +49,58 @@ Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, ""); cmd.setCompressFlag(true); - if (quickFlag){ - connection.write(cmd); - } else { - connection.sendCommand(cmd); - } - if (logger.isDebugEnabled() - logger.debug(cmd.getCommandString()); + manager.put1(quickFlag, cmd); } @Override public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { - int seq = this.seq.getAndIncrement(); - System.err.println("CompressedDataSegment take seq :" + seq); - Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); cmd.setCompressFlag(true); - cmd.setQuickFlag(quickFlag); - seqHash.put(seq, cmd); - if (quickFlag){ - connection.write(cmd); - } else { - connection.sendCommand(cmd); - } - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); + manager.take1(quickFlag, cmd); } @Override public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { - int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); cmd.setCompressFlag(true); - cmd.setQuickFlag(quickFlag); - seqHash.put(seq, cmd); - if (quickFlag){ - connection.write(cmd); - } else { - connection.sendCommand(cmd); - } - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); + manager.take1(quickFlag, cmd); } @Override public void remove(String key) { - Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, ""); - connection.sendCommand(cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); + manager.remove(key); } @Override public void finish() { - Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, ""); - connection.sendCommand(cmd); + manager.finish(); } @Override public void ping(String returnKey) { - Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, ""); - connection.write(cmd); + manager.ping(returnKey); } @Override public void response(String returnKey) { - Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, ""); - connection.write(cmd); + manager.response(returnKey); } @Override public void close() { - Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, ""); - connection.sendManager = false; - connection.sendCommand(cmd); + manager.close(); } @Override public void shutdown() { - connection.close(); - LinkedBlockingQueue<Command> queue = connection.sendQueue; - if (!queue.isEmpty()) queue.clear(); + manager.shutdown(); } @Override public void setSendError(boolean b) { - connection.sendManager = b; + manager.setSendError(b); } }