Mercurial > hg > Members > tatsuki > Alice
changeset 73:4bfd81352cfa
change to concurrent data segment
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 21 Feb 2012 22:55:17 +0900 (2012-02-21) |
parents | 728c254347a6 |
children | d199c38717ce 82a1c25ca0c8 |
files | src/alice/daemon/IncomingTcpConnection.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/LocalDataSegmentManager.java |
diffstat | 3 files changed, 33 insertions(+), 22 deletions(-) [+] |
line wrap: on
line diff
--- a/src/alice/daemon/IncomingTcpConnection.java Tue Feb 21 21:11:50 2012 +0900 +++ b/src/alice/daemon/IncomingTcpConnection.java Tue Feb 21 22:55:17 2012 +0900 @@ -44,23 +44,23 @@ CommandType type = CommandType.getCommandTypeFromId(msg.type); switch (type) { case UPDATE: - lmanager.addCommand(getDataSegmentKey(msg), + lmanager.submitCommand(getDataSegmentKey(msg), new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PUT: - lmanager.addCommand(getDataSegmentKey(msg), + lmanager.submitCommand(getDataSegmentKey(msg), new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PEEK: - lmanager.addCommand(getDataSegmentKey(msg), + lmanager.submitCommand(getDataSegmentKey(msg), new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); break; case TAKE: - lmanager.addCommand(getDataSegmentKey(msg), + lmanager.submitCommand(getDataSegmentKey(msg), new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); break; case REMOVE: - lmanager.addCommand(getDataSegmentKey(msg), + lmanager.submitCommand(getDataSegmentKey(msg), new Command(type, null, null, null, 0, 0, null, null, null)); break; case REPLY:
--- a/src/alice/datasegment/DataSegmentKey.java Tue Feb 21 21:11:50 2012 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Tue Feb 21 22:55:17 2012 +0900 @@ -2,7 +2,6 @@ import java.util.ArrayList; import java.util.Iterator; -import java.util.concurrent.atomic.AtomicInteger; import alice.datasegment.Command; @@ -15,16 +14,16 @@ private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>(); private ArrayList<Command> waitList = new ArrayList<Command>(); - private AtomicInteger tailIndex = new AtomicInteger(1); + private int tailIndex = 1; - public void runCommand(Command cmd) { + public synchronized void runCommand(Command cmd) { switch (cmd.type) { case UPDATE: if (dataList.size() != 0) { dataList.remove(0); } case PUT: - int index = tailIndex.getAndIncrement(); + int index = tailIndex++; DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey); dataList.add(dsv); // Process waiting peek and take commands @@ -45,7 +44,7 @@ } break; case PEEK: - if (cmd.index >= tailIndex.get()) { + if (cmd.index >= tailIndex) { waitList.add(cmd); break; } @@ -65,7 +64,7 @@ waitList.add(cmd); break; case TAKE: - if (cmd.index >= tailIndex.get()) { + if (cmd.index >= tailIndex) { waitList.add(cmd); break; }
--- a/src/alice/datasegment/LocalDataSegmentManager.java Tue Feb 21 21:11:50 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Tue Feb 21 22:55:17 2012 +0900 @@ -2,6 +2,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.msgpack.type.Value; @@ -15,6 +17,12 @@ private LinkedBlockingQueue<KeyCommand> cmdQueue = new LinkedBlockingQueue<KeyCommand>(); private Logger logger = Logger.getLogger("local"); + private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads + Runtime.getRuntime().availableProcessors(), + Integer.MAX_VALUE, // keepAliveTime + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); + private Runnable keyCommandThread = new Runnable() { @Override @@ -37,12 +45,16 @@ new Thread(keyCommandThread, "LocalDataSegmentManager-runKeyCommand").start(); } - public void addCommand(DataSegmentKey key, Command cmd) { - try { - cmdQueue.put(new KeyCommand(key, cmd)); - } catch (InterruptedException e) { - e.printStackTrace(); - } + public void submitCommand(final DataSegmentKey key, final Command cmd) { + Runnable runCommand = new Runnable() { + + @Override + public void run() { + key.runCommand(cmd); + } + + }; + dataSegmentExecutor.execute(runCommand); } public DataSegmentKey getDataSegmentKey(String key) { @@ -63,7 +75,7 @@ public void put(String key, Value val, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, replyQueue, cs, reverseKey); - addCommand(dataSegmentKey, cmd); + submitCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @@ -75,7 +87,7 @@ public void update(String key, Value val, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, replyQueue, cs, reverseKey); - addCommand(dataSegmentKey, cmd); + submitCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @@ -86,7 +98,7 @@ int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); - addCommand(dataSegmentKey, cmd); + submitCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @@ -97,7 +109,7 @@ int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number - addCommand(dataSegmentKey, cmd); + submitCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @@ -106,7 +118,7 @@ public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); - addCommand(dataSegmentKey, cmd); + submitCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); }