# HG changeset patch # User one # Date 1326449099 -32400 # Node ID 30f97d776a3e001ea978ad5af5475198d4877307 # Parent c4d6ff56b9bf17abec3c285edb84d60d3a012404 implements Alice daemon diff -r c4d6ff56b9bf -r 30f97d776a3e src/alice/daemon/AcceptThread.java --- a/src/alice/daemon/AcceptThread.java Fri Jan 13 07:04:38 2012 +0900 +++ b/src/alice/daemon/AcceptThread.java Fri Jan 13 19:04:59 2012 +0900 @@ -19,6 +19,8 @@ try { Socket socket = ss.accept(); Connection connection = new Connection(socket); + new IncomingTcpConnection(connection).start(); + new OutboundTcpConnection(connection).start(); } catch (IOException e) { e.printStackTrace(); } diff -r c4d6ff56b9bf -r 30f97d776a3e src/alice/daemon/CommandMessage.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/daemon/CommandMessage.java Fri Jan 13 19:04:59 2012 +0900 @@ -0,0 +1,21 @@ +package alice.daemon; + +import org.msgpack.annotation.Message; +import org.msgpack.type.Value; + +@Message +public class CommandMessage { + int type; + int index; + int seq; + String key; + Value val; + + public CommandMessage(int type, int index, int seq, String key, Value val) { + this.type = type; + this.index = index; + this.seq = seq; + this.key = key; + this.val = val; + } +} diff -r c4d6ff56b9bf -r 30f97d776a3e src/alice/daemon/Connection.java --- a/src/alice/daemon/Connection.java Fri Jan 13 07:04:38 2012 +0900 +++ b/src/alice/daemon/Connection.java Fri Jan 13 19:04:59 2012 +0900 @@ -1,11 +1,17 @@ package alice.daemon; import java.net.Socket; +import java.util.concurrent.LinkedBlockingQueue; + +import alice.datasegment.Command; public class Connection { + public Socket socket; + public LinkedBlockingQueue sendQueue = new LinkedBlockingQueue(); + public Connection(Socket socket) { - // TODO Auto-generated constructor stub + this.socket = socket; } } diff -r c4d6ff56b9bf -r 30f97d776a3e src/alice/daemon/IncomingTcpConnection.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/daemon/IncomingTcpConnection.java Fri Jan 13 19:04:59 2012 +0900 @@ -0,0 +1,83 @@ +package alice.daemon; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.msgpack.MessagePack; + +import alice.datasegment.Command; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegment; +import alice.datasegment.DataSegmentKey; +import alice.datasegment.LocalDataSegmentManager; +import alice.datasegment.SocketDataSegmentManager; + +public class IncomingTcpConnection extends Thread { + + public Connection connection; + public SocketDataSegmentManager manager; + + public IncomingTcpConnection(Connection connection) { + this.connection = connection; + } + + public void run() { + MessagePack msgpack = new MessagePack(); + while (true) { + SocketChannel ch = connection.socket.getChannel(); + ByteBuffer buf = ByteBuffer.allocateDirect(4); // for int + + try { + int allReadLen = 0; + do { + int readLen = ch.read(buf); + allReadLen += readLen; + } while (allReadLen < 4); + buf.rewind(); + int msgLen = buf.getInt(); + allReadLen = 0; + ByteBuffer msgBuf = ByteBuffer.allocateDirect(msgLen); + do { + int readLen = ch.read(msgBuf); + allReadLen += readLen; + } while (allReadLen < msgLen); + msgBuf.rewind(); + CommandMessage msg = msgpack.read(msgBuf, CommandMessage.class); + CommandType type = CommandType.getCommandTypeFromId(msg.type); + LocalDataSegmentManager manager = (LocalDataSegmentManager)DataSegment.get("local"); + DataSegmentKey dsKey = manager.getDataSegmentKey(msg.key); + switch (type) { + case UPDATE: + dsKey.addCommand(new Command(type, null, msg.val, 0, 0, null, null)); + break; + case PUT: + dsKey.addCommand(new Command(type, null, msg.val, 0, 0, null, null)); + break; + case PEEK: + //Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue replyQueue, CodeSegment cs) { + dsKey.addCommand(new Command(type, null, null, msg.index, msg.seq, connection.sendQueue, null)); + break; + case TAKE: + dsKey.addCommand(new Command(type, null, null, msg.index, msg.seq, connection.sendQueue, null)); + break; + case REMOVE: + dsKey.addCommand(new Command(type, null, null, 0, 0, null, null)); + break; + case REPLY: + try { + manager.replyQueue.put(new Command(type, null, msg.val, msg.index, msg.seq, null, null)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + break; + default: + break; + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } + +} diff -r c4d6ff56b9bf -r 30f97d776a3e src/alice/daemon/OutboundTcpConnection.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/daemon/OutboundTcpConnection.java Fri Jan 13 19:04:59 2012 +0900 @@ -0,0 +1,41 @@ +package alice.daemon; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.msgpack.MessagePack; + +import alice.datasegment.Command; + +public class OutboundTcpConnection extends Thread { + + public Connection connection; + + public OutboundTcpConnection(Connection connection) { + this.connection = connection; + } + + public CommandMessage convert(Command cmd) { + return new CommandMessage(cmd.type.id, cmd.index, cmd.seq, null, cmd.val); + } + + public void run() { + MessagePack msgpack = new MessagePack(); + while (true) { + try { + CommandMessage cmdMsg = convert(connection.sendQueue.take()); + byte[] buf = msgpack.write(cmdMsg); + ByteBuffer buffer = ByteBuffer.allocateDirect(4 + buf.length); + buffer.putInt(buf.length); + buffer.put(buf); + connection.socket.getChannel().write(buffer); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + + } + } + +} diff -r c4d6ff56b9bf -r 30f97d776a3e src/alice/datasegment/Command.java --- a/src/alice/datasegment/Command.java Fri Jan 13 07:04:38 2012 +0900 +++ b/src/alice/datasegment/Command.java Fri Jan 13 19:04:59 2012 +0900 @@ -1,25 +1,27 @@ package alice.datasegment; +import java.util.concurrent.BlockingQueue; + import org.msgpack.type.Value; import alice.codesegment.CodeSegment; public class Command { - public CommandType cmdType; + public CommandType type; public String argKey; public Value val; public int index; public int seq; - public DataSegmentManager manager; + public BlockingQueue replyQueue; public CodeSegment cs; - public Command(CommandType cmdType, String argKey, Value val, int index, int seq, DataSegmentManager manager, CodeSegment cs) { - this.cmdType = cmdType; + public Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue replyQueue, CodeSegment cs) { + this.type = cmdType; this.argKey = argKey; this.val = val; this.index = index; this.seq = seq; - this.manager = manager; + this.replyQueue = replyQueue; this.cs = cs; } diff -r c4d6ff56b9bf -r 30f97d776a3e src/alice/datasegment/DataSegmentKey.java --- a/src/alice/datasegment/DataSegmentKey.java Fri Jan 13 07:04:38 2012 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Fri Jan 13 19:04:59 2012 +0900 @@ -30,7 +30,7 @@ while (true) { try { Command cmd = cmdQueue.take(); - switch (cmd.cmdType) { + switch (cmd.type) { case UPDATE: if (dataList.size() != 0) { dataList.remove(0); @@ -43,9 +43,9 @@ LinkedList removeList = new LinkedList(); for (Command waitCmd : waitList) { if (waitCmd.index < index) { - waitCmd.manager.replyQueue.put(new Command(CommandType.REPLY, null, cmd.val, index, waitCmd.seq, null, null)); + waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, cmd.val, index, waitCmd.seq, null, null)); removeList.add(waitCmd); - if (waitCmd.cmdType == CommandType.TAKE) { // delete data, if it run take cmd. + if (waitCmd.type == CommandType.TAKE) { // delete data, if it run take cmd. dataList.remove(dsv); break; } @@ -62,7 +62,7 @@ } for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { - cmd.manager.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null)); + cmd.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null)); break; } } @@ -75,7 +75,7 @@ } for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { - cmd.manager.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null)); + cmd.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null)); dataList.remove(data); break; } diff -r c4d6ff56b9bf -r 30f97d776a3e src/alice/datasegment/DataSegmentManager.java --- a/src/alice/datasegment/DataSegmentManager.java Fri Jan 13 07:04:38 2012 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Fri Jan 13 19:04:59 2012 +0900 @@ -8,8 +8,8 @@ import alice.codesegment.CodeSegment; public abstract class DataSegmentManager { - public ConcurrentHashMap dataSegments = new ConcurrentHashMap(); - public ConcurrentHashMap seqHash = new ConcurrentHashMap(); + protected ConcurrentHashMap dataSegments = new ConcurrentHashMap(); + protected ConcurrentHashMap seqHash = new ConcurrentHashMap(); public LinkedBlockingQueue replyQueue = new LinkedBlockingQueue(); public abstract void put(String key, Value val); diff -r c4d6ff56b9bf -r 30f97d776a3e src/alice/datasegment/LocalDataSegmentManager.java --- a/src/alice/datasegment/LocalDataSegmentManager.java Fri Jan 13 07:04:38 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Fri Jan 13 19:04:59 2012 +0900 @@ -26,11 +26,12 @@ } }; + public LocalDataSegmentManager() { new Thread(replyThread).start(); } - private DataSegmentKey getDataSegmentKey(String key) { + public DataSegmentKey getDataSegmentKey(String key) { DataSegmentKey newDataSegmentKey = new DataSegmentKey(); DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); if (dataSegmentKey == null) { @@ -43,20 +44,20 @@ @Override public void put(String key, Value val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.PUT, null, val, 0, 0, this, null)); + dataSegmentKey.addCommand(new Command(CommandType.PUT, null, val, 0, 0, replyQueue, null)); } @Override public void update(String key, Value val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, val, 0, 0, this, null)); + dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, val, 0, 0, replyQueue, null)); } @Override public void take(String argKey, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.TAKE, argKey, null, index, seq, this, cs); + Command cmd = new Command(CommandType.TAKE, argKey, null, index, seq, replyQueue, cs); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); } @@ -65,7 +66,7 @@ public void peek(String argKey, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.PEEK, argKey, null, index, seq, this, cs); + Command cmd = new Command(CommandType.PEEK, argKey, null, index, seq, replyQueue, cs); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); } @@ -73,7 +74,7 @@ @Override public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, 0, 0, this, null)); + dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, 0, 0, replyQueue, null)); } } diff -r c4d6ff56b9bf -r 30f97d776a3e src/alice/datasegment/RemoteDataSegment.java --- a/src/alice/datasegment/RemoteDataSegment.java Fri Jan 13 07:04:38 2012 +0900 +++ b/src/alice/datasegment/RemoteDataSegment.java Fri Jan 13 19:04:59 2012 +0900 @@ -35,4 +35,5 @@ // TODO Auto-generated method stub } + } diff -r c4d6ff56b9bf -r 30f97d776a3e src/alice/datasegment/SocketDataSegmentManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/datasegment/SocketDataSegmentManager.java Fri Jan 13 19:04:59 2012 +0900 @@ -0,0 +1,39 @@ +package alice.datasegment; + +import org.msgpack.type.Value; + +import alice.codesegment.CodeSegment; + +public class SocketDataSegmentManager extends DataSegmentManager { + + @Override + public void put(String key, Value val) { + // TODO Auto-generated method stub + + } + + @Override + public void update(String key, Value val) { + // TODO Auto-generated method stub + + } + + @Override + public void take(String argKey, String key, int index, CodeSegment cs) { + // TODO Auto-generated method stub + + } + + @Override + public void peek(String argKey, String key, int index, CodeSegment cs) { + // TODO Auto-generated method stub + + } + + @Override + public void remove(String key) { + // TODO Auto-generated method stub + + } + +} diff -r c4d6ff56b9bf -r 30f97d776a3e src/alice/test/codesegment/TestCodeSegment.java --- a/src/alice/test/codesegment/TestCodeSegment.java Fri Jan 13 07:04:38 2012 +0900 +++ b/src/alice/test/codesegment/TestCodeSegment.java Fri Jan 13 19:04:59 2012 +0900 @@ -13,6 +13,7 @@ DataSegmentValue data = ids.get("arg1"); System.out.println("index = " + data.index); System.out.println("data = " + data.val); + System.out.println(data.val.getType()); if (data.index == 10) { System.exit(0);