Mercurial > hg > Members > tatsuki > Alice
changeset 58:ebdcab7b9b04
add comment
author | one |
---|---|
date | Wed, 08 Feb 2012 17:06:39 +0900 |
parents | 7fa9ddb31f64 |
children | eb1714f41caf |
files | src/alice/codesegment/CodeSegmentManager.java src/alice/daemon/IncomingTcpConnection.java src/alice/daemon/OutboundTcpConnection.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/RemoteDataSegmentManager.java src/alice/test/codesegment/remote/RemoteIncrement.java |
diffstat | 7 files changed, 20 insertions(+), 9 deletions(-) [+] |
line wrap: on
line diff
--- a/src/alice/codesegment/CodeSegmentManager.java Wed Feb 08 16:07:33 2012 +0900 +++ b/src/alice/codesegment/CodeSegmentManager.java Wed Feb 08 17:06:39 2012 +0900 @@ -10,9 +10,9 @@ private static CodeSegmentManager instance = new CodeSegmentManager(); public LinkedBlockingQueue<CodeSegment> readyQueue = new LinkedBlockingQueue<CodeSegment>(); - ThreadPoolExecutor codeSegmentExecutor = new ThreadPoolExecutor(1, + ThreadPoolExecutor codeSegmentExecutor = new ThreadPoolExecutor(1, // initial number of threads Runtime.getRuntime().availableProcessors(), - Integer.MAX_VALUE, + Integer.MAX_VALUE, // keepAliveTime TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); private Logger log = Logger.getLogger(CodeSegmentManager.class);
--- a/src/alice/daemon/IncomingTcpConnection.java Wed Feb 08 16:07:33 2012 +0900 +++ b/src/alice/daemon/IncomingTcpConnection.java Wed Feb 08 17:06:39 2012 +0900 @@ -26,6 +26,9 @@ this.reverseKey = reverseKey; } + /** + * pipeline thread for receiving + */ public void run() { Unpacker unpacker = null; try {
--- a/src/alice/daemon/OutboundTcpConnection.java Wed Feb 08 16:07:33 2012 +0900 +++ b/src/alice/daemon/OutboundTcpConnection.java Wed Feb 08 17:06:39 2012 +0900 @@ -19,6 +19,9 @@ return new CommandMessage(cmd.type.id, cmd.index, cmd.seq, cmd.key, cmd.val); } + /** + * pipeline thread for transmission + */ public void run() { MessagePack msgpack = new MessagePack(); while (true) {
--- a/src/alice/datasegment/DataSegmentKey.java Wed Feb 08 16:07:33 2012 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Wed Feb 08 17:06:39 2012 +0900 @@ -45,14 +45,13 @@ int index = tailIndex.getAndIncrement(); DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey); dataList.add(dsv); - // run waiting peek and take - boolean takeFlag = true; - for (Iterator<Command> iter = waitList.iterator(); iter.hasNext() && takeFlag; ) { + // Process waiting peek and take commands + for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) { Command waitCmd = iter.next(); if (waitCmd.index < index) { waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey)); iter.remove(); - if (waitCmd.type == CommandType.TAKE) { // delete data, if it run take cmd. + if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command dataList.remove(dsv); break; }
--- a/src/alice/datasegment/LocalDataSegmentManager.java Wed Feb 08 16:07:33 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Wed Feb 08 17:06:39 2012 +0900 @@ -66,7 +66,7 @@ DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null); - seqHash.put(seq, cmd); + seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number dataSegmentKey.addCommand(cmd); logger.debug(cmd.getCommandString()); }
--- a/src/alice/datasegment/RemoteDataSegmentManager.java Wed Feb 08 16:07:33 2012 +0900 +++ b/src/alice/datasegment/RemoteDataSegmentManager.java Wed Feb 08 17:06:39 2012 +0900 @@ -45,10 +45,13 @@ }.start(); } + /** + * send put command to target DataSegment + */ @Override public void put(String key, Value val, CodeSegment cs) { Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, cs, null); - connection.sendCommand(cmd); + connection.sendCommand(cmd); // put command on the transmission thread logger.debug(cmd.getCommandString()); }
--- a/src/alice/test/codesegment/remote/RemoteIncrement.java Wed Feb 08 16:07:33 2012 +0900 +++ b/src/alice/test/codesegment/remote/RemoteIncrement.java Wed Feb 08 17:06:39 2012 +0900 @@ -7,7 +7,10 @@ public class RemoteIncrement extends CodeSegment { public Receiver num = ids.create(CommandType.TAKE); - + + /** + * Increment DataSegment value up to 10 + */ @Override public void run() { int num = this.num.asInteger();