Mercurial > hg > Database > Alice
view src/main/java/alice/datasegment/DataSegmentManager.java @ 467:6e304a7a60e7 dispose
remove white space
author | sugi |
---|---|
date | Sat, 22 Nov 2014 12:08:24 +0900 |
parents | bcf6f4a6fcd0 |
children | 86c45738dd9e |
line wrap: on
line source
package alice.datasegment; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import alice.codesegment.CodeSegment; public abstract class DataSegmentManager { protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); protected AtomicInteger seq = new AtomicInteger(1); // waiting for PUT or UPDATE at unique sequence number // but it doesn't need for Local protected Runnable replyThread = new Runnable() { Logger logger = Logger.getLogger("reply"); @Override public void run() { while (true) { try { Command reply = replyQueue.take(); Command cmd = getAndRemoveCmd(reply.seq); if (cmd == null) { logger.warn("conflict sequence number"); continue; } cmd.cs.ids.reply(cmd.receiver, reply); if (logger.isDebugEnabled()) logger.debug(reply.getCommandString() + " " + cmd.getCommandString()); } catch (InterruptedException e) { e.printStackTrace(); } } } }; public Command getAndRemoveCmd(int index){ return seqHash.remove(index); } public void addReplyCommand(Command cmd) { try { replyQueue.put(cmd); } catch (InterruptedException e) { e.printStackTrace(); } } public abstract void put(String key, ReceiveData rData, SendOption option); public abstract void update(String key, ReceiveData rData, SendOption option); public abstract void peek(Receiver receiver, CodeSegment cs, SendOption option); public abstract void take(Receiver receiver, CodeSegment cs, SendOption option); public abstract void remove(String key); public abstract void shutdown(); public abstract void close(); public abstract void finish(); public abstract void ping(String returnKey); public abstract void response(String returnKey); }