Mercurial > hg > Members > tatsuki > Alice
changeset 16:433e601a8e28
network bug fix
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Sun, 15 Jan 2012 12:17:30 +0900 |
parents | 45e98e74db96 |
children | bb075e103cd3 |
files | src/alice/daemon/AcceptThread.java src/alice/daemon/CommandMessage.java src/alice/daemon/IncomingTcpConnection.java src/alice/daemon/OutboundTcpConnection.java src/alice/datasegment/Command.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/test/codesegment/RemoteIncrement.java src/alice/test/codesegment/TestCodeSegment.java src/alice/test/codesegment/TestRemoteAlice.java src/log4j.xml |
diffstat | 10 files changed, 30 insertions(+), 15 deletions(-) [+] |
line wrap: on
line diff
--- a/src/alice/daemon/AcceptThread.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/daemon/AcceptThread.java Sun Jan 15 12:17:30 2012 +0900 @@ -4,11 +4,16 @@ import java.net.ServerSocket; import java.net.Socket; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + import alice.datasegment.DataSegment; public class AcceptThread extends Thread { private ServerSocket ss; + private Logger log = Logger.getLogger(AcceptThread.class); + public AcceptThread(ServerSocket ss, String name) { super(name); @@ -19,9 +24,8 @@ public void run() { while (true) { try { - System.out.println("wait accept..."); Socket socket = ss.accept(); - System.out.println("accepted!"); + log.info("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort()); Connection connection = new Connection(socket); new IncomingTcpConnection(connection, DataSegment.get("local")).start(); new OutboundTcpConnection(connection).start();
--- a/src/alice/daemon/CommandMessage.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/daemon/CommandMessage.java Sun Jan 15 12:17:30 2012 +0900 @@ -5,11 +5,13 @@ @Message public class CommandMessage { - int type; - int index; - int seq; - String key; - Value val; + public int type = 0; + public int index = 0; + public int seq = 0; + public String key = null; + public Value val = null; + + public CommandMessage() {} public CommandMessage(int type, int index, int seq, String key, Value val) { this.type = type;
--- a/src/alice/daemon/IncomingTcpConnection.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/daemon/IncomingTcpConnection.java Sun Jan 15 12:17:30 2012 +0900 @@ -28,11 +28,11 @@ while (true) { SocketChannel ch = connection.socket.getChannel(); ByteBuffer buf = ByteBuffer.allocateDirect(4); // for int - try { int allReadLen = 0; do { int readLen = ch.read(buf); + if (readLen < 0) return; allReadLen += readLen; } while (allReadLen < 4); buf.rewind(); @@ -41,12 +41,15 @@ ByteBuffer msgBuf = ByteBuffer.allocateDirect(msgLen); do { int readLen = ch.read(msgBuf); + if (readLen < 0) return; allReadLen += readLen; } while (allReadLen < msgLen); - msgBuf.rewind(); + msgBuf.flip(); CommandMessage msg = msgpack.read(msgBuf, CommandMessage.class); + msgBuf.flip(); + System.out.println(msgpack.read(msgBuf)); CommandType type = CommandType.getCommandTypeFromId(msg.type); - LocalDataSegmentManager lmanager = (LocalDataSegmentManager)DataSegment.get("local"); + LocalDataSegmentManager lmanager = (LocalDataSegmentManager)DataSegment.get("local"); DataSegmentKey dsKey = lmanager.getDataSegmentKey(msg.key); switch (type) { case UPDATE:
--- a/src/alice/daemon/OutboundTcpConnection.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/daemon/OutboundTcpConnection.java Sun Jan 15 12:17:30 2012 +0900 @@ -28,6 +28,7 @@ ByteBuffer buffer = ByteBuffer.allocateDirect(4 + buf.length); buffer.putInt(buf.length); buffer.put(buf); + buffer.flip(); connection.socket.getChannel().write(buffer); } catch (InterruptedException e) { e.printStackTrace();
--- a/src/alice/datasegment/Command.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/datasegment/Command.java Sun Jan 15 12:17:30 2012 +0900 @@ -19,6 +19,7 @@ public Command(CommandType cmdType, String argKey, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) { this.type = cmdType; this.argKey = argKey; + this.key = key; this.val = val; this.index = index; this.seq = seq;
--- a/src/alice/datasegment/LocalDataSegmentManager.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Sun Jan 15 12:17:30 2012 +0900 @@ -12,6 +12,9 @@ } public DataSegmentKey getDataSegmentKey(String key) { + if (key == null) { + return null; + } DataSegmentKey newDataSegmentKey = new DataSegmentKey(); DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); if (dataSegmentKey == null) {
--- a/src/alice/test/codesegment/RemoteIncrement.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/test/codesegment/RemoteIncrement.java Sun Jan 15 12:17:30 2012 +0900 @@ -11,7 +11,7 @@ public void run() { DataSegmentValue data = ids.get("num"); int num = data.val.asIntegerValue().getInt(); - System.out.println(num++); + System.out.println("[CodeSegment] " + num++); if (num == 10) System.exit(0); CodeSegment cs = new RemoteIncrement();
--- a/src/alice/test/codesegment/TestCodeSegment.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/test/codesegment/TestCodeSegment.java Sun Jan 15 12:17:30 2012 +0900 @@ -8,6 +8,8 @@ public class TestCodeSegment extends CodeSegment { + DataSegmentValue arg1; + @Override public void run() { DataSegmentValue data = ids.get("arg1");
--- a/src/alice/test/codesegment/TestRemoteAlice.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/test/codesegment/TestRemoteAlice.java Sun Jan 15 12:17:30 2012 +0900 @@ -2,6 +2,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import alice.codesegment.CodeSegment; @@ -39,10 +40,8 @@ RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connection); DataSegment.regist(conf.key, manager); connect = false; - System.out.println("connected"); } catch (IOException e) { try { - System.out.println("wait"); Thread.sleep(500); } catch (InterruptedException e1) { e1.printStackTrace();
--- a/src/log4j.xml Sun Jan 15 01:19:54 2012 +0900 +++ b/src/log4j.xml Sun Jan 15 12:17:30 2012 +0900 @@ -1,13 +1,13 @@ <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> <appender name="Appender1" class="org.apache.log4j.FileAppender"> - <param name="File" value="alice.log" /> + <param name="File" value="alice.log" ></param> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%d %-5p %c - %m [%t] (%F:%L)%n"/> </layout> </appender> <root> - <level value="warn" /> + <level value="debug" /> <appender-ref ref="Appender1" /> </root> </log4j:configuration>