Mercurial > hg > Database > Alice
annotate src/alice/datasegment/DataSegmentManager.java @ 30:b5a21baf0b07
implements RingTopology
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 17 Jan 2012 16:13:03 +0900 |
parents | 98ab26e09a98 |
children | 20c67f673224 |
rev | line source |
---|---|
2 | 1 package alice.datasegment; |
2 | |
3 import java.util.concurrent.ConcurrentHashMap; | |
6 | 4 import java.util.concurrent.LinkedBlockingQueue; |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
5 import java.util.concurrent.atomic.AtomicInteger; |
6 | 6 |
3 | 7 import org.msgpack.type.Value; |
2 | 8 |
3 | 9 import alice.codesegment.CodeSegment; |
2 | 10 |
11 public abstract class DataSegmentManager { | |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
12 |
13 | 13 protected ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); |
14 protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); | |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
15 public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
16 protected AtomicInteger seq = new AtomicInteger(1); |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
17 protected Runnable replyThread = new Runnable() { |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
18 |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
19 @Override |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
20 public void run() { |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
21 while (true) { |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
22 try { |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
23 Command reply = replyQueue.take(); |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
24 Command cmd = seqHash.get(reply.seq); |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
18
diff
changeset
|
25 cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.reverseKey)); |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
26 } catch (InterruptedException e) { |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
27 e.printStackTrace(); |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
28 } |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
29 } |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
30 } |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
31 |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
32 }; |
2 | 33 |
34 public abstract void put(String key, Value val); | |
5 | 35 public abstract void update(String key, Value val); |
18
72dd27d952b0
change InputDataSegment API
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
14
diff
changeset
|
36 public void take(DataSegmentReceiver receiver, String key, CodeSegment cs) { |
72dd27d952b0
change InputDataSegment API
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
14
diff
changeset
|
37 take(receiver, key, 0, cs); |
2 | 38 } |
18
72dd27d952b0
change InputDataSegment API
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
14
diff
changeset
|
39 public abstract void take(DataSegmentReceiver receiver, String key, int index, CodeSegment cs); |
72dd27d952b0
change InputDataSegment API
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
14
diff
changeset
|
40 public void peek(DataSegmentReceiver receiver, String key, CodeSegment cs) { |
72dd27d952b0
change InputDataSegment API
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
14
diff
changeset
|
41 peek(receiver, key, 0, cs); |
2 | 42 } |
18
72dd27d952b0
change InputDataSegment API
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
14
diff
changeset
|
43 public abstract void peek(DataSegmentReceiver receiver, String key, int index, CodeSegment cs); |
3 | 44 public abstract void remove(String key); |
30 | 45 public abstract void finish(); |
2 | 46 |
47 } |