diff src/alice/datasegment/LocalDataSegmentManager.java @ 7:352eb19d837d

implements reply of LocalDataSegment
author one
date Thu, 12 Jan 2012 13:48:34 +0900
parents c78a1cc2cd8f
children 78b415d019de
line wrap: on
line diff
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Thu Jan 12 13:19:04 2012 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Thu Jan 12 13:48:34 2012 +0900
@@ -5,7 +5,6 @@
 import org.msgpack.type.Value;
 
 import alice.codesegment.CodeSegment;
-import alice.codesegment.Reply;
 import alice.datasegment.CommandType;
 
 public class LocalDataSegmentManager extends DataSegmentManager {
@@ -18,13 +17,12 @@
 			while (true) {
 				try {
 					Reply reply = replyQueue.take();
-					
+					Command cmd = seqHash.get(reply.seq);
+					cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val));
 				} catch (InterruptedException e) {
 					e.printStackTrace();
 				}
-				
 			}
-			
 		}
 		
 	};
@@ -44,29 +42,29 @@
 	@Override
 	public void put(String key, Value val) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.PUT, val, 0, 0, this));
+		dataSegmentKey.addCommand(new Command(CommandType.PUT, null, val, 0, 0, this, null));
 	}
 
 	@Override
 	public void update(String key, Value val) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.UPDATE, val, 0, 0, this));
+		dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, val, 0, 0, this, null));
 	}
 
 	@Override
-	public void take(String key, int index, CodeSegment cs) {
+	public void take(String argKey, String key, int index, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		int seq = this.seq.getAndIncrement();
-		Command cmd = new Command(CommandType.TAKE, null, index, seq, this);
+		Command cmd = new Command(CommandType.TAKE, argKey, null, index, seq, this, cs);
 		seqHash.put(seq, cmd);
 		dataSegmentKey.addCommand(cmd);
 	}
 
 	@Override
-	public void peek(String key, int index, CodeSegment cs) {
+	public void peek(String argKey, String key, int index, CodeSegment cs) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
 		int seq = this.seq.getAndIncrement();
-		Command cmd = new Command(CommandType.PEEK, null, index, seq, this);
+		Command cmd = new Command(CommandType.PEEK, argKey, null, index, seq, this, cs);
 		seqHash.put(seq, cmd);
 		dataSegmentKey.addCommand(cmd);
 	}
@@ -74,7 +72,7 @@
 	@Override
 	public void remove(String key) {
 		DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
-		dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, 0, 0, this));
+		dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, 0, 0, this, null));
 	}
 
 }