Mercurial > hg > Database > Alice
annotate src/alice/datasegment/DataSegmentManager.java @ 41:f9334781344a
add close api
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 02 Feb 2012 10:48:39 +0900 |
parents | 20616fe4d28a |
children | 73158dc54c59 |
rev | line source |
---|---|
2 | 1 package alice.datasegment; |
2 | |
3 import java.util.concurrent.ConcurrentHashMap; | |
6 | 4 import java.util.concurrent.LinkedBlockingQueue; |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
5 import java.util.concurrent.atomic.AtomicInteger; |
6 | 6 |
39 | 7 import org.apache.log4j.Logger; |
3 | 8 import org.msgpack.type.Value; |
2 | 9 |
3 | 10 import alice.codesegment.CodeSegment; |
2 | 11 |
12 public abstract class DataSegmentManager { | |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
13 |
13 | 14 protected ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>(); |
15 protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); | |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
16 public LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
17 protected AtomicInteger seq = new AtomicInteger(1); |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
18 protected Runnable replyThread = new Runnable() { |
39 | 19 Logger logger = Logger.getLogger("reply"); |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
20 @Override |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
21 public void run() { |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
22 while (true) { |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
23 try { |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
24 Command reply = replyQueue.take(); |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
25 Command cmd = seqHash.get(reply.seq); |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
18
diff
changeset
|
26 cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.reverseKey)); |
39 | 27 logger.debug(reply.getCommandString() + " " + cmd.getCommandString()); |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
28 } catch (InterruptedException e) { |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
29 e.printStackTrace(); |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
30 } |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
31 } |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
32 } |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
33 |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
34 }; |
2 | 35 |
40 | 36 public abstract void put(String key, Value val, CodeSegment cs); |
37 public abstract void update(String key, Value val, CodeSegment cs); | |
33
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
38 public void take(Receiver receiver, String key, CodeSegment cs) { |
18
72dd27d952b0
change InputDataSegment API
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
14
diff
changeset
|
39 take(receiver, key, 0, cs); |
2 | 40 } |
33
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
41 public abstract void take(Receiver receiver, String key, int index, CodeSegment cs); |
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
42 public void peek(Receiver receiver, String key, CodeSegment cs) { |
18
72dd27d952b0
change InputDataSegment API
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
14
diff
changeset
|
43 peek(receiver, key, 0, cs); |
2 | 44 } |
33
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
45 public abstract void peek(Receiver receiver, String key, int index, CodeSegment cs); |
3 | 46 public abstract void remove(String key); |
41 | 47 public abstract void close(); |
30 | 48 public abstract void finish(); |
2 | 49 |
50 } |