Mercurial > hg > Database > Alice
annotate src/alice/datasegment/RemoteDataSegmentManager.java @ 276:3e0d1ac4f4a8
Refactor iterating process with for-each sentence.
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Fri, 18 Oct 2013 01:51:15 +0900 |
parents | 99026285c5dc |
children | f5d7654b90ff |
rev | line source |
---|---|
12 | 1 package alice.datasegment; |
2 | |
23
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
3 import java.io.IOException; |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
4 import java.net.InetSocketAddress; |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
5 import java.nio.channels.SocketChannel; |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
6 |
25 | 7 import org.apache.log4j.Logger; |
12 | 8 import alice.codesegment.CodeSegment; |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
9 import alice.daemon.Connection; |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
10 import alice.daemon.IncomingTcpConnection; |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
11 import alice.daemon.OutboundTcpConnection; |
274 | 12 import alice.topology.HostMessage; |
13 import alice.topology.manager.reconnection.SendError; | |
12 | 14 |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
15 public class RemoteDataSegmentManager extends DataSegmentManager { |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
16 |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
17 Connection connection; |
39 | 18 Logger logger; |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
19 |
274 | 20 public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port, final boolean rFlag) { |
39 | 21 logger = Logger.getLogger(connectionKey); |
23
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
22 connection = new Connection(); |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
23 final RemoteDataSegmentManager manager = this; |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
27
diff
changeset
|
24 new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start(); |
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
27
diff
changeset
|
25 new Thread("Connect-" + connectionKey) { |
23
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
26 public void run() { |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
27 boolean connect = true; |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
28 do { |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
29 try { |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
30 SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port)); |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
31 connection.socket = sc.socket(); |
63 | 32 connection.socket.setTcpNoDelay(true); |
23
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
33 connect = false; |
39 | 34 logger.info("Connect to " + connection.getInfoString()); |
23
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
35 } catch (IOException e) { |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
36 try { |
275 | 37 System.out.println("WAITING"); |
54
27a64e771c4c
change connection wait time
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
41
diff
changeset
|
38 Thread.sleep(50); |
23
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
39 } catch (InterruptedException e1) { |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
40 e1.printStackTrace(); |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
41 } |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
42 } |
274 | 43 } while (connect&&!rFlag); |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
27
diff
changeset
|
44 new IncomingTcpConnection(connection, manager, reverseKey).start(); |
23
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
45 new OutboundTcpConnection(connection).start(); |
274 | 46 // if connection failed need to stop these thread |
47 if (connect){ | |
48 System.out.println("send error"); | |
49 new SendError(new HostMessage(hostName, port)).execute(); | |
50 } | |
23
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
51 } |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
52 }.start(); |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
53 } |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
54 |
58 | 55 /** |
56 * send put command to target DataSegment | |
57 */ | |
12 | 58 @Override |
264 | 59 public void put(String key, byte[] val) { |
60 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); | |
61 connection.sendCommand(cmd); // put command on the transmission thread | |
62 if (logger.isDebugEnabled()) | |
63 logger.debug(cmd.getCommandString()); | |
64 } | |
65 | |
66 @Override | |
67 public void put(String key, Object val) { | |
132 | 68 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); |
58 | 69 connection.sendCommand(cmd); // put command on the transmission thread |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
68
diff
changeset
|
70 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
66
diff
changeset
|
71 logger.debug(cmd.getCommandString()); |
12 | 72 } |
73 | |
74 @Override | |
264 | 75 public void quickPut(String key, byte[] val) { |
76 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); | |
77 connection.write(cmd); // put command is executed right now | |
78 if (logger.isDebugEnabled()) | |
79 logger.debug(cmd.getCommandString()); | |
80 } | |
81 | |
82 @Override | |
83 public void quickPut(String key, Object val) { | |
252 | 84 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); |
85 connection.write(cmd); // put command is executed right now | |
86 if (logger.isDebugEnabled()) | |
87 logger.debug(cmd.getCommandString()); | |
88 } | |
89 | |
90 @Override | |
264 | 91 public void update(String key, byte[] val) { |
132 | 92 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); |
39 | 93 connection.sendCommand(cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
68
diff
changeset
|
94 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
66
diff
changeset
|
95 logger.debug(cmd.getCommandString()); |
12 | 96 } |
252 | 97 |
98 @Override | |
264 | 99 public void update(String key, Object val) { |
100 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); | |
101 connection.sendCommand(cmd); | |
102 if (logger.isDebugEnabled()) | |
103 logger.debug(cmd.getCommandString()); | |
104 } | |
105 | |
106 @Override | |
107 public void quickUpdate(String key, byte[] val) { | |
252 | 108 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); |
109 connection.write(cmd); | |
110 if (logger.isDebugEnabled()) | |
111 logger.debug(cmd.getCommandString()); | |
264 | 112 } |
113 | |
114 @Override | |
115 public void quickUpdate(String key, Object val) { | |
116 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); | |
117 connection.write(cmd); | |
118 if (logger.isDebugEnabled()) | |
119 logger.debug(cmd.getCommandString()); | |
252 | 120 } |
12 | 121 |
122 @Override | |
254 | 123 public void take(Receiver receiver, CodeSegment cs) { |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
124 int seq = this.seq.getAndIncrement(); |
254 | 125 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
126 seqHash.put(seq, cmd); |
39 | 127 connection.sendCommand(cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
68
diff
changeset
|
128 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
66
diff
changeset
|
129 logger.debug(cmd.getCommandString()); |
12 | 130 } |
252 | 131 |
254 | 132 public void quickTake(Receiver receiver, CodeSegment cs) { |
252 | 133 int seq = this.seq.getAndIncrement(); |
254 | 134 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true); |
252 | 135 seqHash.put(seq, cmd); |
136 connection.write(cmd); | |
137 if (logger.isDebugEnabled()) | |
138 logger.debug(cmd.getCommandString()); | |
139 } | |
12 | 140 |
141 @Override | |
254 | 142 public void peek(Receiver receiver, CodeSegment cs) { |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
143 int seq = this.seq.getAndIncrement(); |
254 | 144 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
145 seqHash.put(seq, cmd); |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
146 connection.sendCommand(cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
68
diff
changeset
|
147 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
66
diff
changeset
|
148 logger.debug(cmd.getCommandString()); |
12 | 149 } |
252 | 150 |
254 | 151 public void quickPeek(Receiver receiver, CodeSegment cs) { |
252 | 152 int seq = this.seq.getAndIncrement(); |
254 | 153 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true); |
252 | 154 seqHash.put(seq, cmd); |
155 connection.write(cmd); | |
156 if (logger.isDebugEnabled()) | |
157 logger.debug(cmd.getCommandString()); | |
158 | |
159 } | |
12 | 160 |
161 @Override | |
162 public void remove(String key) { | |
39 | 163 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null); |
164 connection.sendCommand(cmd); | |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
68
diff
changeset
|
165 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
66
diff
changeset
|
166 logger.debug(cmd.getCommandString()); |
12 | 167 } |
41 | 168 |
169 @Override | |
30 | 170 public void finish() { |
39 | 171 Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, null); |
172 connection.sendCommand(cmd); | |
30 | 173 } |
13 | 174 |
41 | 175 @Override |
176 public void close() { | |
177 Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null); | |
178 connection.sendCommand(cmd); | |
179 } | |
180 | |
252 | 181 |
12 | 182 } |