Mercurial > hg > Members > tatsuki > Alice
comparison src/main/java/alice/datasegment/DataSegmentManager.java @ 345:8f71c3e6f11d
Change directory structure Maven standard
author | sugi |
---|---|
date | Wed, 16 Apr 2014 18:26:07 +0900 |
parents | |
children | 388e7d4b0624 |
comparison
equal
deleted
inserted
replaced
344:9f97ec18f8c5 | 345:8f71c3e6f11d |
---|---|
1 package alice.datasegment; | |
2 | |
3 import java.util.concurrent.ConcurrentHashMap; | |
4 import java.util.concurrent.LinkedBlockingQueue; | |
5 import java.util.concurrent.atomic.AtomicInteger; | |
6 | |
7 import org.apache.log4j.Logger; | |
8 | |
9 import alice.codesegment.CodeSegment; | |
10 | |
11 public abstract class DataSegmentManager { | |
12 | |
13 protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>(); | |
14 protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>(); | |
15 protected AtomicInteger seq = new AtomicInteger(1); // waiting for PUT or UPDATE at unique sequence number | |
16 // but it doesn't need for Local | |
17 | |
18 protected Runnable replyThread = new Runnable() { | |
19 Logger logger = Logger.getLogger("reply"); | |
20 @Override | |
21 public void run() { | |
22 while (true) { | |
23 try { | |
24 Command reply = replyQueue.take(); | |
25 Command cmd = getAndRemoveCmd(reply.seq); | |
26 if (cmd == null) { | |
27 logger.warn("conflict sequence number"); | |
28 continue; | |
29 } | |
30 cmd.cs.ids.reply(cmd.receiver, reply); | |
31 if (logger.isDebugEnabled()) | |
32 logger.debug(reply.getCommandString() + " " + cmd.getCommandString()); | |
33 } catch (InterruptedException e) { | |
34 e.printStackTrace(); | |
35 } | |
36 } | |
37 } | |
38 | |
39 }; | |
40 | |
41 public Command getAndRemoveCmd(int index){ | |
42 return seqHash.remove(index); | |
43 } | |
44 | |
45 public void addReplyCommand(Command cmd) { | |
46 try { | |
47 replyQueue.put(cmd); | |
48 } catch (InterruptedException e) { | |
49 e.printStackTrace(); | |
50 } | |
51 } | |
52 | |
53 public abstract void put(String key, Object val); | |
54 public abstract void update(String key, Object val); | |
55 public abstract void take(Receiver receiver, CodeSegment cs); | |
56 public abstract void peek(Receiver receiver, CodeSegment cs); | |
57 | |
58 public abstract void quickPut(String key, Object val); | |
59 public abstract void quickUpdate(String key, Object val); | |
60 public abstract void quickPeek(Receiver receiver, CodeSegment cs); | |
61 public abstract void quickTake(Receiver receiver, CodeSegment cs); | |
62 | |
63 public abstract void remove(String key); | |
64 public abstract void shutdown(String key); | |
65 public abstract void close(); | |
66 public abstract void finish(); | |
67 | |
68 public abstract void ping(String returnKey); | |
69 public abstract void response(String returnKey); | |
70 | |
71 } |