# HG changeset patch # User one # Date 1326343714 -32400 # Node ID 352eb19d837d6bdc65b8c2cd7556aa916f2aea53 # Parent c78a1cc2cd8f1c7158842926b148559502797eb2 implements reply of LocalDataSegment diff -r c78a1cc2cd8f -r 352eb19d837d src/alice/codesegment/CodeSegmentManager.java --- a/src/alice/codesegment/CodeSegmentManager.java Thu Jan 12 13:19:04 2012 +0900 +++ b/src/alice/codesegment/CodeSegmentManager.java Thu Jan 12 13:48:34 2012 +0900 @@ -3,6 +3,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import alice.datasegment.Reply; + public class CodeSegmentManager { private static CodeSegmentManager instance = new CodeSegmentManager(); private LinkedBlockingQueue replyQueue = new LinkedBlockingQueue(); diff -r c78a1cc2cd8f -r 352eb19d837d src/alice/codesegment/InputDataSegment.java --- a/src/alice/codesegment/InputDataSegment.java Thu Jan 12 13:19:04 2012 +0900 +++ b/src/alice/codesegment/InputDataSegment.java Thu Jan 12 13:48:34 2012 +0900 @@ -16,22 +16,25 @@ this.cs = cs; } - public void peek(String managerKey, String key) { - peek(managerKey, key, 0); + public void peek(String argKey, String managerKey, String key) { + peek(argKey, managerKey, key, 0); } - public void peek(String managerKey, String key, int index) { - DataSegment.get(managerKey).peek(key, index, cs); + public void peek(String argKey, String managerKey, String key, int index) { + DataSegment.get(managerKey).peek(argKey, key, index, cs); count.getAndIncrement(); } - public void take(String managerKey, String key) { - take(managerKey, key, 0); + public void take(String argKey,String managerKey, String key) { + take(argKey, managerKey, key, 0); } - public void take(String managerKey, String key, int index) { - DataSegment.get(managerKey).take(key, index, cs); + public void take(String argKey, String managerKey, String key, int index) { + DataSegment.get(managerKey).take(argKey, key, index, cs); count.getAndIncrement(); } - + + public void reply(String key, DataSegmentValue val) { + inputDataSegments.put(key, val); + } } diff -r c78a1cc2cd8f -r 352eb19d837d src/alice/codesegment/Reply.java --- a/src/alice/codesegment/Reply.java Thu Jan 12 13:19:04 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,15 +0,0 @@ -package alice.codesegment; - -import org.msgpack.type.Value; - -public class Reply { - int seq; - int index; - Value val; - - public Reply(int seq, int index, Value val) { - this.seq = seq; - this.index = index; - this.val = val; - } -} diff -r c78a1cc2cd8f -r 352eb19d837d src/alice/datasegment/Command.java --- a/src/alice/datasegment/Command.java Thu Jan 12 13:19:04 2012 +0900 +++ b/src/alice/datasegment/Command.java Thu Jan 12 13:48:34 2012 +0900 @@ -2,19 +2,25 @@ import org.msgpack.type.Value; +import alice.codesegment.CodeSegment; + public class Command { public CommandType cmdType; + public String argKey; public Value val; public int index; public int seq; public DataSegmentManager manager; - - public Command(CommandType cmdType, Value val, int index, int seq, DataSegmentManager manager) { + public CodeSegment cs; + + public Command(CommandType cmdType, String argKey, Value val, int index, int seq, DataSegmentManager manager, CodeSegment cs) { this.cmdType = cmdType; + this.argKey = argKey; this.val = val; this.index = index; this.seq = seq; this.manager = manager; + this.cs = cs; } } diff -r c78a1cc2cd8f -r 352eb19d837d src/alice/datasegment/DataSegmentKey.java --- a/src/alice/datasegment/DataSegmentKey.java Thu Jan 12 13:19:04 2012 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Thu Jan 12 13:48:34 2012 +0900 @@ -4,7 +4,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; -import alice.codesegment.Reply; import alice.datasegment.Command; public class DataSegmentKey { diff -r c78a1cc2cd8f -r 352eb19d837d src/alice/datasegment/DataSegmentManager.java --- a/src/alice/datasegment/DataSegmentManager.java Thu Jan 12 13:19:04 2012 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Thu Jan 12 13:48:34 2012 +0900 @@ -6,7 +6,6 @@ import org.msgpack.type.Value; import alice.codesegment.CodeSegment; -import alice.codesegment.Reply; public abstract class DataSegmentManager { public ConcurrentHashMap dataSegments = new ConcurrentHashMap(); @@ -15,14 +14,14 @@ public abstract void put(String key, Value val); public abstract void update(String key, Value val); - public void take(String key, CodeSegment cs) { - take(key, 0, cs); + public void take(String argKey, String key, CodeSegment cs) { + take(argKey, key, 0, cs); } - public abstract void take(String key, int index, CodeSegment cs); - public void peek(String key, CodeSegment cs) { - peek(key, 0, cs); + public abstract void take(String argKey, String key, int index, CodeSegment cs); + public void peek(String argKey, String key, CodeSegment cs) { + peek(argKey, key, 0, cs); } - public abstract void peek(String key, int index, CodeSegment cs); + public abstract void peek(String argKey, String key, int index, CodeSegment cs); public abstract void remove(String key); } diff -r c78a1cc2cd8f -r 352eb19d837d src/alice/datasegment/LocalDataSegmentManager.java --- a/src/alice/datasegment/LocalDataSegmentManager.java Thu Jan 12 13:19:04 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Thu Jan 12 13:48:34 2012 +0900 @@ -5,7 +5,6 @@ import org.msgpack.type.Value; import alice.codesegment.CodeSegment; -import alice.codesegment.Reply; import alice.datasegment.CommandType; public class LocalDataSegmentManager extends DataSegmentManager { @@ -18,13 +17,12 @@ while (true) { try { Reply reply = replyQueue.take(); - + Command cmd = seqHash.get(reply.seq); + cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val)); } catch (InterruptedException e) { e.printStackTrace(); } - } - } }; @@ -44,29 +42,29 @@ @Override public void put(String key, Value val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.PUT, val, 0, 0, this)); + dataSegmentKey.addCommand(new Command(CommandType.PUT, null, val, 0, 0, this, null)); } @Override public void update(String key, Value val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.UPDATE, val, 0, 0, this)); + dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, val, 0, 0, this, null)); } @Override - public void take(String key, int index, CodeSegment cs) { + public void take(String argKey, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.TAKE, null, index, seq, this); + Command cmd = new Command(CommandType.TAKE, argKey, null, index, seq, this, cs); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); } @Override - public void peek(String key, int index, CodeSegment cs) { + public void peek(String argKey, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.PEEK, null, index, seq, this); + Command cmd = new Command(CommandType.PEEK, argKey, null, index, seq, this, cs); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); } @@ -74,7 +72,7 @@ @Override public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, 0, 0, this)); + dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, 0, 0, this, null)); } } diff -r c78a1cc2cd8f -r 352eb19d837d src/alice/datasegment/Reply.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/datasegment/Reply.java Thu Jan 12 13:48:34 2012 +0900 @@ -0,0 +1,15 @@ +package alice.datasegment; + +import org.msgpack.type.Value; + +public class Reply { + int seq; + int index; + Value val; + + public Reply(int seq, int index, Value val) { + this.seq = seq; + this.index = index; + this.val = val; + } +} diff -r c78a1cc2cd8f -r 352eb19d837d src/alice/test/codesegment/TestCodeSegment.java --- a/src/alice/test/codesegment/TestCodeSegment.java Thu Jan 12 13:19:04 2012 +0900 +++ b/src/alice/test/codesegment/TestCodeSegment.java Thu Jan 12 13:48:34 2012 +0900 @@ -10,7 +10,7 @@ @Override public void prepare() { - ids.peek("local", "key1"); + ids.peek("arg1", "local", "key1"); } @Override