annotate src/main/java/alice/datasegment/RemoteDataSegmentManager.java @ 574:ea21af9a4762 dispose

delete serializeFlag, fix MessagePack pack&unpack
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Tue, 15 Dec 2015 11:49:07 +0900
parents 5a9b83c64ddf
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
345
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
1 package alice.datasegment;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
2
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
3 import java.io.IOException;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
4 import java.net.InetSocketAddress;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
5 import java.nio.channels.SocketChannel;
470
780ae843cdac Delete disconnect managerKey from connection list
sugi
parents: 467
diff changeset
6 import java.util.concurrent.LinkedBlockingQueue;
345
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
7
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
8 import org.apache.log4j.Logger;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
9
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
10 import alice.codesegment.CodeSegment;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
11 import alice.daemon.Connection;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
12 import alice.daemon.IncomingTcpConnection;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
13 import alice.daemon.OutboundTcpConnection;
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
14
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
15 public class RemoteDataSegmentManager extends DataSegmentManager {
480
c06070403ed4 named IncomingTcp and OutboundTcp Thread
sugi
parents: 478
diff changeset
16 protected Connection connection;
c06070403ed4 named IncomingTcp and OutboundTcp Thread
sugi
parents: 478
diff changeset
17 protected Logger logger;
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
18
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
19 public RemoteDataSegmentManager(){}
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
20
553
5a9b83c64ddf fix gradle file
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 536
diff changeset
21 public RemoteDataSegmentManager(final String connectionKey, final String reverseKey, final String hostName, final int port) {
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
22 logger = Logger.getLogger(connectionKey);
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
23 connection = new Connection();
470
780ae843cdac Delete disconnect managerKey from connection list
sugi
parents: 467
diff changeset
24 connection.name = connectionKey;
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
25 final RemoteDataSegmentManager manager = this;
470
780ae843cdac Delete disconnect managerKey from connection list
sugi
parents: 467
diff changeset
26 //new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start();
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
27 new Thread("Connect-" + connectionKey) {
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
28 public void run() {
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
29 boolean connect = true;
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
30 do {
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
31 try {
535
dd20acf579bd resolve connection bug
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 533
diff changeset
32 //System.out.println("RemoteDSM connect hostname:" + hostName + " port:" + port);
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
33 SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port));
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
34 connection.socket = sc.socket();
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
35 connection.socket.setTcpNoDelay(true);
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
36 connect = false;
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
37 logger.info("Connect to " + connection.getInfoString());
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
38 } catch (IOException e) {
535
dd20acf579bd resolve connection bug
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 533
diff changeset
39 //System.err.println("Can not connect" + e);
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
40 try {
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
41 Thread.sleep(50);
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
42 } catch (InterruptedException e1) {
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
43 e1.printStackTrace();
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
44 }
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
45 }
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
46 } while (connect);
478
cf345b10a21a bug fix
sugi
parents: 471
diff changeset
47 IncomingTcpConnection in = new IncomingTcpConnection(connection, manager, reverseKey);
480
c06070403ed4 named IncomingTcp and OutboundTcp Thread
sugi
parents: 478
diff changeset
48 in.setName(reverseKey+"-IncomingTcp");
470
780ae843cdac Delete disconnect managerKey from connection list
sugi
parents: 467
diff changeset
49 in.setPriority(MAX_PRIORITY);
780ae843cdac Delete disconnect managerKey from connection list
sugi
parents: 467
diff changeset
50 in.start();
478
cf345b10a21a bug fix
sugi
parents: 471
diff changeset
51 OutboundTcpConnection out = new OutboundTcpConnection(connection);
533
b3c9554ccb1b change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 529
diff changeset
52 out.setName(connectionKey + "-OutboundTcp");
470
780ae843cdac Delete disconnect managerKey from connection list
sugi
parents: 467
diff changeset
53 out.setPriority(MAX_PRIORITY);
780ae843cdac Delete disconnect managerKey from connection list
sugi
parents: 467
diff changeset
54 out.start();
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
55 }
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
56 }.start();
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
57 }
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
58
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
59 /**
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
60 * send put command to target DataSegment
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
61 */
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
62 @Override
533
b3c9554ccb1b change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 529
diff changeset
63 public void put(String key, ReceiveData rData, boolean quickFlag) {
529
cb7c31848d16 add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 527
diff changeset
64 Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, "");
533
b3c9554ccb1b change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 529
diff changeset
65
536
d2f7d02c4976 remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 535
diff changeset
66 put1(quickFlag, cmd);
d2f7d02c4976 remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 535
diff changeset
67 }
d2f7d02c4976 remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 535
diff changeset
68
d2f7d02c4976 remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 535
diff changeset
69 public void put1(boolean quickFlag, Command cmd) {
533
b3c9554ccb1b change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 529
diff changeset
70 if (quickFlag){
455
b004f62b83e5 refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents: 452
diff changeset
71 connection.write(cmd); // put command is executed right now
b004f62b83e5 refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents: 452
diff changeset
72 } else {
467
6e304a7a60e7 remove white space
sugi
parents: 459
diff changeset
73 connection.sendCommand(cmd); // put command on the transmission thread
455
b004f62b83e5 refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents: 452
diff changeset
74 }
445
86b74532e66c change Protocol
sugi
parents: 419
diff changeset
75 if (logger.isDebugEnabled())
86b74532e66c change Protocol
sugi
parents: 419
diff changeset
76 logger.debug(cmd.getCommandString());
86b74532e66c change Protocol
sugi
parents: 419
diff changeset
77 }
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
78
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
79 @Override
533
b3c9554ccb1b change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 529
diff changeset
80 public void update(String key, ReceiveData rData, boolean quickFlag) {
458
bcf6f4a6fcd0 need set Meta DataSegment PUT API
sugi
parents: 455
diff changeset
81 Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, "");
533
b3c9554ccb1b change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 529
diff changeset
82
536
d2f7d02c4976 remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 535
diff changeset
83 put1(quickFlag, cmd);
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
84 }
345
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
85
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
86 @Override
533
b3c9554ccb1b change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 529
diff changeset
87 public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) {
536
d2f7d02c4976 remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 535
diff changeset
88
d2f7d02c4976 remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 535
diff changeset
89 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null);
d2f7d02c4976 remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 535
diff changeset
90 take1(quickFlag, cmd);
d2f7d02c4976 remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 535
diff changeset
91 }
d2f7d02c4976 remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 535
diff changeset
92
d2f7d02c4976 remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 535
diff changeset
93 public void take1(boolean quickFlag, Command cmd) {
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
94 int seq = this.seq.getAndIncrement();
574
ea21af9a4762 delete serializeFlag, fix MessagePack pack&unpack
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 553
diff changeset
95 //System.err.println("DataSegment take seq :" + seq);
536
d2f7d02c4976 remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 535
diff changeset
96 cmd.setSeq(seq);
d2f7d02c4976 remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 535
diff changeset
97 seqHash.put(seq, cmd);
533
b3c9554ccb1b change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 529
diff changeset
98 cmd.setQuickFlag(quickFlag);
b3c9554ccb1b change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 529
diff changeset
99 if (quickFlag){
455
b004f62b83e5 refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents: 452
diff changeset
100 connection.write(cmd);
b004f62b83e5 refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents: 452
diff changeset
101 } else {
b004f62b83e5 refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents: 452
diff changeset
102 connection.sendCommand(cmd);
b004f62b83e5 refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents: 452
diff changeset
103 }
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
104 if (logger.isDebugEnabled())
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
105 logger.debug(cmd.getCommandString());
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
106 }
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
107
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
108 @Override
533
b3c9554ccb1b change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 529
diff changeset
109 public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) {
536
d2f7d02c4976 remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 535
diff changeset
110 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null);
d2f7d02c4976 remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 535
diff changeset
111 take1(quickFlag, cmd);
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
112 }
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
113
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
114 @Override
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
115 public void remove(String key) {
446
a91890dff56e refactor
sugi
parents: 445
diff changeset
116 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, "");
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
117 connection.sendCommand(cmd);
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
118 if (logger.isDebugEnabled())
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
119 logger.debug(cmd.getCommandString());
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
120 }
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
121
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
122 @Override
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
123 public void finish() {
446
a91890dff56e refactor
sugi
parents: 445
diff changeset
124 Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, "");
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
125 connection.sendCommand(cmd);
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
126 }
345
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
127
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
128 @Override
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
129 public void ping(String returnKey) {
446
a91890dff56e refactor
sugi
parents: 445
diff changeset
130 Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, "");
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
131 connection.write(cmd);
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
132 }
345
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
133
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
134 @Override
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
135 public void response(String returnKey) {
446
a91890dff56e refactor
sugi
parents: 445
diff changeset
136 Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, "");
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
137 connection.write(cmd);
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
138 }
345
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
139
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
140 @Override
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
141 public void close() {
446
a91890dff56e refactor
sugi
parents: 445
diff changeset
142 Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, "");
478
cf345b10a21a bug fix
sugi
parents: 471
diff changeset
143 connection.sendManager = false;
419
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
144 connection.sendCommand(cmd);
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
145 }
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
146
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
147 @Override
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
148 public void shutdown() {
aefbe41fcf12 change tab to space
sugi
parents: 400
diff changeset
149 connection.close();
470
780ae843cdac Delete disconnect managerKey from connection list
sugi
parents: 467
diff changeset
150 LinkedBlockingQueue<Command> queue = connection.sendQueue;
483
86c45738dd9e success fix topology ..... may be
sugi
parents: 480
diff changeset
151 if (!queue.isEmpty()) queue.clear();
86c45738dd9e success fix topology ..... may be
sugi
parents: 480
diff changeset
152 }
470
780ae843cdac Delete disconnect managerKey from connection list
sugi
parents: 467
diff changeset
153
483
86c45738dd9e success fix topology ..... may be
sugi
parents: 480
diff changeset
154 @Override
86c45738dd9e success fix topology ..... may be
sugi
parents: 480
diff changeset
155 public void setSendError(boolean b) {
86c45738dd9e success fix topology ..... may be
sugi
parents: 480
diff changeset
156 connection.sendManager = b;
496
f82f259ea93b putConnectionInfo only called from keepAlive deamon
sugi
parents: 483
diff changeset
157 }
533
b3c9554ccb1b change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents: 529
diff changeset
158
345
8f71c3e6f11d Change directory structure Maven standard
sugi
parents:
diff changeset
159 }