comparison src/alice/datasegment/DataSegmentKey.java @ 69:1d4f2b72fb31

delete KeyThread
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 21 Feb 2012 19:44:33 +0900
parents eb1714f41caf
children 4bfd81352cfa 8d3cb7e5fa57
comparison
equal deleted inserted replaced
68:d4c7f7b1096b 69:1d4f2b72fb31
1 package alice.datasegment; 1 package alice.datasegment;
2 2
3 import java.util.ArrayList; 3 import java.util.ArrayList;
4 import java.util.Iterator; 4 import java.util.Iterator;
5 import java.util.concurrent.LinkedBlockingQueue;
6 import java.util.concurrent.atomic.AtomicInteger; 5 import java.util.concurrent.atomic.AtomicInteger;
7 6
8 import alice.datasegment.Command; 7 import alice.datasegment.Command;
9 8
10 /** 9 /**
12 * @author kazz 11 * @author kazz
13 * 12 *
14 */ 13 */
15 public class DataSegmentKey { 14 public class DataSegmentKey {
16 15
17 private String key;
18 private LinkedBlockingQueue<Command> cmdQueue = new LinkedBlockingQueue<Command>();
19 private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>(); 16 private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
20 private ArrayList<Command> waitList = new ArrayList<Command>(); 17 private ArrayList<Command> waitList = new ArrayList<Command>();
21 private AtomicInteger tailIndex = new AtomicInteger(1); 18 private AtomicInteger tailIndex = new AtomicInteger(1);
22 private Thread keyThread;
23 19
24 public DataSegmentKey(String key) { 20 public void runCommand(Command cmd) {
25 this.key = key; 21 switch (cmd.type) {
26 } 22 case UPDATE:
27 23 if (dataList.size() != 0) {
28 public void addCommand(Command cmd) { 24 dataList.remove(0);
29 cmdQueue.add(cmd); 25 }
30 } 26 case PUT:
31 27 int index = tailIndex.getAndIncrement();
32 /** 28 DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey);
33 * too many threads are generated here 29 dataList.add(dsv);
34 * single scheduling queue and waiting queue can be used in future 30 // Process waiting peek and take commands
35 */ 31 for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
36 public void runKeyThread() { 32 Command waitCmd = iter.next();
37 this.keyThread = new Thread() { 33 if (waitCmd.index < index) {
38 @Override
39 public void run() {
40 while (true) {
41 try { 34 try {
42 Command cmd = cmdQueue.take(); 35 waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey));
43 switch (cmd.type) {
44 case UPDATE:
45 if (dataList.size() != 0) {
46 dataList.remove(0);
47 }
48 case PUT:
49 int index = tailIndex.getAndIncrement();
50 DataSegmentValue dsv = new DataSegmentValue(index, cmd.val, cmd.reverseKey);
51 dataList.add(dsv);
52 // Process waiting peek and take commands
53 for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
54 Command waitCmd = iter.next();
55 if (waitCmd.index < index) {
56 waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null, cmd.reverseKey));
57 iter.remove();
58 if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
59 dataList.remove(dsv);
60 break;
61 }
62 }
63 }
64 break;
65 case PEEK:
66 if (cmd.index >= tailIndex.get()) {
67 waitList.add(cmd);
68 break;
69 }
70 boolean waitFlag2 = true;
71 for (DataSegmentValue data : dataList) {
72 if (data.index > cmd.index) {
73 cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
74 waitFlag2 = false;
75 break;
76 }
77 }
78 if (waitFlag2)
79 waitList.add(cmd);
80 break;
81 case TAKE:
82 if (cmd.index >= tailIndex.get()) {
83 waitList.add(cmd);
84 break;
85 }
86 boolean waitFlag = true;
87 for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
88 DataSegmentValue data = iter.next();
89 if (data.index > cmd.index) {
90 cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
91 iter.remove();
92 waitFlag = false;
93 break;
94 }
95 }
96 if (waitFlag)
97 waitList.add(cmd);
98 break;
99 case REMOVE:
100 // TODO: implements later
101 break;
102 default:
103 }
104 } catch (InterruptedException e) { 36 } catch (InterruptedException e) {
105 e.printStackTrace(); 37 e.printStackTrace();
106 } 38 }
39 iter.remove();
40 if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
41 dataList.remove(dsv);
42 break;
43 }
107 } 44 }
108 } 45 }
109 }; 46 break;
110 keyThread.setName("DataSegmentKey-" + key); 47 case PEEK:
111 keyThread.start(); 48 if (cmd.index >= tailIndex.get()) {
49 waitList.add(cmd);
50 break;
51 }
52 boolean waitFlag2 = true;
53 for (DataSegmentValue data : dataList) {
54 if (data.index > cmd.index) {
55 try {
56 cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
57 } catch (InterruptedException e) {
58 e.printStackTrace();
59 }
60 waitFlag2 = false;
61 break;
62 }
63 }
64 if (waitFlag2)
65 waitList.add(cmd);
66 break;
67 case TAKE:
68 if (cmd.index >= tailIndex.get()) {
69 waitList.add(cmd);
70 break;
71 }
72 boolean waitFlag = true;
73 for (Iterator<DataSegmentValue> iter = dataList.iterator(); iter.hasNext(); ) {
74 DataSegmentValue data = iter.next();
75 if (data.index > cmd.index) {
76 try {
77 cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null, data.from));
78 } catch (InterruptedException e) {
79 e.printStackTrace();
80 }
81 iter.remove();
82 waitFlag = false;
83 break;
84 }
85 }
86 if (waitFlag)
87 waitList.add(cmd);
88 break;
89 case REMOVE:
90 // TODO: implements later
91 break;
92 default:
93 }
94
112 } 95 }
113 96
114 } 97 }