view src/alice/datasegment/DataSegmentManager.java @ 68:d4c7f7b1096b

remove copy at OutboundTcpConnection
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Sat, 11 Feb 2012 16:40:03 +0900
parents 2afbb6404840
children 1d4f2b72fb31
line wrap: on
line source

package alice.datasegment;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;
import org.msgpack.type.Value;

import alice.codesegment.CodeSegment;

public abstract class DataSegmentManager {
	
	protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>();
	public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();
	protected AtomicInteger seq = new AtomicInteger(1);
	boolean debug = false;
	
	protected Runnable replyThread = new Runnable() {
		Logger logger = Logger.getLogger("reply");
		@Override
		public void run() {
			while (true) {
				try {
					Command reply = replyQueue.take();
					Command cmd = seqHash.get(reply.seq);
					if (cmd == null) {
						logger.warn("conflict sequence number");
						continue;
					}
					seqHash.remove(reply.seq);
					cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.reverseKey));
					if (debug)
						logger.debug(reply.getCommandString() + " " + cmd.getCommandString());
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		
	};
	
	public abstract void put(String key, Value val, CodeSegment cs);
	public abstract void update(String key, Value val, CodeSegment cs);
	public void take(Receiver receiver, String key, CodeSegment cs) {
		take(receiver, key, 0, cs);
	}
	public abstract void take(Receiver receiver, String key, int index, CodeSegment cs);
	public void peek(Receiver receiver, String key, CodeSegment cs) {
		peek(receiver, key, 0, cs);
	}
	public abstract void peek(Receiver receiver, String key, int index, CodeSegment cs);
	public abstract void remove(String key);
	public abstract void close();
	public abstract void finish();
	
}