2
|
1 package alice.datasegment;
|
|
2
|
3
|
3 import java.util.ArrayList;
|
17
|
4 import java.util.Iterator;
|
3
|
5 import java.util.concurrent.atomic.AtomicInteger;
|
|
6
|
|
7 import alice.datasegment.Command;
|
|
8
|
57
|
9 /**
|
|
10 * Synchronized DataSegment for each DataSegment key
|
|
11 * @author kazz
|
|
12 *
|
|
13 */
|
2
|
14 public class DataSegmentKey {
|
|
15
|
3
|
16 private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
|
|
17 private ArrayList<Command> waitList = new ArrayList<Command>();
|
|
18 private AtomicInteger tailIndex = new AtomicInteger(1);
|
|
19
|
69
|
20 public void runCommand(Command cmd) {
|
|
21 switch (cmd.type) {
|
|
22 case UPDATE:
|
|
23 if (dataList.size() != 0) {
|
|
24 dataList.remove(0);
|
|
25 }
|
|
26 case PUT:
|
|
27 int index = tailIndex.getAndIncrement();
|
|
28 DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey);
|
|
29 dataList.add(dsv);
|
|
30 // Process waiting peek and take commands
|
|
31 for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
|
|
32 Command waitCmd = iter.next();
|
|
33 if (waitCmd.index < index) {
|
3
|
34 try {
|
69
|
35 waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey));
|
3
|
36 } catch (InterruptedException e) {
|
|
37 e.printStackTrace();
|
|
38 }
|
69
|
39 iter.remove();
|
|
40 if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
|
|
41 dataList.remove(dsv);
|
|
42 break;
|
|
43 }
|
|
44 }
|
|
45 }
|
|
46 break;
|
|
47 case PEEK:
|
|
48 if (cmd.index >= tailIndex.get()) {
|
|
49 waitList.add(cmd);
|
|
50 break;
|
|
51 }
|
|
52 boolean waitFlag2 = true;
|
|
53 for (DataSegmentValue data : dataList) {
|
|
54 if (data.index > cmd.index) {
|
|
55 try {
|
|
56 cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
|
|
57 } catch (InterruptedException e) {
|
|
58 e.printStackTrace();
|
|
59 }
|
|
60 waitFlag2 = false;
|
|
61 break;
|
3
|
62 }
|
|
63 }
|
69
|
64 if (waitFlag2)
|
|
65 waitList.add(cmd);
|
|
66 break;
|
|
67 case TAKE:
|
|
68 if (cmd.index >= tailIndex.get()) {
|
|
69 waitList.add(cmd);
|
|
70 break;
|
|
71 }
|
|
72 boolean waitFlag = true;
|
|
73 for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
|
|
74 DataSegmentValue data = iter.next();
|
|
75 if (data.index > cmd.index) {
|
|
76 try {
|
|
77 cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
|
|
78 } catch (InterruptedException e) {
|
|
79 e.printStackTrace();
|
|
80 }
|
|
81 iter.remove();
|
|
82 waitFlag = false;
|
|
83 break;
|
|
84 }
|
|
85 }
|
|
86 if (waitFlag)
|
|
87 waitList.add(cmd);
|
|
88 break;
|
|
89 case REMOVE:
|
|
90 // TODO: implements later
|
|
91 break;
|
|
92 default:
|
|
93 }
|
|
94
|
20
|
95 }
|
3
|
96
|
2
|
97 }
|