Mercurial > hg > Database > Alice
diff src/main/java/alice/datasegment/Command.java @ 523:145c425db88d dispose
add CompressedLDSM
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 09 Apr 2015 18:36:26 +0900 |
parents | 7ef0ebb40c9b |
children | 30a74eee59c7 |
line wrap: on
line diff
--- a/src/main/java/alice/datasegment/Command.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/datasegment/Command.java Thu Apr 09 18:36:26 2015 +0900 @@ -7,6 +7,7 @@ import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; +import org.apache.log4j.Logger; import org.msgpack.MessagePack; import alice.codesegment.CodeSegment; @@ -14,20 +15,28 @@ import alice.daemon.CommandMessage; import alice.daemon.Connection; +/** + * DSMで使われる各コマンドのセット(ReceiveDataからのDSの読み込み) + */ public class Command { - public CommandType type; + public CommandType type;//PEEK, PUTなどのコマンドタイプ public String key; public Receiver receiver; public ReceiveData rData; - public int index; - public int seq; + public int index;//使ってない。アクセス用のindex。負の遺産。 + public int seq;//DSの待ち合わせを行っているCSを表すunique number。リモート用。対応コマンドを表す。 public Connection connection; // for remote - public BlockingQueue<Command> replyQueue; + public BlockingQueue<Command> replyQueue;//PEEK/TAKE必要な返り値? public CodeSegment cs; - public String reverseKey; - private boolean quickFlag = false; - private boolean compressFlag = false; + public String reverseKey;//どこからput/updateされたか + private boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか。trueだとリモート + private boolean compressFlag = false;//trueだったら圧縮する必要がある + private Logger logger = Logger.getLogger("MessagePack"); + + /** + * for PEEK/TAKE + */ public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { this.type = cmdType; this.receiver = receiver; @@ -40,6 +49,9 @@ this.reverseKey = reverseKey; } + /** + * for PUT/UPDATE/REPLY/PING/RESPONSE + */ public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) { this.type = cmdType; this.receiver = receiver; @@ -52,6 +64,10 @@ this.reverseKey = reverseKey; } + /** + * String型でコマンドを取得するメソッド。たぶんログ表示用。 + * @return + */ public String getCommandString() { String csName = "null"; if (cs != null) { @@ -63,15 +79,15 @@ /** * @return serialized ByteBuffer */ - public ByteBuffer convert() { + public ByteBuffer convert() {//メッセージパックでbyteArrayに変換 ByteBuffer buf = null; MessagePack msg = SingletonMessage.getInstance(); try { - byte[] header = null; - byte[] data = null; - byte[] dataSize = null; - boolean serialized = false; - boolean compressed = false; + byte[] header = null;//DSのメタデータ用byteArray + byte[] data = null;//DS本体用byteArray + byte[] dataSize = null;//DSのサイズ + boolean serialized = false;//DSがシリアライズ状態かのフラグ + boolean compressed = false;//DSが圧縮されているかのフラグ switch (type) { /* * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment @@ -83,50 +99,56 @@ */ case UPDATE: case PUT: - case REPLY: - if (rData.compressed()) { + case REPLY://ReceiveDataからREPLYするDSを取得 + if (rData.compressed()) {//圧縮されている場合:各フラグの状態とDS本体を取得 // have already converted data = (byte[]) rData.getObj(); - compressed = rData.compressed(); // true - serialized = rData.serialized(); - } else { + compressed = rData.compressed(); + serialized = rData.serialized();//シリアライズされているか=Remoteから送られてきたか + } else {//圧縮されていない場合:状態にあわせてDS本体を取得し各フラグを立てる if (!rData.serialized() && !rData.isByteArray()) { - data = msg.write(rData.getObj()); + data = msg.write(rData.getObj());//シリアライズ serialized = true; } else { // rData is RAW ByteArray or already serialized data = (byte[]) rData.getObj(); serialized = rData.serialized(); } - if (compressFlag) { - data = zip(data); + if (compressFlag) {//圧縮する ここはあとで消す + data = rData.zip(data); compressed = true; } } CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed); - if (rData.setTime) { + if (rData.setTime) {//AliceVNCの計測用(消してもいい) cm.setTime = true; cm.time = rData.time; cm.depth = rData.depth + 1; } + //MessagePackでDSを作成(ヘッダー・データ本体のサイズ・データ本体) header = msg.write(cm); dataSize = msg.write(data.length); buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); buf.put(header); buf.put(dataSize); buf.put(data); + + //System.out.print("messagePack = " + Integer.toHexString(buf.getInt() & 0xff)); + logger.debug("messagePack = " + Integer.toHexString(buf.getInt() & 0xff)); + break; - default: + default://PEEK, TAKE, RemoveならCommandMessageとそのサイズのみセット header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag)); buf = ByteBuffer.allocate(header.length); buf.put(header); + //System.out.print("messagePack = " + Integer.toHexString(buf.getInt() & 0xff)); break; } buf.flip(); } catch (IOException e) { e.printStackTrace(); - } + }//ここに圧縮機能を入れる return buf; } @@ -137,11 +159,11 @@ * @param flag */ - public void setQuickFlag(boolean flag){ + public void setQuickFlag(boolean flag){//SEDA処理の有無フラグのsetter quickFlag = flag; } - public boolean getQuickFlag(){ + public boolean getQuickFlag(){//SEDA処理の有無フラグのgetter return quickFlag; } @@ -152,20 +174,12 @@ * @param flag */ - public void setCompressFlag(boolean flag){ + public void setCompressFlag(boolean flag){//圧縮フラグのsetter compressFlag = flag; } - public boolean getCompressFlag(){ + public boolean getCompressFlag(){//圧縮フラグのgetter return compressFlag; } - public byte[] zip(byte[] input) throws IOException{ - Deflater deflater = new Deflater(); - ByteArrayOutputStream os = new ByteArrayOutputStream(); - DeflaterOutputStream dos = new DeflaterOutputStream(os, deflater); - dos.write(input); - dos.finish(); - return os.toByteArray(); - } }