Mercurial > hg > Database > Alice
diff src/alice/datasegment/LocalDataSegmentManager.java @ 73:4bfd81352cfa
change to concurrent data segment
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 21 Feb 2012 22:55:17 +0900 |
parents | a3a2605e16a2 |
children | 82a1c25ca0c8 |
line wrap: on
line diff
--- a/src/alice/datasegment/LocalDataSegmentManager.java Tue Feb 21 21:11:50 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Tue Feb 21 22:55:17 2012 +0900 @@ -2,6 +2,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.msgpack.type.Value; @@ -15,6 +17,12 @@ private LinkedBlockingQueue<KeyCommand> cmdQueue = new LinkedBlockingQueue<KeyCommand>(); private Logger logger = Logger.getLogger("local"); + private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads + Runtime.getRuntime().availableProcessors(), + Integer.MAX_VALUE, // keepAliveTime + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); + private Runnable keyCommandThread = new Runnable() { @Override @@ -37,12 +45,16 @@ new Thread(keyCommandThread, "LocalDataSegmentManager-runKeyCommand").start(); } - public void addCommand(DataSegmentKey key, Command cmd) { - try { - cmdQueue.put(new KeyCommand(key, cmd)); - } catch (InterruptedException e) { - e.printStackTrace(); - } + public void submitCommand(final DataSegmentKey key, final Command cmd) { + Runnable runCommand = new Runnable() { + + @Override + public void run() { + key.runCommand(cmd); + } + + }; + dataSegmentExecutor.execute(runCommand); } public DataSegmentKey getDataSegmentKey(String key) { @@ -63,7 +75,7 @@ public void put(String key, Value val, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, replyQueue, cs, reverseKey); - addCommand(dataSegmentKey, cmd); + submitCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @@ -75,7 +87,7 @@ public void update(String key, Value val, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, replyQueue, cs, reverseKey); - addCommand(dataSegmentKey, cmd); + submitCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @@ -86,7 +98,7 @@ int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); - addCommand(dataSegmentKey, cmd); + submitCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @@ -97,7 +109,7 @@ int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number - addCommand(dataSegmentKey, cmd); + submitCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @@ -106,7 +118,7 @@ public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); - addCommand(dataSegmentKey, cmd); + submitCommand(dataSegmentKey, cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); }