Mercurial > hg > Database > Alice
view src/alice/datasegment/DataSegmentManager.java @ 68:d4c7f7b1096b
remove copy at OutboundTcpConnection
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Sat, 11 Feb 2012 16:40:03 +0900 |
parents | 2afbb6404840 |
children | 1d4f2b72fb31 |
line wrap: on
line source
package alice.datasegment; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import org.msgpack.type.Value; import alice.codesegment.CodeSegment; public abstract class DataSegmentManager { protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); protected AtomicInteger seq = new AtomicInteger(1); boolean debug = false; protected Runnable replyThread = new Runnable() { Logger logger = Logger.getLogger("reply"); @Override public void run() { while (true) { try { Command reply = replyQueue.take(); Command cmd = seqHash.get(reply.seq); if (cmd == null) { logger.warn("conflict sequence number"); continue; } seqHash.remove(reply.seq); cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.reverseKey)); if (debug) logger.debug(reply.getCommandString() + " " + cmd.getCommandString()); } catch (InterruptedException e) { e.printStackTrace(); } } } }; public abstract void put(String key, Value val, CodeSegment cs); public abstract void update(String key, Value val, CodeSegment cs); public void take(Receiver receiver, String key, CodeSegment cs) { take(receiver, key, 0, cs); } public abstract void take(Receiver receiver, String key, int index, CodeSegment cs); public void peek(Receiver receiver, String key, CodeSegment cs) { peek(receiver, key, 0, cs); } public abstract void peek(Receiver receiver, String key, int index, CodeSegment cs); public abstract void remove(String key); public abstract void close(); public abstract void finish(); }