Mercurial > hg > Members > tatsuki > Alice
annotate src/alice/datasegment/LocalDataSegmentManager.java @ 184:4475ba30238f working
minor change
author | e095732 |
---|---|
date | Wed, 27 Feb 2013 06:30:14 +0900 |
parents | 75150396681c |
children | d2f5c885a367 |
rev | line source |
---|---|
2 | 1 package alice.datasegment; |
2 | |
56 | 3 import java.util.concurrent.ConcurrentHashMap; |
69 | 4 import java.util.concurrent.LinkedBlockingQueue; |
56 | 5 |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
6 import org.apache.log4j.Logger; |
2 | 7 import org.msgpack.type.Value; |
8 | |
3 | 9 import alice.codesegment.CodeSegment; |
10 | |
2 | 11 public class LocalDataSegmentManager extends DataSegmentManager { |
12 | |
56 | 13 private String reverseKey = "local"; |
14 private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); | |
69 | 15 private LinkedBlockingQueue<KeyCommand> cmdQueue = new LinkedBlockingQueue<KeyCommand>(); |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
16 private Logger logger = Logger.getLogger("local"); |
69 | 17 |
18 private Runnable keyCommandThread = new Runnable() { | |
19 | |
20 @Override | |
21 public void run() { | |
22 while (true) { | |
23 KeyCommand keyCmd = null; | |
24 try { | |
25 keyCmd = cmdQueue.take(); | |
26 } catch (InterruptedException e) { | |
27 e.printStackTrace(); | |
28 } | |
29 keyCmd.runCommand(); | |
30 } | |
31 } | |
32 | |
33 }; | |
34 | |
6 | 35 public LocalDataSegmentManager() { |
69 | 36 new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); |
37 new Thread(keyCommandThread, "LocalDataSegmentManager-runKeyCommand").start(); | |
38 } | |
39 | |
40 public void addCommand(DataSegmentKey key, Command cmd) { | |
41 try { | |
42 cmdQueue.put(new KeyCommand(key, cmd)); | |
43 } catch (InterruptedException e) { | |
44 e.printStackTrace(); | |
45 } | |
6 | 46 } |
3 | 47 |
13 | 48 public DataSegmentKey getDataSegmentKey(String key) { |
64 | 49 DataSegmentKey dsKey = dataSegments.get(key); |
63 | 50 if (dsKey != null) |
51 return dsKey; | |
52 if (key == null) | |
16 | 53 return null; |
69 | 54 DataSegmentKey newDataSegmentKey = new DataSegmentKey(); |
3 | 55 DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); |
8 | 56 if (dataSegmentKey == null) { |
57 dataSegmentKey = newDataSegmentKey; | |
3 | 58 } |
59 return dataSegmentKey; | |
60 } | |
61 | |
2 | 62 @Override |
132 | 63 public void put(String key, Value val) { |
3 | 64 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
132 | 65 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, replyQueue, null, reverseKey); |
69 | 66 addCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
67 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
68 logger.debug(cmd.getCommandString()); |
2 | 69 } |
57 | 70 |
71 /** | |
72 * Enqueue update command to the queue of each DataSegment key | |
73 */ | |
2 | 74 @Override |
132 | 75 public void update(String key, Value val) { |
5 | 76 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
132 | 77 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, replyQueue, null, reverseKey); |
69 | 78 addCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
79 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
80 logger.debug(cmd.getCommandString()); |
5 | 81 } |
82 | |
83 @Override | |
33
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
84 public void take(Receiver receiver, String key, int index, CodeSegment cs) { |
3 | 85 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
86 int seq = this.seq.getAndIncrement(); | |
40 | 87 Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null); |
6 | 88 seqHash.put(seq, cmd); |
69 | 89 addCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
90 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
91 logger.debug(cmd.getCommandString()); |
2 | 92 } |
93 | |
94 @Override | |
33
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
95 public void peek(Receiver receiver, String key, int index, CodeSegment cs) { |
3 | 96 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
97 int seq = this.seq.getAndIncrement(); | |
40 | 98 Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null); |
58 | 99 seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number |
69 | 100 addCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
101 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
102 logger.debug(cmd.getCommandString()); |
2 | 103 } |
104 | |
105 @Override | |
3 | 106 public void remove(String key) { |
107 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); | |
40 | 108 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); |
69 | 109 addCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
110 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
111 logger.debug(cmd.getCommandString()); |
2 | 112 } |
30 | 113 |
114 @Override public void finish() { | |
115 System.exit(0); | |
116 } | |
41 | 117 |
118 @Override | |
119 public void close() { | |
120 | |
121 } | |
30 | 122 |
2 | 123 } |