Mercurial > hg > Database > Alice
annotate src/main/java/alice/datasegment/LocalDataSegmentManager.java @ 650:4289b232b3fd
nulValue
author | suruga |
---|---|
date | Fri, 02 Feb 2018 18:26:49 +0900 |
parents | 8c17a9e66cc7 |
children | e91a574b69de 80a6c4a1c601 ffa43f252492 |
rev | line source |
---|---|
345 | 1 package alice.datasegment; |
2 | |
3 import java.util.concurrent.ConcurrentHashMap; | |
4 import java.util.concurrent.LinkedBlockingQueue; | |
5 import java.util.concurrent.ThreadPoolExecutor; | |
6 import java.util.concurrent.TimeUnit; | |
7 | |
8 import org.apache.log4j.Logger; | |
9 | |
10 import alice.codesegment.CodeSegment; | |
11 | |
12 public class LocalDataSegmentManager extends DataSegmentManager { | |
419 | 13 |
14 private String reverseKey = "local"; | |
15 private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); | |
16 private Logger logger = Logger.getLogger("local"); | |
17 | |
18 private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads | |
19 Runtime.getRuntime().availableProcessors(), | |
20 Integer.MAX_VALUE, // keepAliveTime | |
21 TimeUnit.SECONDS, | |
22 new LinkedBlockingQueue<Runnable>()); | |
23 | |
24 public LocalDataSegmentManager() { | |
25 new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); | |
26 } | |
27 | |
28 public void setReverseKey(String s){ | |
29 reverseKey = s; | |
30 } | |
31 | |
32 private class RunCommand implements Runnable { | |
33 | |
34 DataSegmentKey key; | |
35 Command cmd; | |
36 | |
37 public RunCommand(DataSegmentKey key, Command cmd) { | |
38 this.key = key; | |
39 this.cmd = cmd; | |
40 } | |
41 | |
42 @Override | |
43 public void run() { | |
44 key.runCommand(cmd); | |
45 } | |
46 | |
47 } | |
345 | 48 |
419 | 49 public void submitCommand(DataSegmentKey key, Command cmd) { |
50 dataSegmentExecutor.execute(new RunCommand(key, cmd)); | |
51 } | |
52 | |
53 public DataSegmentKey getDataSegmentKey(String key) { | |
54 DataSegmentKey dsKey = dataSegments.get(key); | |
55 if (dsKey != null) | |
56 return dsKey; | |
57 if (key == null) | |
58 return null; | |
59 DataSegmentKey newDataSegmentKey = new DataSegmentKey(); | |
60 DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); | |
61 if (dataSegmentKey == null) { | |
62 dataSegmentKey = newDataSegmentKey; | |
63 } | |
64 return dataSegmentKey; | |
65 } | |
66 | |
491 | 67 public void removeDataSegmentKey(String key) { |
68 if (key!=null) | |
69 dataSegments.remove(key); | |
70 } | |
71 | |
419 | 72 @Override |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
529
diff
changeset
|
73 public void put(String key, ReceiveData rData, boolean quickFlag) { |
538
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
74 Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey); |
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
75 put1(key, cmd); |
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
76 } |
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
77 |
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
78 public void put1(String key, Command cmd) { |
419 | 79 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
80 dataSegmentKey.runCommand(cmd); | |
81 if (logger.isDebugEnabled()) | |
82 logger.debug(cmd.getCommandString()); | |
83 } | |
84 | |
85 /** | |
86 * Enqueue update command to the queue of each DataSegment key | |
87 */ | |
345 | 88 |
419 | 89 @Override |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
529
diff
changeset
|
90 public void update(String key, ReceiveData rData, boolean quickFlag) { |
458 | 91 Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey); |
538
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
92 put1(key, cmd); |
419 | 93 } |
94 | |
95 @Override | |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
529
diff
changeset
|
96 public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { |
538
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
97 |
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
98 Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); |
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
99 take1(receiver, cmd); |
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
100 } |
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
101 |
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
102 public void take1(Receiver receiver, Command cmd) { |
419 | 103 int seq = this.seq.getAndIncrement(); |
538
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
104 cmd.setSeq(seq); |
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
105 //seqHash.put(seq, cmd); |
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
106 DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); |
419 | 107 dataSegmentKey.runCommand(cmd); |
108 if (logger.isDebugEnabled()) | |
109 logger.debug(cmd.getCommandString()); | |
110 } | |
111 | |
112 @Override | |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
529
diff
changeset
|
113 public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { |
538
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
114 Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, 0, replyQueue, cs, null); |
8c17a9e66cc7
Compressed LDSM refactoring & flip refactoring
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
533
diff
changeset
|
115 take1(receiver, cmd); |
419 | 116 } |
345 | 117 |
419 | 118 @Override |
119 public void remove(String key) { | |
120 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); | |
121 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); | |
122 dataSegmentKey.runCommand(cmd); | |
123 if (logger.isDebugEnabled()) | |
124 logger.debug(cmd.getCommandString()); | |
125 } | |
126 | |
127 @Override public void finish() { | |
128 System.exit(0); | |
129 } | |
130 | |
131 @Override | |
132 public void close() { | |
133 | |
134 } | |
345 | 135 |
419 | 136 public void recommand(Receiver receiver, CodeSegment cs) { |
137 DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); | |
138 int seq = this.seq.getAndIncrement(); | |
139 Command cmd = new Command(receiver.type, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); | |
140 dataSegmentKey.runCommand(cmd); | |
141 if (logger.isDebugEnabled()) | |
142 logger.debug(cmd.getCommandString()); | |
143 } | |
345 | 144 |
419 | 145 @Override |
146 public void ping(String returnKey) { | |
147 | |
148 } | |
345 | 149 |
419 | 150 @Override |
151 public void response(String returnKey) { | |
152 | |
153 } | |
345 | 154 |
419 | 155 @Override |
156 public void shutdown() { | |
345 | 157 |
419 | 158 } |
345 | 159 |
483 | 160 @Override |
161 public void setSendError(boolean b) { | |
162 | |
163 } | |
345 | 164 } |