Mercurial > hg > Database > Alice
changeset 224:409d7679cf7b
merge
author | Shinji KONO <kono@ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 28 Mar 2013 00:21:38 +0900 |
parents | 82a1c25ca0c8 (diff) f4aaada20712 (current diff) |
children | bc061ee5f31f |
files | lib/msgpack-0.6.6-SNAPSHOT-sources.jar lib/msgpack-0.6.6-SNAPSHOT.jar scripts/view_log.sh src/alice/daemon/IncomingTcpConnection.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/LocalDataSegmentManager.java |
diffstat | 4 files changed, 37 insertions(+), 53 deletions(-) [+] |
line wrap: on
line diff
--- a/src/alice/daemon/IncomingTcpConnection.java Thu Mar 28 00:08:35 2013 +0900 +++ b/src/alice/daemon/IncomingTcpConnection.java Thu Mar 28 00:21:38 2013 +0900 @@ -43,23 +43,23 @@ CommandType type = CommandType.getCommandTypeFromId(msg.type); switch (type) { case UPDATE: - lmanager.addCommand(getDataSegmentKey(msg), + lmanager.submitCommand(getDataSegmentKey(msg), new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PUT: - lmanager.addCommand(getDataSegmentKey(msg), + lmanager.submitCommand(getDataSegmentKey(msg), new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PEEK: - lmanager.addCommand(getDataSegmentKey(msg), + lmanager.submitCommand(getDataSegmentKey(msg), new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); break; case TAKE: - lmanager.addCommand(getDataSegmentKey(msg), + lmanager.submitCommand(getDataSegmentKey(msg), new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); break; case REMOVE: - lmanager.addCommand(getDataSegmentKey(msg), + lmanager.submitCommand(getDataSegmentKey(msg), new Command(type, null, null, null, 0, 0, null, null, null)); break; case REPLY:
--- a/src/alice/datasegment/DataSegmentKey.java Thu Mar 28 00:08:35 2013 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Thu Mar 28 00:21:38 2013 +0900 @@ -2,7 +2,6 @@ import java.util.ArrayList; import java.util.Iterator; -import java.util.concurrent.atomic.AtomicInteger; import alice.datasegment.Command; @@ -15,9 +14,9 @@ private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>(); private ArrayList<Command> waitList = new ArrayList<Command>(); - private AtomicInteger tailIndex = new AtomicInteger(1); + private int tailIndex = 1; - public void runCommand(Command cmd) { + public synchronized void runCommand(Command cmd) { switch (cmd.type) { case UPDATE: if (dataList.size() != 0) { @@ -45,7 +44,7 @@ } break; case PEEK: - if (cmd.index >= tailIndex.get()) { + if (cmd.index >= tailIndex) { waitList.add(cmd); break; } @@ -65,7 +64,7 @@ waitList.add(cmd); break; case TAKE: - if (cmd.index >= tailIndex.get()) { + if (cmd.index >= tailIndex) { waitList.add(cmd); break; }
--- a/src/alice/datasegment/KeyCommand.java Thu Mar 28 00:08:35 2013 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,17 +0,0 @@ -package alice.datasegment; - -public class KeyCommand { - - DataSegmentKey key; - Command cmd; - - public KeyCommand(DataSegmentKey key, Command cmd) { - this.key = key; - this.cmd = cmd; - } - - public void runCommand() { - key.runCommand(cmd); - } - -} \ No newline at end of file
--- a/src/alice/datasegment/LocalDataSegmentManager.java Thu Mar 28 00:08:35 2013 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Thu Mar 28 00:21:38 2013 +0900 @@ -2,6 +2,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.msgpack.type.Value; @@ -12,37 +14,37 @@ private String reverseKey = "local"; private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); - private LinkedBlockingQueue<KeyCommand> cmdQueue = new LinkedBlockingQueue<KeyCommand>(); private Logger logger = Logger.getLogger("local"); - private Runnable keyCommandThread = new Runnable() { + private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads + Runtime.getRuntime().availableProcessors(), + Integer.MAX_VALUE, // keepAliveTime + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); + + public LocalDataSegmentManager() { + new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); + } + private class RunCommand implements Runnable { + + DataSegmentKey key; + Command cmd; + + public RunCommand(DataSegmentKey key, Command cmd) { + this.key = key; + this.cmd = cmd; + } + @Override public void run() { - while (true) { - KeyCommand keyCmd = null; - try { - keyCmd = cmdQueue.take(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - keyCmd.runCommand(); - } + key.runCommand(cmd); } - }; - - public LocalDataSegmentManager() { - new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); - new Thread(keyCommandThread, "LocalDataSegmentManager-runKeyCommand").start(); } - - public void addCommand(DataSegmentKey key, Command cmd) { - try { - cmdQueue.put(new KeyCommand(key, cmd)); - } catch (InterruptedException e) { - e.printStackTrace(); - } + + public void submitCommand(DataSegmentKey key, Command cmd) { + dataSegmentExecutor.execute(new RunCommand(key, cmd)); } public DataSegmentKey getDataSegmentKey(String key) { @@ -102,7 +104,7 @@ int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); - addCommand(dataSegmentKey, cmd); + submitCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @@ -113,7 +115,7 @@ int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number - addCommand(dataSegmentKey, cmd); + submitCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @@ -122,7 +124,7 @@ public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); - addCommand(dataSegmentKey, cmd); + submitCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); }