# HG changeset patch # User one # Date 1326341944 -32400 # Node ID c78a1cc2cd8f1c7158842926b148559502797eb2 # Parent 80375ae09a1faf532a2445598593b8ca39bece8f implements Reply diff -r 80375ae09a1f -r c78a1cc2cd8f src/alice/codesegment/CodeSegmentManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/codesegment/CodeSegmentManager.java Thu Jan 12 13:19:04 2012 +0900 @@ -0,0 +1,35 @@ +package alice.codesegment; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +public class CodeSegmentManager { + private static CodeSegmentManager instance = new CodeSegmentManager(); + private LinkedBlockingQueue replyQueue = new LinkedBlockingQueue(); + private ConcurrentHashMap seqHash = new ConcurrentHashMap(); + + private CodeSegmentManager() { + Runnable replyThread = new Runnable() { + + @Override + public void run() { + while (true) { + try { + Reply reply = replyQueue.take(); + + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + } + }; + } + + public void create(CodeSegment cs) { + } + + public static CodeSegmentManager get() { + return instance; + } +} diff -r 80375ae09a1f -r c78a1cc2cd8f src/alice/codesegment/Input.java --- a/src/alice/codesegment/Input.java Wed Jan 11 23:28:02 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,15 +0,0 @@ -package alice.codesegment; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -import alice.datasegment.CommandType; - -@Retention(RetentionPolicy.RUNTIME) -@Target(ElementType.FIELD) -public @interface Input { - public CommandType type(); - public int index(); -} diff -r 80375ae09a1f -r c78a1cc2cd8f src/alice/codesegment/Reply.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/codesegment/Reply.java Thu Jan 12 13:19:04 2012 +0900 @@ -0,0 +1,15 @@ +package alice.codesegment; + +import org.msgpack.type.Value; + +public class Reply { + int seq; + int index; + Value val; + + public Reply(int seq, int index, Value val) { + this.seq = seq; + this.index = index; + this.val = val; + } +} diff -r 80375ae09a1f -r c78a1cc2cd8f src/alice/datasegment/Command.java --- a/src/alice/datasegment/Command.java Wed Jan 11 23:28:02 2012 +0900 +++ b/src/alice/datasegment/Command.java Thu Jan 12 13:19:04 2012 +0900 @@ -2,21 +2,19 @@ import org.msgpack.type.Value; -import alice.codesegment.CodeSegment; - public class Command { public CommandType cmdType; public Value val; public int index; - public CodeSegment cs; public int seq; + public DataSegmentManager manager; - public Command(CommandType cmdType, Value val, int index, CodeSegment cs, int seq) { - this.cs = cs; + public Command(CommandType cmdType, Value val, int index, int seq, DataSegmentManager manager) { this.cmdType = cmdType; this.val = val; this.index = index; this.seq = seq; + this.manager = manager; } } diff -r 80375ae09a1f -r c78a1cc2cd8f src/alice/datasegment/DataSegmentKey.java --- a/src/alice/datasegment/DataSegmentKey.java Wed Jan 11 23:28:02 2012 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Thu Jan 12 13:19:04 2012 +0900 @@ -4,6 +4,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +import alice.codesegment.Reply; import alice.datasegment.Command; public class DataSegmentKey { @@ -36,12 +37,16 @@ } case PUT: int index = tailIndex.getAndIncrement(); - dataList.add(new DataSegmentValue(index, cmd.val)); + DataSegmentValue dsv = new DataSegmentValue(index, cmd.val); + dataList.add(dsv); // run waiting peek and take for (Command waitCmd : waitList) { if (waitCmd.index < index) { - // TODO: make and send reply msg - + waitCmd.manager.replyQueue.put(new Reply(waitCmd.seq, index, cmd.val)); + if (waitCmd.cmdType == CommandType.TAKE) { // delete data, if it run take cmd. + dataList.remove(dsv); + break; + } } } break; @@ -52,30 +57,25 @@ } for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { - // TODO: make and send reply msg - + cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val)); break; } } + waitList.add(cmd); break; case TAKE: if (cmd.index >= tailIndex.get()) { waitList.add(cmd); break; } - boolean waitFlag = true; for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { - // TODO: make and send reply msg - - + cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val)); dataList.remove(data); - waitFlag = false; break; } } - if (waitFlag) - waitList.add(cmd); + waitList.add(cmd); break; case REMOVE: // TODO: implements later @@ -88,7 +88,7 @@ } } }; - keyThread.run(); + new Thread(keyThread).start(); }; } diff -r 80375ae09a1f -r c78a1cc2cd8f src/alice/datasegment/DataSegmentManager.java --- a/src/alice/datasegment/DataSegmentManager.java Wed Jan 11 23:28:02 2012 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Thu Jan 12 13:19:04 2012 +0900 @@ -1,12 +1,17 @@ package alice.datasegment; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + import org.msgpack.type.Value; import alice.codesegment.CodeSegment; +import alice.codesegment.Reply; public abstract class DataSegmentManager { - ConcurrentHashMap dataSegments = new ConcurrentHashMap(); + public ConcurrentHashMap dataSegments = new ConcurrentHashMap(); + public ConcurrentHashMap seqHash = new ConcurrentHashMap(); + public LinkedBlockingQueue replyQueue = new LinkedBlockingQueue(); public abstract void put(String key, Value val); public abstract void update(String key, Value val); diff -r 80375ae09a1f -r c78a1cc2cd8f src/alice/datasegment/LocalDataSegmentManager.java --- a/src/alice/datasegment/LocalDataSegmentManager.java Wed Jan 11 23:28:02 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Thu Jan 12 13:19:04 2012 +0900 @@ -5,11 +5,32 @@ import org.msgpack.type.Value; import alice.codesegment.CodeSegment; +import alice.codesegment.Reply; import alice.datasegment.CommandType; public class LocalDataSegmentManager extends DataSegmentManager { - private AtomicInteger seq = new AtomicInteger(1); + private AtomicInteger seq = new AtomicInteger(1); + private Runnable replyThread = new Runnable() { + + @Override + public void run() { + while (true) { + try { + Reply reply = replyQueue.take(); + + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + + } + + }; + public LocalDataSegmentManager() { + new Thread(replyThread).start(); + } private DataSegmentKey getDataSegmentKey(String key) { DataSegmentKey newDataSegmentKey = new DataSegmentKey(); @@ -23,33 +44,37 @@ @Override public void put(String key, Value val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.PUT, val, 0, null, 0)); + dataSegmentKey.addCommand(new Command(CommandType.PUT, val, 0, 0, this)); } @Override public void update(String key, Value val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.UPDATE, val, 0, null, 0)); + dataSegmentKey.addCommand(new Command(CommandType.UPDATE, val, 0, 0, this)); } @Override public void take(String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); - dataSegmentKey.addCommand(new Command(CommandType.TAKE, null, index, cs, seq)); + Command cmd = new Command(CommandType.TAKE, null, index, seq, this); + seqHash.put(seq, cmd); + dataSegmentKey.addCommand(cmd); } @Override public void peek(String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); - dataSegmentKey.addCommand(new Command(CommandType.PEEK, null, index, cs, seq)); + Command cmd = new Command(CommandType.PEEK, null, index, seq, this); + seqHash.put(seq, cmd); + dataSegmentKey.addCommand(cmd); } @Override public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, 0, null, 0)); + dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, 0, 0, this)); } }