Mercurial > hg > Members > tatsuki > Alice
changeset 13:30f97d776a3e
implements Alice daemon
author | one |
---|---|
date | Fri, 13 Jan 2012 19:04:59 +0900 |
parents | c4d6ff56b9bf |
children | e3f1b21718b0 |
files | src/alice/daemon/AcceptThread.java src/alice/daemon/CommandMessage.java src/alice/daemon/Connection.java src/alice/daemon/IncomingTcpConnection.java src/alice/daemon/OutboundTcpConnection.java src/alice/datasegment/Command.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/RemoteDataSegment.java src/alice/datasegment/SocketDataSegmentManager.java src/alice/test/codesegment/TestCodeSegment.java |
diffstat | 12 files changed, 216 insertions(+), 19 deletions(-) [+] |
line wrap: on
line diff
--- a/src/alice/daemon/AcceptThread.java Fri Jan 13 07:04:38 2012 +0900 +++ b/src/alice/daemon/AcceptThread.java Fri Jan 13 19:04:59 2012 +0900 @@ -19,6 +19,8 @@ try { Socket socket = ss.accept(); Connection connection = new Connection(socket); + new IncomingTcpConnection(connection).start(); + new OutboundTcpConnection(connection).start(); } catch (IOException e) { e.printStackTrace(); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/daemon/CommandMessage.java Fri Jan 13 19:04:59 2012 +0900 @@ -0,0 +1,21 @@ +package alice.daemon; + +import org.msgpack.annotation.Message; +import org.msgpack.type.Value; + +@Message +public class CommandMessage { + int type; + int index; + int seq; + String key; + Value val; + + public CommandMessage(int type, int index, int seq, String key, Value val) { + this.type = type; + this.index = index; + this.seq = seq; + this.key = key; + this.val = val; + } +}
--- a/src/alice/daemon/Connection.java Fri Jan 13 07:04:38 2012 +0900 +++ b/src/alice/daemon/Connection.java Fri Jan 13 19:04:59 2012 +0900 @@ -1,11 +1,17 @@ package alice.daemon; import java.net.Socket; +import java.util.concurrent.LinkedBlockingQueue; + +import alice.datasegment.Command; public class Connection { + public Socket socket; + public LinkedBlockingQueue<Command> sendQueue = new LinkedBlockingQueue<Command>(); + public Connection(Socket socket) { - // TODO Auto-generated constructor stub + this.socket = socket; } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/daemon/IncomingTcpConnection.java Fri Jan 13 19:04:59 2012 +0900 @@ -0,0 +1,83 @@ +package alice.daemon; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.msgpack.MessagePack; + +import alice.datasegment.Command; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegment; +import alice.datasegment.DataSegmentKey; +import alice.datasegment.LocalDataSegmentManager; +import alice.datasegment.SocketDataSegmentManager; + +public class IncomingTcpConnection extends Thread { + + public Connection connection; + public SocketDataSegmentManager manager; + + public IncomingTcpConnection(Connection connection) { + this.connection = connection; + } + + public void run() { + MessagePack msgpack = new MessagePack(); + while (true) { + SocketChannel ch = connection.socket.getChannel(); + ByteBuffer buf = ByteBuffer.allocateDirect(4); // for int + + try { + int allReadLen = 0; + do { + int readLen = ch.read(buf); + allReadLen += readLen; + } while (allReadLen < 4); + buf.rewind(); + int msgLen = buf.getInt(); + allReadLen = 0; + ByteBuffer msgBuf = ByteBuffer.allocateDirect(msgLen); + do { + int readLen = ch.read(msgBuf); + allReadLen += readLen; + } while (allReadLen < msgLen); + msgBuf.rewind(); + CommandMessage msg = msgpack.read(msgBuf, CommandMessage.class); + CommandType type = CommandType.getCommandTypeFromId(msg.type); + LocalDataSegmentManager manager = (LocalDataSegmentManager)DataSegment.get("local"); + DataSegmentKey dsKey = manager.getDataSegmentKey(msg.key); + switch (type) { + case UPDATE: + dsKey.addCommand(new Command(type, null, msg.val, 0, 0, null, null)); + break; + case PUT: + dsKey.addCommand(new Command(type, null, msg.val, 0, 0, null, null)); + break; + case PEEK: + //Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) { + dsKey.addCommand(new Command(type, null, null, msg.index, msg.seq, connection.sendQueue, null)); + break; + case TAKE: + dsKey.addCommand(new Command(type, null, null, msg.index, msg.seq, connection.sendQueue, null)); + break; + case REMOVE: + dsKey.addCommand(new Command(type, null, null, 0, 0, null, null)); + break; + case REPLY: + try { + manager.replyQueue.put(new Command(type, null, msg.val, msg.index, msg.seq, null, null)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + break; + default: + break; + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/daemon/OutboundTcpConnection.java Fri Jan 13 19:04:59 2012 +0900 @@ -0,0 +1,41 @@ +package alice.daemon; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.msgpack.MessagePack; + +import alice.datasegment.Command; + +public class OutboundTcpConnection extends Thread { + + public Connection connection; + + public OutboundTcpConnection(Connection connection) { + this.connection = connection; + } + + public CommandMessage convert(Command cmd) { + return new CommandMessage(cmd.type.id, cmd.index, cmd.seq, null, cmd.val); + } + + public void run() { + MessagePack msgpack = new MessagePack(); + while (true) { + try { + CommandMessage cmdMsg = convert(connection.sendQueue.take()); + byte[] buf = msgpack.write(cmdMsg); + ByteBuffer buffer = ByteBuffer.allocateDirect(4 + buf.length); + buffer.putInt(buf.length); + buffer.put(buf); + connection.socket.getChannel().write(buffer); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + + } + } + +}
--- a/src/alice/datasegment/Command.java Fri Jan 13 07:04:38 2012 +0900 +++ b/src/alice/datasegment/Command.java Fri Jan 13 19:04:59 2012 +0900 @@ -1,25 +1,27 @@ package alice.datasegment; +import java.util.concurrent.BlockingQueue; + import org.msgpack.type.Value; import alice.codesegment.CodeSegment; public class Command { - public CommandType cmdType; + public CommandType type; public String argKey; public Value val; public int index; public int seq; - public DataSegmentManager manager; + public BlockingQueue<Command> replyQueue; public CodeSegment cs; - public Command(CommandType cmdType, String argKey, Value val, int index, int seq, DataSegmentManager manager, CodeSegment cs) { - this.cmdType = cmdType; + public Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) { + this.type = cmdType; this.argKey = argKey; this.val = val; this.index = index; this.seq = seq; - this.manager = manager; + this.replyQueue = replyQueue; this.cs = cs; }
--- a/src/alice/datasegment/DataSegmentKey.java Fri Jan 13 07:04:38 2012 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Fri Jan 13 19:04:59 2012 +0900 @@ -30,7 +30,7 @@ while (true) { try { Command cmd = cmdQueue.take(); - switch (cmd.cmdType) { + switch (cmd.type) { case UPDATE: if (dataList.size() != 0) { dataList.remove(0); @@ -43,9 +43,9 @@ LinkedList<Command> removeList = new LinkedList<Command>(); for (Command waitCmd : waitList) { if (waitCmd.index < index) { - waitCmd.manager.replyQueue.put(new Command(CommandType.REPLY, null, cmd.val, index, waitCmd.seq, null, null)); + waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, cmd.val, index, waitCmd.seq, null, null)); removeList.add(waitCmd); - if (waitCmd.cmdType == CommandType.TAKE) { // delete data, if it run take cmd. + if (waitCmd.type == CommandType.TAKE) { // delete data, if it run take cmd. dataList.remove(dsv); break; } @@ -62,7 +62,7 @@ } for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { - cmd.manager.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null)); + cmd.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null)); break; } } @@ -75,7 +75,7 @@ } for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { - cmd.manager.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null)); + cmd.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null)); dataList.remove(data); break; }
--- a/src/alice/datasegment/DataSegmentManager.java Fri Jan 13 07:04:38 2012 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Fri Jan 13 19:04:59 2012 +0900 @@ -8,8 +8,8 @@ import alice.codesegment.CodeSegment; public abstract class DataSegmentManager { - public ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); - public ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); + protected ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); + protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); public abstract void put(String key, Value val);
--- a/src/alice/datasegment/LocalDataSegmentManager.java Fri Jan 13 07:04:38 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Fri Jan 13 19:04:59 2012 +0900 @@ -26,11 +26,12 @@ } }; + public LocalDataSegmentManager() { new Thread(replyThread).start(); } - private DataSegmentKey getDataSegmentKey(String key) { + public DataSegmentKey getDataSegmentKey(String key) { DataSegmentKey newDataSegmentKey = new DataSegmentKey(); DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); if (dataSegmentKey == null) { @@ -43,20 +44,20 @@ @Override public void put(String key, Value val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.PUT, null, val, 0, 0, this, null)); + dataSegmentKey.addCommand(new Command(CommandType.PUT, null, val, 0, 0, replyQueue, null)); } @Override public void update(String key, Value val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, val, 0, 0, this, null)); + dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, val, 0, 0, replyQueue, null)); } @Override public void take(String argKey, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.TAKE, argKey, null, index, seq, this, cs); + Command cmd = new Command(CommandType.TAKE, argKey, null, index, seq, replyQueue, cs); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); } @@ -65,7 +66,7 @@ public void peek(String argKey, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.PEEK, argKey, null, index, seq, this, cs); + Command cmd = new Command(CommandType.PEEK, argKey, null, index, seq, replyQueue, cs); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); } @@ -73,7 +74,7 @@ @Override public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, 0, 0, this, null)); + dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, 0, 0, replyQueue, null)); } }
--- a/src/alice/datasegment/RemoteDataSegment.java Fri Jan 13 07:04:38 2012 +0900 +++ b/src/alice/datasegment/RemoteDataSegment.java Fri Jan 13 19:04:59 2012 +0900 @@ -35,4 +35,5 @@ // TODO Auto-generated method stub } + }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/datasegment/SocketDataSegmentManager.java Fri Jan 13 19:04:59 2012 +0900 @@ -0,0 +1,39 @@ +package alice.datasegment; + +import org.msgpack.type.Value; + +import alice.codesegment.CodeSegment; + +public class SocketDataSegmentManager extends DataSegmentManager { + + @Override + public void put(String key, Value val) { + // TODO Auto-generated method stub + + } + + @Override + public void update(String key, Value val) { + // TODO Auto-generated method stub + + } + + @Override + public void take(String argKey, String key, int index, CodeSegment cs) { + // TODO Auto-generated method stub + + } + + @Override + public void peek(String argKey, String key, int index, CodeSegment cs) { + // TODO Auto-generated method stub + + } + + @Override + public void remove(String key) { + // TODO Auto-generated method stub + + } + +}
--- a/src/alice/test/codesegment/TestCodeSegment.java Fri Jan 13 07:04:38 2012 +0900 +++ b/src/alice/test/codesegment/TestCodeSegment.java Fri Jan 13 19:04:59 2012 +0900 @@ -13,6 +13,7 @@ DataSegmentValue data = ids.get("arg1"); System.out.println("index = " + data.index); System.out.println("data = " + data.val); + System.out.println(data.val.getType()); if (data.index == 10) { System.exit(0);