1 package alice.datasegment;
3 import java.io.IOException;
4 import java.net.InetSocketAddress;
5 import java.nio.channels.SocketChannel;
7 import org.apache.log4j.Logger;
9 import alice.codesegment.CodeSegment;
10 import alice.daemon.Connection;
11 import alice.daemon.IncomingTcpConnection;
12 import alice.daemon.OutboundTcpConnection;
13 import alice.topology.HostMessage;
14 import alice.topology.manager.reconnection.SendError;
16 public class RemoteDataSegmentManager extends DataSegmentManager {
18 Connection connection;
19 Logger logger;
21 public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port, final boolean rFlag) {
22 logger = Logger.getLogger(connectionKey);
23 connection = new Connection();
24 final RemoteDataSegmentManager manager = this;
25 new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start();
26 new Thread("Connect-" + connectionKey) {
27 public void run() {
28 boolean connect = true;
29 do {
30 try {
31 SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port));
32 connection.socket = sc.socket();
33 connection.socket.setTcpNoDelay(true);
34 connect = false;
35 logger.info("Connect to " + connection.getInfoString());
36 } catch (IOException e) {
37 try {
38 Thread.sleep(50);
39 } catch (InterruptedException e1) {
40 e1.printStackTrace();
41 }
42 }
43 } while (connect&&!rFlag);
44 new IncomingTcpConnection(connection, manager, reverseKey).start();
45 new OutboundTcpConnection(connection).start();
46 // if connection failed need to stop these thread
47 if (connect){
48 new SendError(new HostMessage(hostName, port)).execute();
49 }
50 }
51 }.start();
52 }
54 /**
55 * send put command to target DataSegment
56 */
57 @Override
58 public void put(String key, Object val) {
59 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
60 connection.sendCommand(cmd); // put command on the transmission thread
61 if (logger.isDebugEnabled())
62 logger.debug(cmd.getCommandString());
63 }
65 @Override
66 public void quickPut(String key, Object val) {
67 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
68 connection.write(cmd); // put command is executed right now
69 if (logger.isDebugEnabled())
70 logger.debug(cmd.getCommandString());
71 }
73 @Override
74 public void update(String key, Object val) {
75 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
76 connection.sendCommand(cmd);
77 if (logger.isDebugEnabled())
78 logger.debug(cmd.getCommandString());
79 }
81 @Override
82 public void quickUpdate(String key, Object val) {
83 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
84 connection.write(cmd);
85 if (logger.isDebugEnabled())
86 logger.debug(cmd.getCommandString());
87 }
89 @Override
90 public void take(Receiver receiver, CodeSegment cs) {
91 int seq = this.seq.getAndIncrement();
92 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
93 seqHash.put(seq, cmd);
94 connection.sendCommand(cmd);
95 if (logger.isDebugEnabled())
96 logger.debug(cmd.getCommandString());
97 }
99 public void quickTake(Receiver receiver, CodeSegment cs) {
100 int seq = this.seq.getAndIncrement();
101 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);
102 seqHash.put(seq, cmd);
103 connection.write(cmd);
104 if (logger.isDebugEnabled())
105 logger.debug(cmd.getCommandString());
106 }
108 @Override
109 public void peek(Receiver receiver, CodeSegment cs) {
110 int seq = this.seq.getAndIncrement();
111 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
112 seqHash.put(seq, cmd);
113 connection.sendCommand(cmd);
114 if (logger.isDebugEnabled())
115 logger.debug(cmd.getCommandString());
116 }
118 public void quickPeek(Receiver receiver, CodeSegment cs) {
119 int seq = this.seq.getAndIncrement();
120 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);
121 seqHash.put(seq, cmd);
122 connection.write(cmd);
123 if (logger.isDebugEnabled())
124 logger.debug(cmd.getCommandString());
126 }
128 @Override
129 public void remove(String key) {
130 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null);
131 connection.sendCommand(cmd);
132 if (logger.isDebugEnabled())
133 logger.debug(cmd.getCommandString());
134 }
136 @Override
137 public void finish() {
138 Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, null);
139 connection.sendCommand(cmd);
140 }
142 @Override
143 public void close() {
144 Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null);
145 connection.sendCommand(cmd);
146 }
148 @Override
149 public void ping(String returnKey) {
150 Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, null);
151 connection.write(cmd);
152 }
154 @Override
155 public void response(String returnKey) {
156 Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, null);
157 connection.write(cmd);
158 }
160 @Override
161 public void shutdown(String key) {
162 connection.close();
163 }
166 }