Mercurial > hg > Members > tatsuki > Alice
changeset 248:913ee9dfec4f
add recycle method
author | sugi |
---|---|
date | Wed, 29 May 2013 15:39:55 +0900 |
parents | 6e042fde5662 |
children | 2a8bcf09bd06 |
files | src/alice/codesegment/CodeSegment.java src/alice/codesegment/InputDataSegment.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/test/codesegment/local/TestCodeSegment.java src/alice/test/codesegment/local/wordcount/SeparateArray.java src/alice/test/codesegment/local/wordcount/WordCount.java |
diffstat | 6 files changed, 41 insertions(+), 13 deletions(-) [+] |
line wrap: on
line diff
--- a/src/alice/codesegment/CodeSegment.java Tue May 28 17:53:02 2013 +0900 +++ b/src/alice/codesegment/CodeSegment.java Wed May 29 15:39:55 2013 +0900 @@ -1,14 +1,27 @@ package alice.codesegment; +import java.util.ArrayList; +import java.util.Iterator; + import alice.codesegment.InputDataSegment; +import alice.datasegment.Receiver; public abstract class CodeSegment implements Runnable { public InputDataSegment ids = new InputDataSegment(this); public OutputDataSegment ods = new OutputDataSegment(); + public ArrayList<Receiver> list = new ArrayList<Receiver>(); public void execute() { ids.receive(); } + + public void recycle(){ + ids.setCounter(list.size()); + for (Iterator<Receiver> it = list.iterator();it.hasNext();){ + Receiver receiver = it.next(); + ids.recommand(receiver); + } + } }
--- a/src/alice/codesegment/InputDataSegment.java Tue May 28 17:53:02 2013 +0900 +++ b/src/alice/codesegment/InputDataSegment.java Wed May 29 15:39:55 2013 +0900 @@ -19,7 +19,6 @@ public CodeSegment cs; private AtomicInteger count = new AtomicInteger(1); // 1 for no input data segments private AtomicInteger keyCount = new AtomicInteger(0); // number of DataSegments - public InputDataSegment(CodeSegment cs) { this.cs = cs; } @@ -29,6 +28,7 @@ } public void peek(Receiver receiver, String managerKey, String key, int index) { + cs.list.add(receiver); DataSegment.get(managerKey).peek(receiver, key, index, cs); } @@ -37,6 +37,7 @@ } public void peek(Receiver receiver, String key, int index) { + cs.list.add(receiver); DataSegment.getLocal().peek(receiver, key, index, cs); } @@ -45,6 +46,7 @@ } public void take(Receiver receiver, String managerKey, String key, int index) { + cs.list.add(receiver); DataSegment.get(managerKey).take(receiver, key, index, cs); } @@ -53,6 +55,7 @@ } public void take(Receiver receiver, String key, int index) { + cs.list.add(receiver); DataSegment.getLocal().take(receiver, key, index, cs); } @@ -94,4 +97,13 @@ public Receiver create(CommandType type) { return new Receiver(this, type); } + + public void recommand(Receiver receiver) { + // TODO why only local? + DataSegment.getLocal().recommand(receiver, cs); + } + + public void setCounter(int cnt){ + count.set(cnt); + } }
--- a/src/alice/datasegment/LocalDataSegmentManager.java Tue May 28 17:53:02 2013 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Wed May 29 15:39:55 2013 +0900 @@ -136,5 +136,14 @@ } - + public void recommand(Receiver receiver, CodeSegment cs) { + DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); + int seq = this.seq.getAndIncrement(); + Command cmd = new Command(receiver.type, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + dataSegmentKey.runCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + + } + }
--- a/src/alice/test/codesegment/local/TestCodeSegment.java Tue May 28 17:53:02 2013 +0900 +++ b/src/alice/test/codesegment/local/TestCodeSegment.java Wed May 29 15:39:55 2013 +0900 @@ -12,6 +12,7 @@ @Override public void run() { + System.out.println("type = " + arg1.type); System.out.println("index = " + arg1.index); System.out.println("data = " + arg1.getVal()); System.out.println(((Value)arg1.getVal()).getType());
--- a/src/alice/test/codesegment/local/wordcount/SeparateArray.java Tue May 28 17:53:02 2013 +0900 +++ b/src/alice/test/codesegment/local/wordcount/SeparateArray.java Wed May 29 15:39:55 2013 +0900 @@ -9,7 +9,6 @@ import alice.codesegment.CodeSegment; public class SeparateArray extends CodeSegment{ - private WordConfig conf; SeparateArray(WordConfig conf){ @@ -17,14 +16,12 @@ } @Override - public void run() { - + public void run() { // cpu分だけTaskをあらかじめ作成 int cpu_num = Runtime.getRuntime().availableProcessors(); for (int cnt =0;cnt < cpu_num;cnt++){ new WordCount(); } - BufferedReader br = null; int i = 0; try { @@ -36,8 +33,7 @@ int size = conf.division * 1024; // Kbyte Range range = new Range(size); int check = br.read(range.array); - for (;check!=-1;i++){ - + for (;check!=-1;i++){ char[] array; array = new char[size]; check = br.read(array);
--- a/src/alice/test/codesegment/local/wordcount/WordCount.java Tue May 28 17:53:02 2013 +0900 +++ b/src/alice/test/codesegment/local/wordcount/WordCount.java Wed May 29 15:39:55 2013 +0900 @@ -30,12 +30,8 @@ word_flag = 0; } } - //System.out.println(i); - //System.out.println(r.array.length); word_num += word_flag; if (r.nextchar!='\0'){ // null means last block - // buf.get(r.end) is next block's char. - // buf.get(r.end-1) is this block's last char if ((r.array[i-1] == 0x0A||r.array[i-1] == 0x20) && (r.nextchar == 0x0A||r.nextchar == 0x20)){ word_num--; @@ -44,7 +40,8 @@ Result result = new Result(line_num,word_num); ods.put("result", result); - new WordCount(); + recycle(); + } }