Mercurial > hg > Database > Alice
annotate src/main/java/alice/datasegment/DataSegmentManager.java @ 650:4289b232b3fd
nulValue
author | suruga |
---|---|
date | Fri, 02 Feb 2018 18:26:49 +0900 |
parents | ea21af9a4762 |
children |
rev | line source |
---|---|
345 | 1 package alice.datasegment; |
2 | |
3 import java.util.concurrent.ConcurrentHashMap; | |
4 import java.util.concurrent.LinkedBlockingQueue; | |
5 import java.util.concurrent.atomic.AtomicInteger; | |
6 | |
7 import org.apache.log4j.Logger; | |
8 | |
9 import alice.codesegment.CodeSegment; | |
10 | |
11 public abstract class DataSegmentManager { | |
419 | 12 |
13 protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); | |
14 protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); | |
15 protected AtomicInteger seq = new AtomicInteger(1); // waiting for PUT or UPDATE at unique sequence number | |
16 // but it doesn't need for Local | |
17 | |
18 protected Runnable replyThread = new Runnable() { | |
19 Logger logger = Logger.getLogger("reply"); | |
20 @Override | |
523
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
503
diff
changeset
|
21 public void run() {//SEDAのREPLYスレッドのなごり。消してもいい。 |
419 | 22 while (true) { |
23 try { | |
24 Command reply = replyQueue.take(); | |
25 Command cmd = getAndRemoveCmd(reply.seq); | |
26 if (cmd == null) { | |
27 logger.warn("conflict sequence number"); | |
28 continue; | |
29 } | |
30 cmd.cs.ids.reply(cmd.receiver, reply); | |
31 if (logger.isDebugEnabled()) | |
32 logger.debug(reply.getCommandString() + " " + cmd.getCommandString()); | |
33 } catch (InterruptedException e) { | |
34 e.printStackTrace(); | |
35 } | |
36 } | |
37 } | |
38 | |
39 }; | |
345 | 40 |
419 | 41 public Command getAndRemoveCmd(int index){ |
574
ea21af9a4762
delete serializeFlag, fix MessagePack pack&unpack
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
536
diff
changeset
|
42 //System.err.println("DSM getAndRemoveCmd seq : " + index); |
419 | 43 return seqHash.remove(index); |
44 } | |
45 | |
46 public void addReplyCommand(Command cmd) { | |
47 try { | |
48 replyQueue.put(cmd); | |
49 } catch (InterruptedException e) { | |
50 e.printStackTrace(); | |
51 } | |
52 } | |
53 | |
523
145c425db88d
add CompressedLDSM
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
503
diff
changeset
|
54 //各コマンドの抽象クラス |
533
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
527
diff
changeset
|
55 public abstract void put(String key, ReceiveData rData, boolean quickFlag); |
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
527
diff
changeset
|
56 public abstract void update(String key, ReceiveData rData, boolean quickFlag); |
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
527
diff
changeset
|
57 public abstract void peek(Receiver receiver, CodeSegment cs, boolean quickFlag); |
b3c9554ccb1b
change compressed API to set data specified DSM name
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
527
diff
changeset
|
58 public abstract void take(Receiver receiver, CodeSegment cs, boolean quickFlag); |
526
928907206d21
remove CompressedRDSM & CompressedLDSM class
Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
parents:
523
diff
changeset
|
59 |
419 | 60 public abstract void remove(String key); |
61 public abstract void shutdown(); | |
62 public abstract void close(); | |
63 public abstract void finish(); | |
64 | |
65 public abstract void ping(String returnKey); | |
66 public abstract void response(String returnKey); | |
496 | 67 |
483 | 68 public abstract void setSendError(boolean b); |
419 | 69 |
345 | 70 } |