comparison src/main/java/alice/datasegment/DataSegmentManager.java @ 345:8f71c3e6f11d

Change directory structure Maven standard
author sugi
date Wed, 16 Apr 2014 18:26:07 +0900
parents
children 388e7d4b0624
comparison
equal deleted inserted replaced
344:9f97ec18f8c5 345:8f71c3e6f11d
1 package alice.datasegment;
2
3 import java.util.concurrent.ConcurrentHashMap;
4 import java.util.concurrent.LinkedBlockingQueue;
5 import java.util.concurrent.atomic.AtomicInteger;
6
7 import org.apache.log4j.Logger;
8
9 import alice.codesegment.CodeSegment;
10
11 public abstract class DataSegmentManager {
12
13 protected ConcurrentHashMap<Integer, Command> seqHash = new ConcurrentHashMap<Integer, Command>();
14 protected LinkedBlockingQueue<Command> replyQueue = new LinkedBlockingQueue<Command>();
15 protected AtomicInteger seq = new AtomicInteger(1); // waiting for PUT or UPDATE at unique sequence number
16 // but it doesn't need for Local
17
18 protected Runnable replyThread = new Runnable() {
19 Logger logger = Logger.getLogger("reply");
20 @Override
21 public void run() {
22 while (true) {
23 try {
24 Command reply = replyQueue.take();
25 Command cmd = getAndRemoveCmd(reply.seq);
26 if (cmd == null) {
27 logger.warn("conflict sequence number");
28 continue;
29 }
30 cmd.cs.ids.reply(cmd.receiver, reply);
31 if (logger.isDebugEnabled())
32 logger.debug(reply.getCommandString() + " " + cmd.getCommandString());
33 } catch (InterruptedException e) {
34 e.printStackTrace();
35 }
36 }
37 }
38
39 };
40
41 public Command getAndRemoveCmd(int index){
42 return seqHash.remove(index);
43 }
44
45 public void addReplyCommand(Command cmd) {
46 try {
47 replyQueue.put(cmd);
48 } catch (InterruptedException e) {
49 e.printStackTrace();
50 }
51 }
52
53 public abstract void put(String key, Object val);
54 public abstract void update(String key, Object val);
55 public abstract void take(Receiver receiver, CodeSegment cs);
56 public abstract void peek(Receiver receiver, CodeSegment cs);
57
58 public abstract void quickPut(String key, Object val);
59 public abstract void quickUpdate(String key, Object val);
60 public abstract void quickPeek(Receiver receiver, CodeSegment cs);
61 public abstract void quickTake(Receiver receiver, CodeSegment cs);
62
63 public abstract void remove(String key);
64 public abstract void shutdown(String key);
65 public abstract void close();
66 public abstract void finish();
67
68 public abstract void ping(String returnKey);
69 public abstract void response(String returnKey);
70
71 }