comparison src/main/java/alice/datasegment/CompressedRemoteDataSegmentManager.java @ 529:cb7c31848d16 dispose

add CompressedDSMs
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Fri, 01 May 2015 18:19:16 +0900
parents
children 4aeebea0c9b5
comparison
equal deleted inserted replaced
528:6ebddfac7ff6 529:cb7c31848d16
1 package alice.datasegment;
2
3 import java.io.IOException;
4 import java.net.InetSocketAddress;
5 import java.nio.channels.SocketChannel;
6 import java.util.concurrent.LinkedBlockingQueue;
7
8 import org.apache.log4j.Logger;
9
10 import alice.codesegment.CodeSegment;
11 import alice.daemon.Connection;
12 import alice.daemon.IncomingTcpConnection;
13 import alice.daemon.OutboundTcpConnection;
14
15 public class CompressedRemoteDataSegmentManager extends DataSegmentManager {
16 protected Connection connection;
17 protected Logger logger;
18
19 public CompressedRemoteDataSegmentManager(){}
20
21 public CompressedRemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) {
22 logger = Logger.getLogger(connectionKey);
23 connection = new Connection();
24 connection.name = connectionKey;
25 final CompressedRemoteDataSegmentManager manager = this;
26 //new Thread(replyThread, "RemoteDataSegmentManager-" + connectionKey).start();
27 new Thread("Connect-" + connectionKey) {
28 public void run() {
29 boolean connect = true;
30 do {
31 try {
32 SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port));
33 connection.socket = sc.socket();
34 connection.socket.setTcpNoDelay(true);
35 connect = false;
36 logger.info("Connect to " + connection.getInfoString());
37 } catch (IOException e) {
38 try {
39 Thread.sleep(50);
40 } catch (InterruptedException e1) {
41 e1.printStackTrace();
42 }
43 }
44 } while (connect);
45 IncomingTcpConnection in = new IncomingTcpConnection(connection, manager, reverseKey);
46 in.setName(reverseKey+"-IncomingTcp");
47 in.setPriority(MAX_PRIORITY);
48 in.start();
49 OutboundTcpConnection out = new OutboundTcpConnection(connection);
50 out.setName(connectionKey+"-OutboundTcp");
51 out.setPriority(MAX_PRIORITY);
52 out.start();
53 }
54 }.start();
55 }
56
57 /**
58 * send put command to target DataSegment
59 */
60 @Override
61 public void put(String key, ReceiveData rData, SendOption option) {
62 try {
63 rData.zip();
64 } catch (IOException e) {
65 e.printStackTrace();
66 }
67 Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, "");
68 cmd.setCompressFlag(option.isCompress());//true
69
70 if (option.isQuick()){
71 connection.write(cmd); // put command is executed right now
72 } else {
73 connection.sendCommand(cmd); // put command on the transmission thread
74 }
75 if (logger.isDebugEnabled())
76 logger.debug(cmd.getCommandString());
77 }
78
79 @Override
80 public void update(String key, ReceiveData rData, SendOption option) {
81 try {
82 rData.zip();
83 } catch (IOException e) {
84 e.printStackTrace();
85 }
86 Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, "");
87 cmd.setCompressFlag(option.isCompress());
88
89 if (option.isQuick()){
90 connection.write(cmd);
91 } else {
92 connection.sendCommand(cmd);
93 }
94 if (logger.isDebugEnabled())
95 logger.debug(cmd.getCommandString());
96 }
97
98 @Override
99 public void take(Receiver receiver, CodeSegment cs, SendOption option) {
100 int seq = this.seq.getAndIncrement();
101 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
102 cmd.setQuickFlag(option.isQuick());
103 seqHash.put(seq, cmd);
104 if (option.isQuick()){
105 connection.write(cmd);
106 } else {
107 connection.sendCommand(cmd);
108 }
109 if (logger.isDebugEnabled())
110 logger.debug(cmd.getCommandString());
111 }
112
113 @Override
114 public void peek(Receiver receiver, CodeSegment cs, SendOption option) {
115 int seq = this.seq.getAndIncrement();
116 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
117 cmd.setQuickFlag(option.isQuick());
118 seqHash.put(seq, cmd);
119 if (option.isQuick()){
120 connection.write(cmd);
121 } else {
122 connection.sendCommand(cmd);
123 }
124 if (logger.isDebugEnabled())
125 logger.debug(cmd.getCommandString());
126 }
127
128 @Override
129 public void remove(String key) {
130 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null, "");
131 connection.sendCommand(cmd);
132 if (logger.isDebugEnabled())
133 logger.debug(cmd.getCommandString());
134 }
135
136 @Override
137 public void finish() {
138 Command cmd = new Command(CommandType.FINISH, null, null, null, 0, 0, null, null, "");
139 connection.sendCommand(cmd);
140 }
141
142 @Override
143 public void ping(String returnKey) {
144 Command cmd = new Command(CommandType.PING, null, returnKey, null, 0, 0, null, null, "");
145 connection.write(cmd);
146 }
147
148 @Override
149 public void response(String returnKey) {
150 Command cmd = new Command(CommandType.RESPONSE, null, returnKey, null, 0, 0, null, null, "");
151 connection.write(cmd);
152 }
153
154 @Override
155 public void close() {
156 Command cmd = new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, "");
157 connection.sendManager = false;
158 connection.sendCommand(cmd);
159 }
160
161 @Override
162 public void shutdown() {
163 connection.close();
164 LinkedBlockingQueue<Command> queue = connection.sendQueue;
165 if (!queue.isEmpty()) queue.clear();
166 }
167
168 @Override
169 public void setSendError(boolean b) {
170 connection.sendManager = b;
171 }
172 }