Mercurial > hg > Database > Alice
changeset 51:919389066887
change protocol header: remove an integer representing message length (work?)
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Sun, 05 Feb 2012 06:26:26 +0900 |
parents | cc440cb8582e |
children | 0f773308a863 |
files | src/alice/daemon/IncomingTcpConnection.java src/alice/daemon/OutboundTcpConnection.java |
diffstat | 2 files changed, 17 insertions(+), 24 deletions(-) [+] |
line wrap: on
line diff
--- a/src/alice/daemon/IncomingTcpConnection.java Sat Feb 04 21:08:38 2012 +0900 +++ b/src/alice/daemon/IncomingTcpConnection.java Sun Feb 05 06:26:26 2012 +0900 @@ -1,11 +1,11 @@ package alice.daemon; +import java.io.EOFException; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; -import java.nio.channels.SocketChannel; import org.msgpack.MessagePack; +import org.msgpack.unpacker.Unpacker; import alice.datasegment.Command; import alice.datasegment.CommandType; @@ -27,28 +27,15 @@ public void run() { MessagePack msgpack = new MessagePack(); + Unpacker unpacker = null; + try { + unpacker = msgpack.createUnpacker(connection.socket.getInputStream()); + } catch (IOException e2) { + e2.printStackTrace(); + } while (true) { - SocketChannel ch = connection.socket.getChannel(); - ByteBuffer buf = ByteBuffer.allocateDirect(4); // for int try { - int allReadLen = 0; - do { - int readLen = ch.read(buf); - if (readLen < 0) return; - allReadLen += readLen; - } while (allReadLen < 4); - buf.rewind(); - int msgLen = buf.getInt(); - allReadLen = 0; - ByteBuffer msgBuf = ByteBuffer.allocateDirect(msgLen); - do { - int readLen = ch.read(msgBuf); - if (readLen < 0) return; - allReadLen += readLen; - } while (allReadLen < msgLen); - msgBuf.flip(); - CommandMessage msg = msgpack.read(msgBuf, CommandMessage.class); - msgBuf.flip(); + CommandMessage msg = unpacker.read(CommandMessage.class); CommandType type = CommandType.getCommandTypeFromId(msg.type); LocalDataSegmentManager lmanager = (LocalDataSegmentManager)DataSegment.get("local"); DataSegmentKey dsKey = lmanager.getDataSegmentKey(msg.key); @@ -86,6 +73,13 @@ e1.printStackTrace(); } return; + } catch (EOFException e) { + try { + connection.sendQueue.put(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + return; } catch (IOException e) { e.printStackTrace(); }
--- a/src/alice/daemon/OutboundTcpConnection.java Sat Feb 04 21:08:38 2012 +0900 +++ b/src/alice/daemon/OutboundTcpConnection.java Sun Feb 05 06:26:26 2012 +0900 @@ -34,8 +34,7 @@ } CommandMessage cmdMsg = convert(cmd); byte[] buf = msgpack.write(cmdMsg); - ByteBuffer buffer = ByteBuffer.allocateDirect(4 + buf.length); - buffer.putInt(buf.length); + ByteBuffer buffer = ByteBuffer.allocateDirect(buf.length); buffer.put(buf); buffer.flip(); while (buffer.hasRemaining()) {