345
|
1 package alice.datasegment;
|
|
2
|
452
|
3 import java.io.ByteArrayOutputStream;
|
345
|
4 import java.io.IOException;
|
443
|
5 import java.nio.ByteBuffer;
|
345
|
6 import java.util.concurrent.BlockingQueue;
|
452
|
7 import java.util.zip.Deflater;
|
|
8 import java.util.zip.DeflaterOutputStream;
|
345
|
9
|
443
|
10 import org.msgpack.MessagePack;
|
452
|
11
|
345
|
12 import alice.codesegment.CodeSegment;
|
|
13 import alice.codesegment.SingletonMessage;
|
|
14 import alice.daemon.CommandMessage;
|
|
15 import alice.daemon.Connection;
|
|
16
|
|
17 public class Command {
|
419
|
18 public CommandType type;
|
|
19 public String key;
|
|
20 public Receiver receiver;
|
458
|
21 public ReceiveData rData;
|
419
|
22 public int index;
|
|
23 public int seq;
|
|
24 public Connection connection; // for remote
|
|
25 public BlockingQueue<Command> replyQueue;
|
|
26 public CodeSegment cs;
|
|
27 public String reverseKey;
|
448
|
28 private boolean quickFlag = false;
|
452
|
29 private boolean serializeFlag = false;
|
|
30 private boolean compressFlag = false;
|
419
|
31
|
458
|
32 public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
|
419
|
33 this.type = cmdType;
|
|
34 this.receiver = receiver;
|
|
35 this.key = key;
|
458
|
36 this.rData = rData;
|
419
|
37 this.index = index;
|
|
38 this.seq = seq;
|
|
39 this.replyQueue = replyQueue;
|
|
40 this.cs = cs;
|
|
41 this.reverseKey = reverseKey;
|
|
42 }
|
|
43
|
458
|
44 public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) {
|
419
|
45 this.type = cmdType;
|
|
46 this.receiver = receiver;
|
|
47 this.key = key;
|
458
|
48 this.rData = rData;
|
419
|
49 this.index = index;
|
|
50 this.seq = seq;
|
|
51 this.connection = connection;
|
|
52 this.cs = cs;
|
|
53 this.reverseKey = reverseKey;
|
|
54 }
|
|
55
|
|
56 public String getCommandString() {
|
|
57 String csName = "null";
|
|
58 if (cs != null) {
|
|
59 csName = cs.toString();
|
|
60 }
|
458
|
61 return this.type + "\t" + key + "\t" + rData + "\tindex=" + index + "\tcs=" + csName;
|
419
|
62 }
|
445
|
63
|
|
64 /**
|
|
65 * @return serialized ByteBuffer
|
|
66 */
|
443
|
67 public ByteBuffer convert() {
|
|
68 ByteBuffer buf = null;
|
|
69 MessagePack msg = SingletonMessage.getInstance();
|
|
70 try {
|
452
|
71 byte[] header = null;
|
|
72 byte[] data = null;
|
|
73 byte[] dataSize = null;
|
458
|
74 boolean serialized = false;
|
|
75 boolean compressed = false;
|
443
|
76 switch (type) {
|
452
|
77 /*
|
|
78 * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment
|
|
79 * case UPDATE and PUT
|
|
80 * compress and serialize flag are selected by user, so if true, need convert.
|
|
81 * case REPLY
|
|
82 * these flags represent DataSegment status.
|
|
83 * for example, serializeFlag is true. DataSegment had already converted, so no need convert.
|
|
84 */
|
443
|
85 case UPDATE:
|
458
|
86 case PUT:
|
|
87 case REPLY:
|
|
88 if (rData.compressed()) {
|
|
89 // have already converted
|
|
90 data = (byte[]) rData.getObj();
|
|
91 compressed = rData.compressed(); // true
|
|
92 serialized = rData.serialized();
|
452
|
93 } else {
|
458
|
94 if (!rData.serialized() && !rData.isByteArray()) {
|
|
95 data = msg.write(rData.getObj());
|
|
96 serialized = true;
|
|
97 } else { // rData is RAW ByteArray or already serialized
|
|
98 data = (byte[]) rData.getObj();
|
|
99 serialized = rData.serialized();
|
|
100 }
|
|
101 if (compressFlag) {
|
|
102 data = zip(data);
|
|
103 compressed = true;
|
|
104 }
|
443
|
105 }
|
458
|
106
|
|
107 header = msg.write(new CommandMessage(type.id, index, seq, key, false, serialized, compressed));
|
452
|
108 dataSize = msg.write(data.length);
|
443
|
109 buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
|
|
110 buf.put(header);
|
|
111 buf.put(dataSize);
|
452
|
112 buf.put(data);
|
|
113 break;
|
443
|
114 default:
|
452
|
115 header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, serializeFlag, compressFlag));
|
443
|
116 buf = ByteBuffer.allocate(header.length);
|
|
117 buf.put(header);
|
|
118 break;
|
419
|
119 }
|
443
|
120
|
|
121 buf.flip();
|
|
122 } catch (IOException e) {
|
|
123 e.printStackTrace();
|
419
|
124 }
|
443
|
125 return buf;
|
419
|
126 }
|
445
|
127
|
458
|
128 /**
|
|
129 * If this flag is true, command isn't send queue.
|
|
130 * command is executed right now.
|
|
131 *
|
|
132 * @param flag
|
|
133 */
|
|
134
|
446
|
135 public void setQuickFlag(boolean flag){
|
|
136 quickFlag = flag;
|
|
137 }
|
|
138
|
448
|
139 public boolean getQuickFlag(){
|
|
140 return quickFlag;
|
|
141 }
|
|
142
|
458
|
143 /**
|
|
144 * If this flag is true, DataSegment isn't serialized.
|
|
145 * Alice auto select true or false.
|
|
146 *
|
|
147 * @param flag
|
|
148 */
|
|
149
|
445
|
150 public void setSerializeFlag(boolean flag){
|
|
151 serializeFlag = flag;
|
|
152 }
|
446
|
153
|
448
|
154 public boolean getSerializeFlag(){
|
|
155 return serializeFlag;
|
|
156 }
|
|
157
|
458
|
158 /**
|
|
159 * Before sending Remote DataSegment, DataSegment type is ByteArray.
|
|
160 * If this flag true, ByteArray is compressed with ZRLEE(ZRIB) algorithm
|
|
161 *
|
|
162 * @param flag
|
|
163 */
|
|
164
|
446
|
165 public void setCompressFlag(boolean flag){
|
|
166 compressFlag = flag;
|
|
167 }
|
448
|
168
|
|
169 public boolean getCompressFlag(){
|
|
170 return compressFlag;
|
|
171 }
|
452
|
172
|
|
173 public byte[] zip(byte[] input) throws IOException{
|
|
174 Deflater deflater = new Deflater();
|
|
175 ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
176 DeflaterOutputStream dos = new DeflaterOutputStream(os, deflater);
|
|
177 dos.write(input);
|
|
178 dos.finish();
|
|
179 return os.toByteArray();
|
|
180 }
|
345
|
181 }
|