comparison src/alice/datasegment/DataSegmentKey.java @ 6:c78a1cc2cd8f

implements Reply
author one
date Thu, 12 Jan 2012 13:19:04 +0900
parents 80375ae09a1f
children 352eb19d837d
comparison
equal deleted inserted replaced
5:80375ae09a1f 6:c78a1cc2cd8f
2 2
3 import java.util.ArrayList; 3 import java.util.ArrayList;
4 import java.util.concurrent.LinkedBlockingQueue; 4 import java.util.concurrent.LinkedBlockingQueue;
5 import java.util.concurrent.atomic.AtomicInteger; 5 import java.util.concurrent.atomic.AtomicInteger;
6 6
7 import alice.codesegment.Reply;
7 import alice.datasegment.Command; 8 import alice.datasegment.Command;
8 9
9 public class DataSegmentKey { 10 public class DataSegmentKey {
10 11
11 private LinkedBlockingQueue<Command> cmdQueue = new LinkedBlockingQueue<Command>(); 12 private LinkedBlockingQueue<Command> cmdQueue = new LinkedBlockingQueue<Command>();
34 if (dataList.size() != 0) { 35 if (dataList.size() != 0) {
35 dataList.remove(0); 36 dataList.remove(0);
36 } 37 }
37 case PUT: 38 case PUT:
38 int index = tailIndex.getAndIncrement(); 39 int index = tailIndex.getAndIncrement();
39 dataList.add(new DataSegmentValue(index, cmd.val)); 40 DataSegmentValue dsv = new DataSegmentValue(index, cmd.val);
41 dataList.add(dsv);
40 // run waiting peek and take 42 // run waiting peek and take
41 for (Command waitCmd : waitList) { 43 for (Command waitCmd : waitList) {
42 if (waitCmd.index < index) { 44 if (waitCmd.index < index) {
43 // TODO: make and send reply msg 45 waitCmd.manager.replyQueue.put(new Reply(waitCmd.seq, index, cmd.val));
44 46 if (waitCmd.cmdType == CommandType.TAKE) { // delete data, if it run take cmd.
47 dataList.remove(dsv);
48 break;
49 }
45 } 50 }
46 } 51 }
47 break; 52 break;
48 case PEEK: 53 case PEEK:
49 if (cmd.index >= tailIndex.get()) { 54 if (cmd.index >= tailIndex.get()) {
50 waitList.add(cmd); 55 waitList.add(cmd);
51 break; 56 break;
52 } 57 }
53 for (DataSegmentValue data : dataList) { 58 for (DataSegmentValue data : dataList) {
54 if (data.index > cmd.index) { 59 if (data.index > cmd.index) {
55 // TODO: make and send reply msg 60 cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val));
56
57 break; 61 break;
58 } 62 }
59 } 63 }
64 waitList.add(cmd);
60 break; 65 break;
61 case TAKE: 66 case TAKE:
62 if (cmd.index >= tailIndex.get()) { 67 if (cmd.index >= tailIndex.get()) {
63 waitList.add(cmd); 68 waitList.add(cmd);
64 break; 69 break;
65 } 70 }
66 boolean waitFlag = true;
67 for (DataSegmentValue data : dataList) { 71 for (DataSegmentValue data : dataList) {
68 if (data.index > cmd.index) { 72 if (data.index > cmd.index) {
69 // TODO: make and send reply msg 73 cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val));
70
71
72 dataList.remove(data); 74 dataList.remove(data);
73 waitFlag = false;
74 break; 75 break;
75 } 76 }
76 } 77 }
77 if (waitFlag) 78 waitList.add(cmd);
78 waitList.add(cmd);
79 break; 79 break;
80 case REMOVE: 80 case REMOVE:
81 // TODO: implements later 81 // TODO: implements later
82 break; 82 break;
83 default: 83 default:
86 e.printStackTrace(); 86 e.printStackTrace();
87 } 87 }
88 } 88 }
89 } 89 }
90 }; 90 };
91 keyThread.run(); 91 new Thread(keyThread).start();
92 }; 92 };
93 93
94 } 94 }