Mercurial > hg > Members > tatsuki > Alice
changeset 12:c4d6ff56b9bf
unite Command and Reply
and add Network outline
author | one |
---|---|
date | Fri, 13 Jan 2012 07:04:38 +0900 (2012-01-12) |
parents | 2ea5acb0ed16 |
children | 30f97d776a3e |
files | src/alice/daemon/AcceptThread.java src/alice/daemon/AliceDaemon.java src/alice/daemon/Config.java src/alice/daemon/Connection.java src/alice/datasegment/CommandType.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/RemoteDataSegment.java src/alice/datasegment/Reply.java src/alice/test/codesegment/StartCodeSegment.java src/alice/test/codesegment/TestCodeSegment.java |
diffstat | 12 files changed, 171 insertions(+), 24 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/daemon/AcceptThread.java Fri Jan 13 07:04:38 2012 +0900 @@ -0,0 +1,28 @@ +package alice.daemon; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; + +public class AcceptThread extends Thread { + + private ServerSocket ss; + + public AcceptThread(ServerSocket ss, String name) { + super(name); + this.ss = ss; + } + + @Override + public void run() { + while (true) { + try { + Socket socket = ss.accept(); + Connection connection = new Connection(socket); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/daemon/AliceDaemon.java Fri Jan 13 07:04:38 2012 +0900 @@ -0,0 +1,32 @@ +package alice.daemon; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.nio.channels.ServerSocketChannel; + +public class AliceDaemon { + + private Config conf; + private AcceptThread acceptThread; + + public AliceDaemon(String[] args) { + this.conf = new Config(args); + } + + public void listen() { + try { + ServerSocketChannel ssChannel = ServerSocketChannel.open(); + ServerSocket ss = ssChannel.socket(); + ss.setReuseAddress(true); + ss.bind(new InetSocketAddress(InetAddress.getLocalHost(), conf.port)); + acceptThread = new AcceptThread(ss, "ACCEPT" + conf.port); + acceptThread.start(); + + } catch (IOException e) { + e.printStackTrace(); + } + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/daemon/Config.java Fri Jan 13 07:04:38 2012 +0900 @@ -0,0 +1,15 @@ +package alice.daemon; + +public class Config { + + public int port = 10000; + + public Config(String[] args) { + for (int i = 0; i< args.length; i++) { + if ("-p".equals(args[i])) { + port = Integer.parseInt(args[++i]); + } + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/daemon/Connection.java Fri Jan 13 07:04:38 2012 +0900 @@ -0,0 +1,11 @@ +package alice.daemon; + +import java.net.Socket; + +public class Connection { + + public Connection(Socket socket) { + // TODO Auto-generated constructor stub + } + +}
--- a/src/alice/datasegment/CommandType.java Thu Jan 12 20:22:23 2012 +0900 +++ b/src/alice/datasegment/CommandType.java Fri Jan 13 07:04:38 2012 +0900 @@ -1,9 +1,44 @@ package alice.datasegment; +import java.util.HashMap; + public enum CommandType { PUT, UPDATE, PEEK, TAKE, REMOVE, + REPLY; + + public int id; + public static HashMap<Integer, CommandType> hash = new HashMap<Integer, CommandType>(); + private static int lastId = 0; + + private CommandType(int id) { + this.id = id; + setLastId(id); + } + + private CommandType() { + this.id = incrementLastId(); + } + + private void setLastId(int id) { + lastId =id; + } + + private int incrementLastId() { + return ++lastId; + } + + public static CommandType getCommandTypeFromId(int id) { + return hash.get(id); + } + + static { + for (CommandType type : CommandType.values()) { + hash.put(type.id, type); + } + } + }
--- a/src/alice/datasegment/DataSegmentKey.java Thu Jan 12 20:22:23 2012 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Fri Jan 13 07:04:38 2012 +0900 @@ -43,7 +43,7 @@ LinkedList<Command> removeList = new LinkedList<Command>(); for (Command waitCmd : waitList) { if (waitCmd.index < index) { - waitCmd.manager.replyQueue.put(new Reply(waitCmd.seq, index, cmd.val)); + waitCmd.manager.replyQueue.put(new Command(CommandType.REPLY, null, cmd.val, index, waitCmd.seq, null, null)); removeList.add(waitCmd); if (waitCmd.cmdType == CommandType.TAKE) { // delete data, if it run take cmd. dataList.remove(dsv); @@ -62,7 +62,7 @@ } for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { - cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val)); + cmd.manager.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null)); break; } } @@ -75,7 +75,7 @@ } for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { - cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val)); + cmd.manager.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null)); dataList.remove(data); break; }
--- a/src/alice/datasegment/DataSegmentManager.java Thu Jan 12 20:22:23 2012 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Fri Jan 13 07:04:38 2012 +0900 @@ -10,7 +10,7 @@ public abstract class DataSegmentManager { public ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); public ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); - public LinkedBlockingQueue<Reply> replyQueue = new LinkedBlockingQueue<Reply>(); + public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); public abstract void put(String key, Value val); public abstract void update(String key, Value val);
--- a/src/alice/datasegment/LocalDataSegmentManager.java Thu Jan 12 20:22:23 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Fri Jan 13 07:04:38 2012 +0900 @@ -16,7 +16,7 @@ public void run() { while (true) { try { - Reply reply = replyQueue.take(); + Command reply = replyQueue.take(); Command cmd = seqHash.get(reply.seq); cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val)); } catch (InterruptedException e) {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/datasegment/RemoteDataSegment.java Fri Jan 13 07:04:38 2012 +0900 @@ -0,0 +1,38 @@ +package alice.datasegment; + +import org.msgpack.type.Value; + +import alice.codesegment.CodeSegment; + +public class RemoteDataSegment extends DataSegmentManager { + + @Override + public void put(String key, Value val) { + // TODO Auto-generated method stub + + } + + @Override + public void update(String key, Value val) { + // TODO Auto-generated method stub + + } + + @Override + public void take(String argKey, String key, int index, CodeSegment cs) { + // TODO Auto-generated method stub + + } + + @Override + public void peek(String argKey, String key, int index, CodeSegment cs) { + // TODO Auto-generated method stub + + } + + @Override + public void remove(String key) { + // TODO Auto-generated method stub + + } +}
--- a/src/alice/datasegment/Reply.java Thu Jan 12 20:22:23 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,15 +0,0 @@ -package alice.datasegment; - -import org.msgpack.type.Value; - -public class Reply { - int seq; - int index; - Value val; - - public Reply(int seq, int index, Value val) { - this.seq = seq; - this.index = index; - this.val = val; - } -}
--- a/src/alice/test/codesegment/StartCodeSegment.java Thu Jan 12 20:22:23 2012 +0900 +++ b/src/alice/test/codesegment/StartCodeSegment.java Fri Jan 13 07:04:38 2012 +0900 @@ -17,7 +17,7 @@ System.out.println("create TestCodeSegment"); Value val = ValueFactory.createRawValue("String data"); - ods.put("local", "key1", val); + ods.update("local", "key1", val); } }
--- a/src/alice/test/codesegment/TestCodeSegment.java Thu Jan 12 20:22:23 2012 +0900 +++ b/src/alice/test/codesegment/TestCodeSegment.java Fri Jan 13 07:04:38 2012 +0900 @@ -14,14 +14,17 @@ System.out.println("index = " + data.index); System.out.println("data = " + data.val); - if (data.index == 10) return; + if (data.index == 10) { + System.exit(0); + return; + } CodeSegment cs = new TestCodeSegment(); - cs.ids.take("arg1", "local", "key1", data.index); + cs.ids.peek("arg1", "local", "key1", data.index); cs.ids.execute(); Value val = ValueFactory.createRawValue("String data"); - ods.put("local", "key1", val); + ods.update("local", "key1", val); } }