changeset 12:c4d6ff56b9bf

unite Command and Reply and add Network outline
author one
date Fri, 13 Jan 2012 07:04:38 +0900 (2012-01-12)
parents 2ea5acb0ed16
children 30f97d776a3e
files src/alice/daemon/AcceptThread.java src/alice/daemon/AliceDaemon.java src/alice/daemon/Config.java src/alice/daemon/Connection.java src/alice/datasegment/CommandType.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/RemoteDataSegment.java src/alice/datasegment/Reply.java src/alice/test/codesegment/StartCodeSegment.java src/alice/test/codesegment/TestCodeSegment.java
diffstat 12 files changed, 171 insertions(+), 24 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/daemon/AcceptThread.java	Fri Jan 13 07:04:38 2012 +0900
@@ -0,0 +1,28 @@
+package alice.daemon;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class AcceptThread extends Thread {
+
+	private ServerSocket ss;
+
+	public AcceptThread(ServerSocket ss, String name) {
+		super(name);
+		this.ss = ss;
+	}
+	
+	@Override
+	public void run() {
+		while (true) {
+			try {
+				Socket socket = ss.accept();
+				Connection connection = new Connection(socket);
+			} catch (IOException e) {
+				e.printStackTrace();
+			}
+		}
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/daemon/AliceDaemon.java	Fri Jan 13 07:04:38 2012 +0900
@@ -0,0 +1,32 @@
+package alice.daemon;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.ServerSocketChannel;
+
+public class AliceDaemon {
+	
+	private Config conf;
+	private AcceptThread acceptThread;
+	
+	public AliceDaemon(String[] args) {
+		this.conf = new Config(args);
+	}
+	
+	public void listen() {
+		try {
+			ServerSocketChannel ssChannel = ServerSocketChannel.open();
+			ServerSocket ss = ssChannel.socket();
+			ss.setReuseAddress(true);
+			ss.bind(new InetSocketAddress(InetAddress.getLocalHost(), conf.port));
+			acceptThread = new AcceptThread(ss, "ACCEPT" + conf.port);
+			acceptThread.start();
+			
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+		
+	}
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/daemon/Config.java	Fri Jan 13 07:04:38 2012 +0900
@@ -0,0 +1,15 @@
+package alice.daemon;
+
+public class Config {
+
+	public int port = 10000;
+
+	public Config(String[] args) {
+		for (int i = 0; i< args.length; i++) {
+			if ("-p".equals(args[i])) {
+				port = Integer.parseInt(args[++i]);
+			}
+		}
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/daemon/Connection.java	Fri Jan 13 07:04:38 2012 +0900
@@ -0,0 +1,11 @@
+package alice.daemon;
+
+import java.net.Socket;
+
+public class Connection {
+
+	public Connection(Socket socket) {
+		// TODO Auto-generated constructor stub
+	}
+
+}
--- a/src/alice/datasegment/CommandType.java	Thu Jan 12 20:22:23 2012 +0900
+++ b/src/alice/datasegment/CommandType.java	Fri Jan 13 07:04:38 2012 +0900
@@ -1,9 +1,44 @@
 package alice.datasegment;
 
+import java.util.HashMap;
+
 public enum CommandType {
 	PUT,
 	UPDATE,
 	PEEK,
 	TAKE,
 	REMOVE,
+	REPLY;
+	
+	public int id;
+	public static HashMap<Integer, CommandType> hash = new HashMap<Integer, CommandType>();
+	private static int lastId = 0;
+	
+	private CommandType(int id) {
+		this.id = id;
+		setLastId(id);
+	}
+	
+	private CommandType() {
+		this.id = incrementLastId();
+	}
+	
+	private void setLastId(int id) {
+		lastId =id;
+	}
+	
+	private int incrementLastId() {
+		return ++lastId;
+	}
+	
+	public static CommandType getCommandTypeFromId(int id) {
+		return hash.get(id);
+	}
+	
+	static {
+		for (CommandType type : CommandType.values()) {
+			hash.put(type.id, type);
+		}
+	}
+	
 }
--- a/src/alice/datasegment/DataSegmentKey.java	Thu Jan 12 20:22:23 2012 +0900
+++ b/src/alice/datasegment/DataSegmentKey.java	Fri Jan 13 07:04:38 2012 +0900
@@ -43,7 +43,7 @@
 							LinkedList<Command> removeList = new LinkedList<Command>();
 							for (Command waitCmd : waitList) {
 								if (waitCmd.index < index) {
-									waitCmd.manager.replyQueue.put(new Reply(waitCmd.seq, index, cmd.val));
+									waitCmd.manager.replyQueue.put(new Command(CommandType.REPLY, null, cmd.val, index, waitCmd.seq, null, null));
 									removeList.add(waitCmd);
 									if (waitCmd.cmdType == CommandType.TAKE) { // delete data, if it run take cmd.
 										dataList.remove(dsv);
@@ -62,7 +62,7 @@
 							}
 							for (DataSegmentValue data : dataList) {
 								if (data.index > cmd.index) {
-									cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val));
+									cmd.manager.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null));
 									break;
 								}
 							}
@@ -75,7 +75,7 @@
 							}
 							for (DataSegmentValue data : dataList) {
 								if (data.index > cmd.index) {
-									cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val));
+									cmd.manager.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null));
 									dataList.remove(data);
 									break;
 								}
