Mercurial > hg > Members > tatsuki > Alice
comparison src/alice/datasegment/DataSegmentKey.java @ 69:1d4f2b72fb31
delete KeyThread
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 21 Feb 2012 19:44:33 +0900 |
parents | eb1714f41caf |
children | 4bfd81352cfa 8d3cb7e5fa57 |
comparison
equal
deleted
inserted
replaced
68:d4c7f7b1096b | 69:1d4f2b72fb31 |
---|---|
1 package alice.datasegment; | 1 package alice.datasegment; |
2 | 2 |
3 import java.util.ArrayList; | 3 import java.util.ArrayList; |
4 import java.util.Iterator; | 4 import java.util.Iterator; |
5 import java.util.concurrent.LinkedBlockingQueue; | |
6 import java.util.concurrent.atomic.AtomicInteger; | 5 import java.util.concurrent.atomic.AtomicInteger; |
7 | 6 |
8 import alice.datasegment.Command; | 7 import alice.datasegment.Command; |
9 | 8 |
10 /** | 9 /** |
12 * @author kazz | 11 * @author kazz |
13 * | 12 * |
14 */ | 13 */ |
15 public class DataSegmentKey { | 14 public class DataSegmentKey { |
16 | 15 |
17 private String key; | |
18 private LinkedBlockingQueue<Command> cmdQueue = new LinkedBlockingQueue<Command>(); | |
19 private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>(); | 16 private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>(); |
20 private ArrayList<Command> waitList = new ArrayList<Command>(); | 17 private ArrayList<Command> waitList = new ArrayList<Command>(); |
21 private AtomicInteger tailIndex = new AtomicInteger(1); | 18 private AtomicInteger tailIndex = new AtomicInteger(1); |
22 private Thread keyThread; | |
23 | 19 |
24 public DataSegmentKey(String key) { | 20 public void runCommand(Command cmd) { |
25 this.key = key; | 21 switch (cmd.type) { |
26 } | 22 case UPDATE: |
27 | 23 if (dataList.size() != 0) { |
28 public void addCommand(Command cmd) { | 24 dataList.remove(0); |
29 cmdQueue.add(cmd); | 25 } |
30 } | 26 case PUT: |
31 | 27 int index = tailIndex.getAndIncrement(); |
32 /** | 28 DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey); |
33 * too many threads are generated here | 29 dataList.add(dsv); |
34 * single scheduling queue and waiting queue can be used in future | 30 // Process waiting peek and take commands |
35 */ | 31 for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) { |
36 public void runKeyThread() { | 32 Command waitCmd = iter.next(); |
37 this.keyThread = new Thread() { | 33 if (waitCmd.index < index) { |
38 @Override | |
39 public void run() { | |
40 while (true) { | |
41 try { | 34 try { |
42 Command cmd = cmdQueue.take(); | 35 waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey)); |
43 switch (cmd.type) { | |
44 case UPDATE: | |
45 if (dataList.size() != 0) { | |
46 dataList.remove(0); | |
47 } | |
48 case PUT: | |
49 int index = tailIndex.getAndIncrement(); | |
50 DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey); | |
51 dataList.add(dsv); | |
52 // Process waiting peek and take commands | |
53 for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) { | |
54 Command waitCmd = iter.next(); | |
55 if (waitCmd.index < index) { | |
56 waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey)); | |
57 iter.remove(); | |
58 if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command | |
59 dataList.remove(dsv); | |
60 break; | |
61 } | |
62 } | |
63 } | |
64 break; | |
65 case PEEK: | |
66 if (cmd.index >= tailIndex.get()) { | |
67 waitList.add(cmd); | |
68 break; | |
69 } | |
70 boolean waitFlag2 = true; | |
71 for (DataSegmentValue data : dataList) { | |
72 if (data.index > cmd.index) { | |
73 cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from)); | |
74 waitFlag2 = false; | |
75 break; | |
76 } | |
77 } | |
78 if (waitFlag2) | |
79 waitList.add(cmd); | |
80 break; | |
81 case TAKE: | |
82 if (cmd.index >= tailIndex.get()) { | |
83 waitList.add(cmd); | |
84 break; | |
85 } | |
86 boolean waitFlag = true; | |
87 for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) { | |
88 DataSegmentValue data = iter.next(); | |
89 if (data.index > cmd.index) { | |
90 cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from)); | |
91 iter.remove(); | |
92 waitFlag = false; | |
93 break; | |
94 } | |
95 } | |
96 if (waitFlag) | |
97 waitList.add(cmd); | |
98 break; | |
99 case REMOVE: | |
100 // TODO: implements later | |
101 break; | |
102 default: | |
103 } | |
104 } catch (InterruptedException e) { | 36 } catch (InterruptedException e) { |
105 e.printStackTrace(); | 37 e.printStackTrace(); |
106 } | 38 } |
39 iter.remove(); | |
40 if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command | |
41 dataList.remove(dsv); | |
42 break; | |
43 } | |
107 } | 44 } |
108 } | 45 } |
109 }; | 46 break; |
110 keyThread.setName("DataSegmentKey-" + key); | 47 case PEEK: |
111 keyThread.start(); | 48 if (cmd.index >= tailIndex.get()) { |
49 waitList.add(cmd); | |
50 break; | |
51 } | |
52 boolean waitFlag2 = true; | |
53 for (DataSegmentValue data : dataList) { | |
54 if (data.index > cmd.index) { | |
55 try { | |
56 cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from)); | |
57 } catch (InterruptedException e) { | |
58 e.printStackTrace(); | |
59 } | |
60 waitFlag2 = false; | |
61 break; | |
62 } | |
63 } | |
64 if (waitFlag2) | |
65 waitList.add(cmd); | |
66 break; | |
67 case TAKE: | |
68 if (cmd.index >= tailIndex.get()) { | |
69 waitList.add(cmd); | |
70 break; | |
71 } | |
72 boolean waitFlag = true; | |
73 for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) { | |
74 DataSegmentValue data = iter.next(); | |
75 if (data.index > cmd.index) { | |
76 try { | |
77 cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from)); | |
78 } catch (InterruptedException e) { | |
79 e.printStackTrace(); | |
80 } | |
81 iter.remove(); | |
82 waitFlag = false; | |
83 break; | |
84 } | |
85 } | |
86 if (waitFlag) | |
87 waitList.add(cmd); | |
88 break; | |
89 case REMOVE: | |
90 // TODO: implements later | |
91 break; | |
92 default: | |
93 } | |
94 | |
112 } | 95 } |
113 | 96 |
114 } | 97 } |