Mercurial > hg > Members > tatsuki > Alice
diff src/main/java/alice/datasegment/DataSegmentManager.java @ 345:8f71c3e6f11d
Change directory structure Maven standard
author | sugi |
---|---|
date | Wed, 16 Apr 2014 18:26:07 +0900 |
parents | |
children | 388e7d4b0624 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/datasegment/DataSegmentManager.java Wed Apr 16 18:26:07 2014 +0900 @@ -0,0 +1,71 @@ +package alice.datasegment; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.log4j.Logger; + +import alice.codesegment.CodeSegment; + +public abstract class DataSegmentManager { + + protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); + protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); + protected AtomicInteger seq = new AtomicInteger(1); // waiting for PUT or UPDATE at unique sequence number + // but it doesn't need for Local + + protected Runnable replyThread = new Runnable() { + Logger logger = Logger.getLogger("reply"); + @Override + public void run() { + while (true) { + try { + Command reply = replyQueue.take(); + Command cmd = getAndRemoveCmd(reply.seq); + if (cmd == null) { + logger.warn("conflict sequence number"); + continue; + } + cmd.cs.ids.reply(cmd.receiver, reply); + if (logger.isDebugEnabled()) + logger.debug(reply.getCommandString() + " " + cmd.getCommandString()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + }; + + public Command getAndRemoveCmd(int index){ + return seqHash.remove(index); + } + + public void addReplyCommand(Command cmd) { + try { + replyQueue.put(cmd); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public abstract void put(String key, Object val); + public abstract void update(String key, Object val); + public abstract void take(Receiver receiver, CodeSegment cs); + public abstract void peek(Receiver receiver, CodeSegment cs); + + public abstract void quickPut(String key, Object val); + public abstract void quickUpdate(String key, Object val); + public abstract void quickPeek(Receiver receiver, CodeSegment cs); + public abstract void quickTake(Receiver receiver, CodeSegment cs); + + public abstract void remove(String key); + public abstract void shutdown(String key); + public abstract void close(); + public abstract void finish(); + + public abstract void ping(String returnKey); + public abstract void response(String returnKey); + +}