# HG changeset patch # User Shinji KONO # Date 1364397698 -32400 # Node ID 409d7679cf7b3155b9b03a78b06b622bca951082 # Parent 82a1c25ca0c87e4d941ce3a5fff451c528ae4de6# Parent f4aaada207121ec26e797d0d9e70177f0c9073bf merge diff -r f4aaada20712 -r 409d7679cf7b src/alice/daemon/IncomingTcpConnection.java --- a/src/alice/daemon/IncomingTcpConnection.java Thu Mar 28 00:08:35 2013 +0900 +++ b/src/alice/daemon/IncomingTcpConnection.java Thu Mar 28 00:21:38 2013 +0900 @@ -43,23 +43,23 @@ CommandType type = CommandType.getCommandTypeFromId(msg.type); switch (type) { case UPDATE: - lmanager.addCommand(getDataSegmentKey(msg), + lmanager.submitCommand(getDataSegmentKey(msg), new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PUT: - lmanager.addCommand(getDataSegmentKey(msg), + lmanager.submitCommand(getDataSegmentKey(msg), new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PEEK: - lmanager.addCommand(getDataSegmentKey(msg), + lmanager.submitCommand(getDataSegmentKey(msg), new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); break; case TAKE: - lmanager.addCommand(getDataSegmentKey(msg), + lmanager.submitCommand(getDataSegmentKey(msg), new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); break; case REMOVE: - lmanager.addCommand(getDataSegmentKey(msg), + lmanager.submitCommand(getDataSegmentKey(msg), new Command(type, null, null, null, 0, 0, null, null, null)); break; case REPLY: diff -r f4aaada20712 -r 409d7679cf7b src/alice/datasegment/DataSegmentKey.java --- a/src/alice/datasegment/DataSegmentKey.java Thu Mar 28 00:08:35 2013 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Thu Mar 28 00:21:38 2013 +0900 @@ -2,7 +2,6 @@ import java.util.ArrayList; import java.util.Iterator; -import java.util.concurrent.atomic.AtomicInteger; import alice.datasegment.Command; @@ -15,9 +14,9 @@ private ArrayList dataList = new ArrayList(); private ArrayList waitList = new ArrayList(); - private AtomicInteger tailIndex = new AtomicInteger(1); + private int tailIndex = 1; - public void runCommand(Command cmd) { + public synchronized void runCommand(Command cmd) { switch (cmd.type) { case UPDATE: if (dataList.size() != 0) { @@ -45,7 +44,7 @@ } break; case PEEK: - if (cmd.index >= tailIndex.get()) { + if (cmd.index >= tailIndex) { waitList.add(cmd); break; } @@ -65,7 +64,7 @@ waitList.add(cmd); break; case TAKE: - if (cmd.index >= tailIndex.get()) { + if (cmd.index >= tailIndex) { waitList.add(cmd); break; } diff -r f4aaada20712 -r 409d7679cf7b src/alice/datasegment/KeyCommand.java --- a/src/alice/datasegment/KeyCommand.java Thu Mar 28 00:08:35 2013 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,17 +0,0 @@ -package alice.datasegment; - -public class KeyCommand { - - DataSegmentKey key; - Command cmd; - - public KeyCommand(DataSegmentKey key, Command cmd) { - this.key = key; - this.cmd = cmd; - } - - public void runCommand() { - key.runCommand(cmd); - } - -} \ No newline at end of file diff -r f4aaada20712 -r 409d7679cf7b src/alice/datasegment/LocalDataSegmentManager.java --- a/src/alice/datasegment/LocalDataSegmentManager.java Thu Mar 28 00:08:35 2013 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Thu Mar 28 00:21:38 2013 +0900 @@ -2,6 +2,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.msgpack.type.Value; @@ -12,37 +14,37 @@ private String reverseKey = "local"; private ConcurrentHashMap dataSegments = new ConcurrentHashMap(); - private LinkedBlockingQueue cmdQueue = new LinkedBlockingQueue(); private Logger logger = Logger.getLogger("local"); - private Runnable keyCommandThread = new Runnable() { + private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads + Runtime.getRuntime().availableProcessors(), + Integer.MAX_VALUE, // keepAliveTime + TimeUnit.SECONDS, + new LinkedBlockingQueue()); + + public LocalDataSegmentManager() { + new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); + } + private class RunCommand implements Runnable { + + DataSegmentKey key; + Command cmd; + + public RunCommand(DataSegmentKey key, Command cmd) { + this.key = key; + this.cmd = cmd; + } + @Override public void run() { - while (true) { - KeyCommand keyCmd = null; - try { - keyCmd = cmdQueue.take(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - keyCmd.runCommand(); - } + key.runCommand(cmd); } - }; - - public LocalDataSegmentManager() { - new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); - new Thread(keyCommandThread, "LocalDataSegmentManager-runKeyCommand").start(); } - - public void addCommand(DataSegmentKey key, Command cmd) { - try { - cmdQueue.put(new KeyCommand(key, cmd)); - } catch (InterruptedException e) { - e.printStackTrace(); - } + + public void submitCommand(DataSegmentKey key, Command cmd) { + dataSegmentExecutor.execute(new RunCommand(key, cmd)); } public DataSegmentKey getDataSegmentKey(String key) { @@ -102,7 +104,7 @@ int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); - addCommand(dataSegmentKey, cmd); + submitCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @@ -113,7 +115,7 @@ int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number - addCommand(dataSegmentKey, cmd); + submitCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @@ -122,7 +124,7 @@ public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); - addCommand(dataSegmentKey, cmd); + submitCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); }