view src/alice/datasegment/DataSegmentManager.java @ 170:6a69891b7232 working

change view point
author sugi
date Thu, 27 Dec 2012 14:38:42 +0900
parents 1044a79ce4ef
children a85ff8dc16c1
line wrap: on
line source

package alice.datasegment;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;
import org.msgpack.type.Value;

import alice.codesegment.CodeSegment;

public abstract class DataSegmentManager {
	
	protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); //TODO Over Head
	protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();
	protected AtomicInteger seq = new AtomicInteger(1);
	
	protected Runnable replyThread = new Runnable() {
		Logger logger = Logger.getLogger("reply");
		@Override
		public void run() {
			while (true) {
				try {
					Command reply = replyQueue.take();
					Command cmd = seqHash.get(reply.seq);
					if (cmd == null) {
						logger.warn("conflict sequence number");
						continue;
					}
					seqHash.remove(reply.seq);
					cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.reverseKey));
					if (logger.isDebugEnabled())
						logger.debug(reply.getCommandString() + " " + cmd.getCommandString());
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		
	};
	
	public void addReplyCommand(Command cmd) {
		try {
			replyQueue.put(cmd);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public abstract void put(String key, Value val);
	public abstract void update(String key, Value val);
	public void take(Receiver receiver, String key, CodeSegment cs) {
		take(receiver, key, 0, cs);
	}
	public abstract void take(Receiver receiver, String key, int index, CodeSegment cs);
	public void peek(Receiver receiver, String key, CodeSegment cs) {
		peek(receiver, key, 0, cs);
	}
	public abstract void peek(Receiver receiver, String key, int index, CodeSegment cs);
	public abstract void remove(String key);
	public abstract void close();
	public abstract void finish();
	
}