annotate src/alice/datasegment/LocalDataSegmentManager.java @ 205:28469b1671e7 working

repair flip API
author sugi
date Tue, 26 Mar 2013 01:45:05 +0900
parents 7f47231ef509
children 96110f25adcc
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
2
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
1 package alice.datasegment;
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
2
56
17f88fd202ae refactor data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 41
diff changeset
3 import java.util.concurrent.ConcurrentHashMap;
69
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
4 import java.util.concurrent.LinkedBlockingQueue;
56
17f88fd202ae refactor data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 41
diff changeset
5
68
d4c7f7b1096b remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 67
diff changeset
6 import org.apache.log4j.Logger;
2
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
7 import org.msgpack.type.Value;
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
8
3
91057e15065f add DataSegment API and CodeSegment
one
parents: 2
diff changeset
9 import alice.codesegment.CodeSegment;
91057e15065f add DataSegment API and CodeSegment
one
parents: 2
diff changeset
10
2
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
11 public class LocalDataSegmentManager extends DataSegmentManager {
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
12
56
17f88fd202ae refactor data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 41
diff changeset
13 private String reverseKey = "local";
17f88fd202ae refactor data segment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 41
diff changeset
14 private ConcurrentHashMap<String, DataSegmentKey> dataSegments = new ConcurrentHashMap<String, DataSegmentKey>();
69
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
15 private LinkedBlockingQueue<KeyCommand> cmdQueue = new LinkedBlockingQueue<KeyCommand>();
68
d4c7f7b1096b remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 67
diff changeset
16 private Logger logger = Logger.getLogger("local");
69
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
17
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
18 private Runnable keyCommandThread = new Runnable() {
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
19
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
20 @Override
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
21 public void run() {
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
22 while (true) {
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
23 KeyCommand keyCmd = null;
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
24 try {
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
25 keyCmd = cmdQueue.take();
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
26 } catch (InterruptedException e) {
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
27 e.printStackTrace();
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
28 }
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
29 keyCmd.runCommand();
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
30 }
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
31 }
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
32
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
33 };
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
34
6
c78a1cc2cd8f implements Reply
one
parents: 5
diff changeset
35 public LocalDataSegmentManager() {
69
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
36 new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start();
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
37 new Thread(keyCommandThread, "LocalDataSegmentManager-runKeyCommand").start();
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
38 }
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
39
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
40 public void addCommand(DataSegmentKey key, Command cmd) {
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
41 try {
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
42 cmdQueue.put(new KeyCommand(key, cmd));
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
43 } catch (InterruptedException e) {
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
44 e.printStackTrace();
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
45 }
6
c78a1cc2cd8f implements Reply
one
parents: 5
diff changeset
46 }
3
91057e15065f add DataSegment API and CodeSegment
one
parents: 2
diff changeset
47
13
30f97d776a3e implements Alice daemon
one
parents: 12
diff changeset
48 public DataSegmentKey getDataSegmentKey(String key) {
64
7aaadd08288c add getLocal method to DataSegment
kazz
parents: 63
diff changeset
49 DataSegmentKey dsKey = dataSegments.get(key);
63
498d1d2524d3 change getDataSegmentKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 58
diff changeset
50 if (dsKey != null)
498d1d2524d3 change getDataSegmentKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 58
diff changeset
51 return dsKey;
498d1d2524d3 change getDataSegmentKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 58
diff changeset
52 if (key == null)
16
433e601a8e28 network bug fix
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 14
diff changeset
53 return null;
69
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
54 DataSegmentKey newDataSegmentKey = new DataSegmentKey();
3
91057e15065f add DataSegment API and CodeSegment
one
parents: 2
diff changeset
55 DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
8
78b415d019de Local DS and CS work! maybe...
one
parents: 7
diff changeset
56 if (dataSegmentKey == null) {
78b415d019de Local DS and CS work! maybe...
one
parents: 7
diff changeset
57 dataSegmentKey = newDataSegmentKey;
3
91057e15065f add DataSegment API and CodeSegment
one
parents: 2
diff changeset
58 }
91057e15065f add DataSegment API and CodeSegment
one
parents: 2
diff changeset
59 return dataSegmentKey;
91057e15065f add DataSegment API and CodeSegment
one
parents: 2
diff changeset
60 }
91057e15065f add DataSegment API and CodeSegment
one
parents: 2
diff changeset
61
2
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
62 @Override
132
1044a79ce4ef delete cs from OutputCodeSegment
sugi
parents: 71
diff changeset
63 public void put(String key, Value val) {
3
91057e15065f add DataSegment API and CodeSegment
one
parents: 2
diff changeset
64 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
202
7f47231ef509 add new flip API
sugi
parents: 199
diff changeset
65 Command cmd = new Command(CommandType.PUT, null, key, val, 0, 0, null, null, reverseKey);
69
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
66 addCommand(dataSegmentKey, cmd);
71
a3a2605e16a2 change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 69
diff changeset
67 if (logger.isDebugEnabled())
68
d4c7f7b1096b remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 67
diff changeset
68 logger.debug(cmd.getCommandString());
2
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
69 }
57
7fa9ddb31f64 add comment
one
parents: 56
diff changeset
70
190
a85ff8dc16c1 add Object data
one
parents: 189
diff changeset
71 public void put(String key, Object obj) {
a85ff8dc16c1 add Object data
one
parents: 189
diff changeset
72 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
202
7f47231ef509 add new flip API
sugi
parents: 199
diff changeset
73 Command cmd = new Command(CommandType.PUT, null, key, obj, 0, 0, null, null, reverseKey);
190
a85ff8dc16c1 add Object data
one
parents: 189
diff changeset
74 addCommand(dataSegmentKey, cmd);
a85ff8dc16c1 add Object data
one
parents: 189
diff changeset
75 if (logger.isDebugEnabled())
a85ff8dc16c1 add Object data
one
parents: 189
diff changeset
76 logger.debug(cmd.getCommandString());
a85ff8dc16c1 add Object data
one
parents: 189
diff changeset
77 }
a85ff8dc16c1 add Object data
one
parents: 189
diff changeset
78
57
7fa9ddb31f64 add comment
one
parents: 56
diff changeset
79 /**
7fa9ddb31f64 add comment
one
parents: 56
diff changeset
80 * Enqueue update command to the queue of each DataSegment key
7fa9ddb31f64 add comment
one
parents: 56
diff changeset
81 */
2
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
82 @Override
132
1044a79ce4ef delete cs from OutputCodeSegment
sugi
parents: 71
diff changeset
83 public void update(String key, Value val) {
5
80375ae09a1f add update api
one
parents: 3
diff changeset
84 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
202
7f47231ef509 add new flip API
sugi
parents: 199
diff changeset
85 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey);
69
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
86 addCommand(dataSegmentKey, cmd);
71
a3a2605e16a2 change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 69
diff changeset
87 if (logger.isDebugEnabled())
68
d4c7f7b1096b remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 67
diff changeset
88 logger.debug(cmd.getCommandString());
5
80375ae09a1f add update api
one
parents: 3
diff changeset
89 }
190
a85ff8dc16c1 add Object data
one
parents: 189
diff changeset
90
a85ff8dc16c1 add Object data
one
parents: 189
diff changeset
91 public void update(String key, Object val) {
a85ff8dc16c1 add Object data
one
parents: 189
diff changeset
92 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
202
7f47231ef509 add new flip API
sugi
parents: 199
diff changeset
93 Command cmd = new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null, reverseKey);
190
a85ff8dc16c1 add Object data
one
parents: 189
diff changeset
94 addCommand(dataSegmentKey, cmd);
a85ff8dc16c1 add Object data
one
parents: 189
diff changeset
95 if (logger.isDebugEnabled())
a85ff8dc16c1 add Object data
one
parents: 189
diff changeset
96 logger.debug(cmd.getCommandString());
a85ff8dc16c1 add Object data
one
parents: 189
diff changeset
97 }
5
80375ae09a1f add update api
one
parents: 3
diff changeset
98
80375ae09a1f add update api
one
parents: 3
diff changeset
99 @Override
33
20c67f673224 change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 30
diff changeset
100 public void take(Receiver receiver, String key, int index, CodeSegment cs) {
3
91057e15065f add DataSegment API and CodeSegment
one
parents: 2
diff changeset
101 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
91057e15065f add DataSegment API and CodeSegment
one
parents: 2
diff changeset
102 int seq = this.seq.getAndIncrement();
40
20616fe4d28a add log viewer
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 39
diff changeset
103 Command cmd = new Command(CommandType.TAKE, receiver, key, null, index, seq, replyQueue, cs, null);
6
c78a1cc2cd8f implements Reply
one
parents: 5
diff changeset
104 seqHash.put(seq, cmd);
69
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
105 addCommand(dataSegmentKey, cmd);
71
a3a2605e16a2 change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 69
diff changeset
106 if (logger.isDebugEnabled())
68
d4c7f7b1096b remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 67
diff changeset
107 logger.debug(cmd.getCommandString());
2
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
108 }
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
109
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
110 @Override
33
20c67f673224 change name of DataSegmentReceiver
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 30
diff changeset
111 public void peek(Receiver receiver, String key, int index, CodeSegment cs) {
3
91057e15065f add DataSegment API and CodeSegment
one
parents: 2
diff changeset
112 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
91057e15065f add DataSegment API and CodeSegment
one
parents: 2
diff changeset
113 int seq = this.seq.getAndIncrement();
40
20616fe4d28a add log viewer
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 39
diff changeset
114 Command cmd = new Command(CommandType.PEEK, receiver, key, null, index, seq, replyQueue, cs, null);
58
ebdcab7b9b04 add comment
one
parents: 57
diff changeset
115 seqHash.put(seq, cmd); // waiting for PUT or UPDATE at unique sequence number
69
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
116 addCommand(dataSegmentKey, cmd);
71
a3a2605e16a2 change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 69
diff changeset
117 if (logger.isDebugEnabled())
68
d4c7f7b1096b remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 67
diff changeset
118 logger.debug(cmd.getCommandString());
2
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
119 }
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
120
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
121 @Override
3
91057e15065f add DataSegment API and CodeSegment
one
parents: 2
diff changeset
122 public void remove(String key) {
91057e15065f add DataSegment API and CodeSegment
one
parents: 2
diff changeset
123 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
40
20616fe4d28a add log viewer
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 39
diff changeset
124 Command cmd = new Command(CommandType.REMOVE, null, key, null, 0, 0, replyQueue, null, null);
69
1d4f2b72fb31 delete KeyThread
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 68
diff changeset
125 addCommand(dataSegmentKey, cmd);
71
a3a2605e16a2 change checking debug mode of logger
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 69
diff changeset
126 if (logger.isDebugEnabled())
68
d4c7f7b1096b remove copy at OutboundTcpConnection
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 67
diff changeset
127 logger.debug(cmd.getCommandString());
2
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
128 }
30
b5a21baf0b07 implements RingTopology
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 28
diff changeset
129
b5a21baf0b07 implements RingTopology
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 28
diff changeset
130 @Override public void finish() {
b5a21baf0b07 implements RingTopology
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 28
diff changeset
131 System.exit(0);
b5a21baf0b07 implements RingTopology
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 28
diff changeset
132 }
41
f9334781344a add close api
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 40
diff changeset
133
f9334781344a add close api
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 40
diff changeset
134 @Override
f9334781344a add close api
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 40
diff changeset
135 public void close() {
f9334781344a add close api
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 40
diff changeset
136
f9334781344a add close api
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents: 40
diff changeset
137 }
205
28469b1671e7 repair flip API
sugi
parents: 202
diff changeset
138
28469b1671e7 repair flip API
sugi
parents: 202
diff changeset
139
28469b1671e7 repair flip API
sugi
parents: 202
diff changeset
140 public void flip(String key, Object val,CommandType type) {
28469b1671e7 repair flip API
sugi
parents: 202
diff changeset
141 DataSegmentKey dataSegmentKey = getDataSegmentKey(key);
28469b1671e7 repair flip API
sugi
parents: 202
diff changeset
142 Command cmd = new Command(type, null, key, val, 0, 0, null, null, reverseKey);
28469b1671e7 repair flip API
sugi
parents: 202
diff changeset
143 addCommand(dataSegmentKey, cmd);
28469b1671e7 repair flip API
sugi
parents: 202
diff changeset
144 if (logger.isDebugEnabled())
28469b1671e7 repair flip API
sugi
parents: 202
diff changeset
145 logger.debug(cmd.getCommandString());
28469b1671e7 repair flip API
sugi
parents: 202
diff changeset
146 }
189
d2f5c885a367 add FLIP API in LocalDataSegmentManager class
e095732
parents: 184
diff changeset
147
202
7f47231ef509 add new flip API
sugi
parents: 199
diff changeset
148 public void flip(Command cmd){
7f47231ef509 add new flip API
sugi
parents: 199
diff changeset
149 DataSegmentKey dataSegmentKey = getDataSegmentKey(cmd.key);
7f47231ef509 add new flip API
sugi
parents: 199
diff changeset
150 addCommand(dataSegmentKey, cmd);
7f47231ef509 add new flip API
sugi
parents: 199
diff changeset
151 if (logger.isDebugEnabled())
7f47231ef509 add new flip API
sugi
parents: 199
diff changeset
152 logger.debug(cmd.getCommandString());
190
a85ff8dc16c1 add Object data
one
parents: 189
diff changeset
153 }
a85ff8dc16c1 add Object data
one
parents: 189
diff changeset
154
2
f71eabb1df2a create outline of DataSegment model
one
parents:
diff changeset
155 }