Mercurial > hg > Members > tatsuki > Alice
annotate src/alice/datasegment/LocalDataSegmentManager.java @ 56:17f88fd202ae
refactor data segment
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Wed, 08 Feb 2012 14:31:16 +0900 |
parents | f9334781344a |
children | 7fa9ddb31f64 |
rev | line source |
---|---|
2 | 1 package alice.datasegment; |
2 | |
56 | 3 import java.util.concurrent.ConcurrentHashMap; |
4 | |
39 | 5 import org.apache.log4j.Logger; |
2 | 6 import org.msgpack.type.Value; |
7 | |
3 | 8 import alice.codesegment.CodeSegment; |
9 | |
2 | 10 public class LocalDataSegmentManager extends DataSegmentManager { |
11 | |
56 | 12 private String reverseKey = "local"; |
13 private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); | |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
20
diff
changeset
|
14 |
39 | 15 private Logger logger = Logger.getLogger("local"); |
16 | |
6 | 17 public LocalDataSegmentManager() { |
20 | 18 new Thread(replyThread, "LocalDataSegmentManager").start(); |
6 | 19 } |
3 | 20 |
13 | 21 public DataSegmentKey getDataSegmentKey(String key) { |
16 | 22 if (key == null) { |
23 return null; | |
24 } | |
20 | 25 DataSegmentKey newDataSegmentKey = new DataSegmentKey(key); |
3 | 26 DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); |
8 | 27 if (dataSegmentKey == null) { |
3 | 28 newDataSegmentKey.runKeyThread(); |
8 | 29 dataSegmentKey = newDataSegmentKey; |
3 | 30 } |
31 return dataSegmentKey; | |
32 } | |
33 | |
2 | 34 @Override |
40 | 35 public void put(String key, Value val, CodeSegment cs) { |
3 | 36 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
40 | 37 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, replyQueue, cs, reverseKey); |
39 | 38 dataSegmentKey.addCommand(cmd); |
40 | 39 logger.debug(cmd.getCommandString()); |
2 | 40 } |
41 | |
42 @Override | |
40 | 43 public void update(String key, Value val, CodeSegment cs) { |
5 | 44 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
40 | 45 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, replyQueue, cs, reverseKey); |
39 | 46 dataSegmentKey.addCommand(cmd); |
40 | 47 logger.debug(cmd.getCommandString()); |
5 | 48 } |
49 | |
50 @Override | |
33
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
51 public void take(Receiver receiver, String key, int index, CodeSegment cs) { |
3 | 52 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
53 int seq = this.seq.getAndIncrement(); | |
40 | 54 Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null); |
6 | 55 seqHash.put(seq, cmd); |
56 dataSegmentKey.addCommand(cmd); | |
40 | 57 logger.debug(cmd.getCommandString()); |
2 | 58 } |
59 | |
60 @Override | |
33
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
61 public void peek(Receiver receiver, String key, int index, CodeSegment cs) { |
3 | 62 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); |
63 int seq = this.seq.getAndIncrement(); | |
40 | 64 Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null); |
6 | 65 seqHash.put(seq, cmd); |
66 dataSegmentKey.addCommand(cmd); | |
40 | 67 logger.debug(cmd.getCommandString()); |
2 | 68 } |
69 | |
70 @Override | |
3 | 71 public void remove(String key) { |
72 DataSegmentKey dataSegmentKey = getDataSegmentKey(key); | |
40 | 73 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null); |
39 | 74 dataSegmentKey.addCommand(cmd); |
40 | 75 logger.debug(cmd.getCommandString()); |
2 | 76 } |
30 | 77 |
78 @Override public void finish() { | |
79 System.exit(0); | |
80 } | |
41 | 81 |
82 @Override | |
83 public void close() { | |
84 | |
85 } | |
30 | 86 |
2 | 87 } |