Mercurial > hg > Database > Alice
annotate src/alice/datasegment/LocalDataSegmentManager.java @ 225:bc061ee5f31f
bitonic sort work but data is not sorted
author | sugi |
---|---|
date | Thu, 28 Mar 2013 18:10:24 +0900 |
parents | 409d7679cf7b |
children | ca1c9c477f54 |
rev | line source |
---|---|
2 | 1 package alice.datasegment; |
2 | |
56 | 3 import java.util.concurrent.ConcurrentHashMap; |
69 | 4 import java.util.concurrent.LinkedBlockingQueue; |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
5 import java.util.concurrent.ThreadPoolExecutor; |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
6 import java.util.concurrent.TimeUnit; |
56 | 7 |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
8 import org.apache.log4j.Logger; |
2 | 9 import org.msgpack.type.Value; |
10 | |
3 | 11 import alice.codesegment.CodeSegment; |
12 | |
2 | 13 public class LocalDataSegmentManager extends DataSegmentManager { |
14 | |
56 | 15 private String reverseKey = "local"; |
16 private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); | |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
17 private Logger logger = Logger.getLogger("local"); |
69 | 18 |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
19 private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
20 Runtime.getRuntime().availableProcessors(), |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
21 Integer.MAX_VALUE, // keepAliveTime |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
22 TimeUnit.SECONDS, |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
23 new LinkedBlockingQueue<Runnable>()); |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
24 |
172 | 25 public LocalDataSegmentManager() { |
26 new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); | |
27 } | |
69 | 28 |
172 | 29 private class RunCommand implements Runnable { |
30 | |
31 DataSegmentKey key; | |
32 Command cmd; | |
33 | |
34 public RunCommand(DataSegmentKey key, Command cmd) { | |
35 this.key = key; | |
36 this.cmd = cmd; | |
69 | 37 } |
38 | |
39 @Override | |
40 public void run() { | |
172 | 41 key.runCommand(cmd); |
69 | 42 } |
43 | |
44 } | |
172 | 45 |
46 public void submitCommand(DataSegmentKey key, Command cmd) { | |
47 dataSegmentExecutor.execute(new RunCommand(key, cmd)); | |
6 | 48 } |
3 | 49 |
13 | 50 public DataSegmentKey getDataSegmentKey(String key) { |
64 | 51 DataSegmentKey dsKey = dataSegments.get(key); |
63 | 52 if (dsKey != null) |
53 return dsKey; | |
54 if (key == null) | |
16 | 55 return null; |
69 | 56 DataSegmentKey newDataSegmentKey = new DataSegmentKey(); |
3 | 57 DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); |
8 | 58 if (dataSegmentKey == null) { |
59 dataSegmentKey = newDataSegmentKey; | |
3 | 60 } |
61 return dataSegmentKey; | |
62 } | |
63 | |
2 | 64 @Override |
132 | 65 public void put(String key, Value val) { |
3 | 66 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
202 | 67 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, reverseKey); |
225 | 68 submitCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
69 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
70 logger.debug(cmd.getCommandString()); |
2 | 71 } |
57 | 72 |
215 | 73 public void putObject(String key, Object obj) { |
190 | 74 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
202 | 75 Command cmd = new Command(CommandType.PUT, null, key, obj, 0, 0, null, null, reverseKey); |
225 | 76 submitCommand(dataSegmentKey, cmd); |
190 | 77 if (logger.isDebugEnabled()) |
78 logger.debug(cmd.getCommandString()); | |
79 } | |
80 | |
57 | 81 /** |
82 * Enqueue update command to the queue of each DataSegment key | |
83 */ | |
2 | 84 @Override |
132 | 85 public void update(String key, Value val) { |
5 | 86 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
202 | 87 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey); |
225 | 88 submitCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
89 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
90 logger.debug(cmd.getCommandString()); |
5 | 91 } |
190 | 92 |
215 | 93 public void updateObject(String key, Object val) { |
190 | 94 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
202 | 95 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey); |
225 | 96 submitCommand(dataSegmentKey, cmd); |
190 | 97 if (logger.isDebugEnabled()) |
98 logger.debug(cmd.getCommandString()); | |
99 } | |
5 | 100 |
101 @Override | |
33
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
102 public void take(Receiver receiver, String key, int index, CodeSegment cs) { |
3 | 103 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
104 int seq = this.seq.getAndIncrement(); | |
40 | 105 Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null); |
6 | 106 seqHash.put(seq, cmd); |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
107 submitCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
108 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
109 logger.debug(cmd.getCommandString()); |
2 | 110 } |
111 | |
112 @Override | |
33
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
113 public void peek(Receiver receiver, String key, int index, CodeSegment cs) { |
3 | 114 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
115 int seq = this.seq.getAndIncrement(); | |
40 | 116 Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null); |
58 | 117 seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
118 submitCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
119 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
120 logger.debug(cmd.getCommandString()); |
2 | 121 } |
122 | |
123 @Override | |
3 | 124 public void remove(String key) { |
125 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); | |
40 | 126 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
127 submitCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
128 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
129 logger.debug(cmd.getCommandString()); |
2 | 130 } |
30 | 131 |
132 @Override public void finish() { | |
133 System.exit(0); | |
134 } | |
41 | 135 |
136 @Override | |
137 public void close() { | |
138 | |
139 } | |
189 | 140 |
209 | 141 |
2 | 142 } |