Mercurial > hg > Database > Alice
view src/main/java/alice/datasegment/RemoteDataSegmentManager.java @ 574:ea21af9a4762 dispose
delete serializeFlag, fix MessagePack pack&unpack
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 15 Dec 2015 11:49:07 +0900 |
parents | 5a9b83c64ddf |
children |
line wrap: on
line source
package alice.datasegment; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; import alice.codesegment.CodeSegment; import alice.daemon.Connection; import alice.daemon.IncomingTcpConnection; import alice.daemon.OutboundTcpConnection; public class RemoteDataSegmentManager extends DataSegmentManager { protected Connection connection; protected Logger logger; public RemoteDataSegmentManager(){} public RemoteDataSegmentManager(final String connectionKey, final String reverseKey, final String hostName, final int port) { logger = Logger.getLogger(connectionKey); connection = new Connection(); connection.name = connectionKey; final RemoteDataSegmentManager manager = this; //new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start(); new Thread("Connect-" + connectionKey) { public void run() { boolean connect = true; do { try { //System.out.println("RemoteDSM connect hostname:" + hostName + " port:" + port); SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port)); connection.socket = sc.socket(); connection.socket.setTcpNoDelay(true); connect = false; logger.info("Connect to " + connection.getInfoString()); } catch (IOException e) { //System.err.println("Can not connect" + e); try { Thread.sleep(50); } catch (InterruptedException e1) { e1.printStackTrace(); } } } while (connect); IncomingTcpConnection in = new IncomingTcpConnection(connection, manager, reverseKey); in.setName(reverseKey+"-IncomingTcp"); in.setPriority(MAX_PRIORITY); in.start(); OutboundTcpConnection out = new OutboundTcpConnection(connection); out.setName(connectionKey + "-OutboundTcp"); out.setPriority(MAX_PRIORITY); out.start(); } }.start(); } /** * send put command to target DataSegment */ @Override public void put(String key, ReceiveData rData, boolean quickFlag) { Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, ""); put1(quickFlag, cmd); } public void put1(boolean quickFlag, Command cmd) { if (quickFlag){ connection.write(cmd); // put command is executed right now } else { connection.sendCommand(cmd); // put command on the transmission thread } if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void update(String key, ReceiveData rData, boolean quickFlag) { Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, ""); put1(quickFlag, cmd); } @Override public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); take1(quickFlag, cmd); } public void take1(boolean quickFlag, Command cmd) { int seq = this.seq.getAndIncrement(); //System.err.println("DataSegment take seq :" + seq); cmd.setSeq(seq); seqHash.put(seq, cmd); cmd.setQuickFlag(quickFlag); if (quickFlag){ connection.write(cmd); } else { connection.sendCommand(cmd); } if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); take1(quickFlag, cmd); } @Override public void remove(String key) { Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, ""); connection.sendCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void finish() { Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, ""); connection.sendCommand(cmd); } @Override public void ping(String returnKey) { Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, ""); connection.write(cmd); } @Override public void response(String returnKey) { Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, ""); connection.write(cmd); } @Override public void close() { Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, ""); connection.sendManager = false; connection.sendCommand(cmd); } @Override public void shutdown() { connection.close(); LinkedBlockingQueue<Command> queue = connection.sendQueue; if (!queue.isEmpty()) queue.clear(); } @Override public void setSendError(boolean b) { connection.sendManager = b; } }