Mercurial > hg > Database > Alice
view src/main/java/alice/datasegment/LocalDataSegmentManager.java @ 527:bfec2c3ff1b8 dispose
change unzip
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 30 Apr 2015 18:14:02 +0900 |
parents | 928907206d21 |
children | cb7c31848d16 |
line wrap: on
line source
package alice.datasegment; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import alice.codesegment.CodeSegment; /** * localのDSに対する処理。DS自体は持っていない。→ReceivedData * DataSegmentKey.runCommandに渡してコマンドを実行する。 */ public class LocalDataSegmentManager extends DataSegmentManager { private String reverseKey = "local"; private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); private Logger logger = Logger.getLogger("local"); private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads Runtime.getRuntime().availableProcessors(), Integer.MAX_VALUE, // keepAliveTime TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); //コンストラクタ。スレッドが走る。 public LocalDataSegmentManager() { new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); } public void setReverseKey(String s){ reverseKey = s; } private class RunCommand implements Runnable { DataSegmentKey key; Command cmd; public RunCommand(DataSegmentKey key, Command cmd) { this.key = key; this.cmd = cmd; } @Override public void run() { key.runCommand(cmd); } } public void submitCommand(DataSegmentKey key, Command cmd) { dataSegmentExecutor.execute(new RunCommand(key, cmd)); } public DataSegmentKey getDataSegmentKey(String key) { DataSegmentKey dsKey = dataSegments.get(key); if (dsKey != null) return dsKey; if (key == null) return null; DataSegmentKey newDataSegmentKey = new DataSegmentKey(); DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); if (dataSegmentKey == null) { dataSegmentKey = newDataSegmentKey; } return dataSegmentKey; } public void removeDataSegmentKey(String key) { if (key!=null) dataSegments.remove(key); } @Override public void put(String key, ReceiveData rData, SendOption option) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey);//ここでMP変換している cmd.setCompressFlag(option.getCompressFlag()); rData.setCompressFlag(option.getCompressFlag()); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } /** * Enqueue update command to the queue of each DataSegment key */ @Override public void update(String key, ReceiveData rData, SendOption option) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey); cmd.setCompressFlag(option.getCompressFlag()); rData.setCompressFlag(option.getCompressFlag()); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void take(Receiver receiver, CodeSegment cs, SendOption option) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); cmd.setCompressFlag(option.getCompressFlag()); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void peek(Receiver receiver, CodeSegment cs, SendOption option) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); cmd.setCompressFlag(option.getCompressFlag()); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } //このコマンドは? @Override public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void finish() { System.exit(0); } @Override public void close() { } //? public void recommand(Receiver receiver, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(receiver.type, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); dataSegmentKey.runCommand(cmd); if (logger.isDebugEnabled()) logger.debug(cmd.getCommandString()); } @Override public void ping(String returnKey) { } @Override public void response(String returnKey) { } @Override public void shutdown() { } @Override public void setSendError(boolean b) { } }