changeset 51:919389066887

change protocol header: remove an integer representing message length (work?)
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Sun, 05 Feb 2012 06:26:26 +0900
parents cc440cb8582e
children 0f773308a863
files src/alice/daemon/IncomingTcpConnection.java src/alice/daemon/OutboundTcpConnection.java
diffstat 2 files changed, 17 insertions(+), 24 deletions(-) [+]
line wrap: on
line diff
--- a/src/alice/daemon/IncomingTcpConnection.java	Sat Feb 04 21:08:38 2012 +0900
+++ b/src/alice/daemon/IncomingTcpConnection.java	Sun Feb 05 06:26:26 2012 +0900
@@ -1,11 +1,11 @@
 package alice.daemon;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SocketChannel;
 
 import org.msgpack.MessagePack;
+import org.msgpack.unpacker.Unpacker;
 
 import alice.datasegment.Command;
 import alice.datasegment.CommandType;
@@ -27,28 +27,15 @@
 	
 	public void run() {
 		MessagePack msgpack = new MessagePack();
+		Unpacker unpacker = null;
+		try {
+			unpacker = msgpack.createUnpacker(connection.socket.getInputStream());
+		} catch (IOException e2) {
+			e2.printStackTrace();
+		}
 		while (true) {
-			SocketChannel ch = connection.socket.getChannel();
-			ByteBuffer buf = ByteBuffer.allocateDirect(4); // for int
 			try {
-				int allReadLen = 0;
-				do {
-					int readLen = ch.read(buf);
-					if (readLen < 0) return;
-					allReadLen += readLen;
-				} while (allReadLen < 4);
-				buf.rewind();
-				int msgLen = buf.getInt();
-				allReadLen = 0;
-				ByteBuffer msgBuf = ByteBuffer.allocateDirect(msgLen);
-				do {
-					int readLen = ch.read(msgBuf);
-					if (readLen < 0) return;
-					allReadLen += readLen;
-				} while (allReadLen < msgLen);
-				msgBuf.flip();
-				CommandMessage msg = msgpack.read(msgBuf, CommandMessage.class);
-				msgBuf.flip();
+				CommandMessage msg = unpacker.read(CommandMessage.class);
 				CommandType type = CommandType.getCommandTypeFromId(msg.type);
 				LocalDataSegmentManager lmanager = (LocalDataSegmentManager)DataSegment.get("local");
 				DataSegmentKey dsKey = lmanager.getDataSegmentKey(msg.key);
@@ -86,6 +73,13 @@
 					e1.printStackTrace();
 				}
 				return;
+			} catch (EOFException e) {
+				try {
+					connection.sendQueue.put(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null));
+				} catch (InterruptedException e1) {
+					e1.printStackTrace();
+				}
+				return;
 			} catch (IOException e) {
 				e.printStackTrace();
 			}
--- a/src/alice/daemon/OutboundTcpConnection.java	Sat Feb 04 21:08:38 2012 +0900
+++ b/src/alice/daemon/OutboundTcpConnection.java	Sun Feb 05 06:26:26 2012 +0900
@@ -34,8 +34,7 @@
 				}
 				CommandMessage cmdMsg = convert(cmd);
 				byte[] buf = msgpack.write(cmdMsg);
-				ByteBuffer buffer = ByteBuffer.allocateDirect(4 + buf.length);
-				buffer.putInt(buf.length);
+				ByteBuffer buffer = ByteBuffer.allocateDirect(buf.length);
 				buffer.put(buf);
 				buffer.flip();
 				while (buffer.hasRemaining()) {