view src/main/java/alice/datasegment/Command.java @ 574:ea21af9a4762 dispose

delete serializeFlag, fix MessagePack pack&unpack
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Tue, 15 Dec 2015 11:49:07 +0900
parents f1777341c5a2
children 3284428f525e
line wrap: on
line source

package alice.datasegment;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;

import org.msgpack.MessagePack;

import alice.codesegment.CodeSegment;
import alice.daemon.CommandMessage;
import alice.daemon.Connection;

/**
 * DSMで使われる各コマンドのセット(ReceiveDataからのDSの読み込み)
 */
public class Command {
    public CommandType type;//PEEK, PUTなどのコマンドタイプ
    public String key;
    public Receiver receiver;
    public ReceiveData rData;
    public int index;//使ってない。アクセス用のindex。負の遺産。
    public int seq;//DSの待ち合わせを行っているCSを表すunique number。リモート用。対応コマンドを表す。
    public Connection connection; // for remote
    public BlockingQueue<Command> replyQueue;//PEEK/TAKE必要な返り値?
    public CodeSegment cs;
    public String reverseKey;//どこからput/updateされたか
    private boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか。trueだとリモート
    private boolean compressFlag = false;//trueだったら圧縮する必要がある

    private static final MessagePack packer = new MessagePack();

    /**
     * for PEEK/TAKE
     */
    public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
        this.type = cmdType;
        this.receiver = receiver;
        this.key = key;
        this.rData = rData;
        this.index = index;
        this.seq = seq;
        this.replyQueue = replyQueue;
        this.cs = cs;
        this.reverseKey = reverseKey;
    }

    /**
     * for PUT/UPDATE/REPLY/PING/RESPONSE
     */
    public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) {
        this.type = cmdType;
        this.receiver = receiver;
        this.key = key;
        this.rData = rData;
        this.index = index;
        this.seq = seq;
        this.connection = connection;
        this.cs = cs;
        this.reverseKey = reverseKey;
    }

    /**
     * String型でコマンドを取得するメソッド。たぶんログ表示用。
     * @return
     */
    public String getCommandString() {
        String csName = "null";
        if (cs != null) {
            csName = cs.toString();
        }
        return this.type + "\t" + key + "\t" + rData + "\tindex=" + index + "\tcs=" + csName;
    }
    /**
     * @return serialized ByteBuffer
     */
    public ByteBuffer convert() {
        ByteBuffer buf = null;

        try {
            byte[] header = null;
            byte[] data = null;
            byte[] dataSize = null;
            switch (type) {
        /*
         * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment
         * case UPDATE and PUT
         * compress and serialize flag are selected by user, so if true, need convert.
         * case REPLY
         * these flags represent DataSegment status.
         * for example, serializeFlag is true. DataSegment had already converted, so no need convert.
         */
                case UPDATE:
                case PUT:
                case REPLY:
                    if(compressFlag){
                        data = rData.getZMessagePack();
                    } else {
                        data = rData.getMessagePack();
                    }

                    CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, compressFlag, rData.getDataSize());
                    if (rData.getSetTime()) {
                        cm.setTime = true;
                        cm.time = rData.getTime();
                        cm.depth = rData.getDepth() + 1;
                    }

                    if (rData.getSetZipped()){
                        cm.setZepped = true;
                        cm.zippedDataSize = rData.getZippedDataSize();
                    }

                    header = packer.write(cm);
                    dataSize = packer.write(data.length);
                    buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
                    buf.put(header);
                    buf.put(dataSize);
                    buf.put(data);
                    break;
                default:
                    header = packer.write(new CommandMessage(type.id, index, seq, key, quickFlag, compressFlag, 0));
                    buf = ByteBuffer.allocate(header.length);
                    buf.put(header);
                    break;
            }

            buf.flip();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return buf;
    }

    /**
     * If this flag is true, command isn't send queue.
     * command is executed right now.
     *
     * @param flag
     */

    public void setQuickFlag(boolean flag){
        quickFlag = flag;
    }

    public boolean getQuickFlag(){
        return quickFlag;
    }

    /**
     * Before sending Remote DataSegment, DataSegment type is ByteArray.
     * If this flag true, ByteArray is compressed with ZRLEE(ZRIB) algorithm
     *
     * @param flag
     */

    public void setCompressFlag(boolean flag){
        compressFlag = flag;
    }

    public boolean getCompressFlag(){
        return compressFlag;
    }

    public void setSeq(int seq) {
        this.seq = seq;
    }
}