Mercurial > hg > Database > Alice
changeset 452:f68d103498e0 dispose
refactor (InputDataSegment holder class changed)
author | sugi |
---|---|
date | Tue, 28 Oct 2014 17:24:16 +0900 |
parents | ad1547756565 |
children | 8470db2523d5 |
files | src/main/java/alice/codesegment/InputDataSegment.java src/main/java/alice/daemon/CommandMessage.java src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/DataSegmentKey.java src/main/java/alice/datasegment/DataSegmentValue.java src/main/java/alice/datasegment/ReceiveData.java src/main/java/alice/datasegment/Receiver.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java |
diffstat | 9 files changed, 201 insertions(+), 90 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/InputDataSegment.java Tue Oct 28 11:07:23 2014 +0900 +++ b/src/main/java/alice/codesegment/InputDataSegment.java Tue Oct 28 17:24:16 2014 +0900 @@ -5,8 +5,7 @@ import alice.datasegment.Command; import alice.datasegment.CommandType; import alice.datasegment.DataSegment; -import alice.datasegment.ReceiveLocalData; -import alice.datasegment.ReceiveRemoteData; +import alice.datasegment.ReceiveData; import alice.datasegment.Receiver; /** @@ -68,14 +67,8 @@ public void reply(Receiver receiver, Command reply) { receiver.index = reply.index; - receiver.from = reply.reverseKey; - if (reply.reverseKey==null){ - receiver.setData(new ReceiveRemoteData(reply.val)); - } else if (!reply.reverseKey.equals("local")) { - receiver.setData(new ReceiveRemoteData(reply.val)); - } else { - receiver.setData(new ReceiveLocalData(reply.obj)); - } + receiver.from = reply.reverseKey; + receiver.setData(new ReceiveData(reply.val, reply.getCompressFlag(), reply.getSerializeFlag())); receive(); }
--- a/src/main/java/alice/daemon/CommandMessage.java Tue Oct 28 11:07:23 2014 +0900 +++ b/src/main/java/alice/daemon/CommandMessage.java Tue Oct 28 17:24:16 2014 +0900 @@ -8,9 +8,9 @@ public int index; public int seq; public String key; - public boolean quickFlag; - public boolean serializedFlag; - public boolean compressedFlag; + public boolean quickFlag = false; + public boolean serialized = false; + public boolean compressed = false; public CommandMessage() {} @@ -21,7 +21,7 @@ this.seq = seq; this.key = key; this.quickFlag = qFlag; - this.serializedFlag = sFlag; - this.compressedFlag = cFlag; + this.serialized = sFlag; + this.compressed = cFlag; } }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java Tue Oct 28 11:07:23 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Tue Oct 28 17:24:16 2014 +0900 @@ -60,6 +60,9 @@ case PUT: val = getSerializedByteArray(unpacker); cmd = new Command(type, null, null, val, 0, 0, null, null, reverseKey); + // these flags express DataSegment status + cmd.setCompressFlag(msg.compressed); + cmd.setSerializeFlag(msg.serialized); lmanager.getDataSegmentKey(msg.key).runCommand(cmd); break; case PEEK: @@ -75,7 +78,10 @@ case REPLY: cmd = manager.getAndRemoveCmd(msg.seq); val = getSerializedByteArray(unpacker); - cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, val, msg.index, msg.seq, null, null, "")); + Command rCmd = new Command(type, null, null, val, msg.index, msg.seq, null, null, ""); + rCmd.setCompressFlag(msg.compressed); + rCmd.setSerializeFlag(msg.serialized); + cmd.cs.ids.reply(cmd.receiver, rCmd); break; case PING: DataSegment.get(reverseKey).response(msg.key);
--- a/src/main/java/alice/datasegment/Command.java Tue Oct 28 11:07:23 2014 +0900 +++ b/src/main/java/alice/datasegment/Command.java Tue Oct 28 17:24:16 2014 +0900 @@ -1,10 +1,14 @@ package alice.datasegment; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.BlockingQueue; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; import org.msgpack.MessagePack; + import alice.codesegment.CodeSegment; import alice.codesegment.SingletonMessage; import alice.daemon.CommandMessage; @@ -14,19 +18,18 @@ public CommandType type; public String key; public Receiver receiver; - public byte[] val; + public Object val; public int index; public int seq; public Connection connection; // for remote public BlockingQueue<Command> replyQueue; public CodeSegment cs; public String reverseKey; - public Object obj; private boolean quickFlag = false; - private boolean serializeFlag = true; - private boolean compressFlag = true; + private boolean serializeFlag = false; + private boolean compressFlag = false; - public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { + public Command(CommandType cmdType, Receiver receiver, String key, Object val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { this.type = cmdType; this.receiver = receiver; this.key = key; @@ -36,10 +39,9 @@ this.replyQueue = replyQueue; this.cs = cs; this.reverseKey = reverseKey; - this.quickFlag = false; } - public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) { + public Command(CommandType cmdType, Receiver receiver, String key, Object val, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) { this.type = cmdType; this.receiver = receiver; this.key = key; @@ -51,31 +53,6 @@ this.reverseKey = reverseKey; } - public Command(CommandType cmdType, Receiver receiver, String key, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { - this.type = cmdType; - this.receiver = receiver; - this.key = key; - this.obj = obj; - this.index = index; - this.seq = seq; - this.replyQueue = replyQueue; - this.cs = cs; - this.reverseKey = reverseKey; - } - - public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { - this.type = cmdType; - this.receiver = receiver; - this.key = key; - this.val = val; - this.obj = obj; - this.index = index; - this.seq = seq; - this.replyQueue = replyQueue; - this.cs = cs; - this.reverseKey = reverseKey; - } - public String getCommandString() { String csName = "null"; if (cs != null) { @@ -91,28 +68,57 @@ ByteBuffer buf = null; MessagePack msg = SingletonMessage.getInstance(); try { - byte[] header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, serializeFlag, compressFlag)); - + byte[] header = null; + byte[] data = null; + byte[] dataSize = null; switch (type) { + /* + * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment + * case UPDATE and PUT + * compress and serialize flag are selected by user, so if true, need convert. + * case REPLY + * these flags represent DataSegment status. + * for example, serializeFlag is true. DataSegment had already converted, so no need convert. + */ case UPDATE: - case PUT: - case REPLY: - byte[] data = null; - if (val!=null) { - data = val; - } else if (!serializeFlag) { - data = (byte[]) obj; - } else if (val==null && obj!=null) { - data = msg.write(obj); + case PUT: + if (!serializeFlag) { + data = (byte[]) val; + } else { + long start = System.currentTimeMillis(); + data = msg.write(val); + long end = System.currentTimeMillis(); + System.out.println("convert DataSegment" +(end - start)); } - byte[] dataSize = msg.write(data.length); - + if (compressFlag) { + data = zip(data); + } + header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, serializeFlag, compressFlag)); + dataSize = msg.write(data.length); buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); buf.put(header); buf.put(dataSize); - buf.put(data); + long start = System.currentTimeMillis(); + buf.put(data); + long end = System.currentTimeMillis(); + System.out.println("put DataSegment" +(end - start)); + break; + case REPLY: // only serialize + if (serializeFlag) { + data = (byte[]) val; + } else { + data = msg.write(val); + this.serializeFlag = true; + } + header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, serializeFlag, compressFlag)); + dataSize = msg.write(data.length); + buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); + buf.put(header); + buf.put(dataSize); + buf.put(data); break; default: + header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, serializeFlag, compressFlag)); buf = ByteBuffer.allocate(header.length); buf.put(header); break; @@ -148,4 +154,13 @@ public boolean getCompressFlag(){ return compressFlag; } + + public byte[] zip(byte[] input) throws IOException{ + Deflater deflater = new Deflater(); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DeflaterOutputStream dos = new DeflaterOutputStream(os, deflater); + dos.write(input); + dos.finish(); + return os.toByteArray(); + } }
--- a/src/main/java/alice/datasegment/DataSegmentKey.java Tue Oct 28 11:07:23 2014 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentKey.java Tue Oct 28 17:24:16 2014 +0900 @@ -25,13 +25,13 @@ case PUT: int index = tailIndex; tailIndex++; - DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj, cmd.reverseKey); + DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey, cmd.getCompressFlag(), cmd.getSerializeFlag()); dataList.add(dsv); // Process waiting peek and take commands for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) { Command waitCmd = iter.next(); if (waitCmd.index < index) { - replyValue(waitCmd ,dsv); + replyValue(waitCmd, dsv); iter.remove(); if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command dataList.remove(dsv); @@ -48,7 +48,7 @@ boolean waitFlag2 = true; for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { - replyValue(cmd ,data); + replyValue(cmd, data); waitFlag2 = false; break; } @@ -65,7 +65,7 @@ for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) { DataSegmentValue data = iter.next(); if (data.index > cmd.index) { - replyValue(cmd ,data); + replyValue(cmd, data); iter.remove(); waitFlag = false; break; @@ -83,17 +83,18 @@ } public void replyValue(Command cmd, DataSegmentValue data){ + Command rCmd = new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from); + rCmd.setCompressFlag(data.compressed); + rCmd.setSerializeFlag(data.serialized); if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local. - cmd.cs.ids.reply(cmd.receiver, new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); + cmd.cs.ids.reply(cmd.receiver, rCmd); } else { try { if (!cmd.getQuickFlag()){ - cmd.connection.sendQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); + cmd.connection.sendQueue.put(rCmd); + } else { + cmd.connection.write(rCmd); } - else { - cmd.connection.write(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); - } - } catch (InterruptedException e) { e.printStackTrace(); }
--- a/src/main/java/alice/datasegment/DataSegmentValue.java Tue Oct 28 11:07:23 2014 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentValue.java Tue Oct 28 17:24:16 2014 +0900 @@ -3,17 +3,18 @@ public class DataSegmentValue { public int index; - public byte[] val; + public Object val; public String from; - public Object obj; public boolean compressed; public boolean serialized; - public DataSegmentValue(int index, byte[] val, Object obj, String reverseKey) { + public DataSegmentValue(int index, Object val, String reverseKey, + boolean compressed, boolean serialized) { this.index = index; this.val = val; this.from = reverseKey; - this.obj = obj; + this.compressed = compressed; + this.serialized = serialized; } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/datasegment/ReceiveData.java Tue Oct 28 17:24:16 2014 +0900 @@ -0,0 +1,100 @@ +package alice.datasegment; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.zip.Inflater; +import java.util.zip.InflaterOutputStream; + +import org.msgpack.type.Value; + +import alice.codesegment.SingletonMessage; + +public class ReceiveData { + private Object val; + + // both flag have to be true or false except DataSegment is byteArray; + private boolean compressed = false; + private boolean serialized = false; + + public ReceiveData(Object obj, boolean cFlag, boolean sFlag){ + val = obj; + compressed = cFlag; + serialized = sFlag; + } + + public boolean compressed(){ + return compressed; + } + + public boolean serialized(){ + return serialized; + } + + public Object getObj(){ + return val; + } + + public String asString(){ + if (serialized){ + return asClass(String.class); + } else { + return (String) val; + } + } + + public int asInteger(){ + if (serialized){ + return asClass(Integer.class); + } else { + return (Integer) val; + } + } + + public Float asFloat(){ + if (serialized){ + return asClass(Float.class); + } else { + return (Float) val; + } + } + + public Value getVal(){ + if (serialized){ + return asClass(Value.class); + } else { + try { + return SingletonMessage.getInstance().unconvert(val); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + } + + @SuppressWarnings("unchecked") + public <T> T asClass(Class<T> clazz) { + try { + if (!serialized) + return (T) val; + byte[] b = null; + if (compressed) { + b = unzip((byte[]) val); + } else { + b = (byte[]) val; + } + return SingletonMessage.getInstance().read(b, clazz); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + public byte[] unzip(byte[] zipped) throws IOException{ + Inflater inflater = new Inflater(); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + InflaterOutputStream ios = new InflaterOutputStream(os, inflater); + ios.write(zipped); + ios.finish(); + return os.toByteArray(); + } +}
--- a/src/main/java/alice/datasegment/Receiver.java Tue Oct 28 11:07:23 2014 +0900 +++ b/src/main/java/alice/datasegment/Receiver.java Tue Oct 28 17:24:16 2014 +0900 @@ -9,7 +9,7 @@ * */ public class Receiver { - private ReceiverData data = new ReceiveRemoteData(); + private ReceiveData data; public InputDataSegment ids; public int index; public String from; @@ -85,7 +85,7 @@ ids.setKey(); } - public void setData(ReceiverData r) { + public void setData(ReceiveData r) { data = r; }
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Tue Oct 28 11:07:23 2014 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Tue Oct 28 17:24:16 2014 +0900 @@ -52,15 +52,8 @@ */ @Override public void put(String key, Object val) { - Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); - connection.sendCommand(cmd); // put command on the transmission thread - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); - } - - public void put(String key, byte[] val) { Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, ""); - cmd.setSerializeFlag(false); + cmd.setSerializeFlag(true); connection.sendCommand(cmd); // put command on the transmission thread if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); @@ -68,7 +61,8 @@ @Override public void quickPut(String key, Object val) { - Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); + Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, ""); + cmd.setSerializeFlag(true); connection.write(cmd); // put command is executed right now if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); @@ -76,7 +70,8 @@ @Override public void update(String key, Object val) { - Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); + Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, ""); + cmd.setSerializeFlag(true); connection.sendCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); @@ -84,7 +79,8 @@ @Override public void quickUpdate(String key, Object val) { - Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null); + Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, ""); + cmd.setSerializeFlag(true); connection.write(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); @@ -168,5 +164,4 @@ connection.close(); } - }