345
|
1 package alice.datasegment;
|
|
2
|
452
|
3 import java.io.ByteArrayOutputStream;
|
345
|
4 import java.io.IOException;
|
443
|
5 import java.nio.ByteBuffer;
|
345
|
6 import java.util.concurrent.BlockingQueue;
|
452
|
7 import java.util.zip.Deflater;
|
|
8 import java.util.zip.DeflaterOutputStream;
|
345
|
9
|
443
|
10 import org.msgpack.MessagePack;
|
452
|
11
|
345
|
12 import alice.codesegment.CodeSegment;
|
|
13 import alice.codesegment.SingletonMessage;
|
|
14 import alice.daemon.CommandMessage;
|
|
15 import alice.daemon.Connection;
|
|
16
|
|
17 public class Command {
|
419
|
18 public CommandType type;
|
|
19 public String key;
|
|
20 public Receiver receiver;
|
458
|
21 public ReceiveData rData;
|
419
|
22 public int index;
|
|
23 public int seq;
|
|
24 public Connection connection; // for remote
|
|
25 public BlockingQueue<Command> replyQueue;
|
|
26 public CodeSegment cs;
|
|
27 public String reverseKey;
|
448
|
28 private boolean quickFlag = false;
|
452
|
29 private boolean compressFlag = false;
|
419
|
30
|
458
|
31 public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
|
419
|
32 this.type = cmdType;
|
|
33 this.receiver = receiver;
|
|
34 this.key = key;
|
458
|
35 this.rData = rData;
|
419
|
36 this.index = index;
|
|
37 this.seq = seq;
|
|
38 this.replyQueue = replyQueue;
|
|
39 this.cs = cs;
|
|
40 this.reverseKey = reverseKey;
|
|
41 }
|
|
42
|
458
|
43 public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) {
|
419
|
44 this.type = cmdType;
|
|
45 this.receiver = receiver;
|
|
46 this.key = key;
|
458
|
47 this.rData = rData;
|
419
|
48 this.index = index;
|
|
49 this.seq = seq;
|
|
50 this.connection = connection;
|
|
51 this.cs = cs;
|
|
52 this.reverseKey = reverseKey;
|
|
53 }
|
|
54
|
|
55 public String getCommandString() {
|
|
56 String csName = "null";
|
|
57 if (cs != null) {
|
|
58 csName = cs.toString();
|
|
59 }
|
458
|
60 return this.type + "\t" + key + "\t" + rData + "\tindex=" + index + "\tcs=" + csName;
|
419
|
61 }
|
467
|
62
|
445
|
63 /**
|
|
64 * @return serialized ByteBuffer
|
|
65 */
|
443
|
66 public ByteBuffer convert() {
|
|
67 ByteBuffer buf = null;
|
|
68 MessagePack msg = SingletonMessage.getInstance();
|
|
69 try {
|
452
|
70 byte[] header = null;
|
|
71 byte[] data = null;
|
|
72 byte[] dataSize = null;
|
458
|
73 boolean serialized = false;
|
|
74 boolean compressed = false;
|
443
|
75 switch (type) {
|
452
|
76 /*
|
|
77 * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment
|
|
78 * case UPDATE and PUT
|
|
79 * compress and serialize flag are selected by user, so if true, need convert.
|
|
80 * case REPLY
|
|
81 * these flags represent DataSegment status.
|
|
82 * for example, serializeFlag is true. DataSegment had already converted, so no need convert.
|
|
83 */
|
443
|
84 case UPDATE:
|
458
|
85 case PUT:
|
|
86 case REPLY:
|
|
87 if (rData.compressed()) {
|
|
88 // have already converted
|
|
89 data = (byte[]) rData.getObj();
|
|
90 compressed = rData.compressed(); // true
|
|
91 serialized = rData.serialized();
|
452
|
92 } else {
|
458
|
93 if (!rData.serialized() && !rData.isByteArray()) {
|
|
94 data = msg.write(rData.getObj());
|
|
95 serialized = true;
|
|
96 } else { // rData is RAW ByteArray or already serialized
|
|
97 data = (byte[]) rData.getObj();
|
|
98 serialized = rData.serialized();
|
461
|
99 }
|
|
100 if (compressFlag) {
|
458
|
101 data = zip(data);
|
|
102 compressed = true;
|
|
103 }
|
443
|
104 }
|
467
|
105
|
458
|
106 header = msg.write(new CommandMessage(type.id, index, seq, key, false, serialized, compressed));
|
461
|
107 dataSize = msg.write(data.length);
|
443
|
108 buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
|
|
109 buf.put(header);
|
|
110 buf.put(dataSize);
|
452
|
111 buf.put(data);
|
|
112 break;
|
443
|
113 default:
|
459
|
114 header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag));
|
443
|
115 buf = ByteBuffer.allocate(header.length);
|
|
116 buf.put(header);
|
|
117 break;
|
419
|
118 }
|
443
|
119
|
|
120 buf.flip();
|
|
121 } catch (IOException e) {
|
|
122 e.printStackTrace();
|
419
|
123 }
|
443
|
124 return buf;
|
419
|
125 }
|
467
|
126
|
458
|
127 /**
|
|
128 * If this flag is true, command isn't send queue.
|
|
129 * command is executed right now.
|
467
|
130 *
|
458
|
131 * @param flag
|
|
132 */
|
467
|
133
|
446
|
134 public void setQuickFlag(boolean flag){
|
|
135 quickFlag = flag;
|
|
136 }
|
467
|
137
|
448
|
138 public boolean getQuickFlag(){
|
|
139 return quickFlag;
|
|
140 }
|
467
|
141
|
458
|
142 /**
|
|
143 * Before sending Remote DataSegment, DataSegment type is ByteArray.
|
|
144 * If this flag true, ByteArray is compressed with ZRLEE(ZRIB) algorithm
|
467
|
145 *
|
458
|
146 * @param flag
|
|
147 */
|
467
|
148
|
446
|
149 public void setCompressFlag(boolean flag){
|
|
150 compressFlag = flag;
|
|
151 }
|
467
|
152
|
448
|
153 public boolean getCompressFlag(){
|
|
154 return compressFlag;
|
|
155 }
|
467
|
156
|
452
|
157 public byte[] zip(byte[] input) throws IOException{
|
|
158 Deflater deflater = new Deflater();
|
|
159 ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
160 DeflaterOutputStream dos = new DeflaterOutputStream(os, deflater);
|
|
161 dos.write(input);
|
467
|
162 dos.finish();
|
452
|
163 return os.toByteArray();
|
467
|
164 }
|
345
|
165 }
|