# HG changeset patch # User one # Date 1326208647 -32400 # Node ID 91057e15065fd8b78dc1e4dac53be478ca2c4aa3 # Parent f71eabb1df2a82dbf1cc323568acd888ffb9eb0d add DataSegment API and CodeSegment diff -r f71eabb1df2a -r 91057e15065f src/alice/codesegment/CodeSegment.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/codesegment/CodeSegment.java Wed Jan 11 00:17:27 2012 +0900 @@ -0,0 +1,12 @@ +package alice.codesegment; + +import alice.codesegment.InputDataSegment; + +public abstract class CodeSegment { + + public InputDataSegment ids = new InputDataSegment(this); + + public abstract void prepare(); + public abstract void run(); + +} diff -r f71eabb1df2a -r 91057e15065f src/alice/codesegment/Input.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/codesegment/Input.java Wed Jan 11 00:17:27 2012 +0900 @@ -0,0 +1,15 @@ +package alice.codesegment; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import alice.datasegment.CommandType; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.FIELD) +public @interface Input { + public CommandType type(); + public int index(); +} diff -r f71eabb1df2a -r 91057e15065f src/alice/codesegment/InputDataSegment.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/codesegment/InputDataSegment.java Wed Jan 11 00:17:27 2012 +0900 @@ -0,0 +1,38 @@ +package alice.codesegment; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import alice.datasegment.DataSegment; +import alice.datasegment.DataSegmentManager; +import alice.datasegment.DataSegmentValue; + +public class InputDataSegment { + + private ConcurrentHashMap inputDataSegments = new ConcurrentHashMap(); + private CodeSegment cs; + private AtomicInteger count = new AtomicInteger(); + + public InputDataSegment(CodeSegment cs) { + this.cs = cs; + } + + public void peek(String managerKey, String key) { + peek(managerKey, key, 0); + } + + public void peek(String managerKey, String key, int index) { + DataSegment.get(managerKey).peek(key, index, cs); + count.getAndIncrement(); + } + + public void take(String managerKey, String key) { + take(managerKey, key, 0); + } + + public void take(String managerKey, String key, int index) { + DataSegment.get(managerKey).take(key, index, cs); + count.getAndIncrement(); + } + +} diff -r f71eabb1df2a -r 91057e15065f src/alice/datasegment/Command.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/datasegment/Command.java Wed Jan 11 00:17:27 2012 +0900 @@ -0,0 +1,22 @@ +package alice.datasegment; + +import org.msgpack.type.Value; + +import alice.codesegment.CodeSegment; + +public class Command { + public CommandType cmdType; + public Value val; + public int index; + public CodeSegment cs; + public int seq; + + public Command(CommandType cmdType, Value val, int index, CodeSegment cs, int seq) { + this.cs = cs; + this.cmdType = cmdType; + this.val = val; + this.index = index; + this.seq = seq; + } + +} diff -r f71eabb1df2a -r 91057e15065f src/alice/datasegment/CommandType.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/datasegment/CommandType.java Wed Jan 11 00:17:27 2012 +0900 @@ -0,0 +1,8 @@ +package alice.datasegment; + +public enum CommandType { + PUT, + PEEK, + TAKE, + REMOVE, +} diff -r f71eabb1df2a -r 91057e15065f src/alice/datasegment/DataSegment.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/datasegment/DataSegment.java Wed Jan 11 00:17:27 2012 +0900 @@ -0,0 +1,22 @@ +package alice.datasegment; + +import java.util.concurrent.ConcurrentHashMap; + +public class DataSegment { + + private static DataSegment dataSegment = new DataSegment(); + private ConcurrentHashMap dataSegmentManageres = new ConcurrentHashMap(); + + private DataSegment() { + dataSegmentManageres.put("local", new LocalDataSegmentManager()); + } + + public static DataSegmentManager get(String key) { + return dataSegment.dataSegmentManageres.get(key); + } + + public static void regist(String key, DataSegmentManager manager) { + dataSegment.dataSegmentManageres.put(key, manager); + } + +} diff -r f71eabb1df2a -r 91057e15065f src/alice/datasegment/DataSegmentKey.java --- a/src/alice/datasegment/DataSegmentKey.java Sun Dec 11 06:37:05 2011 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Wed Jan 11 00:17:27 2012 +0900 @@ -1,5 +1,86 @@ package alice.datasegment; +import java.util.ArrayList; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import alice.datasegment.Command; + public class DataSegmentKey { + private LinkedBlockingQueue cmdQueue = new LinkedBlockingQueue(); + private ArrayList dataList = new ArrayList(); + private ArrayList waitList = new ArrayList(); + private AtomicInteger tailIndex = new AtomicInteger(1); + private Runnable keyThread; + + public DataSegmentKey() { + + } + + public void addCommand(Command cmd) { + cmdQueue.add(cmd); + } + + public void runKeyThread() { + keyThread = new Runnable() { + @Override + public void run() { + while (true) { + try { + Command cmd = cmdQueue.take(); + switch (cmd.cmdType) { + case PUT: + int index = tailIndex.getAndIncrement(); + dataList.add(new DataSegmentValue(index, cmd.val)); + // run waiting peek and take + for (Command waitCmd : waitList) { + if (waitCmd.index < index) { + // TODO: make and send reply msg + + } + } + break; + case PEEK: + if (cmd.index >= tailIndex.get()) { + waitList.add(cmd); + break; + } + for (DataSegmentValue data : dataList) { + if (data.index > cmd.index) { + // TODO: make and send reply msg + + break; + } + } + break; + case TAKE: + if (cmd.index >= tailIndex.get()) { + waitList.add(cmd); + break; + } + for (DataSegmentValue data : dataList) { + if (data.index > cmd.index) { + // TODO: make and send reply msg + + dataList.remove(data); + break; + } + } + break; + case REMOVE: + // TODO: implements later + break; + default: + } + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + }; + keyThread.run(); + }; + } diff -r f71eabb1df2a -r 91057e15065f src/alice/datasegment/DataSegmentManager.java --- a/src/alice/datasegment/DataSegmentManager.java Sun Dec 11 06:37:05 2011 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Wed Jan 11 00:17:27 2012 +0900 @@ -1,22 +1,22 @@ package alice.datasegment; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; +import org.msgpack.type.Value; -import org.msgpack.type.Value; +import alice.codesegment.CodeSegment; public abstract class DataSegmentManager { ConcurrentHashMap dataSegments = new ConcurrentHashMap(); public abstract void put(String key, Value val); - public Future take(String key) { - return take(key, 0); + public void take(String key, CodeSegment cs) { + take(key, 0, cs); } - public abstract Future take(String key, int index); - public Future peek(String key) { - return peek(key, 0); + public abstract void take(String key, int index, CodeSegment cs); + public void peek(String key, CodeSegment cs) { + peek(key, 0, cs); } - public abstract Future peek(String key, int index); - public abstract void delete(String key); + public abstract void peek(String key, int index, CodeSegment cs); + public abstract void remove(String key); } diff -r f71eabb1df2a -r 91057e15065f src/alice/datasegment/DataSegmentReceiver.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/datasegment/DataSegmentReceiver.java Wed Jan 11 00:17:27 2012 +0900 @@ -0,0 +1,5 @@ +package alice.datasegment; + +public class DataSegmentReceiver { + +} diff -r f71eabb1df2a -r 91057e15065f src/alice/datasegment/DataSegmentValue.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/datasegment/DataSegmentValue.java Wed Jan 11 00:17:27 2012 +0900 @@ -0,0 +1,15 @@ +package alice.datasegment; + +import org.msgpack.type.Value; + +public class DataSegmentValue { + + public int index; + public Value val; + + public DataSegmentValue(int index, Value val) { + this.index = index; + this.val = val; + } + +} diff -r f71eabb1df2a -r 91057e15065f src/alice/datasegment/LocalDataSegmentManager.java --- a/src/alice/datasegment/LocalDataSegmentManager.java Sun Dec 11 06:37:05 2011 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Wed Jan 11 00:17:27 2012 +0900 @@ -1,33 +1,49 @@ package alice.datasegment; -import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import org.msgpack.type.Value; +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; + public class LocalDataSegmentManager extends DataSegmentManager { + private AtomicInteger seq = new AtomicInteger(1); + + private DataSegmentKey getDataSegmentKey(String key) { + DataSegmentKey newDataSegmentKey = new DataSegmentKey(); + DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); + if (dataSegmentKey == newDataSegmentKey) { + newDataSegmentKey.runKeyThread(); + } + return dataSegmentKey; + } + @Override public void put(String key, Value val) { - // TODO Auto-generated method stub - + DataSegmentKey dataSegmentKey = getDataSegmentKey(key); + dataSegmentKey.addCommand(new Command(CommandType.PUT, val, 0, null, 0)); } @Override - public Future take(String key, int index) { - // TODO Auto-generated method stub - return null; + public void take(String key, int index, CodeSegment cs) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(key); + int seq = this.seq.getAndIncrement(); + dataSegmentKey.addCommand(new Command(CommandType.TAKE, null, index, cs, seq)); } @Override - public Future peek(String key, int index) { - // TODO Auto-generated method stub - return null; + public void peek(String key, int index, CodeSegment cs) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(key); + int seq = this.seq.getAndIncrement(); + dataSegmentKey.addCommand(new Command(CommandType.PEEK, null, index, cs, seq)); } @Override - public void delete(String key) { - // TODO Auto-generated method stub - + public void remove(String key) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(key); + dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, 0, null, 0)); } } diff -r f71eabb1df2a -r 91057e15065f src/alice/test/codesegment/TestCodeSegment.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/TestCodeSegment.java Wed Jan 11 00:17:27 2012 +0900 @@ -0,0 +1,22 @@ +package alice.test.codesegment; + +import alice.codesegment.CodeSegment; + +public class TestCodeSegment extends CodeSegment { + + public TestCodeSegment() { + + } + + @Override + public void prepare() { + ids.peek("local", "key1"); + } + + @Override + public void run() { + + + } + +}