Mercurial > hg > Members > tatsuki > Alice
changeset 18:72dd27d952b0
change InputDataSegment API
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Sun, 15 Jan 2012 16:03:11 +0900 |
parents | bb075e103cd3 |
children | e7867328a2fb |
files | src/alice/codesegment/InputDataSegment.java src/alice/daemon/AcceptThread.java src/alice/datasegment/Command.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/DataSegmentReceiver.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/RemoteDataSegmentManager.java src/alice/test/codesegment/RemoteIncrement.java src/alice/test/codesegment/RemoteStartCodeSegment.java src/alice/test/codesegment/StartCodeSegment.java src/alice/test/codesegment/TestCodeSegment.java |
diffstat | 12 files changed, 81 insertions(+), 55 deletions(-) [+] |
line wrap: on
line diff
--- a/src/alice/codesegment/InputDataSegment.java Sun Jan 15 15:18:01 2012 +0900 +++ b/src/alice/codesegment/InputDataSegment.java Sun Jan 15 16:03:11 2012 +0900 @@ -1,14 +1,13 @@ package alice.codesegment; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import alice.datasegment.DataSegment; +import alice.datasegment.DataSegmentReceiver; import alice.datasegment.DataSegmentValue; public class InputDataSegment { - private ConcurrentHashMap<String, DataSegmentValue> inputDataSegments = new ConcurrentHashMap<String, DataSegmentValue>(); private CodeSegment cs; private AtomicInteger count = new AtomicInteger(1); // for execute() @@ -16,26 +15,27 @@ this.cs = cs; } - public void peek(String argKey, String managerKey, String key) { - peek(argKey, managerKey, key, 0); + public void peek(DataSegmentReceiver receiver, String managerKey, String key) { + peek(receiver, managerKey, key, 0); } - public void peek(String argKey, String managerKey, String key, int index) { - DataSegment.get(managerKey).peek(argKey, key, index, cs); + public void peek(DataSegmentReceiver receiver, String managerKey, String key, int index) { + DataSegment.get(managerKey).peek(receiver, key, index, cs); count.getAndIncrement(); } - public void take(String argKey,String managerKey, String key) { - take(argKey, managerKey, key, 0); + public void take(DataSegmentReceiver receiver, String managerKey, String key) { + take(receiver, managerKey, key, 0); } - public void take(String argKey, String managerKey, String key, int index) { - DataSegment.get(managerKey).take(argKey, key, index, cs); + public void take(DataSegmentReceiver receiver, String managerKey, String key, int index) { + DataSegment.get(managerKey).take(receiver, key, index, cs); count.getAndIncrement(); } - public void reply(String key, DataSegmentValue val) { - inputDataSegments.put(key, val); + public void reply(DataSegmentReceiver receiver, DataSegmentValue val) { + receiver.index = val.index; + receiver.val = val.val; execute(); } @@ -48,8 +48,5 @@ } } } - - public DataSegmentValue get(String argKey) { - return inputDataSegments.get(argKey); - } + }
--- a/src/alice/daemon/AcceptThread.java Sun Jan 15 15:18:01 2012 +0900 +++ b/src/alice/daemon/AcceptThread.java Sun Jan 15 16:03:11 2012 +0900 @@ -4,7 +4,6 @@ import java.net.ServerSocket; import java.net.Socket; -import org.apache.log4j.Level; import org.apache.log4j.Logger; import alice.datasegment.DataSegment;
--- a/src/alice/datasegment/Command.java Sun Jan 15 15:18:01 2012 +0900 +++ b/src/alice/datasegment/Command.java Sun Jan 15 16:03:11 2012 +0900 @@ -9,16 +9,16 @@ public class Command { public CommandType type; public String key; - public String argKey; + public DataSegmentReceiver receiver; public Value val; public int index; public int seq; public BlockingQueue<Command> replyQueue; public CodeSegment cs; - public Command(CommandType cmdType, String argKey, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) { + public Command(CommandType cmdType, DataSegmentReceiver receiver, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) { this.type = cmdType; - this.argKey = argKey; + this.receiver = receiver; this.key = key; this.val = val; this.index = index;
--- a/src/alice/datasegment/DataSegmentKey.java Sun Jan 15 15:18:01 2012 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Sun Jan 15 16:03:11 2012 +0900 @@ -2,7 +2,6 @@ import java.util.ArrayList; import java.util.Iterator; -import java.util.LinkedList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger;
--- a/src/alice/datasegment/DataSegmentManager.java Sun Jan 15 15:18:01 2012 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Sun Jan 15 16:03:11 2012 +0900 @@ -22,7 +22,7 @@ try { Command reply = replyQueue.take(); Command cmd = seqHash.get(reply.seq); - cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val)); + cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val)); } catch (InterruptedException e) { e.printStackTrace(); } @@ -33,14 +33,14 @@ public abstract void put(String key, Value val); public abstract void update(String key, Value val); - public void take(String argKey, String key, CodeSegment cs) { - take(argKey, key, 0, cs); + public void take(DataSegmentReceiver receiver, String key, CodeSegment cs) { + take(receiver, key, 0, cs); } - public abstract void take(String argKey, String key, int index, CodeSegment cs); - public void peek(String argKey, String key, CodeSegment cs) { - peek(argKey, key, 0, cs); + public abstract void take(DataSegmentReceiver receiver, String key, int index, CodeSegment cs); + public void peek(DataSegmentReceiver receiver, String key, CodeSegment cs) { + peek(receiver, key, 0, cs); } - public abstract void peek(String argKey, String key, int index, CodeSegment cs); + public abstract void peek(DataSegmentReceiver receiver, String key, int index, CodeSegment cs); public abstract void remove(String key); }
--- a/src/alice/datasegment/DataSegmentReceiver.java Sun Jan 15 15:18:01 2012 +0900 +++ b/src/alice/datasegment/DataSegmentReceiver.java Sun Jan 15 16:03:11 2012 +0900 @@ -1,5 +1,34 @@ package alice.datasegment; +import org.msgpack.type.Value; + +import alice.codesegment.InputDataSegment; + public class DataSegmentReceiver { + public InputDataSegment ids; + public int index; + public Value val; + public CommandType type; + + + public DataSegmentReceiver(InputDataSegment ids, CommandType type) { + this.ids = ids; + this.type = type; + } + + public void setKey(String managerKey, String key) { + setKey(managerKey, key, 0); + } + public void setKey(String managerKey, String key, int index) { + switch (type) { + case PEEK: + ids.peek(this, managerKey, key, index); + break; + case TAKE: + ids.take(this, managerKey, key, index); + break; + } + } + }
--- a/src/alice/datasegment/LocalDataSegmentManager.java Sun Jan 15 15:18:01 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Sun Jan 15 16:03:11 2012 +0900 @@ -37,19 +37,19 @@ } @Override - public void take(String argKey, String key, int index, CodeSegment cs) { + public void take(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.TAKE, argKey, null, null, index, seq, replyQueue, cs); + Command cmd = new Command(CommandType.TAKE, receiver, null, null, index, seq, replyQueue, cs); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); } @Override - public void peek(String argKey, String key, int index, CodeSegment cs) { + public void peek(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.PEEK, argKey, null, null, index, seq, replyQueue, cs); + Command cmd = new Command(CommandType.PEEK, receiver, null, null, index, seq, replyQueue, cs); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); }
--- a/src/alice/datasegment/RemoteDataSegmentManager.java Sun Jan 15 15:18:01 2012 +0900 +++ b/src/alice/datasegment/RemoteDataSegmentManager.java Sun Jan 15 16:03:11 2012 +0900 @@ -29,17 +29,17 @@ } @Override - public void take(String argKey, String key, int index, CodeSegment cs) { + public void take(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) { int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.TAKE, argKey, key, null, index, seq, replyQueue, cs); + Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs); seqHash.put(seq, cmd); connection.sendCommand(cmd); } @Override - public void peek(String argKey, String key, int index, CodeSegment cs) { + public void peek(DataSegmentReceiver receiver, String key, int index, CodeSegment cs) { int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.PEEK, argKey, key, null, index, seq, replyQueue, cs); + Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs); seqHash.put(seq, cmd); connection.sendCommand(cmd); }
--- a/src/alice/test/codesegment/RemoteIncrement.java Sun Jan 15 15:18:01 2012 +0900 +++ b/src/alice/test/codesegment/RemoteIncrement.java Sun Jan 15 16:03:11 2012 +0900 @@ -3,19 +3,21 @@ import org.msgpack.type.ValueFactory; import alice.codesegment.CodeSegment; -import alice.datasegment.DataSegmentValue; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegmentReceiver; public class RemoteIncrement extends CodeSegment { + public DataSegmentReceiver num = new DataSegmentReceiver(ids, CommandType.TAKE); + @Override public void run() { - DataSegmentValue data = ids.get("num"); - int num = data.val.asIntegerValue().getInt(); + int num = this.num.val.asIntegerValue().getInt(); System.out.println("[CodeSegment] " + num++); if (num == 10) System.exit(0); - CodeSegment cs = new RemoteIncrement(); - cs.ids.take("num", "remote", "num"); + RemoteIncrement cs = new RemoteIncrement(); + cs.num.setKey("remote", "num"); cs.ids.execute(); ods.put("local", "num", ValueFactory.createIntegerValue(num));
--- a/src/alice/test/codesegment/RemoteStartCodeSegment.java Sun Jan 15 15:18:01 2012 +0900 +++ b/src/alice/test/codesegment/RemoteStartCodeSegment.java Sun Jan 15 16:03:11 2012 +0900 @@ -6,11 +6,11 @@ import alice.codesegment.CodeSegment; public class RemoteStartCodeSegment extends CodeSegment { - + @Override public void run() { - CodeSegment cs = new RemoteIncrement(); - cs.ids.take("num", "remote", "num"); + RemoteIncrement cs = new RemoteIncrement(); + cs.num.setKey("remote", "num"); cs.ids.execute(); Value num = ValueFactory.createIntegerValue(0);
--- a/src/alice/test/codesegment/StartCodeSegment.java Sun Jan 15 15:18:01 2012 +0900 +++ b/src/alice/test/codesegment/StartCodeSegment.java Sun Jan 15 16:03:11 2012 +0900 @@ -11,8 +11,8 @@ public void run() { System.out.println("run StartCodeSegment"); - CodeSegment cs = new TestCodeSegment(); - cs.ids.peek("arg1", "local", "key1"); + TestCodeSegment cs = new TestCodeSegment(); + cs.arg1.setKey("local", "key1"); cs.ids.execute(); System.out.println("create TestCodeSegment");
--- a/src/alice/test/codesegment/TestCodeSegment.java Sun Jan 15 15:18:01 2012 +0900 +++ b/src/alice/test/codesegment/TestCodeSegment.java Sun Jan 15 16:03:11 2012 +0900 @@ -4,26 +4,26 @@ import org.msgpack.type.ValueFactory; import alice.codesegment.CodeSegment; -import alice.datasegment.DataSegmentValue; +import alice.datasegment.CommandType; +import alice.datasegment.DataSegmentReceiver; public class TestCodeSegment extends CodeSegment { - DataSegmentValue arg1; + DataSegmentReceiver arg1 = new DataSegmentReceiver(ids, CommandType.PEEK); @Override public void run() { - DataSegmentValue data = ids.get("arg1"); - System.out.println("index = " + data.index); - System.out.println("data = " + data.val); - System.out.println(data.val.getType()); + System.out.println("index = " + arg1.index); + System.out.println("data = " + arg1.val); + System.out.println(arg1.val.getType()); - if (data.index == 10) { + if (arg1.index == 10) { System.exit(0); return; } - CodeSegment cs = new TestCodeSegment(); - cs.ids.peek("arg1", "local", "key1", data.index); + TestCodeSegment cs = new TestCodeSegment(); + cs.arg1.setKey("local", "key1", arg1.index); cs.ids.execute(); Value val = ValueFactory.createRawValue("String data");