changeset 3:91057e15065f

add DataSegment API and CodeSegment
author one
date Wed, 11 Jan 2012 00:17:27 +0900
parents f71eabb1df2a
children cd3a8f97ea67
files src/alice/codesegment/CodeSegment.java src/alice/codesegment/Input.java src/alice/codesegment/InputDataSegment.java src/alice/datasegment/Command.java src/alice/datasegment/CommandType.java src/alice/datasegment/DataSegment.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/DataSegmentReceiver.java src/alice/datasegment/DataSegmentValue.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/test/codesegment/TestCodeSegment.java
diffstat 12 files changed, 277 insertions(+), 21 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/codesegment/CodeSegment.java	Wed Jan 11 00:17:27 2012 +0900
@@ -0,0 +1,12 @@
+package alice.codesegment;
+
+import alice.codesegment.InputDataSegment;
+
+public abstract class CodeSegment {
+	
+	public InputDataSegment ids = new InputDataSegment(this);
+	
+	public abstract void prepare();
+	public abstract void run();
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/codesegment/Input.java	Wed Jan 11 00:17:27 2012 +0900
@@ -0,0 +1,15 @@
+package alice.codesegment;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import alice.datasegment.CommandType;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface Input {
+	public CommandType type();
+	public int index();
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/codesegment/InputDataSegment.java	Wed Jan 11 00:17:27 2012 +0900
@@ -0,0 +1,38 @@
+package alice.codesegment;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import alice.datasegment.DataSegment;
+import alice.datasegment.DataSegmentManager;
+import alice.datasegment.DataSegmentValue;
+
+public class InputDataSegment {
+	
+	private ConcurrentHashMap<String, DataSegmentValue> inputDataSegments = new ConcurrentHashMap<String, DataSegmentValue>();
+	private CodeSegment cs;
+	private AtomicInteger count = new AtomicInteger();
+	
+	public InputDataSegment(CodeSegment cs) {
+		this.cs = cs;
+	}
+	
+	public void peek(String managerKey, String key) {
+		peek(managerKey, key, 0);
+	}
+	
+	public void peek(String managerKey, String key, int index) {
+		DataSegment.get(managerKey).peek(key, index, cs);
+		count.getAndIncrement();
+	}
+
+	public void take(String managerKey, String key) {
+		take(managerKey, key, 0);
+	}
+	
+	public void take(String managerKey, String key, int index) {
+		DataSegment.get(managerKey).take(key, index, cs);
+		count.getAndIncrement();
+	}
+	
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/datasegment/Command.java	Wed Jan 11 00:17:27 2012 +0900
@@ -0,0 +1,22 @@
+package alice.datasegment;
+
+import org.msgpack.type.Value;
+
+import alice.codesegment.CodeSegment;
+
+public class Command {
+	public CommandType cmdType;
+	public Value val;
+	public int index;
+	public CodeSegment cs;
+	public int seq;
+
+	public Command(CommandType cmdType, Value val, int index, CodeSegment cs, int seq) {
+		this.cs = cs;
+		this.cmdType = cmdType;
+		this.val = val;
+		this.index = index;
+		this.seq = seq;
+	}
+	
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/datasegment/CommandType.java	Wed Jan 11 00:17:27 2012 +0900
@@ -0,0 +1,8 @@
+package alice.datasegment;
+
+public enum CommandType {
+	PUT,
+	PEEK,
+	TAKE,
+	REMOVE,
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/datasegment/DataSegment.java	Wed Jan 11 00:17:27 2012 +0900
@@ -0,0 +1,22 @@
+package alice.datasegment;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DataSegment {
+	
+	private static DataSegment dataSegment = new DataSegment();
+	private ConcurrentHashMap<String, DataSegmentManager> dataSegmentManageres = new ConcurrentHashMap<String, DataSegmentManager>();
+	
+	private DataSegment() {
+		dataSegmentManageres.put("local", new LocalDataSegmentManager());
+	}
+	
+	public static DataSegmentManager get(String key) {
+		return dataSegment.dataSegmentManageres.get(key);
+	}
+	
+	public static void regist(String key, DataSegmentManager manager) {
+		dataSegment.dataSegmentManageres.put(key, manager);
+	}
+	
+}
--- a/src/alice/datasegment/DataSegmentKey.java	Sun Dec 11 06:37:05 2011 +0900
+++ b/src/alice/datasegment/DataSegmentKey.java	Wed Jan 11 00:17:27 2012 +0900
@@ -1,5 +1,86 @@
 package alice.datasegment;
 
+import java.util.ArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import alice.datasegment.Command; 
+
 public class DataSegmentKey {
 	
+	private LinkedBlockingQueue<Command> cmdQueue = new LinkedBlockingQueue<Command>();
+	private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
+	private ArrayList<Command> waitList = new ArrayList<Command>();
+	private AtomicInteger tailIndex = new AtomicInteger(1);
+	private Runnable keyThread;
+	
+	public DataSegmentKey() {
+		
+	}
+	
+	public void addCommand(Command cmd) {
+		cmdQueue.add(cmd);
+	}
+	
+	public void runKeyThread() {
+		keyThread = new Runnable() {
+			@Override
+			public void run() {
+				while (true) {
+					try {
+						Command cmd = cmdQueue.take();
+						switch (cmd.cmdType) {
+						case PUT:
+							int index = tailIndex.getAndIncrement();
+							dataList.add(new DataSegmentValue(index, cmd.val));
+							// run waiting peek and take
+							for (Command waitCmd : waitList) {
+								if (waitCmd.index < index) {
+									// TODO: make and send reply msg
+									
+								}
+							}
+							break;
+						case PEEK:
+							if (cmd.index >= tailIndex.get()) {
+								waitList.add(cmd);
+								break;
+							}
+							for (DataSegmentValue data : dataList) {
+								if (data.index > cmd.index) {
+									// TODO: make and send reply msg
+									
+									break;
+								}
+							}
+							break;
+						case TAKE:
+							if (cmd.index >= tailIndex.get()) {
+								waitList.add(cmd);
+								break;
+							}
+							for (DataSegmentValue data : dataList) {
+								if (data.index > cmd.index) {
+									// TODO: make and send reply msg
+
+									dataList.remove(data);
+									break;
+								}
+							}
+							break;
+						case REMOVE:
+							// TODO: implements later
+							break;
+						default:
+						}
+					} catch (InterruptedException e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+					}
+				}
+			}
+		};
+		keyThread.run();
+	};
+	
 }
--- a/src/alice/datasegment/DataSegmentManager.java	Sun Dec 11 06:37:05 2011 +0900
+++ b/src/alice/datasegment/DataSegmentManager.java	Wed Jan 11 00:17:27 2012 +0900
@@ -1,22 +1,22 @@
 package alice.datasegment;
 
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
+import org.msgpack.type.Value;
 
-import org.msgpack.type.Value;
+import alice.codesegment.CodeSegment;
 
 public abstract class DataSegmentManager {
 	ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
 	
 	public abstract void put(String key, Value val);
-	public Future<Value> take(String key) {
-		return take(key, 0);
+	public void take(String key, CodeSegment cs) {
+		take(key, 0, cs);
 	}
-	public abstract Future<Value> take(String key, int index);
-	public Future<Value> peek(String key) {
-		return peek(key, 0);
+	public abstract void take(String key, int index, CodeSegment cs);
+	public void peek(String key, CodeSegment cs) {
+		peek(key, 0, cs);
 	}
-	public abstract Future<Value> peek(String key, int index);
-	public abstract void delete(String key);
+	public abstract void peek(String key, int index, CodeSegment cs);
+	public abstract void remove(String key);
 	
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/datasegment/DataSegmentReceiver.java	Wed Jan 11 00:17:27 2012 +0900
@@ -0,0 +1,5 @@
+package alice.datasegment;
+
+public class DataSegmentReceiver {
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/datasegment/DataSegmentValue.java	Wed Jan 11 00:17:27 2012 +0900
@@ -0,0 +1,15 @@
+package alice.datasegment;
+
+import org.msgpack.type.Value;
+
+public class DataSegmentValue {
+
+	public int index;
+	public Value val;
+	
+	public DataSegmentValue(int index, Value val) {
+		this.index = index;
+		this.val = val;
+	}
+	
+}
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Sun Dec 11 06:37:05 2011 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Wed Jan 11 00:17:27 2012 +0900
@@ -1,33 +1,49 @@
 package alice.datasegment;
 
-import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.msgpack.type.Value;
 
+import alice.codesegment.CodeSegment;
+import alice.datasegment.CommandType;
+
 public class LocalDataSegmentManager extends DataSegmentManager {
 	
+	private AtomicInteger seq = new AtomicInteger(1); 
+	
+	private DataSegmentKey getDataSegmentKey(String key) {
+		DataSegmentKey newDataSegmentKey = new DataSegmentKey();
+		DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
+		if (dataSegmentKey == newDataSegmentKey) {
+			newDataSegmentKey.runKeyThread();
+		}
+		return dataSegmentKey;
+	}
+	
 	@Override
 	public void put(String key, Value val) {
-		// TODO Auto-generated method stub
-
+		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
+		dataSegmentKey.addCommand(new Command(CommandType.PUT, val, 0, null, 0));
 	}
 
 	@Override
-	public Future<Value> take(String key, int index) {
-		// TODO Auto-generated method stub
-		return null;
+	public void take(String key, int index, CodeSegment cs) {
+		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
+		int seq = this.seq.getAndIncrement();
+		dataSegmentKey.addCommand(new Command(CommandType.TAKE, null, index, cs, seq));
 	}
 
 	@Override
-	public Future<Value> peek(String key, int index) {
-		// TODO Auto-generated method stub
-		return null;
+	public void peek(String key, int index, CodeSegment cs) {
+		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
+		int seq = this.seq.getAndIncrement();
+		dataSegmentKey.addCommand(new Command(CommandType.PEEK, null, index, cs, seq));
 	}
 
 	@Override
-	public void delete(String key) {
-		// TODO Auto-generated method stub
-
+	public void remove(String key) {
+		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
+		dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, 0, null, 0));
 	}
 
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/test/codesegment/TestCodeSegment.java	Wed Jan 11 00:17:27 2012 +0900
@@ -0,0 +1,22 @@
+package alice.test.codesegment;
+
+import alice.codesegment.CodeSegment;
+
+public class TestCodeSegment extends CodeSegment {
+	
+	public TestCodeSegment() {
+
+	}
+
+	@Override
+	public void prepare() {
+		ids.peek("local", "key1");
+	}
+	
+	@Override
+	public void run() {
+		
+
+	}
+
+}