diff src/alice/datasegment/DataSegmentKey.java @ 6:c78a1cc2cd8f

implements Reply
author one
date Thu, 12 Jan 2012 13:19:04 +0900
parents 80375ae09a1f
children 352eb19d837d
line wrap: on
line diff
--- a/src/alice/datasegment/DataSegmentKey.java	Wed Jan 11 23:28:02 2012 +0900
+++ b/src/alice/datasegment/DataSegmentKey.java	Thu Jan 12 13:19:04 2012 +0900
@@ -4,6 +4,7 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import alice.codesegment.Reply;
 import alice.datasegment.Command; 
 
 public class DataSegmentKey {
@@ -36,12 +37,16 @@
 							}
 						case PUT:
 							int index = tailIndex.getAndIncrement();
-							dataList.add(new DataSegmentValue(index, cmd.val));
+							DataSegmentValue dsv = new DataSegmentValue(index, cmd.val); 
+							dataList.add(dsv);
 							// run waiting peek and take
 							for (Command waitCmd : waitList) {
 								if (waitCmd.index < index) {
-									// TODO: make and send reply msg
-									
+									waitCmd.manager.replyQueue.put(new Reply(waitCmd.seq, index, cmd.val));
+									if (waitCmd.cmdType == CommandType.TAKE) { // delete data, if it run take cmd.
+										dataList.remove(dsv);
+										break;
+									}
 								}
 							}
 							break;
@@ -52,30 +57,25 @@
 							}
 							for (DataSegmentValue data : dataList) {
 								if (data.index > cmd.index) {
-									// TODO: make and send reply msg
-									
+									cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val));
 									break;
 								}
 							}
+							waitList.add(cmd);
 							break;
 						case TAKE:
 							if (cmd.index >= tailIndex.get()) {
 								waitList.add(cmd);
 								break;
 							}
-							boolean waitFlag = true;
 							for (DataSegmentValue data : dataList) {
 								if (data.index > cmd.index) {
-									// TODO: make and send reply msg
-									
-									
+									cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val));
 									dataList.remove(data);
-									waitFlag = false;
 									break;
 								}
 							}
-							if (waitFlag)
-								waitList.add(cmd);
+							waitList.add(cmd);
 							break;
 						case REMOVE:
 							// TODO: implements later
@@ -88,7 +88,7 @@
 				}
 			}
 		};
-		keyThread.run();
+		new Thread(keyThread).start();
 	};
 	
 }