Mercurial > hg > Members > tatsuki > Alice
view src/alice/datasegment/DataSegmentManager.java @ 254:2ec10cfa8cc3
refactor
author | sugi |
---|---|
date | Mon, 01 Jul 2013 20:00:07 +0900 |
parents | b78f52865b8d |
children | b4690114a0cd |
line wrap: on
line source
package alice.datasegment; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import org.msgpack.type.Value; import alice.codesegment.CodeSegment; public abstract class DataSegmentManager { protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); //TODO Over Head protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); protected AtomicInteger seq = new AtomicInteger(1); // waiting for PUT or UPDATE at unique sequence number // but it doesn't need for Local protected Runnable replyThread = new Runnable() { Logger logger = Logger.getLogger("reply"); @Override public void run() { while (true) { try { Command reply = replyQueue.take(); Command cmd = getAndRemoveCmd(reply.seq); if (cmd == null) { logger.warn("conflict sequence number"); continue; } cmd.cs.ids.reply(cmd.receiver, reply); if (logger.isDebugEnabled()) logger.debug(reply.getCommandString() + " " + cmd.getCommandString()); } catch (InterruptedException e) { e.printStackTrace(); } } } }; public Command getAndRemoveCmd(int index){ return seqHash.remove(index); } public void addReplyCommand(Command cmd) { try { replyQueue.put(cmd); } catch (InterruptedException e) { e.printStackTrace(); } } public abstract void put(String key, Value val); public abstract void update(String key, Value val); public abstract void take(Receiver receiver, CodeSegment cs); public abstract void peek(Receiver receiver, CodeSegment cs); public abstract void remove(String key); public abstract void close(); public abstract void finish(); public abstract void quickPut(String key, Value val); public abstract void quickUpdate(String key, Value val); public abstract void quickPeek(Receiver receiver, CodeSegment cs); public abstract void quickTake(Receiver receiver, CodeSegment cs); }