diff src/alice/datasegment/LocalDataSegmentManager.java @ 73:4bfd81352cfa

change to concurrent data segment
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 21 Feb 2012 22:55:17 +0900
parents a3a2605e16a2
children 82a1c25ca0c8
line wrap: on
line diff
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Tue Feb 21 21:11:50 2012 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Tue Feb 21 22:55:17 2012 +0900
@@ -2,6 +2,8 @@
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
 import org.msgpack.type.Value;
@@ -15,6 +17,12 @@
 	private LinkedBlockingQueue<KeyCommand> cmdQueue = new LinkedBlockingQueue<KeyCommand>();
 	private Logger logger = Logger.getLogger("local");
 
+	private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads
+			Runtime.getRuntime().availableProcessors(),
+			Integer.MAX_VALUE, // keepAliveTime
+			TimeUnit.SECONDS,
+			new LinkedBlockingQueue<Runnable>());
+	
 	private Runnable keyCommandThread = new Runnable() {
 
 		@Override
@@ -37,12 +45,16 @@
 		new Thread(keyCommandThread, "LocalDataSegmentManager-runKeyCommand").start();
 	}
 
-	public void addCommand(DataSegmentKey key, Command cmd) {
-		try {
-			cmdQueue.put(new KeyCommand(key, cmd));
-		} catch (InterruptedException e) {
-			e.printStackTrace();
-		}
+	public void submitCommand(final DataSegmentKey key, final Command cmd) {
+		Runnable runCommand = new Runnable() {
+
+			@Override
+			public void run() {
+				key.runCommand(cmd);
+			}
+			
+		};
+		dataSegmentExecutor.execute(runCommand);
 	}
 	
 	public DataSegmentKey getDataSegmentKey(String key) {
@@ -63,7 +75,7 @@
 	public void put(String key, Value val, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, replyQueue, cs, reverseKey);
-		addCommand(dataSegmentKey, cmd);
+		submitCommand(dataSegmentKey, cmd);
 		if (logger.isDebugEnabled())
 			logger.debug(cmd.getCommandString());
 	}
@@ -75,7 +87,7 @@
 	public void update(String key, Value val, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, replyQueue, cs, reverseKey);
-		addCommand(dataSegmentKey, cmd);
+		submitCommand(dataSegmentKey, cmd);
 		if (logger.isDebugEnabled())
 			logger.debug(cmd.getCommandString());
 	}
@@ -86,7 +98,7 @@
 		int seq = this.seq.getAndIncrement();
 		Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null);
 		seqHash.put(seq, cmd);
-		addCommand(dataSegmentKey, cmd);
+		submitCommand(dataSegmentKey, cmd);
 		if (logger.isDebugEnabled())
 			logger.debug(cmd.getCommandString());
 	}
@@ -97,7 +109,7 @@
 		int seq = this.seq.getAndIncrement();
 		Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null);
 		seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number
-		addCommand(dataSegmentKey, cmd);
+		submitCommand(dataSegmentKey, cmd);
 		if (logger.isDebugEnabled())
 			logger.debug(cmd.getCommandString());
 	}
@@ -106,7 +118,7 @@
 	public void remove(String key) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null);
-		addCommand(dataSegmentKey, cmd);
+		submitCommand(dataSegmentKey, cmd);
 		if (logger.isDebugEnabled())
 			logger.debug(cmd.getCommandString());
 	}