2
|
1 package alice.datasegment;
|
|
2
|
3
|
3 import java.util.concurrent.atomic.AtomicInteger;
|
2
|
4
|
|
5 import org.msgpack.type.Value;
|
|
6
|
3
|
7 import alice.codesegment.CodeSegment;
|
|
8 import alice.datasegment.CommandType;
|
|
9
|
2
|
10 public class LocalDataSegmentManager extends DataSegmentManager {
|
|
11
|
3
|
12 private AtomicInteger seq = new AtomicInteger(1);
|
|
13
|
|
14 private DataSegmentKey getDataSegmentKey(String key) {
|
|
15 DataSegmentKey newDataSegmentKey = new DataSegmentKey();
|
|
16 DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
|
|
17 if (dataSegmentKey == newDataSegmentKey) {
|
|
18 newDataSegmentKey.runKeyThread();
|
|
19 }
|
|
20 return dataSegmentKey;
|
|
21 }
|
|
22
|
2
|
23 @Override
|
|
24 public void put(String key, Value val) {
|
3
|
25 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
|
|
26 dataSegmentKey.addCommand(new Command(CommandType.PUT, val, 0, null, 0));
|
2
|
27 }
|
|
28
|
|
29 @Override
|
5
|
30 public void update(String key, Value val) {
|
|
31 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
|
|
32 dataSegmentKey.addCommand(new Command(CommandType.UPDATE, val, 0, null, 0));
|
|
33 }
|
|
34
|
|
35 @Override
|
3
|
36 public void take(String key, int index, CodeSegment cs) {
|
|
37 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
|
|
38 int seq = this.seq.getAndIncrement();
|
|
39 dataSegmentKey.addCommand(new Command(CommandType.TAKE, null, index, cs, seq));
|
2
|
40 }
|
|
41
|
|
42 @Override
|
3
|
43 public void peek(String key, int index, CodeSegment cs) {
|
|
44 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
|
|
45 int seq = this.seq.getAndIncrement();
|
|
46 dataSegmentKey.addCommand(new Command(CommandType.PEEK, null, index, cs, seq));
|
2
|
47 }
|
|
48
|
|
49 @Override
|
3
|
50 public void remove(String key) {
|
|
51 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
|
|
52 dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, 0, null, 0));
|
2
|
53 }
|
|
54
|
|
55 }
|