345
|
1 package alice.datasegment;
|
|
2
|
|
3 import java.util.concurrent.ConcurrentHashMap;
|
|
4 import java.util.concurrent.LinkedBlockingQueue;
|
|
5 import java.util.concurrent.ThreadPoolExecutor;
|
|
6 import java.util.concurrent.TimeUnit;
|
|
7
|
|
8 import org.apache.log4j.Logger;
|
|
9
|
|
10 import alice.codesegment.CodeSegment;
|
|
11
|
|
12 public class LocalDataSegmentManager extends DataSegmentManager {
|
|
13
|
|
14 private String reverseKey = "local";
|
|
15 private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
|
|
16 private Logger logger = Logger.getLogger("local");
|
|
17
|
|
18 private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads
|
|
19 Runtime.getRuntime().availableProcessors(),
|
|
20 Integer.MAX_VALUE, // keepAliveTime
|
|
21 TimeUnit.SECONDS,
|
|
22 new LinkedBlockingQueue<Runnable>());
|
|
23
|
|
24 public LocalDataSegmentManager() {
|
|
25 new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start();
|
|
26 }
|
|
27
|
|
28 private class RunCommand implements Runnable {
|
|
29
|
|
30 DataSegmentKey key;
|
|
31 Command cmd;
|
|
32
|
|
33 public RunCommand(DataSegmentKey key, Command cmd) {
|
|
34 this.key = key;
|
|
35 this.cmd = cmd;
|
|
36 }
|
|
37
|
|
38 @Override
|
|
39 public void run() {
|
|
40 key.runCommand(cmd);
|
|
41 }
|
|
42
|
|
43 }
|
|
44
|
|
45 public void submitCommand(DataSegmentKey key, Command cmd) {
|
|
46 dataSegmentExecutor.execute(new RunCommand(key, cmd));
|
|
47 }
|
|
48
|
|
49 public DataSegmentKey getDataSegmentKey(String key) {
|
|
50 DataSegmentKey dsKey = dataSegments.get(key);
|
|
51 if (dsKey != null)
|
|
52 return dsKey;
|
|
53 if (key == null)
|
|
54 return null;
|
|
55 DataSegmentKey newDataSegmentKey = new DataSegmentKey();
|
|
56 DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
|
|
57 if (dataSegmentKey == null) {
|
|
58 dataSegmentKey = newDataSegmentKey;
|
|
59 }
|
|
60 return dataSegmentKey;
|
|
61 }
|
|
62
|
|
63 @Override
|
|
64 public void put(String key, Object val) {
|
|
65 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
|
|
66 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, reverseKey);
|
|
67 dataSegmentKey.runCommand(cmd);
|
|
68 if (logger.isDebugEnabled())
|
|
69 logger.debug(cmd.getCommandString());
|
|
70 }
|
|
71
|
|
72 @Override
|
|
73 public void quickPut(String key, Object val) {
|
|
74 put(key, val);
|
|
75 }
|
|
76
|
|
77 /**
|
|
78 * Enqueue update command to the queue of each DataSegment key
|
|
79 */
|
|
80
|
|
81 @Override
|
|
82 public void update(String key, Object val) {
|
|
83 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
|
|
84 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey);
|
|
85 dataSegmentKey.runCommand(cmd);
|
|
86 if (logger.isDebugEnabled())
|
|
87 logger.debug(cmd.getCommandString());
|
|
88 }
|
|
89
|
|
90
|
|
91 @Override
|
|
92 public void quickUpdate(String key, Object val) {
|
|
93 update(key, val);
|
|
94 }
|
|
95
|
|
96
|
|
97
|
|
98 @Override
|
|
99 public void take(Receiver receiver, CodeSegment cs) {
|
|
100 DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
|
|
101 int seq = this.seq.getAndIncrement();
|
|
102 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
|
|
103 dataSegmentKey.runCommand(cmd);
|
|
104 if (logger.isDebugEnabled())
|
|
105 logger.debug(cmd.getCommandString());
|
|
106 }
|
|
107
|
|
108 @Override
|
|
109 public void quickTake(Receiver receiver, CodeSegment cs) {
|
|
110 take(receiver, cs);
|
|
111 }
|
|
112
|
|
113 @Override
|
|
114 public void peek(Receiver receiver, CodeSegment cs) {
|
|
115 DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
|
|
116 int seq = this.seq.getAndIncrement();
|
|
117 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
|
|
118 dataSegmentKey.runCommand(cmd);
|
|
119 if (logger.isDebugEnabled())
|
|
120 logger.debug(cmd.getCommandString());
|
|
121 }
|
|
122
|
|
123 @Override
|
|
124 public void quickPeek(Receiver receiver, CodeSegment cs) {
|
|
125 peek(receiver, cs);
|
|
126 }
|
|
127
|
|
128
|
|
129 @Override
|
|
130 public void remove(String key) {
|
|
131 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
|
|
132 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null);
|
|
133 dataSegmentKey.runCommand(cmd);
|
|
134 if (logger.isDebugEnabled())
|
|
135 logger.debug(cmd.getCommandString());
|
|
136 }
|
|
137
|
|
138 @Override public void finish() {
|
|
139 System.exit(0);
|
|
140 }
|
|
141
|
|
142 @Override
|
|
143 public void close() {
|
|
144
|
|
145 }
|
|
146
|
|
147 public void recommand(Receiver receiver, CodeSegment cs) {
|
|
148 DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key);
|
|
149 int seq = this.seq.getAndIncrement();
|
|
150 Command cmd = new Command(receiver.type, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null);
|
|
151 dataSegmentKey.runCommand(cmd);
|
|
152 if (logger.isDebugEnabled())
|
|
153 logger.debug(cmd.getCommandString());
|
|
154
|
|
155 }
|
|
156
|
|
157 @Override
|
|
158 public void ping(String returnKey) {
|
|
159
|
|
160 }
|
|
161
|
|
162 @Override
|
|
163 public void response(String returnKey) {
|
|
164
|
|
165 }
|
|
166
|
|
167 @Override
|
350
|
168 public void shutdown() {
|
345
|
169
|
|
170 }
|
|
171
|
|
172 }
|