2
|
1 package alice.datasegment;
|
|
2
|
3
|
3 import java.util.concurrent.atomic.AtomicInteger;
|
2
|
4
|
|
5 import org.msgpack.type.Value;
|
|
6
|
3
|
7 import alice.codesegment.CodeSegment;
|
|
8 import alice.datasegment.CommandType;
|
|
9
|
2
|
10 public class LocalDataSegmentManager extends DataSegmentManager {
|
|
11
|
6
|
12 private AtomicInteger seq = new AtomicInteger(1);
|
|
13 private Runnable replyThread = new Runnable() {
|
|
14
|
|
15 @Override
|
|
16 public void run() {
|
|
17 while (true) {
|
|
18 try {
|
|
19 Reply reply = replyQueue.take();
|
7
|
20 Command cmd = seqHash.get(reply.seq);
|
|
21 cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val));
|
6
|
22 } catch (InterruptedException e) {
|
|
23 e.printStackTrace();
|
|
24 }
|
|
25 }
|
|
26 }
|
|
27
|
|
28 };
|
|
29 public LocalDataSegmentManager() {
|
|
30 new Thread(replyThread).start();
|
|
31 }
|
3
|
32
|
|
33 private DataSegmentKey getDataSegmentKey(String key) {
|
|
34 DataSegmentKey newDataSegmentKey = new DataSegmentKey();
|
|
35 DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
|
8
|
36 if (dataSegmentKey == null) {
|
3
|
37 newDataSegmentKey.runKeyThread();
|
8
|
38 dataSegmentKey = newDataSegmentKey;
|
3
|
39 }
|
|
40 return dataSegmentKey;
|
|
41 }
|
|
42
|
2
|
43 @Override
|
|
44 public void put(String key, Value val) {
|
3
|
45 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
|
7
|
46 dataSegmentKey.addCommand(new Command(CommandType.PUT, null, val, 0, 0, this, null));
|
2
|
47 }
|
|
48
|
|
49 @Override
|
5
|
50 public void update(String key, Value val) {
|
|
51 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
|
7
|
52 dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, val, 0, 0, this, null));
|
5
|
53 }
|
|
54
|
|
55 @Override
|
7
|
56 public void take(String argKey, String key, int index, CodeSegment cs) {
|
3
|
57 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
|
|
58 int seq = this.seq.getAndIncrement();
|
7
|
59 Command cmd = new Command(CommandType.TAKE, argKey, null, index, seq, this, cs);
|
6
|
60 seqHash.put(seq, cmd);
|
|
61 dataSegmentKey.addCommand(cmd);
|
2
|
62 }
|
|
63
|
|
64 @Override
|
7
|
65 public void peek(String argKey, String key, int index, CodeSegment cs) {
|
3
|
66 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
|
|
67 int seq = this.seq.getAndIncrement();
|
7
|
68 Command cmd = new Command(CommandType.PEEK, argKey, null, index, seq, this, cs);
|
6
|
69 seqHash.put(seq, cmd);
|
|
70 dataSegmentKey.addCommand(cmd);
|
2
|
71 }
|
|
72
|
|
73 @Override
|
3
|
74 public void remove(String key) {
|
|
75 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
|
7
|
76 dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, 0, 0, this, null));
|
2
|
77 }
|
|
78
|
|
79 }
|