Mercurial > hg > Database > Alice
annotate src/main/java/alice/datasegment/Command.java @ 529:cb7c31848d16 dispose
add CompressedDSMs
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Fri, 01 May 2015 18:19:16 +0900 |
parents | 6ebddfac7ff6 |
children | 4aeebea0c9b5 |
rev | line source |
---|---|
345 | 1 package alice.datasegment; |
2 | |
3 import java.io.IOException; | |
443 | 4 import java.nio.ByteBuffer; |
345 | 5 import java.util.concurrent.BlockingQueue; |
6 | |
443 | 7 import org.msgpack.MessagePack; |
452 | 8 |
345 | 9 import alice.codesegment.CodeSegment; |
10 import alice.daemon.CommandMessage; | |
11 import alice.daemon.Connection; | |
12 | |
523
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
13 /** |
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
14 * DSMで使われる各コマンドのセット(ReceiveDataからのDSの読み込み) |
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
15 */ |
345 | 16 public class Command { |
523
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
17 public CommandType type;//PEEK, PUTなどのコマンドタイプ |
419 | 18 public String key; |
19 public Receiver receiver; | |
458 | 20 public ReceiveData rData; |
523
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
21 public int index;//使ってない。アクセス用のindex。負の遺産。 |
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
22 public int seq;//DSの待ち合わせを行っているCSを表すunique number。リモート用。対応コマンドを表す。 |
419 | 23 public Connection connection; // for remote |
523
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
24 public BlockingQueue<Command> replyQueue;//PEEK/TAKE必要な返り値? |
419 | 25 public CodeSegment cs; |
523
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
26 public String reverseKey;//どこからput/updateされたか |
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
27 private boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか。trueだとリモート |
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
28 private boolean compressFlag = false;//trueだったら圧縮する必要がある |
419 | 29 |
529
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
30 private static final MessagePack packer = new MessagePack(); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
31 |
523
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
32 /** |
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
33 * for PEEK/TAKE |
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
34 */ |
458 | 35 public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { |
419 | 36 this.type = cmdType; |
37 this.receiver = receiver; | |
38 this.key = key; | |
458 | 39 this.rData = rData; |
419 | 40 this.index = index; |
41 this.seq = seq; | |
42 this.replyQueue = replyQueue; | |
43 this.cs = cs; | |
44 this.reverseKey = reverseKey; | |
45 } | |
46 | |
523
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
47 /** |
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
48 * for PUT/UPDATE/REPLY/PING/RESPONSE |
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
49 */ |
458 | 50 public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) { |
419 | 51 this.type = cmdType; |
52 this.receiver = receiver; | |
53 this.key = key; | |
458 | 54 this.rData = rData; |
419 | 55 this.index = index; |
56 this.seq = seq; | |
57 this.connection = connection; | |
58 this.cs = cs; | |
59 this.reverseKey = reverseKey; | |
60 } | |
61 | |
523
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
62 /** |
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
63 * String型でコマンドを取得するメソッド。たぶんログ表示用。 |
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
64 * @return |
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
488
diff
changeset
|
65 */ |
419 | 66 public String getCommandString() { |
67 String csName = "null"; | |
68 if (cs != null) { | |
69 csName = cs.toString(); | |
70 } | |
458 | 71 return this.type + "\t" + key + "\t" + rData + "\tindex=" + index + "\tcs=" + csName; |
419 | 72 } |
445 | 73 /** |
74 * @return serialized ByteBuffer | |
75 */ | |
529
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
76 public ByteBuffer convert() { |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
77 ByteBuffer buf = null; |
527 | 78 |
529
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
79 try { |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
80 byte[] header = null; |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
81 byte[] data = null; |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
82 byte[] dataSize = null; |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
83 boolean serialized = false; |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
84 boolean compressed = false; |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
85 switch (type) { |
527 | 86 /* |
87 * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment | |
88 * case UPDATE and PUT | |
89 * compress and serialize flag are selected by user, so if true, need convert. | |
90 * case REPLY | |
91 * these flags represent DataSegment status. | |
92 * for example, serializeFlag is true. DataSegment had already converted, so no need convert. | |
93 */ | |
529
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
94 case UPDATE: |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
95 case PUT: |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
96 case REPLY: |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
97 if (rData.compressed()) { |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
98 // have already converted |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
99 data = (byte[]) rData.getObj(); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
100 compressed = rData.compressed(); // true |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
101 serialized = rData.serialized(); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
102 } else { |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
103 if (!rData.serialized() && !rData.isByteArray()) { |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
104 data = packer.write(rData.getObj()); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
105 serialized = true; |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
106 } else { // rData is RAW ByteArray or already serialized |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
107 data = (byte[]) rData.getObj(); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
108 serialized = rData.serialized(); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
109 } |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
110 if (compressFlag) { |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
111 rData.zip(); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
112 compressed = true; |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
113 } |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
114 } |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
115 CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
116 if (rData.setTime) { |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
117 cm.setTime = true; |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
118 cm.time = rData.time; |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
119 cm.depth = rData.depth + 1; |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
120 } |
525
30a74eee59c7
working TestRemoteAlice
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
523
diff
changeset
|
121 |
529
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
122 header = packer.write(cm); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
123 dataSize = packer.write(data.length); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
124 buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
125 buf.put(header); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
126 buf.put(dataSize); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
127 buf.put(data); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
128 break; |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
129 default: |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
130 header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag)); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
131 buf = ByteBuffer.allocate(header.length); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
132 buf.put(header); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
133 break; |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
134 } |
528
6ebddfac7ff6
delete RecieveData.setCompressFlag
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
527
diff
changeset
|
135 |
529
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
136 buf.flip(); |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
137 } catch (IOException e) { |
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
138 e.printStackTrace(); |
527 | 139 } |
443 | 140 return buf; |
419 | 141 } |
467 | 142 |
458 | 143 /** |
144 * If this flag is true, command isn't send queue. | |
145 * command is executed right now. | |
467 | 146 * |
458 | 147 * @param flag |
148 */ | |
467 | 149 |
529
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
150 public void setQuickFlag(boolean flag){ |
446 | 151 quickFlag = flag; |
152 } | |
467 | 153 |
529
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
154 public boolean getQuickFlag(){ |
448 | 155 return quickFlag; |
156 } | |
467 | 157 |
458 | 158 /** |
159 * Before sending Remote DataSegment, DataSegment type is ByteArray. | |
160 * If this flag true, ByteArray is compressed with ZRLEE(ZRIB) algorithm | |
467 | 161 * |
458 | 162 * @param flag |
163 */ | |
467 | 164 |
529
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
165 public void setCompressFlag(boolean flag){ |
446 | 166 compressFlag = flag; |
167 } | |
467 | 168 |
529
cb7c31848d16
add CompressedDSMs
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
528
diff
changeset
|
169 public boolean getCompressFlag(){ |
448 | 170 return compressFlag; |
171 } | |
345 | 172 } |