Mercurial > hg > Database > Alice
view src/main/java/alice/datasegment/DataSegmentManager.java @ 650:4289b232b3fd
nulValue
author | suruga |
---|---|
date | Fri, 02 Feb 2018 18:26:49 +0900 |
parents | ea21af9a4762 |
children |
line wrap: on
line source
package alice.datasegment; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import alice.codesegment.CodeSegment; public abstract class DataSegmentManager { protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); protected AtomicInteger seq = new AtomicInteger(1); // waiting for PUT or UPDATE at unique sequence number // but it doesn't need for Local protected Runnable replyThread = new Runnable() { Logger logger = Logger.getLogger("reply"); @Override public void run() {//SEDAのREPLYスレッドのなごり。消してもいい。 while (true) { try { Command reply = replyQueue.take(); Command cmd = getAndRemoveCmd(reply.seq); if (cmd == null) { logger.warn("conflict sequence number"); continue; } cmd.cs.ids.reply(cmd.receiver, reply); if (logger.isDebugEnabled()) logger.debug(reply.getCommandString() + " " + cmd.getCommandString()); } catch (InterruptedException e) { e.printStackTrace(); } } } }; public Command getAndRemoveCmd(int index){ //System.err.println("DSM getAndRemoveCmd seq : " + index); return seqHash.remove(index); } public void addReplyCommand(Command cmd) { try { replyQueue.put(cmd); } catch (InterruptedException e) { e.printStackTrace(); } } //各コマンドの抽象クラス public abstract void put(String key, ReceiveData rData, boolean quickFlag); public abstract void update(String key, ReceiveData rData, boolean quickFlag); public abstract void peek(Receiver receiver, CodeSegment cs, boolean quickFlag); public abstract void take(Receiver receiver, CodeSegment cs, boolean quickFlag); public abstract void remove(String key); public abstract void shutdown(); public abstract void close(); public abstract void finish(); public abstract void ping(String returnKey); public abstract void response(String returnKey); public abstract void setSendError(boolean b); }