Mercurial > hg > Members > tatsuki > Alice
annotate src/alice/datasegment/DataSegmentKey.java @ 17:bb075e103cd3
bug fix for take()
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Sun, 15 Jan 2012 15:18:01 +0900 |
parents | e3f1b21718b0 |
children | 72dd27d952b0 |
rev | line source |
---|---|
2 | 1 package alice.datasegment; |
2 | |
3 | 3 import java.util.ArrayList; |
17 | 4 import java.util.Iterator; |
10 | 5 import java.util.LinkedList; |
3 | 6 import java.util.concurrent.LinkedBlockingQueue; |
7 import java.util.concurrent.atomic.AtomicInteger; | |
8 | |
9 import alice.datasegment.Command; | |
10 | |
2 | 11 public class DataSegmentKey { |
12 | |
3 | 13 private LinkedBlockingQueue<Command> cmdQueue = new LinkedBlockingQueue<Command>(); |
14 private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>(); | |
15 private ArrayList<Command> waitList = new ArrayList<Command>(); | |
16 private AtomicInteger tailIndex = new AtomicInteger(1); | |
17 private Runnable keyThread; | |
18 | |
19 public DataSegmentKey() { | |
20 | |
21 } | |
22 | |
23 public void addCommand(Command cmd) { | |
24 cmdQueue.add(cmd); | |
25 } | |
26 | |
27 public void runKeyThread() { | |
28 keyThread = new Runnable() { | |
29 @Override | |
30 public void run() { | |
31 while (true) { | |
32 try { | |
33 Command cmd = cmdQueue.take(); | |
13 | 34 switch (cmd.type) { |
5 | 35 case UPDATE: |
36 if (dataList.size() != 0) { | |
37 dataList.remove(0); | |
38 } | |
3 | 39 case PUT: |
40 int index = tailIndex.getAndIncrement(); | |
6 | 41 DataSegmentValue dsv = new DataSegmentValue(index, cmd.val); |
42 dataList.add(dsv); | |
3 | 43 // run waiting peek and take |
17 | 44 boolean takeFlag = true; |
45 for (Iterator<Command> iter = waitList.iterator(); iter.hasNext() && takeFlag; ) { | |
46 Command waitCmd = iter.next(); | |
3 | 47 if (waitCmd.index < index) { |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
48 waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null)); |
17 | 49 iter.remove(); |
13 | 50 if (waitCmd.type == CommandType.TAKE) { // delete data, if it run take cmd. |
6 | 51 dataList.remove(dsv); |
52 break; | |
53 } | |
3 | 54 } |
17 | 55 |
10 | 56 } |
3 | 57 break; |
58 case PEEK: | |
59 if (cmd.index >= tailIndex.get()) { | |
60 waitList.add(cmd); | |
61 break; | |
62 } | |
63 for (DataSegmentValue data : dataList) { | |
64 if (data.index > cmd.index) { | |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
65 cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null)); |
3 | 66 break; |
67 } | |
68 } | |
6 | 69 waitList.add(cmd); |
3 | 70 break; |
71 case TAKE: | |
72 if (cmd.index >= tailIndex.get()) { | |
73 waitList.add(cmd); | |
74 break; | |
75 } | |
17 | 76 boolean waitFlag = true; |
77 for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) { | |
78 DataSegmentValue data = iter.next(); | |
3 | 79 if (data.index > cmd.index) { |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
80 cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null)); |
17 | 81 iter.remove(); |
82 waitFlag = false; | |
3 | 83 break; |
84 } | |
85 } | |
17 | 86 if (waitFlag) |
87 waitList.add(cmd); | |
3 | 88 break; |
89 case REMOVE: | |
90 // TODO: implements later | |
91 break; | |
92 default: | |
93 } | |
94 } catch (InterruptedException e) { | |
95 e.printStackTrace(); | |
96 } | |
97 } | |
98 } | |
99 }; | |
6 | 100 new Thread(keyThread).start(); |
3 | 101 }; |
102 | |
2 | 103 } |