Mercurial > hg > Database > Alice
annotate src/alice/datasegment/LocalDataSegmentManager.java @ 73:4bfd81352cfa
change to concurrent data segment
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 21 Feb 2012 22:55:17 +0900 |
parents | a3a2605e16a2 |
children | 82a1c25ca0c8 |
rev | line source |
---|---|
2 | 1 package alice.datasegment; |
2 | |
56 | 3 import java.util.concurrent.ConcurrentHashMap; |
69 | 4 import java.util.concurrent.LinkedBlockingQueue; |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
5 import java.util.concurrent.ThreadPoolExecutor; |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
6 import java.util.concurrent.TimeUnit; |
56 | 7 |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
8 import org.apache.log4j.Logger; |
2 | 9 import org.msgpack.type.Value; |
10 | |
3 | 11 import alice.codesegment.CodeSegment; |
12 | |
2 | 13 public class LocalDataSegmentManager extends DataSegmentManager { |
14 | |
56 | 15 private String reverseKey = "local"; |
16 private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); | |
69 | 17 private LinkedBlockingQueue<KeyCommand> cmdQueue = new LinkedBlockingQueue<KeyCommand>(); |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
18 private Logger logger = Logger.getLogger("local"); |
69 | 19 |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
20 private ThreadPoolExecutor dataSegmentExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // initial number of threads |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
21 Runtime.getRuntime().availableProcessors(), |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
22 Integer.MAX_VALUE, // keepAliveTime |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
23 TimeUnit.SECONDS, |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
24 new LinkedBlockingQueue<Runnable>()); |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
25 |
69 | 26 private Runnable keyCommandThread = new Runnable() { |
27 | |
28 @Override | |
29 public void run() { | |
30 while (true) { | |
31 KeyCommand keyCmd = null; | |
32 try { | |
33 keyCmd = cmdQueue.take(); | |
34 } catch (InterruptedException e) { | |
35 e.printStackTrace(); | |
36 } | |
37 keyCmd.runCommand(); | |
38 } | |
39 } | |
40 | |
41 }; | |
42 | |
6 | 43 public LocalDataSegmentManager() { |
69 | 44 new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); |
45 new Thread(keyCommandThread, "LocalDataSegmentManager-runKeyCommand").start(); | |
46 } | |
47 | |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
48 public void submitCommand(final DataSegmentKey key, final Command cmd) { |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
49 Runnable runCommand = new Runnable() { |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
50 |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
51 @Override |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
52 public void run() { |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
53 key.runCommand(cmd); |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
54 } |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
55 |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
56 }; |
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
57 dataSegmentExecutor.execute(runCommand); |
6 | 58 } |
3 | 59 |
13 | 60 public DataSegmentKey getDataSegmentKey(String key) { |
64 | 61 DataSegmentKey dsKey = dataSegments.get(key); |
63 | 62 if (dsKey != null) |
63 return dsKey; | |
64 if (key == null) | |
16 | 65 return null; |
69 | 66 DataSegmentKey newDataSegmentKey = new DataSegmentKey(); |
3 | 67 DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); |
8 | 68 if (dataSegmentKey == null) { |
69 dataSegmentKey = newDataSegmentKey; | |
3 | 70 } |
71 return dataSegmentKey; | |
72 } | |
73 | |
2 | 74 @Override |
40 | 75 public void put(String key, Value val, CodeSegment cs) { |
3 | 76 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
40 | 77 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, replyQueue, cs, reverseKey); |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
78 submitCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
79 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
80 logger.debug(cmd.getCommandString()); |
2 | 81 } |
57 | 82 |
83 /** | |
84 * Enqueue update command to the queue of each DataSegment key | |
85 */ | |
2 | 86 @Override |
40 | 87 public void update(String key, Value val, CodeSegment cs) { |
5 | 88 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
40 | 89 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, replyQueue, cs, reverseKey); |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
90 submitCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
91 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
92 logger.debug(cmd.getCommandString()); |
5 | 93 } |
94 | |
95 @Override | |
33
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
96 public void take(Receiver receiver, String key, int index, CodeSegment cs) { |
3 | 97 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
98 int seq = this.seq.getAndIncrement(); | |
40 | 99 Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null); |
6 | 100 seqHash.put(seq, cmd); |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
101 submitCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
102 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
103 logger.debug(cmd.getCommandString()); |
2 | 104 } |
105 | |
106 @Override | |
33
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
107 public void peek(Receiver receiver, String key, int index, CodeSegment cs) { |
3 | 108 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
109 int seq = this.seq.getAndIncrement(); | |
40 | 110 Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null); |
58 | 111 seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
112 submitCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
113 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
114 logger.debug(cmd.getCommandString()); |
2 | 115 } |
116 | |
117 @Override | |
3 | 118 public void remove(String key) { |
119 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); | |
40 | 120 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); |
73
4bfd81352cfa
change to concurrent data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
71
diff
changeset
|
121 submitCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
122 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
123 logger.debug(cmd.getCommandString()); |
2 | 124 } |
30 | 125 |
126 @Override public void finish() { | |
127 System.exit(0); | |
128 } | |
41 | 129 |
130 @Override | |
131 public void close() { | |
132 | |
133 } | |
30 | 134 |
2 | 135 } |