Mercurial > hg > Database > Alice
annotate src/alice/datasegment/DataSegmentManager.java @ 190:a85ff8dc16c1 working
add Object data
author | one |
---|---|
date | Thu, 07 Mar 2013 21:27:00 +0900 |
parents | 6a69891b7232 |
children | f151dea22b2c |
rev | line source |
---|---|
2 | 1 package alice.datasegment; |
2 | |
3 import java.util.concurrent.ConcurrentHashMap; | |
6 | 4 import java.util.concurrent.LinkedBlockingQueue; |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
5 import java.util.concurrent.atomic.AtomicInteger; |
6 | 6 |
39 | 7 import org.apache.log4j.Logger; |
3 | 8 import org.msgpack.type.Value; |
2 | 9 |
3 | 10 import alice.codesegment.CodeSegment; |
2 | 11 |
12 public abstract class DataSegmentManager { | |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
13 |
170 | 14 protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); //TODO Over Head |
69 | 15 protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
16 protected AtomicInteger seq = new AtomicInteger(1); |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
17 |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
18 protected Runnable replyThread = new Runnable() { |
39 | 19 Logger logger = Logger.getLogger("reply"); |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
20 @Override |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
21 public void run() { |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
22 while (true) { |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
23 try { |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
24 Command reply = replyQueue.take(); |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
25 Command cmd = seqHash.get(reply.seq); |
44 | 26 if (cmd == null) { |
27 logger.warn("conflict sequence number"); | |
28 continue; | |
29 } | |
30 seqHash.remove(reply.seq); | |
190 | 31 cmd.cs.ids.reply(cmd.receiver, new DataSegmentValue(reply.index, reply.val, reply.obj, reply.reverseKey)); |
71
a3a2605e16a2
change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
69
diff
changeset
|
32 if (logger.isDebugEnabled()) |
68
d4c7f7b1096b
remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
67
diff
changeset
|
33 logger.debug(reply.getCommandString() + " " + cmd.getCommandString()); |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
34 } catch (InterruptedException e) { |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
35 e.printStackTrace(); |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
36 } |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
37 } |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
38 } |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
39 |
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
40 }; |
2 | 41 |
69 | 42 public void addReplyCommand(Command cmd) { |
43 try { | |
44 replyQueue.put(cmd); | |
45 } catch (InterruptedException e) { | |
46 e.printStackTrace(); | |
47 } | |
48 } | |
49 | |
132 | 50 public abstract void put(String key, Value val); |
51 public abstract void update(String key, Value val); | |
33
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
52 public void take(Receiver receiver, String key, CodeSegment cs) { |
18
72dd27d952b0
change InputDataSegment API
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
14
diff
changeset
|
53 take(receiver, key, 0, cs); |
2 | 54 } |
33
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
55 public abstract void take(Receiver receiver, String key, int index, CodeSegment cs); |
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
56 public void peek(Receiver receiver, String key, CodeSegment cs) { |
18
72dd27d952b0
change InputDataSegment API
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
14
diff
changeset
|
57 peek(receiver, key, 0, cs); |
2 | 58 } |
33
20c67f673224
change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
30
diff
changeset
|
59 public abstract void peek(Receiver receiver, String key, int index, CodeSegment cs); |
3 | 60 public abstract void remove(String key); |
41 | 61 public abstract void close(); |
30 | 62 public abstract void finish(); |
2 | 63 |
64 } |