Mercurial > hg > Members > tatsuki > Alice
annotate src/alice/datasegment/LocalDataSegmentManager.java @ 205:28469b1671e7 working
repair flip API
author | sugi |
---|---|
date | Tue, 26 Mar 2013 01:45:05 +0900 |
parents | 7f47231ef509 |
children | 96110f25adcc |
rev | line source |
---|---|
2 | 1 package alice.datasegment; |
2 | |
56 | 3 import java.util.concurrent.ConcurrentHashMap; |
69 | 4 import java.util.concurrent.LinkedBlockingQueue; |
56 | 5 |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
6 import org.apache.log4j.Logger; |
2 | 7 import org.msgpack.type.Value; |
8 | |
3 | 9 import alice.codesegment.CodeSegment; |
10 | |
2 | 11 public class LocalDataSegmentManager extends DataSegmentManager { |
12 | |
56 | 13 private String reverseKey = "local"; |
14 private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); | |
69 | 15 private LinkedBlockingQueue<KeyCommand> cmdQueue = new LinkedBlockingQueue<KeyCommand>(); |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
16 private Logger logger = Logger.getLogger("local"); |
69 | 17 |
18 private Runnable keyCommandThread = new Runnable() { | |
19 | |
20 @Override | |
21 public void run() { | |
22 while (true) { | |
23 KeyCommand keyCmd = null; | |
24 try { | |
25 keyCmd = cmdQueue.take(); | |
26 } catch (InterruptedException e) { | |
27 e.printStackTrace(); | |
28 } | |
29 keyCmd.runCommand(); | |
30 } | |
31 } | |
32 | |
33 }; | |
34 | |
6 | 35 public LocalDataSegmentManager() { |
69 | 36 new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); |
37 new Thread(keyCommandThread, "LocalDataSegmentManager-runKeyCommand").start(); | |
38 } | |
39 | |
40 public void addCommand(DataSegmentKey key, Command cmd) { | |
41 try { | |
42 cmdQueue.put(new KeyCommand(key, cmd)); | |
43 } catch (InterruptedException e) { | |
44 e.printStackTrace(); | |
45 } | |
6 | 46 } |
3 | 47 |
13 | 48 public DataSegmentKey getDataSegmentKey(String key) { |
64 | 49 DataSegmentKey dsKey = dataSegments.get(key); |
63 | 50 if (dsKey != null) |
51 return dsKey; | |
52 if (key == null) | |
16 | 53 return null; |
69 | 54 DataSegmentKey newDataSegmentKey = new DataSegmentKey(); |
3 | 55 DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); |
8 | 56 if (dataSegmentKey == null) { |
57 dataSegmentKey = newDataSegmentKey; | |
3 | 58 } |
59 return dataSegmentKey; | |
60 } | |
61 | |
2 | 62 @Override |
132 | 63 public void put(String key, Value val) { |
3 | 64 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
202 | 65 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, reverseKey); |
69 | 66 addCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
67 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
68 logger.debug(cmd.getCommandString()); |
2 | 69 } |
57 | 70 |
190 | 71 public void put(String key, Object obj) { |
72 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); | |
202 | 73 Command cmd = new Command(CommandType.PUT, null, key, obj, 0, 0, null, null, reverseKey); |
190 | 74 addCommand(dataSegmentKey, cmd); |
75 if (logger.isDebugEnabled()) | |
76 logger.debug(cmd.getCommandString()); | |
77 } | |
78 | |
57 | 79 /** |
80 * Enqueue update command to the queue of each DataSegment key | |
81 */ | |
2 | 82 @Override |
132 | 83 public void update(String key, Value val) { |
5 | 84 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
202 | 85 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey); |
69 | 86 addCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
87 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
88 logger.debug(cmd.getCommandString()); |
5 | 89 } |
190 | 90 |
91 public void update(String key, Object val) { | |
92 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); | |
202 | 93 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey); |
190 | 94 addCommand(dataSegmentKey, cmd); |
95 if (logger.isDebugEnabled()) | |
96 logger.debug(cmd.getCommandString()); | |
97 } | |
5 | 98 |
99 @Override | |
33
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
100 public void take(Receiver receiver, String key, int index, CodeSegment cs) { |
3 | 101 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
102 int seq = this.seq.getAndIncrement(); | |
40 | 103 Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null); |
6 | 104 seqHash.put(seq, cmd); |
69 | 105 addCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
106 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
107 logger.debug(cmd.getCommandString()); |
2 | 108 } |
109 | |
110 @Override | |
33
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
111 public void peek(Receiver receiver, String key, int index, CodeSegment cs) { |
3 | 112 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
113 int seq = this.seq.getAndIncrement(); | |
40 | 114 Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null); |
58 | 115 seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number |
69 | 116 addCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
117 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
118 logger.debug(cmd.getCommandString()); |
2 | 119 } |
120 | |
121 @Override | |
3 | 122 public void remove(String key) { |
123 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); | |
40 | 124 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); |
69 | 125 addCommand(dataSegmentKey, cmd); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
126 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
127 logger.debug(cmd.getCommandString()); |
2 | 128 } |
30 | 129 |
130 @Override public void finish() { | |
131 System.exit(0); | |
132 } | |
41 | 133 |
134 @Override | |
135 public void close() { | |
136 | |
137 } | |
205 | 138 |
139 | |
140 public void flip(String key, Object val,CommandType type) { | |
141 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); | |
142 Command cmd = new Command(type, null, key, val, 0, 0, null, null, reverseKey); | |
143 addCommand(dataSegmentKey, cmd); | |
144 if (logger.isDebugEnabled()) | |
145 logger.debug(cmd.getCommandString()); | |
146 } | |
189 | 147 |
202 | 148 public void flip(Command cmd){ |
149 DataSegmentKey dataSegmentKey = getDataSegmentKey(cmd.key); | |
150 addCommand(dataSegmentKey, cmd); | |
151 if (logger.isDebugEnabled()) | |
152 logger.debug(cmd.getCommandString()); | |
190 | 153 } |
154 | |
2 | 155 } |