Mercurial > hg > Database > Alice
annotate src/main/java/alice/datasegment/RemoteDataSegmentManager.java @ 574:ea21af9a4762 dispose
delete serializeFlag, fix MessagePack pack&unpack
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 15 Dec 2015 11:49:07 +0900 |
parents | 5a9b83c64ddf |
children |
rev | line source |
---|---|
345 | 1 package alice.datasegment; |
2 | |
3 import java.io.IOException; | |
4 import java.net.InetSocketAddress; | |
5 import java.nio.channels.SocketChannel; | |
470 | 6 import java.util.concurrent.LinkedBlockingQueue; |
345 | 7 |
8 import org.apache.log4j.Logger; | |
9 | |
10 import alice.codesegment.CodeSegment; | |
11 import alice.daemon.Connection; | |
12 import alice.daemon.IncomingTcpConnection; | |
13 import alice.daemon.OutboundTcpConnection; | |
14 | |
15 public class RemoteDataSegmentManager extends DataSegmentManager { | |
480 | 16 protected Connection connection; |
17 protected Logger logger; | |
419 | 18 |
19 public RemoteDataSegmentManager(){} | |
20 | |
553 | 21 public RemoteDataSegmentManager(final String connectionKey, final String reverseKey, final String hostName, final int port) { |
419 | 22 logger = Logger.getLogger(connectionKey); |
23 connection = new Connection(); | |
470 | 24 connection.name = connectionKey; |
419 | 25 final RemoteDataSegmentManager manager = this; |
470 | 26 //new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start(); |
419 | 27 new Thread("Connect-" + connectionKey) { |
28 public void run() { | |
29 boolean connect = true; | |
30 do { | |
31 try { | |
535
dd20acf579bd
resolve connection bug
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
32 //System.out.println("RemoteDSM connect hostname:" + hostName + " port:" + port); |
419 | 33 SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port)); |
34 connection.socket = sc.socket(); | |
35 connection.socket.setTcpNoDelay(true); | |
36 connect = false; | |
37 logger.info("Connect to " + connection.getInfoString()); | |
38 } catch (IOException e) { | |
535
dd20acf579bd
resolve connection bug
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
39 //System.err.println("Can not connect" + e); |
419 | 40 try { |
41 Thread.sleep(50); | |
42 } catch (InterruptedException e1) { | |
43 e1.printStackTrace(); | |
44 } | |
45 } | |
46 } while (connect); | |
478 | 47 IncomingTcpConnection in = new IncomingTcpConnection(connection, manager, reverseKey); |
480 | 48 in.setName(reverseKey+"-IncomingTcp"); |
470 | 49 in.setPriority(MAX_PRIORITY); |
50 in.start(); | |
478 | 51 OutboundTcpConnection out = new OutboundTcpConnection(connection); |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
529
diff
changeset
|
52 out.setName(connectionKey + "-OutboundTcp"); |
470 | 53 out.setPriority(MAX_PRIORITY); |
54 out.start(); | |
419 | 55 } |
56 }.start(); | |
57 } | |
58 | |
59 /** | |
60 * send put command to target DataSegment | |
61 */ | |
62 @Override | |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
529
diff
changeset
|
63 public void put(String key, ReceiveData rData, boolean quickFlag) { |
529
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
527
diff
changeset
|
64 Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, ""); |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
529
diff
changeset
|
65 |
536
d2f7d02c4976
remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
535
diff
changeset
|
66 put1(quickFlag, cmd); |
d2f7d02c4976
remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
535
diff
changeset
|
67 } |
d2f7d02c4976
remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
535
diff
changeset
|
68 |
d2f7d02c4976
remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
535
diff
changeset
|
69 public void put1(boolean quickFlag, Command cmd) { |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
529
diff
changeset
|
70 if (quickFlag){ |
455
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
71 connection.write(cmd); // put command is executed right now |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
72 } else { |
467 | 73 connection.sendCommand(cmd); // put command on the transmission thread |
455
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
74 } |
445 | 75 if (logger.isDebugEnabled()) |
76 logger.debug(cmd.getCommandString()); | |
77 } | |
419 | 78 |
79 @Override | |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
529
diff
changeset
|
80 public void update(String key, ReceiveData rData, boolean quickFlag) { |
458 | 81 Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, ""); |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
529
diff
changeset
|
82 |
536
d2f7d02c4976
remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
535
diff
changeset
|
83 put1(quickFlag, cmd); |
419 | 84 } |
345 | 85 |
419 | 86 @Override |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
529
diff
changeset
|
87 public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { |
536
d2f7d02c4976
remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
535
diff
changeset
|
88 |
d2f7d02c4976
remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
535
diff
changeset
|
89 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); |
d2f7d02c4976
remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
535
diff
changeset
|
90 take1(quickFlag, cmd); |
d2f7d02c4976
remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
535
diff
changeset
|
91 } |
d2f7d02c4976
remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
535
diff
changeset
|
92 |
d2f7d02c4976
remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
535
diff
changeset
|
93 public void take1(boolean quickFlag, Command cmd) { |
419 | 94 int seq = this.seq.getAndIncrement(); |
574
ea21af9a4762
delete serializeFlag, fix MessagePack pack&unpack
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
553
diff
changeset
|
95 //System.err.println("DataSegment take seq :" + seq); |
536
d2f7d02c4976
remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
535
diff
changeset
|
96 cmd.setSeq(seq); |
d2f7d02c4976
remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
535
diff
changeset
|
97 seqHash.put(seq, cmd); |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
529
diff
changeset
|
98 cmd.setQuickFlag(quickFlag); |
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
529
diff
changeset
|
99 if (quickFlag){ |
455
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
100 connection.write(cmd); |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
101 } else { |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
102 connection.sendCommand(cmd); |
b004f62b83e5
refactor (remove quick method from DataSegmentManager and use flag)
sugi
parents:
452
diff
changeset
|
103 } |
419 | 104 if (logger.isDebugEnabled()) |
105 logger.debug(cmd.getCommandString()); | |
106 } | |
107 | |
108 @Override | |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
529
diff
changeset
|
109 public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { |
536
d2f7d02c4976
remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
535
diff
changeset
|
110 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); |
d2f7d02c4976
remoteDSM refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
535
diff
changeset
|
111 take1(quickFlag, cmd); |
419 | 112 } |
113 | |
114 @Override | |
115 public void remove(String key) { | |
446 | 116 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, ""); |
419 | 117 connection.sendCommand(cmd); |
118 if (logger.isDebugEnabled()) | |
119 logger.debug(cmd.getCommandString()); | |
120 } | |
121 | |
122 @Override | |
123 public void finish() { | |
446 | 124 Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, ""); |
419 | 125 connection.sendCommand(cmd); |
126 } | |
345 | 127 |
419 | 128 @Override |
129 public void ping(String returnKey) { | |
446 | 130 Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, ""); |
419 | 131 connection.write(cmd); |
132 } | |
345 | 133 |
419 | 134 @Override |
135 public void response(String returnKey) { | |
446 | 136 Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, ""); |
419 | 137 connection.write(cmd); |
138 } | |
345 | 139 |
419 | 140 @Override |
141 public void close() { | |
446 | 142 Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, ""); |
478 | 143 connection.sendManager = false; |
419 | 144 connection.sendCommand(cmd); |
145 } | |
146 | |
147 @Override | |
148 public void shutdown() { | |
149 connection.close(); | |
470 | 150 LinkedBlockingQueue<Command> queue = connection.sendQueue; |
483 | 151 if (!queue.isEmpty()) queue.clear(); |
152 } | |
470 | 153 |
483 | 154 @Override |
155 public void setSendError(boolean b) { | |
156 connection.sendManager = b; | |
496 | 157 } |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
529
diff
changeset
|
158 |
345 | 159 } |