Mercurial > hg > Members > tatsuki > Alice
view src/alice/datasegment/RemoteDataSegmentManager.java @ 275:99026285c5dc
changeset: 274 is failed But this is success
author | sugi |
---|---|
date | Sun, 29 Sep 2013 17:40:26 +0900 |
parents | f866178f3018 |
children | f5d7654b90ff |
line wrap: on
line source
package alice.datasegment; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; import org.apache.log4j.Logger; import alice.codesegment.CodeSegment; import alice.daemon.Connection; import alice.daemon.IncomingTcpConnection; import alice.daemon.OutboundTcpConnection; import alice.topology.HostMessage; import alice.topology.manager.reconnection.SendError; public class RemoteDataSegmentManager extends DataSegmentManager { Connection connection; Logger logger; public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port, final boolean rFlag) { logger = Logger.getLogger(connectionKey); connection = new Connection(); final RemoteDataSegmentManager manager = this; new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start(); new Thread("Connect-" + connectionKey) { public void run() { boolean connect = true; do { try { SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port)); connection.socket = sc.socket(); connection.socket.setTcpNoDelay(true); connect = false; logger.info("Connect to " + connection.getInfoString()); } catch (IOException e) { try { System.out.println("WAITING"); Thread.sleep(50); } catch (InterruptedException e1) { e1.printStackTrace(); } } } while (connect&&!rFlag); new IncomingTcpConnection(connection, manager, reverseKey).start(); new OutboundTcpConnection(connection).start(); // if connection failed need to stop these thread if (connect){ System.out.println("send error"); new SendError(new HostMessage(hostName, port)).execute(); } } }.start(); } /** * send put command to target DataSegment */ @Override public void put(String key, byte[] val) { Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); connection.sendCommand(cmd); // put command on the transmission thread if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void put(String key, Object val) { Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); connection.sendCommand(cmd); // put command on the transmission thread if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void quickPut(String key, byte[] val) { Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); connection.write(cmd); // put command is executed right now if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void quickPut(String key, Object val) { Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); connection.write(cmd); // put command is executed right now if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void update(String key, byte[] val) { Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); connection.sendCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void update(String key, Object val) { Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); connection.sendCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void quickUpdate(String key, byte[] val) { Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); connection.write(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void quickUpdate(String key, Object val) { Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); connection.write(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void take(Receiver receiver, CodeSegment cs) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); connection.sendCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } public void quickTake(Receiver receiver, CodeSegment cs) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true); seqHash.put(seq, cmd); connection.write(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void peek(Receiver receiver, CodeSegment cs) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); connection.sendCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } public void quickPeek(Receiver receiver, CodeSegment cs) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true); seqHash.put(seq, cmd); connection.write(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void remove(String key) { Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null); connection.sendCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void finish() { Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, null); connection.sendCommand(cmd); } @Override public void close() { Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null); connection.sendCommand(cmd); } }