2
|
1 package alice.datasegment;
|
|
2
|
3
|
3 import java.util.ArrayList;
|
|
4 import java.util.concurrent.LinkedBlockingQueue;
|
|
5 import java.util.concurrent.atomic.AtomicInteger;
|
|
6
|
6
|
7 import alice.codesegment.Reply;
|
3
|
8 import alice.datasegment.Command;
|
|
9
|
2
|
10 public class DataSegmentKey {
|
|
11
|
3
|
12 private LinkedBlockingQueue<Command> cmdQueue = new LinkedBlockingQueue<Command>();
|
|
13 private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
|
|
14 private ArrayList<Command> waitList = new ArrayList<Command>();
|
|
15 private AtomicInteger tailIndex = new AtomicInteger(1);
|
|
16 private Runnable keyThread;
|
|
17
|
|
18 public DataSegmentKey() {
|
|
19
|
|
20 }
|
|
21
|
|
22 public void addCommand(Command cmd) {
|
|
23 cmdQueue.add(cmd);
|
|
24 }
|
|
25
|
|
26 public void runKeyThread() {
|
|
27 keyThread = new Runnable() {
|
|
28 @Override
|
|
29 public void run() {
|
|
30 while (true) {
|
|
31 try {
|
|
32 Command cmd = cmdQueue.take();
|
|
33 switch (cmd.cmdType) {
|
5
|
34 case UPDATE:
|
|
35 if (dataList.size() != 0) {
|
|
36 dataList.remove(0);
|
|
37 }
|
3
|
38 case PUT:
|
|
39 int index = tailIndex.getAndIncrement();
|
6
|
40 DataSegmentValue dsv = new DataSegmentValue(index, cmd.val);
|
|
41 dataList.add(dsv);
|
3
|
42 // run waiting peek and take
|
|
43 for (Command waitCmd : waitList) {
|
|
44 if (waitCmd.index < index) {
|
6
|
45 waitCmd.manager.replyQueue.put(new Reply(waitCmd.seq, index, cmd.val));
|
|
46 if (waitCmd.cmdType == CommandType.TAKE) { // delete data, if it run take cmd.
|
|
47 dataList.remove(dsv);
|
|
48 break;
|
|
49 }
|
3
|
50 }
|
|
51 }
|
|
52 break;
|
|
53 case PEEK:
|
|
54 if (cmd.index >= tailIndex.get()) {
|
|
55 waitList.add(cmd);
|
|
56 break;
|
|
57 }
|
|
58 for (DataSegmentValue data : dataList) {
|
|
59 if (data.index > cmd.index) {
|
6
|
60 cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val));
|
3
|
61 break;
|
|
62 }
|
|
63 }
|
6
|
64 waitList.add(cmd);
|
3
|
65 break;
|
|
66 case TAKE:
|
|
67 if (cmd.index >= tailIndex.get()) {
|
|
68 waitList.add(cmd);
|
|
69 break;
|
|
70 }
|
|
71 for (DataSegmentValue data : dataList) {
|
|
72 if (data.index > cmd.index) {
|
6
|
73 cmd.manager.replyQueue.put(new Reply(cmd.seq, data.index, data.val));
|
3
|
74 dataList.remove(data);
|
|
75 break;
|
|
76 }
|
|
77 }
|
6
|
78 waitList.add(cmd);
|
3
|
79 break;
|
|
80 case REMOVE:
|
|
81 // TODO: implements later
|
|
82 break;
|
|
83 default:
|
|
84 }
|
|
85 } catch (InterruptedException e) {
|
|
86 e.printStackTrace();
|
|
87 }
|
|
88 }
|
|
89 }
|
|
90 };
|
6
|
91 new Thread(keyThread).start();
|
3
|
92 };
|
|
93
|
2
|
94 }
|