Mercurial > hg > Database > Alice
annotate src/alice/datasegment/RemoteDataSegmentManager.java @ 254:2ec10cfa8cc3
refactor
author | sugi |
---|---|
date | Mon, 01 Jul 2013 20:00:07 +0900 |
parents | 32e7d5271477 |
children | b4690114a0cd |
rev | line source |
---|---|
12 | 1 package alice.datasegment; |
2 | |
23
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
3 import java.io.IOException; |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
4 import java.net.InetSocketAddress; |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
5 import java.nio.channels.SocketChannel; |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
6 |
25 | 7 import org.apache.log4j.Logger; |
12 | 8 import org.msgpack.type.Value; |
9 | |
10 import alice.codesegment.CodeSegment; | |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
11 import alice.daemon.Connection; |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
12 import alice.daemon.IncomingTcpConnection; |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
13 import alice.daemon.OutboundTcpConnection; |
12 | 14 |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
15 public class RemoteDataSegmentManager extends DataSegmentManager { |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
16 |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
17 Connection connection; |
39 | 18 Logger logger; |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
19 |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
27
diff
changeset
|
20 public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) { |
39 | 21 logger = Logger.getLogger(connectionKey); |
23
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
22 connection = new Connection(); |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
23 final RemoteDataSegmentManager manager = this; |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
27
diff
changeset
|
24 new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start(); |
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
27
diff
changeset
|
25 new Thread("Connect-" + connectionKey) { |
23
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
26 public void run() { |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
27 boolean connect = true; |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
28 do { |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
29 try { |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
30 SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port)); |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
31 connection.socket = sc.socket(); |
63 | 32 connection.socket.setTcpNoDelay(true); |
23
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
33 connect = false; |
39 | 34 logger.info("Connect to " + connection.getInfoString()); |
23
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
35 } catch (IOException e) { |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
36 try { |
54
27a64e771c4c
change connection wait time
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
41
diff
changeset
|
37 Thread.sleep(50); |
23
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
38 } catch (InterruptedException e1) { |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
39 e1.printStackTrace(); |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
40 } |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
41 } |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
42 } while (connect); |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
27
diff
changeset
|
43 new IncomingTcpConnection(connection, manager, reverseKey).start(); |
23
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
44 new OutboundTcpConnection(connection).start(); |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
45 } |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
46 }.start(); |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
47 } |
54bf607118ae
change method to create RemoteDSM
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
48 |
58 | 49 /** |
50 * send put command to target DataSegment | |
51 */ | |
12 | 52 @Override |
132 | 53 public void put(String key, Value val) { |
54 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); | |
58 | 55 connection.sendCommand(cmd); // put command on the transmission thread |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
68
diff
changeset
|
56 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
66
diff
changeset
|
57 logger.debug(cmd.getCommandString()); |
12 | 58 } |
59 | |
60 @Override | |
252 | 61 public void quickPut(String key, Value val) { |
62 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); | |
63 connection.write(cmd); // put command is executed right now | |
64 if (logger.isDebugEnabled()) | |
65 logger.debug(cmd.getCommandString()); | |
66 } | |
67 | |
68 @Override | |
132 | 69 public void update(String key, Value val) { |
70 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); | |
39 | 71 connection.sendCommand(cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
68
diff
changeset
|
72 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
66
diff
changeset
|
73 logger.debug(cmd.getCommandString()); |
12 | 74 } |
252 | 75 |
76 @Override | |
77 public void quickUpdate(String key, Value val) { | |
78 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); | |
79 connection.write(cmd); | |
80 if (logger.isDebugEnabled()) | |
81 logger.debug(cmd.getCommandString()); | |
82 | |
83 } | |
12 | 84 |
85 @Override | |
254 | 86 public void take(Receiver receiver, CodeSegment cs) { |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
87 int seq = this.seq.getAndIncrement(); |
254 | 88 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
89 seqHash.put(seq, cmd); |
39 | 90 connection.sendCommand(cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
68
diff
changeset
|
91 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
66
diff
changeset
|
92 logger.debug(cmd.getCommandString()); |
12 | 93 } |
252 | 94 |
95 @Override | |
254 | 96 public void quickTake(Receiver receiver, CodeSegment cs) { |
252 | 97 int seq = this.seq.getAndIncrement(); |
254 | 98 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true); |
252 | 99 seqHash.put(seq, cmd); |
100 connection.write(cmd); | |
101 if (logger.isDebugEnabled()) | |
102 logger.debug(cmd.getCommandString()); | |
103 } | |
12 | 104 |
105 @Override | |
254 | 106 public void peek(Receiver receiver, CodeSegment cs) { |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
107 int seq = this.seq.getAndIncrement(); |
254 | 108 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
109 seqHash.put(seq, cmd); |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
110 connection.sendCommand(cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
68
diff
changeset
|
111 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
66
diff
changeset
|
112 logger.debug(cmd.getCommandString()); |
12 | 113 } |
252 | 114 |
115 | |
116 @Override | |
254 | 117 public void quickPeek(Receiver receiver, CodeSegment cs) { |
252 | 118 int seq = this.seq.getAndIncrement(); |
254 | 119 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null, true); |
252 | 120 seqHash.put(seq, cmd); |
121 connection.write(cmd); | |
122 if (logger.isDebugEnabled()) | |
123 logger.debug(cmd.getCommandString()); | |
124 | |
125 } | |
12 | 126 |
127 @Override | |
128 public void remove(String key) { | |
39 | 129 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, null); |
130 connection.sendCommand(cmd); | |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
68
diff
changeset
|
131 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
66
diff
changeset
|
132 logger.debug(cmd.getCommandString()); |
12 | 133 } |
41 | 134 |
135 @Override | |
30 | 136 public void finish() { |
39 | 137 Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, null); |
138 connection.sendCommand(cmd); | |
30 | 139 } |
13 | 140 |
41 | 141 @Override |
142 public void close() { | |
143 Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null); | |
144 connection.sendCommand(cmd); | |
145 } | |
146 | |
252 | 147 |
12 | 148 } |