345
|
1 package alice.datasegment;
|
|
2
|
|
3 import java.io.IOException;
|
|
4 import java.net.InetSocketAddress;
|
|
5 import java.nio.channels.SocketChannel;
|
|
6
|
|
7 import org.apache.log4j.Logger;
|
|
8
|
|
9 import alice.codesegment.CodeSegment;
|
|
10 import alice.daemon.Connection;
|
|
11 import alice.daemon.IncomingTcpConnection;
|
|
12 import alice.daemon.OutboundTcpConnection;
|
|
13 import alice.topology.HostMessage;
|
|
14 import alice.topology.manager.reconnection.SendError;
|
|
15
|
|
16 public class RemoteDataSegmentManager extends DataSegmentManager {
|
|
17
|
|
18 Connection connection;
|
|
19 Logger logger;
|
|
20
|
|
21 public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port, final boolean rFlag) {
|
|
22 logger = Logger.getLogger(connectionKey);
|
|
23 connection = new Connection();
|
|
24 final RemoteDataSegmentManager manager = this;
|
|
25 new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start();
|
|
26 new Thread("Connect-" + connectionKey) {
|
|
27 public void run() {
|
|
28 boolean connect = true;
|
|
29 do {
|
|
30 try {
|
|
31 SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port));
|
|
32 connection.socket = sc.socket();
|
|
33 connection.socket.setTcpNoDelay(true);
|
|
34 connect = false;
|
|
35 logger.info("Connect to " + connection.getInfoString());
|
|
36 } catch (IOException e) {
|
|
37 try {
|
|
38 Thread.sleep(50);
|
|
39 } catch (InterruptedException e1) {
|
|
40 e1.printStackTrace();
|
|
41 }
|
|
42 }
|
|
43 } while (connect&&!rFlag);
|
|
44 new IncomingTcpConnection(connection, manager, reverseKey).start();
|
|
45 new OutboundTcpConnection(connection).start();
|
|
46 // if connection failed need to stop these thread
|
|
47 if (connect){
|
|
48 new SendError(new HostMessage(hostName, port)).execute();
|
|
49 }
|
|
50 }
|
|
51 }.start();
|
|
52 }
|
|
53
|
|
54 /**
|
|
55 * send put command to target DataSegment
|
|
56 */
|
|
57 @Override
|
|
58 public void put(String key, Object val) {
|
|
59 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
|
|
60 connection.sendCommand(cmd); // put command on the transmission thread
|
|
61 if (logger.isDebugEnabled())
|
|
62 logger.debug(cmd.getCommandString());
|
|
63 }
|
|
64
|
|
65 @Override
|
|
66 public void quickPut(String key, Object val) {
|
|
67 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
|
|
68 connection.write(cmd); // put command is executed right now
|
|
69 if (logger.isDebugEnabled())
|
|
70 logger.debug(cmd.getCommandString());
|
|
71 }
|
|
72
|
|
73 @Override
|
|
74 public void update(String key, Object val) {
|
|
75 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
|
|
76 connection.sendCommand(cmd);
|
|
77 if (logger.isDebugEnabled())
|
|
78 logger.debug(cmd.getCommandString());
|
|
79 }
|
|
80
|
|
81 @Override
|
|
82 public void quickUpdate(String key, Object val) {
|
|
83 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
|
|
84 connection.write(cmd);
|
|
85 if (logger.isDebugEnabled())
|
|
86 logger.debug(cmd.getCommandString());
|
|
87 }
|
|
88
|
|
89 @Override
|
|
90 public void take(Receiver receiver, CodeSegment cs) {
|
|
91 int seq = this.seq.getAndIncrement();
|
|
92 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
|
|
93 seqHash.put(seq, cmd);
|
|
94 connection.sendCommand(cmd);
|
|
95 if (logger.isDebugEnabled())
|
|
96 logger.debug(cmd.getCommandString());
|
|
97 }
|
|
98
|
|
99 public void quickTake(Receiver receiver, CodeSegment cs) {
|
|
100 int seq = this.seq.getAndIncrement();
|
|
101 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);
|
|
102 seqHash.put(seq, cmd);
|
|
103 connection.write(cmd);
|
|
104 if (logger.isDebugEnabled())
|
|
105 logger.debug(cmd.getCommandString());
|
|
106 }
|
|
107
|
|
108 @Override
|
|
109 public void peek(Receiver receiver, CodeSegment cs) {
|
|
110 int seq = this.seq.getAndIncrement();
|
|
111 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
|
|
112 seqHash.put(seq, cmd);
|
|
113 connection.sendCommand(cmd);
|
|
114 if (logger.isDebugEnabled())
|
|
115 logger.debug(cmd.getCommandString());
|
|
116 }
|
|
117
|
|
118 public void quickPeek(Receiver receiver, CodeSegment cs) {
|
|
119 int seq = this.seq.getAndIncrement();
|
|
120 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true);
|
|
121 seqHash.put(seq, cmd);
|
|
122 connection.write(cmd);
|
|
123 if (logger.isDebugEnabled())
|
|
124 logger.debug(cmd.getCommandString());
|
|
125
|
|
126 }
|
|
127
|
|
128 @Override
|
|
129 public void remove(String key) {
|
|
130 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null);
|
|
131 connection.sendCommand(cmd);
|
|
132 if (logger.isDebugEnabled())
|
|
133 logger.debug(cmd.getCommandString());
|
|
134 }
|
|
135
|
|
136 @Override
|
|
137 public void finish() {
|
|
138 Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, null);
|
|
139 connection.sendCommand(cmd);
|
|
140 }
|
|
141
|
|
142 @Override
|
|
143 public void close() {
|
|
144 Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null);
|
|
145 connection.sendCommand(cmd);
|
|
146 }
|
|
147
|
|
148 @Override
|
|
149 public void ping(String returnKey) {
|
|
150 Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, null);
|
|
151 connection.write(cmd);
|
|
152 }
|
|
153
|
|
154 @Override
|
|
155 public void response(String returnKey) {
|
|
156 Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, null);
|
|
157 connection.write(cmd);
|
|
158 }
|
|
159
|
|
160 @Override
|
|
161 public void shutdown(String key) {
|
|
162 connection.close();
|
|
163 }
|
|
164
|
|
165
|
|
166 }
|