Mercurial > hg > Database > Alice
view src/main/java/alice/datasegment/Command.java @ 459:4419a2415661 dispose
can use compress option
author | sugi |
---|---|
date | Mon, 03 Nov 2014 20:26:05 +0900 |
parents | bcf6f4a6fcd0 |
children | 99e309768ac9 |
line wrap: on
line source
package alice.datasegment; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.BlockingQueue; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import org.msgpack.MessagePack; import alice.codesegment.CodeSegment; import alice.codesegment.SingletonMessage; import alice.daemon.CommandMessage; import alice.daemon.Connection; public class Command { public CommandType type; public String key; public Receiver receiver; public ReceiveData rData; public int index; public int seq; public Connection connection; // for remote public BlockingQueue<Command> replyQueue; public CodeSegment cs; public String reverseKey; private boolean quickFlag = false; private boolean compressFlag = false; public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { this.type = cmdType; this.receiver = receiver; this.key = key; this.rData = rData; this.index = index; this.seq = seq; this.replyQueue = replyQueue; this.cs = cs; this.reverseKey = reverseKey; } public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) { this.type = cmdType; this.receiver = receiver; this.key = key; this.rData = rData; this.index = index; this.seq = seq; this.connection = connection; this.cs = cs; this.reverseKey = reverseKey; } public String getCommandString() { String csName = "null"; if (cs != null) { csName = cs.toString(); } return this.type + "\t" + key + "\t" + rData + "\tindex=" + index + "\tcs=" + csName; } /** * @return serialized ByteBuffer */ public ByteBuffer convert() { ByteBuffer buf = null; MessagePack msg = SingletonMessage.getInstance(); try { byte[] header = null; byte[] data = null; byte[] dataSize = null; boolean serialized = false; boolean compressed = false; switch (type) { /* * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment * case UPDATE and PUT * compress and serialize flag are selected by user, so if true, need convert. * case REPLY * these flags represent DataSegment status. * for example, serializeFlag is true. DataSegment had already converted, so no need convert. */ case UPDATE: case PUT: case REPLY: if (rData.compressed()) { // have already converted data = (byte[]) rData.getObj(); compressed = rData.compressed(); // true serialized = rData.serialized(); } else { if (!rData.serialized() && !rData.isByteArray()) { data = msg.write(rData.getObj()); serialized = true; } else { // rData is RAW ByteArray or already serialized data = (byte[]) rData.getObj(); serialized = rData.serialized(); } if (compressFlag) { data = zip(data); compressed = true; } } header = msg.write(new CommandMessage(type.id, index, seq, key, false, serialized, compressed)); dataSize = msg.write(data.length); buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); buf.put(header); buf.put(dataSize); buf.put(data); break; default: header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag)); buf = ByteBuffer.allocate(header.length); buf.put(header); break; } buf.flip(); } catch (IOException e) { e.printStackTrace(); } return buf; } /** * If this flag is true, command isn't send queue. * command is executed right now. * * @param flag */ public void setQuickFlag(boolean flag){ quickFlag = flag; } public boolean getQuickFlag(){ return quickFlag; } /** * Before sending Remote DataSegment, DataSegment type is ByteArray. * If this flag true, ByteArray is compressed with ZRLEE(ZRIB) algorithm * * @param flag */ public void setCompressFlag(boolean flag){ compressFlag = flag; } public boolean getCompressFlag(){ return compressFlag; } public byte[] zip(byte[] input) throws IOException{ Deflater deflater = new Deflater(); ByteArrayOutputStream os = new ByteArrayOutputStream(); DeflaterOutputStream dos = new DeflaterOutputStream(os, deflater); dos.write(input); dos.finish(); return os.toByteArray(); } }