--- a/src/alice/datasegment/DataSegmentManager.java	Thu Jan 12 20:22:23 2012 +0900
+++ b/src/alice/datasegment/DataSegmentManager.java	Fri Jan 13 07:04:38 2012 +0900
@@ -10,7 +10,7 @@
 public abstract class DataSegmentManager {
 	public ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
 	public ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>();
-	public LinkedBlockingQueue<Reply> replyQueue = new LinkedBlockingQueue<Reply>();  
+	public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();  
 	
 	public abstract void put(String key, Value val);
 	public abstract void update(String key, Value val);
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Thu Jan 12 20:22:23 2012 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Fri Jan 13 07:04:38 2012 +0900
@@ -16,7 +16,7 @@
 		public void run() {
 			while (true) {
 				try {
-					Reply reply = replyQueue.take();
+					Command reply = replyQueue.take();
 					Command cmd = seqHash.get(reply.seq);
 					cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val));
 				} catch (InterruptedException e) {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/datasegment/RemoteDataSegment.java	Fri Jan 13 07:04:38 2012 +0900
@@ -0,0 +1,38 @@
+package alice.datasegment;
+
+import org.msgpack.type.Value;
+
+import alice.codesegment.CodeSegment;
+
+public class RemoteDataSegment extends DataSegmentManager {
+
+	@Override
+	public void put(String key, Value val) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public void update(String key, Value val) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public void take(String argKey, String key, int index, CodeSegment cs) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public void peek(String argKey, String key, int index, CodeSegment cs) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public void remove(String key) {
+		// TODO Auto-generated method stub
+		
+	}
+}
--- a/src/alice/datasegment/Reply.java	Thu Jan 12 20:22:23 2012 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,15 +0,0 @@
-package alice.datasegment;
-
-import org.msgpack.type.Value;
-
-public class Reply {
-	int seq;
-	int index;
-	Value val;
-	
-	public Reply(int seq, int index, Value val) {
-		this.seq = seq;
-		this.index = index;
-		this.val = val;
-	}
-}
--- a/src/alice/test/codesegment/StartCodeSegment.java	Thu Jan 12 20:22:23 2012 +0900
+++ b/src/alice/test/codesegment/StartCodeSegment.java	Fri Jan 13 07:04:38 2012 +0900
@@ -17,7 +17,7 @@
 		System.out.println("create TestCodeSegment");
 		
 		Value val = ValueFactory.createRawValue("String data");
-		ods.put("local", "key1", val);
+		ods.update("local", "key1", val);
 	}
 
 }
--- a/src/alice/test/codesegment/TestCodeSegment.java	Thu Jan 12 20:22:23 2012 +0900
+++ b/src/alice/test/codesegment/TestCodeSegment.java	Fri Jan 13 07:04:38 2012 +0900
@@ -14,14 +14,17 @@
 		System.out.println("index = " + data.index);
 		System.out.println("data = " + data.val);
 		
-		if (data.index == 10) return;
+		if (data.index == 10) {
+			System.exit(0);
+			return;
+		}
 		
 		CodeSegment cs = new TestCodeSegment();
-		cs.ids.take("arg1", "local", "key1", data.index);
+		cs.ids.peek("arg1", "local", "key1", data.index);
 		cs.ids.execute();
 		
 		Value val = ValueFactory.createRawValue("String data");
-		ods.put("local", "key1", val);
+		ods.update("local", "key1", val);
 	}
 
 }