diff src/main/java/alice/datasegment/DataSegmentManager.java @ 345:8f71c3e6f11d

Change directory structure Maven standard
author sugi
date Wed, 16 Apr 2014 18:26:07 +0900
parents
children 388e7d4b0624
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/alice/datasegment/DataSegmentManager.java	Wed Apr 16 18:26:07 2014 +0900
@@ -0,0 +1,71 @@
+package alice.datasegment;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+
+import alice.codesegment.CodeSegment;
+
+public abstract class DataSegmentManager {
+	
+	protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>();
+	protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();
+	protected AtomicInteger seq = new AtomicInteger(1); // waiting for PUT or UPDATE at unique sequence number
+														// but it doesn't need for Local
+	
+	protected Runnable replyThread = new Runnable() {
+		Logger logger = Logger.getLogger("reply");
+		@Override
+		public void run() {
+			while (true) {
+				try {
+					Command reply = replyQueue.take();
+					Command cmd = getAndRemoveCmd(reply.seq);
+					if (cmd == null) {
+						logger.warn("conflict sequence number");
+						continue;
+					}
+					cmd.cs.ids.reply(cmd.receiver, reply);
+					if (logger.isDebugEnabled())
+						logger.debug(reply.getCommandString() + " " + cmd.getCommandString());
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				}
+			}
+		}
+		
+	};
+	
+	public Command getAndRemoveCmd(int index){
+		return seqHash.remove(index);
+	}
+	
+	public void addReplyCommand(Command cmd) {
+		try {
+			replyQueue.put(cmd);
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
+	}
+		
+	public abstract void put(String key, Object val);
+	public abstract void update(String key, Object val);
+	public abstract void take(Receiver receiver, CodeSegment cs);
+	public abstract void peek(Receiver receiver, CodeSegment cs);
+	
+	public abstract void quickPut(String key, Object val);
+	public abstract void quickUpdate(String key, Object val);
+	public abstract void quickPeek(Receiver receiver, CodeSegment cs);
+	public abstract void quickTake(Receiver receiver, CodeSegment cs);
+
+	public abstract void remove(String key);
+	public abstract void shutdown(String key);
+	public abstract void close();
+	public abstract void finish();
+	
+	public abstract void ping(String returnKey);
+	public abstract void response(String returnKey);
+		
+}