Mercurial > hg > Members > tatsuki > Alice
view src/alice/datasegment/LocalDataSegmentManager.java @ 69:1d4f2b72fb31
delete KeyThread
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 21 Feb 2012 19:44:33 +0900 |
parents | d4c7f7b1096b |
children | a3a2605e16a2 |
line wrap: on
line source
package alice.datasegment; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; import org.msgpack.type.Value; import alice.codesegment.CodeSegment; public class LocalDataSegmentManager extends DataSegmentManager { private String reverseKey = "local"; private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); private LinkedBlockingQueue<KeyCommand> cmdQueue = new LinkedBlockingQueue<KeyCommand>(); private Logger logger = Logger.getLogger("local"); private boolean debug = false; private Runnable keyCommandThread = new Runnable() { @Override public void run() { while (true) { KeyCommand keyCmd = null; try { keyCmd = cmdQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } keyCmd.runCommand(); } } }; public LocalDataSegmentManager() { new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); new Thread(keyCommandThread, "LocalDataSegmentManager-runKeyCommand").start(); } public void addCommand(DataSegmentKey key, Command cmd) { try { cmdQueue.put(new KeyCommand(key, cmd)); } catch (InterruptedException e) { e.printStackTrace(); } } public DataSegmentKey getDataSegmentKey(String key) { DataSegmentKey dsKey = dataSegments.get(key); if (dsKey != null) return dsKey; if (key == null) return null; DataSegmentKey newDataSegmentKey = new DataSegmentKey(); DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); if (dataSegmentKey == null) { dataSegmentKey = newDataSegmentKey; } return dataSegmentKey; } @Override public void put(String key, Value val, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, replyQueue, cs, reverseKey); addCommand(dataSegmentKey, cmd); if (debug) logger.debug(cmd.getCommandString()); } /** * Enqueue update command to the queue of each DataSegment key */ @Override public void update(String key, Value val, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, replyQueue, cs, reverseKey); addCommand(dataSegmentKey, cmd); if (debug) logger.debug(cmd.getCommandString()); } @Override public void take(Receiver receiver, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); addCommand(dataSegmentKey, cmd); if (debug) logger.debug(cmd.getCommandString()); } @Override public void peek(Receiver receiver, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null); seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number addCommand(dataSegmentKey, cmd); if (debug) logger.debug(cmd.getCommandString()); } @Override public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); addCommand(dataSegmentKey, cmd); if (debug) logger.debug(cmd.getCommandString()); } @Override public void finish() { System.exit(0); } @Override public void close() { } }