Mercurial > hg > Members > tatsuki > Alice
changeset 198:f151dea22b2c working
add flip api
author | sugi |
---|---|
date | Tue, 19 Mar 2013 01:25:09 +0900 (2013-03-18) |
parents | 2b28d3c16a58 |
children | 15b68b65f8a4 |
files | .settings/org.eclipse.core.resources.prefs src/alice/codesegment/InputDataSegment.java src/alice/datasegment/Command.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/DataSegmentValue.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/Receiver.java src/alice/test/codesegment/api/FlipCodeSegment.java src/alice/test/codesegment/api/FlipTest.java src/alice/test/codesegment/api/StartCodeSegment.java src/alice/test/codesegment/api/TakeCodeSegment.java src/alice/test/codesegment/api/TestApiAlice.java |
diffstat | 13 files changed, 150 insertions(+), 50 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.settings/org.eclipse.core.resources.prefs Tue Mar 19 01:25:09 2013 +0900 @@ -0,0 +1,2 @@ +eclipse.preferences.version=1 +encoding//src/alice/test/codesegment/local/bitonicsort/SortTest.java=UTF-8
--- a/src/alice/codesegment/InputDataSegment.java Fri Mar 15 17:25:25 2013 +0900 +++ b/src/alice/codesegment/InputDataSegment.java Tue Mar 19 01:25:09 2013 +0900 @@ -2,6 +2,8 @@ import java.util.concurrent.atomic.AtomicInteger; +import org.msgpack.type.Value; + import alice.datasegment.CommandType; import alice.datasegment.DataSegment; import alice.datasegment.DataSegmentValue; @@ -18,6 +20,8 @@ public CodeSegment cs; private AtomicInteger count = new AtomicInteger(1); // 1 for no input data segments private AtomicInteger keyCount = new AtomicInteger(0); // number of DataSegments + + private DataSegmentValue dsv; public InputDataSegment(CodeSegment cs) { this.cs = cs; @@ -54,12 +58,24 @@ public void take(Receiver receiver, String key, int index) { DataSegment.getLocal().take(receiver, key, index, cs); } + + public void flip(Receiver receiver, String key){ + + } + + public void flip(Receiver receiver, Value val, Object obj){ + //int index = DataSegment.getLocal().getDataSegmentKey(receiver.key).getIndex(); + //dsv.setValue(index, val, obj); + DataSegment.getLocal().flip(receiver.key, val, obj, dsv); + + } public void reply(Receiver receiver, DataSegmentValue val) { receiver.index = val.index; receiver.val = val.val; receiver.from = val.from; receiver.obj = val.obj; + setDataSegmentValue(val); receive(); } @@ -88,5 +104,9 @@ public Receiver create(CommandType type) { return new Receiver(this, type); } + + private void setDataSegmentValue(DataSegmentValue dsv){ + this.dsv = dsv; + } }
--- a/src/alice/datasegment/Command.java Fri Mar 15 17:25:25 2013 +0900 +++ b/src/alice/datasegment/Command.java Tue Mar 19 01:25:09 2013 +0900 @@ -1,7 +1,6 @@ package alice.datasegment; import java.util.concurrent.BlockingQueue; - import org.msgpack.type.Value; import alice.codesegment.CodeSegment; @@ -20,9 +19,9 @@ public DataSegmentValue dsv; public Command(CommandType cmdType, int seq, DataSegmentValue dsv){ - this.type = cmdType; - this.seq = seq; - this.dsv = dsv; + this.type=cmdType; + this.seq=seq; + this.dsv=dsv; } public Command(CommandType cmdType, Receiver receiver, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { @@ -62,6 +61,8 @@ } + + public String getCommandString() { String csName = "null"; if (cs != null) {
--- a/src/alice/datasegment/DataSegmentKey.java Fri Mar 15 17:25:25 2013 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Tue Mar 19 01:25:09 2013 +0900 @@ -17,8 +17,8 @@ private ArrayList<Command> waitList = new ArrayList<Command>(); private AtomicInteger tailIndex = new AtomicInteger(1); - public AtomicInteger getTailIndex(){ - return tailIndex; + public int getIndex(){ + return tailIndex.getAndIncrement(); } public ArrayList<DataSegmentValue> getDataList(){ @@ -40,7 +40,8 @@ Command waitCmd = iter.next(); if (waitCmd.index < index) { try { - waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, cmd.obj, index, waitCmd.seq, null, null, cmd.reverseKey)); + //waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, cmd.obj, index, waitCmd.seq, null, null, cmd.reverseKey)); + waitCmd.replyQueue.put(new Command(CommandType.REPLY, waitCmd.seq, dsv)); } catch (InterruptedException e) { e.printStackTrace(); } @@ -61,7 +62,8 @@ for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { try { - cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); + //cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); + cmd.replyQueue.put(new Command(CommandType.REPLY, cmd.seq, data)); } catch (InterruptedException e) { e.printStackTrace(); } @@ -82,7 +84,8 @@ DataSegmentValue data = iter.next(); if (data.index > cmd.index) { try { - cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); + //cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); + cmd.replyQueue.put(new Command(CommandType.REPLY, cmd.seq, data)); } catch (InterruptedException e) { e.printStackTrace(); } @@ -95,9 +98,24 @@ waitList.add(cmd); break; case FLIP: - index = tailIndex.getAndIncrement(); - dataList.set(0, new DataSegmentValue(index, cmd.val, cmd.obj, cmd.reverseKey)); - // need to check waitList + index = cmd.dsv.index; + // need to check waitList + for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) { + Command waitCmd = iter.next(); + if (waitCmd.index < index) { + try { + //waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, cmd.obj, index, waitCmd.seq, null, null, cmd.reverseKey)); + waitCmd.replyQueue.put(new Command(CommandType.REPLY, waitCmd.seq, cmd.dsv)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + iter.remove(); + if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command + dataList.remove(cmd.dsv); + break; + } + } + } break; case REMOVE: // TODO: implements later
--- a/src/alice/datasegment/DataSegmentManager.java Fri Mar 15 17:25:25 2013 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Tue Mar 19 01:25:09 2013 +0900 @@ -28,7 +28,8 @@ continue; } seqHash.remove(reply.seq); - cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.obj, reply.reverseKey)); + //cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.obj, reply.reverseKey)); + cmd.cs.ids.reply(cmd.receiver, reply.dsv); if (logger.isDebugEnabled()) logger.debug(reply.getCommandString() + " " + cmd.getCommandString()); } catch (InterruptedException e) {
--- a/src/alice/datasegment/DataSegmentValue.java Fri Mar 15 17:25:25 2013 +0900 +++ b/src/alice/datasegment/DataSegmentValue.java Tue Mar 19 01:25:09 2013 +0900 @@ -21,12 +21,11 @@ this.val = val; this.from = reverseKey; } - - public void setDataSegmentValue(int index, Value val, Object obj, String reverseKey){ + + public synchronized void setValue(int index, Value val, Object obj){ this.index = index; this.val = val; this.obj = obj; - this.from = reverseKey; } - + }
--- a/src/alice/datasegment/LocalDataSegmentManager.java Fri Mar 15 17:25:25 2013 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Tue Mar 19 01:25:09 2013 +0900 @@ -136,18 +136,12 @@ } - public void flip(String key, Value val) { + public void flip(String key ,Value val, Object obj, DataSegmentValue dsv) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - Command cmd = new Command(CommandType.FLIP, null, key, val, 0, 0, replyQueue, null, reverseKey); - addCommand(dataSegmentKey, cmd); - if (logger.isDebugEnabled()) - logger.debug(cmd.getCommandString()); - } - - public void flip(String key, Object val) { - DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - Command cmd = new Command(CommandType.FLIP, null, key, val, 0, 0, replyQueue, null, reverseKey); - addCommand(dataSegmentKey, cmd); + int index = dataSegmentKey.getIndex(); + dsv.setValue(index, val, obj); + Command cmd = new Command(CommandType.FLIP, 0, dsv); + //addCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); }
--- a/src/alice/datasegment/Receiver.java Fri Mar 15 17:25:25 2013 +0900 +++ b/src/alice/datasegment/Receiver.java Tue Mar 19 01:25:09 2013 +0900 @@ -4,7 +4,6 @@ import org.msgpack.type.ArrayValue; import org.msgpack.type.Value; -import org.msgpack.type.ValueFactory; import org.msgpack.type.ValueType; import alice.codesegment.InputDataSegment; @@ -32,26 +31,12 @@ ids.regist(); } - // for same key - public void flip(Value val){ - DataSegment.getLocal().flip(this.key, val); - } - - public void flip(int val){ - DataSegment.getLocal().flip(this.key, ValueFactory.createIntegerValue(val)); + ids.flip(this, val, null); } - public void flip(String val){ - DataSegment.getLocal().flip(this.key, ValueFactory.createRawValue(val)); - } - - public void flip(byte[] val){ - DataSegment.getLocal().flip(this.key, ValueFactory.createRawValue(val, true)); - } - - public <T> void flip(T val) { - DataSegment.getLocal().flip(key, val); + public void flip(Object obj) { + ids.flip(this, null, obj); } public void setKey(String managerKey, String key) {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/api/FlipCodeSegment.java Tue Mar 19 01:25:09 2013 +0900 @@ -0,0 +1,26 @@ +package alice.test.codesegment.api; + +import alice.codesegment.CodeSegment; + +public class FlipCodeSegment extends CodeSegment{ + + private String key; + public FlipCodeSegment(String _key){ + this.key = _key; + } + + @Override + public void run() { + Integer num = new Integer(0); + ods.put(key, num, false); + //System.out.println("Key is " +key); + new FlipTest(key); + //for (int i = 0; i < 1000000; i++){ + //ods.put(key, num, false); + //System.out.println("put"); + //} + + + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/api/FlipTest.java Tue Mar 19 01:25:09 2013 +0900 @@ -0,0 +1,48 @@ +package alice.test.codesegment.api; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.Receiver; + +public class FlipTest extends CodeSegment{ + + private Receiver arg1 = ids.create(CommandType.PEEK); + public static long t = 0; + public static boolean flag = false; + public static int count = 0; + + public FlipTest(String key){ + arg1.setKey(key); + } + + public FlipTest(String key, int index){ + arg1.setKey(key, index); + } + + @Override + public void run() { + if (flag){ + System.out.println(System.currentTimeMillis() - t); + System.out.println(arg1.obj); + if (count >= 100) System.exit(0); + flag = false; + count++; + new FlipCodeSegment(Long.toString(t)).execute(); + + } else { + t = System.currentTimeMillis(); + + for (int i = 0;i<1000000;i++){ + Integer num = i; + arg1.flip(num); + //System.out.println("flip"); + //ods.update(arg1.key, num, false); + } + + flag = true; + new FlipTest(arg1.key); + //new FlipTest(arg1.key ,1000000); + } + } + +}
--- a/src/alice/test/codesegment/api/StartCodeSegment.java Fri Mar 15 17:25:25 2013 +0900 +++ b/src/alice/test/codesegment/api/StartCodeSegment.java Tue Mar 19 01:25:09 2013 +0900 @@ -24,8 +24,10 @@ } else if ("-take".equals(args[i])){ new PutCodeSegment().execute(); - new TakeCodeSegment(); + new TakeCodeSegment("num"); + } else if ("-flip".equals(args[i])){ + new FlipCodeSegment("key").execute(); }
--- a/src/alice/test/codesegment/api/TakeCodeSegment.java Fri Mar 15 17:25:25 2013 +0900 +++ b/src/alice/test/codesegment/api/TakeCodeSegment.java Tue Mar 19 01:25:09 2013 +0900 @@ -8,12 +8,12 @@ Receiver ds1 = ids.create(CommandType.TAKE); - public TakeCodeSegment(){ - this.ds1.setKey("num"); + public TakeCodeSegment(String key){ + this.ds1.setKey(key); } @Override public void run() { - new TakeCodeSegment(); + new TakeCodeSegment(ds1.key); } }
--- a/src/alice/test/codesegment/api/TestApiAlice.java Fri Mar 15 17:25:25 2013 +0900 +++ b/src/alice/test/codesegment/api/TestApiAlice.java Tue Mar 19 01:25:09 2013 +0900 @@ -1,11 +1,15 @@ package alice.test.codesegment.api; +import alice.daemon.AliceDaemon; +import alice.daemon.Config; + public class TestApiAlice { /** * @param args */ public static void main(String[] args) { + new AliceDaemon(new Config(args)).listen(); new StartCodeSegment(args).execute(); }