Mercurial > hg > Database > Alice
changeset 445:86b74532e66c dispose
change Protocol
author | sugi |
---|---|
date | Sun, 26 Oct 2014 18:21:48 +0900 |
parents | 8f006f9d1b9c |
children | a91890dff56e |
files | src/main/java/alice/daemon/IncomingTcpConnection.java src/main/java/alice/datasegment/Command.java src/main/java/alice/datasegment/DataSegmentKey.java src/main/java/alice/datasegment/RemoteDataSegmentManager.java |
diffstat | 4 files changed, 31 insertions(+), 15 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java Sat Oct 18 22:26:15 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Sun Oct 26 18:21:48 2014 +0900 @@ -42,7 +42,7 @@ public void run() { Unpacker unpacker = null; try { - unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream()); + unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream()); } catch (IOException e) { e.printStackTrace(); }
--- a/src/main/java/alice/datasegment/Command.java Sat Oct 18 22:26:15 2014 +0900 +++ b/src/main/java/alice/datasegment/Command.java Sun Oct 26 18:21:48 2014 +0900 @@ -22,7 +22,8 @@ public CodeSegment cs; public String reverseKey; public Object obj; - public boolean flag; + public boolean quickFlag; + private boolean serializeFlag = true; public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { this.type = cmdType; @@ -34,7 +35,7 @@ this.replyQueue = replyQueue; this.cs = cs; this.reverseKey = reverseKey; - this.flag = false; + this.quickFlag = false; } public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey, boolean flag) { @@ -47,7 +48,7 @@ this.replyQueue = replyQueue; this.cs = cs; this.reverseKey = reverseKey; - this.flag = flag; + this.quickFlag = flag; } public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, int index, int seq, Connection connection, CodeSegment cs, String reverseKey, boolean flag) { @@ -60,7 +61,7 @@ this.connection = connection; this.cs = cs; this.reverseKey = reverseKey; - this.flag = flag; + this.quickFlag = flag; } public Command(CommandType cmdType, Receiver receiver, String key, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { @@ -73,7 +74,7 @@ this.replyQueue = replyQueue; this.cs = cs; this.reverseKey = reverseKey; - this.flag = false; + this.quickFlag = false; } public Command(CommandType cmdType, Receiver receiver, String key, byte[] val, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { @@ -87,7 +88,7 @@ this.replyQueue = replyQueue; this.cs = cs; this.reverseKey = reverseKey; - this.flag = false; + this.quickFlag = false; } public String getCommandString() { @@ -97,22 +98,27 @@ } return this.type + "\t" + key + "\t" + val + "\tindex=" + index + "\tcs=" + csName; } + + /** + * @return serialized ByteBuffer + */ public ByteBuffer convert() { ByteBuffer buf = null; MessagePack msg = SingletonMessage.getInstance(); try { - byte[] header = msg.write(new CommandMessage(type.id, index, seq, key, flag)); + byte[] header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag)); switch (type) { case UPDATE: case PUT: case REPLY: - case RESPONSE: byte[] data = null; - if (val==null&&obj!=null){ + if (val!=null) { + data = val; + } else if (!serializeFlag) { + data = (byte[]) obj; + } else if (val==null && obj!=null) { data = msg.write(obj); - } else if (val!=null) { - data = val; } byte[] dataSize = msg.write(data.length); @@ -121,7 +127,6 @@ buf.put(dataSize); buf.put(data); break; - default: buf = ByteBuffer.allocate(header.length); buf.put(header); @@ -134,5 +139,8 @@ } return buf; } - + + public void setSerializeFlag(boolean flag){ + serializeFlag = flag; + } }
--- a/src/main/java/alice/datasegment/DataSegmentKey.java Sat Oct 18 22:26:15 2014 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentKey.java Sun Oct 26 18:21:48 2014 +0900 @@ -87,7 +87,7 @@ cmd.cs.ids.reply(cmd.receiver, new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); } else { try { - if (!cmd.flag){ + if (!cmd.quickFlag){ cmd.connection.sendQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); } else {
--- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Sat Oct 18 22:26:15 2014 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Sun Oct 26 18:21:48 2014 +0900 @@ -57,6 +57,14 @@ if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } + + public void put(String key, byte[] val) { + Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null); + cmd.setSerializeFlag(false); + connection.sendCommand(cmd); // put command on the transmission thread + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } @Override public void quickPut(String key, Object val) {