changeset 252:b78f52865b8d

no use Queue API
author sugi
date Tue, 18 Jun 2013 16:05:53 +0900
parents 88be2824a989
children 32e7d5271477
files src/alice/datasegment/Command.java src/alice/datasegment/DataSegmentManager.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/datasegment/RemoteDataSegmentManager.java
diffstat 4 files changed, 90 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/src/alice/datasegment/Command.java	Mon Jun 17 18:56:54 2013 +0900
+++ b/src/alice/datasegment/Command.java	Tue Jun 18 16:05:53 2013 +0900
@@ -36,6 +36,19 @@
 		this.flag = false;
 	}
 	
+	public Command(CommandType cmdType, Receiver receiver, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs, String reverseKey, boolean flag) {
+		this.type = cmdType;
+		this.receiver = receiver;
+		this.key = key;
+		this.val = val;
+		this.index = index;
+		this.seq = seq;
+		this.replyQueue = replyQueue;
+		this.cs = cs;
+		this.reverseKey = reverseKey;
+		this.flag = flag;
+	}
+	
 	public Command(CommandType cmdType, Receiver receiver, String key, Value val, int index, int seq, Connection connection, CodeSegment cs, String reverseKey, boolean flag) {
 		this.type = cmdType;
 		this.receiver = receiver;
--- a/src/alice/datasegment/DataSegmentManager.java	Mon Jun 17 18:56:54 2013 +0900
+++ b/src/alice/datasegment/DataSegmentManager.java	Tue Jun 18 16:05:53 2013 +0900
@@ -65,4 +65,19 @@
 	public abstract void close();
 	public abstract void finish();
 	
+	public abstract void quickPut(String key, Value val);
+	public abstract void quickUpdate(String key, Value val);
+	
+	public void quickPeek(Receiver receiver, String key, CodeSegment cs) {
+		quickPeek(receiver, key, 0, cs);
+	}
+	public abstract void quickPeek(Receiver receiver, String key, int index, CodeSegment cs);
+	
+	public void quickTake(Receiver receiver, String key, CodeSegment cs) {
+		quickTake(receiver, key, 0, cs);
+	}
+	
+	public abstract void quickTake(Receiver receiver, String key, int index, CodeSegment cs);
+
+	
 }
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Mon Jun 17 18:56:54 2013 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Tue Jun 18 16:05:53 2013 +0900
@@ -146,4 +146,26 @@
 		
 	}
 
+	@Override
+	public void quickPeek(Receiver receiver, String key, int index, CodeSegment cs) {
+		
+	}
+
+	@Override
+	public void quickTake(Receiver receiver, String key, int index, CodeSegment cs) {
+		
+	}
+
+	@Override
+	public void quickPut(String key, Value val) {
+
+	}
+
+	@Override
+	public void quickUpdate(String key, Value val) {
+		
+	}
+
+	
+
 }
--- a/src/alice/datasegment/RemoteDataSegmentManager.java	Mon Jun 17 18:56:54 2013 +0900
+++ b/src/alice/datasegment/RemoteDataSegmentManager.java	Tue Jun 18 16:05:53 2013 +0900
@@ -58,12 +58,29 @@
 	}
 
 	@Override
+	public void quickPut(String key, Value val) {
+		Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, null);
+		connection.write(cmd); // put command is executed right now
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
+	
+	@Override
 	public void update(String key, Value val) {
 		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
 		connection.sendCommand(cmd);
 		if (logger.isDebugEnabled())
 			logger.debug(cmd.getCommandString());
 	}
+	
+	@Override
+	public void quickUpdate(String key, Value val) {
+		Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, null);
+		connection.write(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+		
+	}
 
 	@Override
 	public void take(Receiver receiver, String key, int index, CodeSegment cs) {
@@ -74,6 +91,16 @@
 		if (logger.isDebugEnabled())
 			logger.debug(cmd.getCommandString());
 	}
+	
+	@Override
+	public void quickTake(Receiver receiver, String key, int index, CodeSegment cs) {
+		int seq = this.seq.getAndIncrement();
+		Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null);
+		seqHash.put(seq, cmd);
+		connection.write(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+	}
 
 	@Override
 	public void peek(Receiver receiver, String key, int index, CodeSegment cs) {
@@ -84,6 +111,18 @@
 		if (logger.isDebugEnabled())
 			logger.debug(cmd.getCommandString());
 	}
+	
+
+	@Override
+	public void quickPeek(Receiver receiver, String key, int index, CodeSegment cs) {
+		int seq = this.seq.getAndIncrement();
+		Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null, true);
+		seqHash.put(seq, cmd);
+		connection.write(cmd);
+		if (logger.isDebugEnabled())
+			logger.debug(cmd.getCommandString());
+		
+	}
 
 	@Override
 	public void remove(String key) {
@@ -105,4 +144,5 @@
 		connection.sendCommand(cmd);
 	}
 
+
 }