Mercurial > hg > Database > Alice
comparison src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java @ 529:cb7c31848d16 dispose
add CompressedDSMs
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Fri, 01 May 2015 18:19:16 +0900 |
parents | |
children | 4aeebea0c9b5 |
comparison
equal
deleted
inserted
replaced
528:6ebddfac7ff6 | 529:cb7c31848d16 |
---|---|
1 package alice.datasegment; | |
2 | |
3 import java.io.IOException; | |
4 import java.net.InetSocketAddress; | |
5 import java.nio.channels.SocketChannel; | |
6 import java.util.concurrent.LinkedBlockingQueue; | |
7 | |
8 import org.apache.log4j.Logger; | |
9 | |
10 import alice.codesegment.CodeSegment; | |
11 import alice.daemon.Connection; | |
12 import alice.daemon.IncomingTcpConnection; | |
13 import alice.daemon.OutboundTcpConnection; | |
14 | |
15 public class CompressedRemoteDataSegmentManager extends DataSegmentManager { | |
16 protected Connection connection; | |
17 protected Logger logger; | |
18 | |
19 public CompressedRemoteDataSegmentManager(){} | |
20 | |
21 public CompressedRemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) { | |
22 logger = Logger.getLogger(connectionKey); | |
23 connection = new Connection(); | |
24 connection.name = connectionKey; | |
25 final CompressedRemoteDataSegmentManager manager = this; | |
26 //new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start(); | |
27 new Thread("Connect-" + connectionKey) { | |
28 public void run() { | |
29 boolean connect = true; | |
30 do { | |
31 try { | |
32 SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port)); | |
33 connection.socket = sc.socket(); | |
34 connection.socket.setTcpNoDelay(true); | |
35 connect = false; | |
36 logger.info("Connect to " + connection.getInfoString()); | |
37 } catch (IOException e) { | |
38 try { | |
39 Thread.sleep(50); | |
40 } catch (InterruptedException e1) { | |
41 e1.printStackTrace(); | |
42 } | |
43 } | |
44 } while (connect); | |
45 IncomingTcpConnection in = new IncomingTcpConnection(connection, manager, reverseKey); | |
46 in.setName(reverseKey+"-IncomingTcp"); | |
47 in.setPriority(MAX_PRIORITY); | |
48 in.start(); | |
49 OutboundTcpConnection out = new OutboundTcpConnection(connection); | |
50 out.setName(connectionKey+"-OutboundTcp"); | |
51 out.setPriority(MAX_PRIORITY); | |
52 out.start(); | |
53 } | |
54 }.start(); | |
55 } | |
56 | |
57 /** | |
58 * send put command to target DataSegment | |
59 */ | |
60 @Override | |
61 public void put(String key, ReceiveData rData, SendOption option) { | |
62 try { | |
63 rData.zip(); | |
64 } catch (IOException e) { | |
65 e.printStackTrace(); | |
66 } | |
67 Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, ""); | |
68 cmd.setCompressFlag(option.isCompress());//true | |
69 | |
70 if (option.isQuick()){ | |
71 connection.write(cmd); // put command is executed right now | |
72 } else { | |
73 connection.sendCommand(cmd); // put command on the transmission thread | |
74 } | |
75 if (logger.isDebugEnabled()) | |
76 logger.debug(cmd.getCommandString()); | |
77 } | |
78 | |
79 @Override | |
80 public void update(String key, ReceiveData rData, SendOption option) { | |
81 try { | |
82 rData.zip(); | |
83 } catch (IOException e) { | |
84 e.printStackTrace(); | |
85 } | |
86 Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, ""); | |
87 cmd.setCompressFlag(option.isCompress()); | |
88 | |
89 if (option.isQuick()){ | |
90 connection.write(cmd); | |
91 } else { | |
92 connection.sendCommand(cmd); | |
93 } | |
94 if (logger.isDebugEnabled()) | |
95 logger.debug(cmd.getCommandString()); | |
96 } | |
97 | |
98 @Override | |
99 public void take(Receiver receiver, CodeSegment cs, SendOption option) { | |
100 int seq = this.seq.getAndIncrement(); | |
101 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); | |
102 cmd.setQuickFlag(option.isQuick()); | |
103 seqHash.put(seq, cmd); | |
104 if (option.isQuick()){ | |
105 connection.write(cmd); | |
106 } else { | |
107 connection.sendCommand(cmd); | |
108 } | |
109 if (logger.isDebugEnabled()) | |
110 logger.debug(cmd.getCommandString()); | |
111 } | |
112 | |
113 @Override | |
114 public void peek(Receiver receiver, CodeSegment cs, SendOption option) { | |
115 int seq = this.seq.getAndIncrement(); | |
116 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); | |
117 cmd.setQuickFlag(option.isQuick()); | |
118 seqHash.put(seq, cmd); | |
119 if (option.isQuick()){ | |
120 connection.write(cmd); | |
121 } else { | |
122 connection.sendCommand(cmd); | |
123 } | |
124 if (logger.isDebugEnabled()) | |
125 logger.debug(cmd.getCommandString()); | |
126 } | |
127 | |
128 @Override | |
129 public void remove(String key) { | |
130 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, ""); | |
131 connection.sendCommand(cmd); | |
132 if (logger.isDebugEnabled()) | |
133 logger.debug(cmd.getCommandString()); | |
134 } | |
135 | |
136 @Override | |
137 public void finish() { | |
138 Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, ""); | |
139 connection.sendCommand(cmd); | |
140 } | |
141 | |
142 @Override | |
143 public void ping(String returnKey) { | |
144 Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, ""); | |
145 connection.write(cmd); | |
146 } | |
147 | |
148 @Override | |
149 public void response(String returnKey) { | |
150 Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, ""); | |
151 connection.write(cmd); | |
152 } | |
153 | |
154 @Override | |
155 public void close() { | |
156 Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, ""); | |
157 connection.sendManager = false; | |
158 connection.sendCommand(cmd); | |
159 } | |
160 | |
161 @Override | |
162 public void shutdown() { | |
163 connection.close(); | |
164 LinkedBlockingQueue<Command> queue = connection.sendQueue; | |
165 if (!queue.isEmpty()) queue.clear(); | |
166 } | |
167 | |
168 @Override | |
169 public void setSendError(boolean b) { | |
170 connection.sendManager = b; | |
171 } | |
172 } |