view src/alice/datasegment/DataSegmentManager.java @ 254:2ec10cfa8cc3

refactor
author sugi
date Mon, 01 Jul 2013 20:00:07 +0900
parents b78f52865b8d
children b4690114a0cd
line wrap: on
line source

package alice.datasegment;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;
import org.msgpack.type.Value;

import alice.codesegment.CodeSegment;

public abstract class DataSegmentManager {
	
	protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); //TODO Over Head
	protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();
	protected AtomicInteger seq = new AtomicInteger(1); // waiting for PUT or UPDATE at unique sequence number
														// but it doesn't need for Local
	
	protected Runnable replyThread = new Runnable() {
		Logger logger = Logger.getLogger("reply");
		@Override
		public void run() {
			while (true) {
				try {
					Command reply = replyQueue.take();
					Command cmd = getAndRemoveCmd(reply.seq);
					if (cmd == null) {
						logger.warn("conflict sequence number");
						continue;
					}
					cmd.cs.ids.reply(cmd.receiver, reply);
					if (logger.isDebugEnabled())
						logger.debug(reply.getCommandString() + " " + cmd.getCommandString());
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		
	};
	
	public Command getAndRemoveCmd(int index){
		return seqHash.remove(index);
	}
	
	public void addReplyCommand(Command cmd) {
		try {
			replyQueue.put(cmd);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public abstract void put(String key, Value val);
	public abstract void update(String key, Value val);
	public abstract void take(Receiver receiver, CodeSegment cs);
	public abstract void peek(Receiver receiver, CodeSegment cs);
	public abstract void remove(String key);
	public abstract void close();
	public abstract void finish();
	
	public abstract void quickPut(String key, Value val);
	public abstract void quickUpdate(String key, Value val);
	
	public abstract void quickPeek(Receiver receiver, CodeSegment cs);
	public abstract void quickTake(Receiver receiver, CodeSegment cs);

	
}