Mercurial > hg > Members > tatsuki > Alice
changeset 190:a85ff8dc16c1 working
add Object data
author | one |
---|---|
date | Thu, 07 Mar 2013 21:27:00 +0900 |
parents | d2f5c885a367 |
children | 60051454e443 |
files | src/alice/codesegment/InputDataSegment.java src/alice/codesegment/OutputDataSegment.java src/alice/datasegment/Command.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/DataSegmentValue.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/Receiver.java src/alice/test/codesegment/local/StartCodeSegment.java src/alice/test/codesegment/local/TestCodeSegment.java |
diffstat | 10 files changed, 83 insertions(+), 16 deletions(-) [+] |
line wrap: on
line diff
--- a/src/alice/codesegment/InputDataSegment.java Thu Mar 07 12:31:05 2013 +0900 +++ b/src/alice/codesegment/InputDataSegment.java Thu Mar 07 21:27:00 2013 +0900 @@ -59,6 +59,7 @@ receiver.index = val.index; receiver.val = val.val; receiver.from = val.from; + receiver.obj = val.obj; receive(); }
--- a/src/alice/codesegment/OutputDataSegment.java Thu Mar 07 12:31:05 2013 +0900 +++ b/src/alice/codesegment/OutputDataSegment.java Thu Mar 07 21:27:00 2013 +0900 @@ -31,11 +31,7 @@ } public <T> void put(String key, T val) { - try { - DataSegment.getLocal().put(key, SingletonMessage.getInstance().unconvert(val)); - } catch (IOException e) { - e.printStackTrace(); - } + DataSegment.getLocal().put(key, val); } public void update(String key, Value val) {
--- a/src/alice/datasegment/Command.java Thu Mar 07 12:31:05 2013 +0900 +++ b/src/alice/datasegment/Command.java Thu Mar 07 21:27:00 2013 +0900 @@ -16,6 +16,7 @@ public BlockingQueue<Command> replyQueue; public CodeSegment cs; public String reverseKey; + public Object obj; public Command(CommandType cmdType, Receiver receiver, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { this.type = cmdType; @@ -28,6 +29,30 @@ this.cs = cs; this.reverseKey = reverseKey; } + public Command(CommandType cmdType, Receiver receiver, String key, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { + this.type = cmdType; + this.receiver = receiver; + this.key = key; + this.obj = obj; + this.index = index; + this.seq = seq; + this.replyQueue = replyQueue; + this.cs = cs; + this.reverseKey = reverseKey; + } + + public Command(CommandType cmdType, Receiver receiver, String key, Value val, Object obj, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey) { + this.type = cmdType; + this.receiver = receiver; + this.key = key; + this.val = val; + this.index = index; + this.seq = seq; + this.replyQueue = replyQueue; + this.cs = cs; + this.reverseKey = reverseKey; + } + public String getCommandString() { String csName = "null";
--- a/src/alice/datasegment/DataSegmentKey.java Thu Mar 07 12:31:05 2013 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Thu Mar 07 21:27:00 2013 +0900 @@ -33,14 +33,14 @@ } case PUT: int index = tailIndex.getAndIncrement(); - DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey); + DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.obj,cmd.reverseKey); dataList.add(dsv); // Process waiting peek and take commands for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) { Command waitCmd = iter.next(); if (waitCmd.index < index) { try { - waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey)); + waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, cmd.obj, index, waitCmd.seq, null, null, cmd.reverseKey)); } catch (InterruptedException e) { e.printStackTrace(); } @@ -61,7 +61,7 @@ for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { try { - cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from)); + cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); } catch (InterruptedException e) { e.printStackTrace(); } @@ -82,7 +82,7 @@ DataSegmentValue data = iter.next(); if (data.index > cmd.index) { try { - cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from)); + cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.obj, data.index, cmd.seq, null, null, data.from)); } catch (InterruptedException e) { e.printStackTrace(); } @@ -96,7 +96,7 @@ break; case FLIP: index = tailIndex.getAndIncrement(); - dataList.get(0).setDataSegmentValue(index, cmd.val, cmd.reverseKey); + dataList.get(0).setDataSegmentValue(index, cmd.val, cmd.obj, cmd.reverseKey); // need to check waitList break; case REMOVE:
--- a/src/alice/datasegment/DataSegmentManager.java Thu Mar 07 12:31:05 2013 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Thu Mar 07 21:27:00 2013 +0900 @@ -28,7 +28,7 @@ continue; } seqHash.remove(reply.seq); - cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.reverseKey)); + cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.obj, reply.reverseKey)); if (logger.isDebugEnabled()) logger.debug(reply.getCommandString() + " " + cmd.getCommandString()); } catch (InterruptedException e) {
--- a/src/alice/datasegment/DataSegmentValue.java Thu Mar 07 12:31:05 2013 +0900 +++ b/src/alice/datasegment/DataSegmentValue.java Thu Mar 07 21:27:00 2013 +0900 @@ -7,16 +7,25 @@ public int index; public Value val; public String from; + public Object obj; - public DataSegmentValue(int index, Value val, String reverseKey) { + public DataSegmentValue(int index, Value val, Object obj,String reverseKey) { + this.index = index; + this.val = val; + this.from = reverseKey; + this.obj = obj; + } + + public DataSegmentValue(int index, Value val,String reverseKey) { this.index = index; this.val = val; this.from = reverseKey; } - - public void setDataSegmentValue(int index, Value val, String reverseKey){ + + public void setDataSegmentValue(int index, Value val, Object obj,String reverseKey){ this.index = index; this.val = val; + this.obj = obj; this.from = reverseKey; }
--- a/src/alice/datasegment/LocalDataSegmentManager.java Thu Mar 07 12:31:05 2013 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Thu Mar 07 21:27:00 2013 +0900 @@ -68,6 +68,14 @@ logger.debug(cmd.getCommandString()); } + public void put(String key, Object obj) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(key); + Command cmd = new Command(CommandType.PUT, null, key, obj, 0, 0, replyQueue, null, reverseKey); + addCommand(dataSegmentKey, cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + /** * Enqueue update command to the queue of each DataSegment key */ @@ -79,6 +87,14 @@ if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } + + public void update(String key, Object val) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(key); + Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, replyQueue, null, reverseKey); + addCommand(dataSegmentKey, cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } @Override public void take(Receiver receiver, String key, int index, CodeSegment cs) { @@ -128,4 +144,12 @@ logger.debug(cmd.getCommandString()); } + public void flip(String key, Object val) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(key); + Command cmd = new Command(CommandType.FLIP, null, key, val, 0, 0, replyQueue, null, reverseKey); + addCommand(dataSegmentKey, cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + }
--- a/src/alice/datasegment/Receiver.java Thu Mar 07 12:31:05 2013 +0900 +++ b/src/alice/datasegment/Receiver.java Thu Mar 07 21:27:00 2013 +0900 @@ -19,6 +19,7 @@ public InputDataSegment ids; public int index; public Value val; + public Object obj; public String from; public CommandType type;
--- a/src/alice/test/codesegment/local/StartCodeSegment.java Thu Mar 07 12:31:05 2013 +0900 +++ b/src/alice/test/codesegment/local/StartCodeSegment.java Thu Mar 07 21:27:00 2013 +0900 @@ -1,6 +1,9 @@ package alice.test.codesegment.local; +import java.util.Random; + import alice.codesegment.CodeSegment; +import alice.test.codesegment.local.bitonicsort.DataList; public class StartCodeSegment extends CodeSegment { @@ -17,8 +20,14 @@ ods.update("local", "key1", 0); // bind string data to datasegment local.key1 // this startup TestCodeSegment. */ + DataList list = new DataList(); + int size = 10; + for (int i = 0; i < size; i++){ + Random rnd = new Random(); + list.table.add(rnd.nextInt(100000)); + } t = System.currentTimeMillis(); - ods.put("local", "key1", 0); + ods.put("key1", list); //ods.put("local", "key1", 1); //ods.put("local", "key1", 2); //ods.put("local", "key1", 3);
--- a/src/alice/test/codesegment/local/TestCodeSegment.java Thu Mar 07 12:31:05 2013 +0900 +++ b/src/alice/test/codesegment/local/TestCodeSegment.java Thu Mar 07 21:27:00 2013 +0900 @@ -19,10 +19,12 @@ @Override public void run() { - if(count > 10000){ + if(count > 1){ System.out.println(System.currentTimeMillis() - StartCodeSegment.t); System.exit(1); } + //System.out.println(arg1.obj); + //System.out.println(arg1.val); //ods.update("key1",arg1.asInteger()+1); new TestCodeSegment();