Mercurial > hg > Database > Alice
changeset 458:bcf6f4a6fcd0 dispose
need set Meta DataSegment PUT API
line wrap: on
line diff
--- a/src/main/java/alice/codesegment/InputDataSegment.java Mon Nov 03 13:21:36 2014 +0900 +++ b/src/main/java/alice/codesegment/InputDataSegment.java Mon Nov 03 17:12:53 2014 +0900 @@ -5,7 +5,6 @@ import alice.datasegment.Command; import alice.datasegment.CommandType; import alice.datasegment.DataSegment; -import alice.datasegment.ReceiveData; import alice.datasegment.Receiver; import alice.datasegment.SendOption; @@ -73,7 +72,7 @@ public void reply(Receiver receiver, Command reply) { receiver.index = reply.index; receiver.from = reply.reverseKey; - receiver.setData(new ReceiveData(reply.val, reply.getCompressFlag(), reply.getSerializeFlag())); + receiver.setData(reply.rData); receive(); }
--- a/src/main/java/alice/codesegment/OutputDataSegment.java Mon Nov 03 13:21:36 2014 +0900 +++ b/src/main/java/alice/codesegment/OutputDataSegment.java Mon Nov 03 17:12:53 2014 +0900 @@ -2,6 +2,7 @@ import alice.datasegment.CommandType; import alice.datasegment.DataSegment; +import alice.datasegment.ReceiveData; import alice.datasegment.Receiver; import alice.datasegment.SendOption; @@ -13,16 +14,16 @@ */ public void flip(Receiver receiver) { - DataSegment.getLocal().put(receiver.key, receiver.getObj(), null); + DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), null); } public void flip(Receiver receiver, CommandType type) { switch (type) { - case PUT: - DataSegment.getLocal().put(receiver.key, receiver.getObj(), null); + case PUT: + DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), null); break; case UPDATE: - DataSegment.getLocal().update(receiver.key, receiver.getObj(), null); + DataSegment.getLocal().update(receiver.key, receiver.getReceiveData(), null); break; default: break; @@ -30,11 +31,13 @@ } public void put(String key, Object val) { - DataSegment.getLocal().put(key, val, null); + ReceiveData rData = new ReceiveData(val, false, false); + DataSegment.getLocal().put(key, rData, null); } public void update(String key, Object val) { - DataSegment.getLocal().update(key, val, null); + ReceiveData rData = new ReceiveData(val, false, false); + DataSegment.getLocal().update(key, rData, null); } /** @@ -42,8 +45,9 @@ */ public void put(String managerKey, String key, Object val) { if (!managerKey.equals("local")){ + ReceiveData rData = new ReceiveData(val, false, false); SendOption option = new SendOption(false, compressFlag()); - DataSegment.get(managerKey).put(key, val, option); + DataSegment.get(managerKey).put(key, rData, option); } else { put(key, val); } @@ -51,8 +55,9 @@ public void quickPut(String managerKey, String key, Object val) { if (!managerKey.equals("local")){ + ReceiveData rData = new ReceiveData(val, false, false); SendOption option = new SendOption(true, compressFlag()); - DataSegment.get(managerKey).put(key, val, option); + DataSegment.get(managerKey).put(key, rData, option); } else { put(key, val); } @@ -60,8 +65,9 @@ public void update(String managerKey, String key, Object val) { if (!managerKey.equals("local")){ + ReceiveData rData = new ReceiveData(val, false, false); SendOption option = new SendOption(false, compressFlag()); - DataSegment.get(managerKey).update(key, val, option); + DataSegment.get(managerKey).update(key, rData, option); } else { update(key, val); } @@ -69,8 +75,9 @@ public void quickUpdate(String managerKey, String key, Object val) { if (!managerKey.equals("local")){ + ReceiveData rData = new ReceiveData(val, false, false); SendOption option = new SendOption(true, compressFlag()); - DataSegment.get(managerKey).update(key, val, option); + DataSegment.get(managerKey).update(key, rData, option); } else { update(key, val); }
--- a/src/main/java/alice/daemon/Connection.java Mon Nov 03 13:21:36 2014 +0900 +++ b/src/main/java/alice/daemon/Connection.java Mon Nov 03 17:12:53 2014 +0900 @@ -8,6 +8,7 @@ import alice.datasegment.Command; import alice.datasegment.DataSegment; +import alice.datasegment.ReceiveData; public class Connection { @@ -62,8 +63,9 @@ } public void putConnectionInfo() { - ConnectionInfo c = new ConnectionInfo(socket.getInetAddress().toString() ,socket.getPort()); - DataSegment.getLocal().put("disconnect", c, null); + ConnectionInfo c = new ConnectionInfo(socket.getInetAddress().toString(), socket.getPort()); + ReceiveData rData = new ReceiveData(c, false, false); + DataSegment.getLocal().put("disconnect", rData, null); } }
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java Mon Nov 03 13:21:36 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Mon Nov 03 17:12:53 2014 +0900 @@ -13,6 +13,7 @@ import alice.datasegment.DataSegment; import alice.datasegment.DataSegmentManager; import alice.datasegment.LocalDataSegmentManager; +import alice.datasegment.ReceiveData; import alice.topology.manager.keeparive.RespondData; public class IncomingTcpConnection extends Thread { @@ -52,17 +53,14 @@ while (true) { try { Command cmd = null; - byte[] val = null; + ReceiveData rData = null; CommandMessage msg = unpacker.read(CommandMessage.class); CommandType type = CommandType.getCommandTypeFromId(msg.type); switch (type) { case UPDATE: case PUT: - val = getSerializedByteArray(unpacker); - cmd = new Command(type, null, null, val, 0, 0, null, null, reverseKey); - // these flags express DataSegment status - cmd.setCompressFlag(msg.compressed); - cmd.setSerializeFlag(msg.serialized); + rData = new ReceiveData(getSerializedByteArray(unpacker), msg.compressed, msg.serialized); + cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey); lmanager.getDataSegmentKey(msg.key).runCommand(cmd); break; case PEEK: @@ -77,8 +75,8 @@ break; case REPLY: cmd = manager.getAndRemoveCmd(msg.seq); - val = getSerializedByteArray(unpacker); - Command rCmd = new Command(type, null, null, val, msg.index, msg.seq, null, null, ""); + rData = new ReceiveData(getSerializedByteArray(unpacker), msg.compressed, msg.serialized); + Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); rCmd.setCompressFlag(msg.compressed); rCmd.setSerializeFlag(msg.serialized); cmd.cs.ids.reply(cmd.receiver, rCmd); @@ -87,7 +85,8 @@ DataSegment.get(reverseKey).response(msg.key); break; case RESPONSE: - DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()), null); + rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis()), false, false); + DataSegment.getLocal().put(msg.key, rData, null); break; default: break;
--- a/src/main/java/alice/daemon/IncomingUdpConnection.java Mon Nov 03 13:21:36 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingUdpConnection.java Mon Nov 03 17:12:53 2014 +0900 @@ -12,6 +12,7 @@ import alice.datasegment.CommandType; import alice.datasegment.DataSegment; import alice.datasegment.DataSegmentManager; +import alice.datasegment.ReceiveData; import alice.topology.manager.keeparive.RespondData; public class IncomingUdpConnection extends IncomingTcpConnection { @@ -35,6 +36,7 @@ try { Command cmd = null; byte[] val = null; + ReceiveData rData = null; // Max data length is 65507 because of the max length of UDP payload ByteBuffer receive = ByteBuffer.allocate(65507); receiver.receive(receive); @@ -44,13 +46,11 @@ CommandType type = CommandType.getCommandTypeFromId(msg.type); switch (type) { case UPDATE: - case PUT: + case PUT: val = new byte[unpacker.readInt()]; receive.get(val); - cmd = new Command(type, null, null, val, 0, 0, null, null, reverseKey); - // these flags express DataSegment status - cmd.setCompressFlag(msg.compressed); - cmd.setSerializeFlag(msg.serialized); + rData = new ReceiveData(val, msg.compressed, msg.serialized); + cmd = new Command(type, null, null, rData, 0, 0, null, null, reverseKey); getLocalDataSegmentManager().getDataSegmentKey(msg.key).runCommand(cmd); break; case PEEK: @@ -67,17 +67,16 @@ cmd = manager.getAndRemoveCmd(msg.seq); val = new byte[unpacker.readInt()]; receive.get(val); - Command rCmd = new Command(type, null, null, val, msg.index, msg.seq, null, null, ""); - // these flags express DataSegment status - rCmd.setCompressFlag(msg.compressed); - rCmd.setSerializeFlag(msg.serialized); + rData = new ReceiveData(val, msg.compressed, msg.serialized); + Command rCmd = new Command(type, null, null, rData, msg.index, msg.seq, null, null, ""); cmd.cs.ids.reply(cmd.receiver, rCmd); break; case PING: DataSegment.get(reverseKey).response(msg.key); break; case RESPONSE: - DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis()), null); + rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis()), false, false); + DataSegment.getLocal().put(msg.key, rData, null); break; default: break;
--- a/src/main/java/alice/datasegment/Command.java Mon Nov 03 13:21:36 2014 +0900 +++ b/src/main/java/alice/datasegment/Command.java Mon Nov 03 17:12:53 2014 +0900 @@ -18,7 +18,7 @@ public CommandType type; public String key; public Receiver receiver; - public Object val; + public ReceiveData rData; public int index; public int seq; public Connection connection; // for remote @@ -29,11 +29,11 @@ private boolean serializeFlag = false; private boolean compressFlag = false; - public Command(CommandType cmdType, Receiver receiver, String key, Object val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { + public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { this.type = cmdType; this.receiver = receiver; this.key = key; - this.val = val; + this.rData = rData; this.index = index; this.seq = seq; this.replyQueue = replyQueue; @@ -41,11 +41,11 @@ this.reverseKey = reverseKey; } - public Command(CommandType cmdType, Receiver receiver, String key, Object val, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) { + public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) { this.type = cmdType; this.receiver = receiver; this.key = key; - this.val = val; + this.rData = rData; this.index = index; this.seq = seq; this.connection = connection; @@ -58,7 +58,7 @@ if (cs != null) { csName = cs.toString(); } - return this.type + "\t" + key + "\t" + val + "\tindex=" + index + "\tcs=" + csName; + return this.type + "\t" + key + "\t" + rData + "\tindex=" + index + "\tcs=" + csName; } /** @@ -71,6 +71,8 @@ byte[] header = null; byte[] data = null; byte[] dataSize = null; + boolean serialized = false; + boolean compressed = false; switch (type) { /* * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment @@ -81,36 +83,34 @@ * for example, serializeFlag is true. DataSegment had already converted, so no need convert. */ case UPDATE: - case PUT: - if (!serializeFlag) { - data = (byte[]) val; + case PUT: + case REPLY: + if (rData.compressed()) { + // have already converted + data = (byte[]) rData.getObj(); + compressed = rData.compressed(); // true + serialized = rData.serialized(); } else { - data = msg.write(val); + if (!rData.serialized() && !rData.isByteArray()) { + data = msg.write(rData.getObj()); + serialized = true; + } else { // rData is RAW ByteArray or already serialized + data = (byte[]) rData.getObj(); + serialized = rData.serialized(); + } + if (compressFlag) { + data = zip(data); + compressed = true; + } } - if (compressFlag) { - data = zip(data); - } - header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, serializeFlag, compressFlag)); + + header = msg.write(new CommandMessage(type.id, index, seq, key, false, serialized, compressed)); dataSize = msg.write(data.length); buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); buf.put(header); buf.put(dataSize); buf.put(data); break; - case REPLY: // only serialize - if (serializeFlag) { - data = (byte[]) val; - } else { - data = msg.write(val); - this.serializeFlag = true; - } - header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, serializeFlag, compressFlag)); - dataSize = msg.write(data.length); - buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); - buf.put(header); - buf.put(dataSize); - buf.put(data); - break; default: header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, serializeFlag, compressFlag)); buf = ByteBuffer.allocate(header.length); @@ -125,6 +125,13 @@ return buf; } + /** + * If this flag is true, command isn't send queue. + * command is executed right now. + * + * @param flag + */ + public void setQuickFlag(boolean flag){ quickFlag = flag; } @@ -133,6 +140,13 @@ return quickFlag; } + /** + * If this flag is true, DataSegment isn't serialized. + * Alice auto select true or false. + * + * @param flag + */ + public void setSerializeFlag(boolean flag){ serializeFlag = flag; } @@ -141,6 +155,13 @@ return serializeFlag; } + /** + * Before sending Remote DataSegment, DataSegment type is ByteArray. + * If this flag true, ByteArray is compressed with ZRLEE(ZRIB) algorithm + * + * @param flag + */ + public void setCompressFlag(boolean flag){ compressFlag = flag; }
--- a/src/main/java/alice/datasegment/DataSegmentKey.java Mon Nov 03 13:21:36 2014 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentKey.java Mon Nov 03 17:12:53 2014 +0900 @@ -25,7 +25,7 @@ case PUT: int index = tailIndex; tailIndex++; - DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey, cmd.getCompressFlag(), cmd.getSerializeFlag()); + DataSegmentValue dsv = new DataSegmentValue(index, cmd.rData, cmd.reverseKey); dataList.add(dsv); // Process waiting peek and take commands for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) { @@ -83,14 +83,12 @@ } public void replyValue(Command cmd, DataSegmentValue data){ - Command rCmd = new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from); - rCmd.setCompressFlag(data.compressed); - rCmd.setSerializeFlag(data.serialized); + Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, data.index, cmd.seq, null, null, data.from); if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local. cmd.cs.ids.reply(cmd.receiver, rCmd); } else { try { - if (!cmd.getQuickFlag()){ + if (!cmd.getQuickFlag()) { cmd.connection.sendQueue.put(rCmd); } else { cmd.connection.write(rCmd);
--- a/src/main/java/alice/datasegment/DataSegmentManager.java Mon Nov 03 13:21:36 2014 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentManager.java Mon Nov 03 17:12:53 2014 +0900 @@ -50,8 +50,8 @@ } } - public abstract void put(String key, Object val, SendOption option); - public abstract void update(String key, Object val, SendOption option); + public abstract void put(String key, ReceiveData rData, SendOption option); + public abstract void update(String key, ReceiveData rData, SendOption option); public abstract void peek(Receiver receiver, CodeSegment cs, SendOption option); public abstract void take(Receiver receiver, CodeSegment cs, SendOption option);
--- a/src/main/java/alice/datasegment/DataSegmentValue.java Mon Nov 03 13:21:36 2014 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentValue.java Mon Nov 03 17:12:53 2014 +0900 @@ -3,18 +3,13 @@ public class DataSegmentValue { public int index; - public Object val; + public ReceiveData rData; public String from; - public boolean compressed; - public boolean serialized; - public DataSegmentValue(int index, Object val, String reverseKey, - boolean compressed, boolean serialized) { + public DataSegmentValue(int index, ReceiveData rData, String reverseKey) { this.index = index; - this.val = val; + this.rData = rData; this.from = reverseKey; - this.compressed = compressed; - this.serialized = serialized; } }
--- a/src/main/java/alice/datasegment/LocalDataSegmentManager.java Mon Nov 03 13:21:36 2014 +0900 +++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java Mon Nov 03 17:12:53 2014 +0900 @@ -65,9 +65,9 @@ } @Override - public void put(String key, Object val, SendOption option) { + public void put(String key, ReceiveData rData, SendOption option) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, reverseKey); + Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); @@ -78,9 +78,9 @@ */ @Override - public void update(String key, Object val, SendOption option) { + public void update(String key, ReceiveData rData, SendOption option) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey); + Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString());
--- a/src/main/java/alice/datasegment/Receiver.java Mon Nov 03 13:21:36 2014 +0900 +++ b/src/main/java/alice/datasegment/Receiver.java Mon Nov 03 17:12:53 2014 +0900 @@ -109,4 +109,7 @@ return data.getVal(); } + public ReceiveData getReceiveData() { + return data; + } }
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Mon Nov 03 13:21:36 2014 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Mon Nov 03 17:12:53 2014 +0900 @@ -51,8 +51,8 @@ * send put command to target DataSegment */ @Override - public void put(String key, Object val, SendOption option) { - Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, ""); + public void put(String key, ReceiveData rData, SendOption option) { + Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, ""); cmd.setSerializeFlag(true); if (option.isQuick()){ connection.write(cmd); // put command is executed right now @@ -64,8 +64,8 @@ } @Override - public void update(String key, Object val, SendOption option) { - Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, ""); + public void update(String key, ReceiveData rData, SendOption option) { + Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, ""); cmd.setSerializeFlag(true); if (option.isQuick()){ connection.write(cmd);