changeset 248:913ee9dfec4f

add recycle method
author sugi
date Wed, 29 May 2013 15:39:55 +0900
parents 6e042fde5662
children 2a8bcf09bd06
files src/alice/codesegment/CodeSegment.java src/alice/codesegment/InputDataSegment.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/test/codesegment/local/TestCodeSegment.java src/alice/test/codesegment/local/wordcount/SeparateArray.java src/alice/test/codesegment/local/wordcount/WordCount.java
diffstat 6 files changed, 41 insertions(+), 13 deletions(-) [+]
line wrap: on
line diff
--- a/src/alice/codesegment/CodeSegment.java	Tue May 28 17:53:02 2013 +0900
+++ b/src/alice/codesegment/CodeSegment.java	Wed May 29 15:39:55 2013 +0900
@@ -1,14 +1,27 @@
 package alice.codesegment;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+
 import alice.codesegment.InputDataSegment;
+import alice.datasegment.Receiver;
 
 public abstract class CodeSegment implements Runnable {
 	
 	public InputDataSegment ids = new InputDataSegment(this);
 	public OutputDataSegment ods = new OutputDataSegment();
+	public ArrayList<Receiver> list = new ArrayList<Receiver>();
 	
 	public void execute() {
 		ids.receive();
 	}
+	
+	public void recycle(){
+		ids.setCounter(list.size());
+		for (Iterator<Receiver> it = list.iterator();it.hasNext();){
+			Receiver receiver = it.next();
+			ids.recommand(receiver);
+		}
+	}
 
 }
--- a/src/alice/codesegment/InputDataSegment.java	Tue May 28 17:53:02 2013 +0900
+++ b/src/alice/codesegment/InputDataSegment.java	Wed May 29 15:39:55 2013 +0900
@@ -19,7 +19,6 @@
 	public CodeSegment cs;
 	private AtomicInteger count = new AtomicInteger(1); // 1 for no input data segments
 	private AtomicInteger keyCount = new AtomicInteger(0); // number of DataSegments
-	
 	public InputDataSegment(CodeSegment cs) {
 		this.cs = cs;
 	}
@@ -29,6 +28,7 @@
 	}
 	
 	public void peek(Receiver receiver, String managerKey, String key, int index) {
+		cs.list.add(receiver);
 		DataSegment.get(managerKey).peek(receiver, key, index, cs);
 	}
 
@@ -37,6 +37,7 @@
 	}
 	
 	public void peek(Receiver receiver, String key, int index) {
+		cs.list.add(receiver);
 		DataSegment.getLocal().peek(receiver, key, index, cs);
 	}
 
@@ -45,6 +46,7 @@
 	}
 	
 	public void take(Receiver receiver, String managerKey, String key, int index) {
+		cs.list.add(receiver);
 		DataSegment.get(managerKey).take(receiver, key, index, cs);
 	}
 
@@ -53,6 +55,7 @@
 	}
 	
 	public void take(Receiver receiver, String key, int index) {
+		cs.list.add(receiver);
 		DataSegment.getLocal().take(receiver, key, index, cs);
 	}
 
@@ -94,4 +97,13 @@
 	public Receiver create(CommandType type) {
 		return new Receiver(this, type);
 	}
+
+	public void recommand(Receiver receiver) {
+		// TODO why only local?
+		DataSegment.getLocal().recommand(receiver, cs);
+	}
+	
+	public void setCounter(int cnt){
+		count.set(cnt);
+	}
 }
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Tue May 28 17:53:02 2013 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Wed May 29 15:39:55 2013 +0900
@@ -136,5 +136,14 @@
 		
 	}
 
-	
+	public void recommand(Receiver receiver, CodeSegment cs) {
+		DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
+		int seq = this.seq.getAndIncrement();
+		Command cmd = new Command(receiver.type, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
+		dataSegmentKey.runCommand(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+		
+	}
+
 }
--- a/src/alice/test/codesegment/local/TestCodeSegment.java	Tue May 28 17:53:02 2013 +0900
+++ b/src/alice/test/codesegment/local/TestCodeSegment.java	Wed May 29 15:39:55 2013 +0900
@@ -12,6 +12,7 @@
 	
 	@Override
 	public void run() {
+		System.out.println("type = " + arg1.type);
 		System.out.println("index = " + arg1.index);
 		System.out.println("data = " + arg1.getVal());
 		System.out.println(((Value)arg1.getVal()).getType());
--- a/src/alice/test/codesegment/local/wordcount/SeparateArray.java	Tue May 28 17:53:02 2013 +0900
+++ b/src/alice/test/codesegment/local/wordcount/SeparateArray.java	Wed May 29 15:39:55 2013 +0900
@@ -9,7 +9,6 @@
 import alice.codesegment.CodeSegment;
 
 public class SeparateArray extends CodeSegment{
-
 	private WordConfig conf;
 	
 	SeparateArray(WordConfig conf){
@@ -17,14 +16,12 @@
 	}
 	
 	@Override
-	public void run() {
-		
+	public void run() {	
 		// cpu分だけTaskをあらかじめ作成
 	 	int cpu_num = Runtime.getRuntime().availableProcessors();
 	 	for (int cnt =0;cnt < cpu_num;cnt++){
 	 		new WordCount();
 	 	}
-	 	
 		BufferedReader br = null;
 		int i = 0;
 		try {
@@ -36,8 +33,7 @@
 			int size = conf.division * 1024; // Kbyte
 			Range range = new Range(size);
 			int check = br.read(range.array);
-			for (;check!=-1;i++){
-			
+			for (;check!=-1;i++){			
 				char[] array;
 				array = new char[size];
 				check = br.read(array);
--- a/src/alice/test/codesegment/local/wordcount/WordCount.java	Tue May 28 17:53:02 2013 +0900
+++ b/src/alice/test/codesegment/local/wordcount/WordCount.java	Wed May 29 15:39:55 2013 +0900
@@ -30,12 +30,8 @@
 				word_flag = 0;
 			}
 		}
-		//System.out.println(i);
-		//System.out.println(r.array.length);
 		word_num += word_flag;
 		if (r.nextchar!='\0'){ // null means last block
-			// buf.get(r.end) is next block's char.
-			// buf.get(r.end-1) is this block's last char
 			if ((r.array[i-1] == 0x0A||r.array[i-1] == 0x20) &&
 					(r.nextchar == 0x0A||r.nextchar == 0x20)){
 				word_num--;
@@ -44,7 +40,8 @@
 		
 		Result result = new Result(line_num,word_num);
 		ods.put("result", result);
-		new WordCount();
+		recycle();
+		
 	}
 
 }