Mercurial > hg > Members > tatsuki > Alice
changeset 69:1d4f2b72fb31
delete KeyThread
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 21 Feb 2012 19:44:33 +0900 |
parents | d4c7f7b1096b |
children | f2d4a4686036 |
files | src/alice/daemon/IncomingTcpConnection.java src/alice/datasegment/DataSegmentKey.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/KeyCommand.java src/alice/datasegment/LocalDataSegmentManager.java |
diffstat | 5 files changed, 147 insertions(+), 120 deletions(-) [+] |
line wrap: on
line diff
--- a/src/alice/daemon/IncomingTcpConnection.java Sat Feb 11 16:40:03 2012 +0900 +++ b/src/alice/daemon/IncomingTcpConnection.java Tue Feb 21 19:44:33 2012 +0900 @@ -20,6 +20,8 @@ public Connection connection; public DataSegmentManager manager; public String reverseKey; + private LocalDataSegmentManager lmanager = DataSegment.getLocal();; + public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { this.manager = manager; this.connection = connection; @@ -42,44 +44,36 @@ CommandType type = CommandType.getCommandTypeFromId(msg.type); switch (type) { case UPDATE: - getDataSegmentKey(msg).addCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); + lmanager.addCommand(getDataSegmentKey(msg), + new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PUT: - getDataSegmentKey(msg).addCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); + lmanager.addCommand(getDataSegmentKey(msg), + new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); break; case PEEK: - //Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) { - getDataSegmentKey(msg).addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); + lmanager.addCommand(getDataSegmentKey(msg), + new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); break; case TAKE: - getDataSegmentKey(msg).addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); + lmanager.addCommand(getDataSegmentKey(msg), + new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); break; case REMOVE: - getDataSegmentKey(msg).addCommand(new Command(type, null, null, null, 0, 0, null, null, null)); + lmanager.addCommand(getDataSegmentKey(msg), + new Command(type, null, null, null, 0, 0, null, null, null)); break; case REPLY: - try { - manager.replyQueue.put(new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); - } catch (InterruptedException e) { - e.printStackTrace(); - } + manager.addReplyCommand(new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); break; default: break; } } catch (ClosedChannelException e) { - try { - connection.sendQueue.put(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } + connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); return; } catch (EOFException e) { - try { - connection.sendQueue.put(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } + connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); return; } catch (IOException e) { e.printStackTrace(); @@ -87,7 +81,6 @@ } } private DataSegmentKey getDataSegmentKey(CommandMessage msg) { - LocalDataSegmentManager lmanager = DataSegment.getLocal(); return lmanager.getDataSegmentKey(msg.key); } }
--- a/src/alice/datasegment/DataSegmentKey.java Sat Feb 11 16:40:03 2012 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Tue Feb 21 19:44:33 2012 +0900 @@ -2,7 +2,6 @@ import java.util.ArrayList; import java.util.Iterator; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import alice.datasegment.Command; @@ -14,101 +13,85 @@ */ public class DataSegmentKey { - private String key; - private LinkedBlockingQueue<Command> cmdQueue = new LinkedBlockingQueue<Command>(); private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>(); private ArrayList<Command> waitList = new ArrayList<Command>(); private AtomicInteger tailIndex = new AtomicInteger(1); - private Thread keyThread; - public DataSegmentKey(String key) { - this.key = key; - } - - public void addCommand(Command cmd) { - cmdQueue.add(cmd); - } - - /** - * too many threads are generated here - * single scheduling queue and waiting queue can be used in future - */ - public void runKeyThread() { - this.keyThread = new Thread() { - @Override - public void run() { - while (true) { + public void runCommand(Command cmd) { + switch (cmd.type) { + case UPDATE: + if (dataList.size() != 0) { + dataList.remove(0); + } + case PUT: + int index = tailIndex.getAndIncrement(); + DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey); + dataList.add(dsv); + // Process waiting peek and take commands + for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) { + Command waitCmd = iter.next(); + if (waitCmd.index < index) { try { - Command cmd = cmdQueue.take(); - switch (cmd.type) { - case UPDATE: - if (dataList.size() != 0) { - dataList.remove(0); - } - case PUT: - int index = tailIndex.getAndIncrement(); - DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey); - dataList.add(dsv); - // Process waiting peek and take commands - for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) { - Command waitCmd = iter.next(); - if (waitCmd.index < index) { - waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey)); - iter.remove(); - if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command - dataList.remove(dsv); - break; - } - } - } - break; - case PEEK: - if (cmd.index >= tailIndex.get()) { - waitList.add(cmd); - break; - } - boolean waitFlag2 = true; - for (DataSegmentValue data : dataList) { - if (data.index > cmd.index) { - cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from)); - waitFlag2 = false; - break; - } - } - if (waitFlag2) - waitList.add(cmd); - break; - case TAKE: - if (cmd.index >= tailIndex.get()) { - waitList.add(cmd); - break; - } - boolean waitFlag = true; - for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) { - DataSegmentValue data = iter.next(); - if (data.index > cmd.index) { - cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from)); - iter.remove(); - waitFlag = false; - break; - } - } - if (waitFlag) - waitList.add(cmd); - break; - case REMOVE: - // TODO: implements later - break; - default: - } + waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey)); } catch (InterruptedException e) { e.printStackTrace(); } + iter.remove(); + if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command + dataList.remove(dsv); + break; + } + } + } + break; + case PEEK: + if (cmd.index >= tailIndex.get()) { + waitList.add(cmd); + break; + } + boolean waitFlag2 = true; + for (DataSegmentValue data : dataList) { + if (data.index > cmd.index) { + try { + cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + waitFlag2 = false; + break; } } - }; - keyThread.setName("DataSegmentKey-" + key); - keyThread.start(); + if (waitFlag2) + waitList.add(cmd); + break; + case TAKE: + if (cmd.index >= tailIndex.get()) { + waitList.add(cmd); + break; + } + boolean waitFlag = true; + for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) { + DataSegmentValue data = iter.next(); + if (data.index > cmd.index) { + try { + cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + iter.remove(); + waitFlag = false; + break; + } + } + if (waitFlag) + waitList.add(cmd); + break; + case REMOVE: + // TODO: implements later + break; + default: + } + } }
--- a/src/alice/datasegment/DataSegmentManager.java Sat Feb 11 16:40:03 2012 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Tue Feb 21 19:44:33 2012 +0900 @@ -12,7 +12,7 @@ public abstract class DataSegmentManager { protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); - public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); + protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); protected AtomicInteger seq = new AtomicInteger(1); boolean debug = false; @@ -40,6 +40,14 @@ }; + public void addReplyCommand(Command cmd) { + try { + replyQueue.put(cmd); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + public abstract void put(String key, Value val, CodeSegment cs); public abstract void update(String key, Value val, CodeSegment cs); public void take(Receiver receiver, String key, CodeSegment cs) {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/datasegment/KeyCommand.java Tue Feb 21 19:44:33 2012 +0900 @@ -0,0 +1,17 @@ +package alice.datasegment; + +public class KeyCommand { + + DataSegmentKey key; + Command cmd; + + public KeyCommand(DataSegmentKey key, Command cmd) { + this.key = key; + this.cmd = cmd; + } + + public void runCommand() { + key.runCommand(cmd); + } + +} \ No newline at end of file
--- a/src/alice/datasegment/LocalDataSegmentManager.java Sat Feb 11 16:40:03 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Tue Feb 21 19:44:33 2012 +0900 @@ -1,6 +1,7 @@ package alice.datasegment; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; import org.msgpack.type.Value; @@ -11,12 +12,39 @@ private String reverseKey = "local"; private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); - + private LinkedBlockingQueue<KeyCommand> cmdQueue = new LinkedBlockingQueue<KeyCommand>(); private Logger logger = Logger.getLogger("local"); - boolean debug = false; + private boolean debug = false; + + private Runnable keyCommandThread = new Runnable() { + + @Override + public void run() { + while (true) { + KeyCommand keyCmd = null; + try { + keyCmd = cmdQueue.take(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + keyCmd.runCommand(); + } + } + + }; + public LocalDataSegmentManager() { - new Thread(replyThread, "LocalDataSegmentManager").start(); + new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); + new Thread(keyCommandThread, "LocalDataSegmentManager-runKeyCommand").start(); + } + + public void addCommand(DataSegmentKey key, Command cmd) { + try { + cmdQueue.put(new KeyCommand(key, cmd)); + } catch (InterruptedException e) { + e.printStackTrace(); + } } public DataSegmentKey getDataSegmentKey(String key) { @@ -25,10 +53,9 @@ return dsKey; if (key == null) return null; - DataSegmentKey newDataSegmentKey = new DataSegmentKey(key); + DataSegmentKey newDataSegmentKey = new DataSegmentKey(); DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); if (dataSegmentKey == null) { - newDataSegmentKey.runKeyThread(); dataSegmentKey = newDataSegmentKey; } return dataSegmentKey; @@ -38,12 +65,11 @@ public void put(String key, Value val, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, replyQueue, cs, reverseKey); - dataSegmentKey.addCommand(cmd); + addCommand(dataSegmentKey, cmd); if (debug) logger.debug(cmd.getCommandString()); } - /** * Enqueue update command to the queue of each DataSegment key */ @@ -51,7 +77,7 @@ public void update(String key, Value val, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, replyQueue, cs, reverseKey); - dataSegmentKey.addCommand(cmd); + addCommand(dataSegmentKey, cmd); if (debug) logger.debug(cmd.getCommandString()); } @@ -62,7 +88,7 @@ int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); - dataSegmentKey.addCommand(cmd); + addCommand(dataSegmentKey, cmd); if (debug) logger.debug(cmd.getCommandString()); } @@ -73,7 +99,7 @@ int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number - dataSegmentKey.addCommand(cmd); + addCommand(dataSegmentKey, cmd); if (debug) logger.debug(cmd.getCommandString()); } @@ -82,7 +108,7 @@ public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); - dataSegmentKey.addCommand(cmd); + addCommand(dataSegmentKey, cmd); if (debug) logger.debug(cmd.getCommandString()); }