view src/main/java/alice/datasegment/Command.java @ 459:4419a2415661 dispose

can use compress option
author sugi
date Mon, 03 Nov 2014 20:26:05 +0900
parents bcf6f4a6fcd0
children 99e309768ac9
line wrap: on
line source

package alice.datasegment;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;

import org.msgpack.MessagePack;

import alice.codesegment.CodeSegment;
import alice.codesegment.SingletonMessage;
import alice.daemon.CommandMessage;
import alice.daemon.Connection;

public class Command {
    public CommandType type;
    public String key;
    public Receiver receiver;
    public ReceiveData rData;
    public int index;
    public int seq;
    public Connection connection; // for remote
    public BlockingQueue<Command> replyQueue;
    public CodeSegment cs;
    public String reverseKey;
    private boolean quickFlag = false;
    private boolean compressFlag = false;

    public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) {
        this.type = cmdType;
        this.receiver = receiver;
        this.key = key;
        this.rData = rData;
        this.index = index;
        this.seq = seq;
        this.replyQueue = replyQueue;
        this.cs = cs;
        this.reverseKey = reverseKey;
    }

    public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) {
        this.type = cmdType;
        this.receiver = receiver;
        this.key = key;
        this.rData = rData;
        this.index = index;
        this.seq = seq;
        this.connection = connection;
        this.cs = cs;
        this.reverseKey = reverseKey;
    }

    public String getCommandString() {
        String csName = "null";
        if (cs != null) {
            csName = cs.toString();
        }
        return this.type + "\t" + key + "\t" + rData + "\tindex=" + index + "\tcs=" + csName;
    }
      
    /**
     * @return serialized ByteBuffer
     */
    public ByteBuffer convert() {
        ByteBuffer buf = null;
        MessagePack msg = SingletonMessage.getInstance();
        try {
            byte[] header = null;
            byte[] data = null;
            byte[] dataSize = null;
            boolean serialized = false;
            boolean compressed = false;
            switch (type) {
            /*
             * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment
             * case UPDATE and PUT
             * compress and serialize flag are selected by user, so if true, need convert.
             * case REPLY
             * these flags represent DataSegment status.
             * for example, serializeFlag is true. DataSegment had already converted, so no need convert.
             */
            case UPDATE:
            case PUT:
            case REPLY:
                if (rData.compressed()) {
                    // have already converted
                    data = (byte[]) rData.getObj();
                    compressed = rData.compressed(); // true
                    serialized = rData.serialized();
                } else {
                    if (!rData.serialized() && !rData.isByteArray()) {
                        data = msg.write(rData.getObj());
                        serialized = true;
                    } else { // rData is RAW ByteArray or already serialized
                        data = (byte[]) rData.getObj();
                        serialized = rData.serialized();
                    }                    
                    if (compressFlag) {                        
                        data = zip(data);
                        compressed = true;
                    }
                }
                
                header = msg.write(new CommandMessage(type.id, index, seq, key, false, serialized, compressed));
                dataSize = msg.write(data.length);                
                buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
                buf.put(header);
                buf.put(dataSize);
                buf.put(data);
                break;
            default:
                header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag));
                buf = ByteBuffer.allocate(header.length);
                buf.put(header);
                break;
            }

            buf.flip();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return buf;
    }
    
    /**
     * If this flag is true, command isn't send queue.
     * command is executed right now.
     * 
     * @param flag
     */
    
    public void setQuickFlag(boolean flag){
        quickFlag = flag;
    }
    
    public boolean getQuickFlag(){
        return quickFlag;
    }
    
    /**
     * Before sending Remote DataSegment, DataSegment type is ByteArray.
     * If this flag true, ByteArray is compressed with ZRLEE(ZRIB) algorithm
     * 
     * @param flag
     */
    
    public void setCompressFlag(boolean flag){
        compressFlag = flag;
    }
    
    public boolean getCompressFlag(){
        return compressFlag;
    }
    
    public byte[] zip(byte[] input) throws IOException{
        Deflater deflater = new Deflater();
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        DeflaterOutputStream dos = new DeflaterOutputStream(os, deflater);
        dos.write(input);
        dos.finish(); 
        return os.toByteArray();
    } 
}