345
|
1 package alice.datasegment;
|
|
2
|
|
3 import java.util.concurrent.ConcurrentHashMap;
|
|
4 import java.util.concurrent.LinkedBlockingQueue;
|
|
5 import java.util.concurrent.ThreadPoolExecutor;
|
|
6 import java.util.concurrent.TimeUnit;
|
|
7
|
|
8 import org.apache.log4j.Logger;
|
|
9
|
|
10 import alice.codesegment.CodeSegment;
|
|
11
|
|
12 public class LocalDataSegmentManager extends DataSegmentManager {
|
419
|
13
|
|
14 private String reverseKey = "local";
|
|
15 private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
|
|
16 private Logger logger = Logger.getLogger("local");
|
|
17
|
|
18 private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads
|
|
19 Runtime.getRuntime().availableProcessors(),
|
|
20 Integer.MAX_VALUE, // keepAliveTime
|
|
21 TimeUnit.SECONDS,
|
|
22 new LinkedBlockingQueue<Runnable>());
|
|
23
|
|
24 public LocalDataSegmentManager() {
|
|
25 new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start();
|
|
26 }
|
|
27
|
|
28 public void setReverseKey(String s){
|
|
29 reverseKey = s;
|
|
30 }
|
|
31
|
|
32 private class RunCommand implements Runnable {
|
|
33
|
|
34 DataSegmentKey key;
|
|
35 Command cmd;
|
|
36
|
|
37 public RunCommand(DataSegmentKey key, Command cmd) {
|
|
38 this.key = key;
|
|
39 this.cmd = cmd;
|
|
40 }
|
|
41
|
|
42 @Override
|
|
43 public void run() {
|
|
44 key.runCommand(cmd);
|
|
45 }
|
|
46
|
|
47 }
|
345
|
48
|
419
|
49 public void submitCommand(DataSegmentKey key, Command cmd) {
|
|
50 dataSegmentExecutor.execute(new RunCommand(key, cmd));
|
|
51 }
|
|
52
|
|
53 public DataSegmentKey getDataSegmentKey(String key) {
|
|
54 DataSegmentKey dsKey = dataSegments.get(key);
|
|
55 if (dsKey != null)
|
|
56 return dsKey;
|
|
57 if (key == null)
|
|
58 return null;
|
|
59 DataSegmentKey newDataSegmentKey = new DataSegmentKey();
|
|
60 DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
|
|
61 if (dataSegmentKey == null) {
|
|
62 dataSegmentKey = newDataSegmentKey;
|
|
63 }
|
|
64 return dataSegmentKey;
|
|
65 }
|
|
66
|
|
67 @Override
|
|
68 public void put(String key, Object val) {
|
|
69 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
|
|
70 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, reverseKey);
|
|
71 dataSegmentKey.runCommand(cmd);
|
|
72 if (logger.isDebugEnabled())
|
|
73 logger.debug(cmd.getCommandString());
|
|
74 }
|
|
75
|
|
76 @Override
|
|
77 public void quickPut(String key, Object val) {
|
|
78 put(key, val);
|
|
79 }
|
|
80
|
|
81 /**
|
|
82 * Enqueue update command to the queue of each DataSegment key
|
|
83 */
|
345
|
84
|
419
|
85 @Override
|
|
86 public void update(String key, Object val) {
|
|
87 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
|
|
88 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey);
|
|
89 dataSegmentKey.runCommand(cmd);
|
|
90 if (logger.isDebugEnabled())
|
|
91 logger.debug(cmd.getCommandString());
|
|
92 }
|
|
93
|
|
94 @Override
|
|
95 public void quickUpdate(String key, Object val) {
|
|
96 update(key, val);
|
|
97 }
|
|
98
|
|
99 @Override
|
|
100 public void take(Receiver receiver, CodeSegment cs) {
|
|
101 DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
|
|
102 int seq = this.seq.getAndIncrement();
|
|
103 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
|
|
104 dataSegmentKey.runCommand(cmd);
|
|
105 if (logger.isDebugEnabled())
|
|
106 logger.debug(cmd.getCommandString());
|
|
107 }
|
|
108
|
|
109 @Override
|
|
110 public void quickTake(Receiver receiver, CodeSegment cs) {
|
|
111 take(receiver, cs);
|
|
112 }
|
|
113
|
|
114 @Override
|
|
115 public void peek(Receiver receiver, CodeSegment cs) {
|
|
116 DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
|
|
117 int seq = this.seq.getAndIncrement();
|
|
118 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
|
|
119 dataSegmentKey.runCommand(cmd);
|
|
120 if (logger.isDebugEnabled())
|
|
121 logger.debug(cmd.getCommandString());
|
|
122 }
|
345
|
123
|
419
|
124 @Override
|
|
125 public void quickPeek(Receiver receiver, CodeSegment cs) {
|
|
126 peek(receiver, cs);
|
|
127 }
|
|
128
|
|
129 @Override
|
|
130 public void remove(String key) {
|
|
131 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
|
|
132 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null);
|
|
133 dataSegmentKey.runCommand(cmd);
|
|
134 if (logger.isDebugEnabled())
|
|
135 logger.debug(cmd.getCommandString());
|
|
136 }
|
|
137
|
|
138 @Override public void finish() {
|
|
139 System.exit(0);
|
|
140 }
|
|
141
|
|
142 @Override
|
|
143 public void close() {
|
|
144
|
|
145 }
|
345
|
146
|
419
|
147 public void recommand(Receiver receiver, CodeSegment cs) {
|
|
148 DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
|
|
149 int seq = this.seq.getAndIncrement();
|
|
150 Command cmd = new Command(receiver.type, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
|
|
151 dataSegmentKey.runCommand(cmd);
|
|
152 if (logger.isDebugEnabled())
|
|
153 logger.debug(cmd.getCommandString());
|
345
|
154
|
419
|
155 }
|
345
|
156
|
419
|
157 @Override
|
|
158 public void ping(String returnKey) {
|
|
159
|
|
160 }
|
345
|
161
|
419
|
162 @Override
|
|
163 public void response(String returnKey) {
|
|
164
|
|
165 }
|
345
|
166
|
419
|
167 @Override
|
|
168 public void shutdown() {
|
345
|
169
|
419
|
170 }
|
345
|
171
|
|
172 }
|