Mercurial > hg > Database > Alice
changeset 7:352eb19d837d
implements reply of LocalDataSegment
author | one |
---|---|
date | Thu, 12 Jan 2012 13:48:34 +0900 |
parents | c78a1cc2cd8f |
children | 78b415d019de |
files | src/alice/codesegment/CodeSegmentManager.java src/alice/codesegment/InputDataSegment.java src/alice/codesegment/Reply.java src/alice/datasegment/Command.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/Reply.java src/alice/test/codesegment/TestCodeSegment.java |
diffstat | 9 files changed, 53 insertions(+), 46 deletions(-) [+] |
line wrap: on
line diff
--- a/src/alice/codesegment/CodeSegmentManager.java Thu Jan 12 13:19:04 2012 +0900 +++ b/src/alice/codesegment/CodeSegmentManager.java Thu Jan 12 13:48:34 2012 +0900 @@ -3,6 +3,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import alice.datasegment.Reply; + public class CodeSegmentManager { private static CodeSegmentManager instance = new CodeSegmentManager(); private LinkedBlockingQueue<Reply> replyQueue = new LinkedBlockingQueue<Reply>();
--- a/src/alice/codesegment/InputDataSegment.java Thu Jan 12 13:19:04 2012 +0900 +++ b/src/alice/codesegment/InputDataSegment.java Thu Jan 12 13:48:34 2012 +0900 @@ -16,22 +16,25 @@ this.cs = cs; } - public void peek(String managerKey, String key) { - peek(managerKey, key, 0); + public void peek(String argKey, String managerKey, String key) { + peek(argKey, managerKey, key, 0); } - public void peek(String managerKey, String key, int index) { - DataSegment.get(managerKey).peek(key, index, cs); + public void peek(String argKey, String managerKey, String key, int index) { + DataSegment.get(managerKey).peek(argKey, key, index, cs); count.getAndIncrement(); } - public void take(String managerKey, String key) { - take(managerKey, key, 0); + public void take(String argKey,String managerKey, String key) { + take(argKey, managerKey, key, 0); } - public void take(String managerKey, String key, int index) { - DataSegment.get(managerKey).take(key, index, cs); + public void take(String argKey, String managerKey, String key, int index) { + DataSegment.get(managerKey).take(argKey, key, index, cs); count.getAndIncrement(); } - + + public void reply(String key, DataSegmentValue val) { + inputDataSegments.put(key, val); + } }
--- a/src/alice/codesegment/Reply.java Thu Jan 12 13:19:04 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,15 +0,0 @@ -package alice.codesegment; - -import org.msgpack.type.Value; - -public class Reply { - int seq; - int index; - Value val; - - public Reply(int seq, int index, Value val) { - this.seq = seq; - this.index = index; - this.val = val; - } -}
--- a/src/alice/datasegment/Command.java Thu Jan 12 13:19:04 2012 +0900 +++ b/src/alice/datasegment/Command.java Thu Jan 12 13:48:34 2012 +0900 @@ -2,19 +2,25 @@ import org.msgpack.type.Value; +import alice.codesegment.CodeSegment; + public class Command { public CommandType cmdType; + public String argKey; public Value val; public int index; public int seq; public DataSegmentManager manager; - - public Command(CommandType cmdType, Value val, int index, int seq, DataSegmentManager manager) { + public CodeSegment cs; + + public Command(CommandType cmdType, String argKey, Value val, int index, int seq, DataSegmentManager manager, CodeSegment cs) { this.cmdType = cmdType; + this.argKey = argKey; this.val = val; this.index = index; this.seq = seq; this.manager = manager; + this.cs = cs; } }
--- a/src/alice/datasegment/DataSegmentKey.java Thu Jan 12 13:19:04 2012 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Thu Jan 12 13:48:34 2012 +0900 @@ -4,7 +4,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; -import alice.codesegment.Reply; import alice.datasegment.Command; public class DataSegmentKey {
--- a/src/alice/datasegment/DataSegmentManager.java Thu Jan 12 13:19:04 2012 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Thu Jan 12 13:48:34 2012 +0900 @@ -6,7 +6,6 @@ import org.msgpack.type.Value; import alice.codesegment.CodeSegment; -import alice.codesegment.Reply; public abstract class DataSegmentManager { public ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); @@ -15,14 +14,14 @@ public abstract void put(String key, Value val); public abstract void update(String key, Value val); - public void take(String key, CodeSegment cs) { - take(key, 0, cs); + public void take(String argKey, String key, CodeSegment cs) { + take(argKey, key, 0, cs); } - public abstract void take(String key, int index, CodeSegment cs); - public void peek(String key, CodeSegment cs) { - peek(key, 0, cs); + public abstract void take(String argKey, String key, int index, CodeSegment cs); + public void peek(String argKey, String key, CodeSegment cs) { + peek(argKey, key, 0, cs); } - public abstract void peek(String key, int index, CodeSegment cs); + public abstract void peek(String argKey, String key, int index, CodeSegment cs); public abstract void remove(String key); }
--- a/src/alice/datasegment/LocalDataSegmentManager.java Thu Jan 12 13:19:04 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Thu Jan 12 13:48:34 2012 +0900 @@ -5,7 +5,6 @@ import org.msgpack.type.Value; import alice.codesegment.CodeSegment; -import alice.codesegment.Reply; import alice.datasegment.CommandType; public class LocalDataSegmentManager extends DataSegmentManager { @@ -18,13 +17,12 @@ while (true) { try { Reply reply = replyQueue.take(); - + Command cmd = seqHash.get(reply.seq); + cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val)); } catch (InterruptedException e) { e.printStackTrace(); } - } - } }; @@ -44,29 +42,29 @@ @Override public void put(String key, Value val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.PUT, val, 0, 0, this)); + dataSegmentKey.addCommand(new Command(CommandType.PUT, null, val, 0, 0, this, null)); } @Override public void update(String key, Value val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.UPDATE, val, 0, 0, this)); + dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, val, 0, 0, this, null)); } @Override - public void take(String key, int index, CodeSegment cs) { + public void take(String argKey, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.TAKE, null, index, seq, this); + Command cmd = new Command(CommandType.TAKE, argKey, null, index, seq, this, cs); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); } @Override - public void peek(String key, int index, CodeSegment cs) { + public void peek(String argKey, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.PEEK, null, index, seq, this); + Command cmd = new Command(CommandType.PEEK, argKey, null, index, seq, this, cs); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); } @@ -74,7 +72,7 @@ @Override public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, 0, 0, this)); + dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, 0, 0, this, null)); } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/datasegment/Reply.java Thu Jan 12 13:48:34 2012 +0900 @@ -0,0 +1,15 @@ +package alice.datasegment; + +import org.msgpack.type.Value; + +public class Reply { + int seq; + int index; + Value val; + + public Reply(int seq, int index, Value val) { + this.seq = seq; + this.index = index; + this.val = val; + } +}