0
|
1
|
|
2 /*
|
|
3 * @(#)FederatedLinda.java 1.1 06/04/01
|
|
4 *
|
|
5 * Copyright 2006 Shinji KONO
|
|
6 *
|
|
7
|
|
8 PSX Lidna
|
|
9 Trasport layer of PSX Linda library
|
|
10
|
|
11 */
|
|
12
|
|
13 package fdl;
|
|
14
|
|
15 import java.io.IOException;
|
|
16 import java.nio.ByteBuffer;
|
|
17 import java.nio.ByteOrder;
|
2
|
18 import java.nio.CharBuffer;
|
0
|
19 import java.nio.channels.ClosedSelectorException;
|
|
20 import java.nio.channels.SelectionKey;
|
|
21 import java.nio.channels.Selector;
|
|
22 import java.nio.channels.SocketChannel;
|
|
23 import java.util.Hashtable;
|
|
24 import java.util.Set;
|
|
25
|
|
26
|
|
27 /**
|
|
28 FederatedLinda
|
|
29 *
|
|
30 * @author Shinji Kono
|
|
31 *
|
|
32 * @param mytsid Tuple Space ID
|
|
33
|
|
34 Initialize connection channel for a tuple space
|
|
35
|
|
36 one instance for each Tuple space connection
|
|
37
|
|
38 */
|
|
39
|
17
|
40 public class FederatedLinda {
|
0
|
41
|
11
|
42 static FederatedLinda fdl;
|
|
43 static int MAX_SEQUENCE = 2048;
|
|
44 static boolean debug = true;
|
0
|
45
|
11
|
46 public int tid;
|
|
47 public int seq;
|
|
48 public int qsize;
|
|
49 public PSXLinda linda;
|
|
50
|
|
51 public Selector selector;
|
0
|
52
|
11
|
53 public PSXQueue q_top,q_end;
|
|
54 public PSXReply r_top,r_end;
|
|
55 public Hashtable<Integer,PSXReply> seqHash;
|
0
|
56
|
11
|
57 static FederatedLinda init()
|
|
58 throws IOException {
|
|
59 if (fdl==null) {
|
|
60 fdl = new FederatedLinda();
|
|
61 }
|
|
62 return fdl;
|
0
|
63 }
|
|
64
|
11
|
65 private FederatedLinda()
|
|
66 throws IOException {
|
|
67 selector = Selector.open();
|
|
68 seqHash = new Hashtable<Integer, PSXReply>();
|
|
69 }
|
0
|
70
|
11
|
71 public PSXLinda open(String _host,int _port)
|
|
72 throws IOException {
|
|
73 tid++;
|
|
74 // System.out.print("Tid = ");
|
|
75 // System.out.println(tid);
|
|
76 PSXLinda newlinda = new PSXLinda(this,tid,_host,_port);
|
|
77 linda = newlinda.add(linda);
|
|
78 return linda;
|
|
79 }
|
0
|
80
|
11
|
81 /**
|
0
|
82 psx_queue (unsigned int tspace_id, unsigned int id,
|
|
83 unsigned int size, unsigned char *data, char mode,
|
|
84 void(*callback)(char*,void*), void * obj):
|
11
|
85 */
|
|
86
|
|
87 public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int length, int mode, PSXCallback callback) {
|
|
88 PSXQueue c = new PSXQueue(linda,id,mode,s,length,callback);
|
0
|
89
|
11
|
90 if (q_top == null) {
|
|
91 c = q_end = q_top = c;
|
|
92 } else {
|
|
93 q_end.next = c;
|
|
94 q_end = c;
|
|
95 }
|
|
96 qsize++;
|
0
|
97
|
17
|
98 if (mode != PSX.PSX_OUT) {
|
|
99 PSXReply p = new PSXReply(PSX.PSX_REPLY,callback);
|
11
|
100 p.seq = seq(p);
|
|
101 c.setSeq(p.seq);
|
|
102 if (debug) {
|
|
103 System.out.print("Integer compare=");
|
|
104 System.out.println((new Integer(2)).equals(new Integer(2)));
|
|
105 System.out.print("Seding seq=");
|
|
106 System.out.println(p.seq);
|
|
107 }
|
|
108 if (r_top == null){
|
|
109 r_end = r_top = p;
|
|
110 } else {
|
|
111 r_end.next = p;
|
|
112 r_end = p;
|
|
113 }
|
|
114 return p;
|
|
115 }
|
|
116 return null;
|
0
|
117 }
|
|
118
|
11
|
119 public int seq(PSXReply reply) {
|
|
120 Integer s;
|
|
121 do {
|
|
122 seq++;
|
|
123 if (seq>MAX_SEQUENCE) {
|
|
124 seq = 0;
|
|
125 }
|
|
126 s = new Integer(seq);
|
|
127 } while (seqHash.containsKey(s));
|
|
128 if (debug) {
|
|
129 System.out.print("hash value = ");
|
|
130 System.out.println(s.hashCode());
|
|
131 }
|
|
132 seqHash.put(s,reply);
|
|
133 seq++;
|
|
134 return seq-1;
|
|
135 }
|
0
|
136
|
11
|
137 public Selector selector() {
|
|
138 return selector;
|
|
139 }
|
0
|
140
|
11
|
141 public int sync() throws IOException {
|
|
142 return sync(0);
|
0
|
143 }
|
|
144
|
11
|
145 public int sync(long mtimeout)
|
|
146 throws IOException {
|
|
147 int key_num = 0;
|
|
148 Set<SelectionKey> keys;
|
0
|
149
|
11
|
150 while (q_top != null){
|
|
151 PSXQueue c = q_top;
|
|
152 c.Send();
|
|
153 q_top = c.next;
|
|
154 // psx_free(c);
|
|
155 // q_top = c = t;
|
|
156 qsize--;
|
|
157 }
|
2
|
158
|
11
|
159 try {
|
|
160 key_num = selector.select(mtimeout);
|
|
161 keys = selector.selectedKeys();
|
|
162 for (SelectionKey key : keys) {
|
|
163 // System.out.println("selecting");
|
|
164 SocketChannel sock = (SocketChannel)key.channel();
|
|
165 chkServe(sock);
|
|
166 }
|
|
167 } catch (IOException e) {
|
|
168 e.printStackTrace();
|
|
169 } catch (ClosedSelectorException e) {
|
|
170 e.printStackTrace();
|
|
171 }
|
2
|
172
|
11
|
173 return key_num;
|
|
174 }
|
|
175
|
|
176 public int sync_com(long mtimeout)
|
|
177 throws IOException {
|
|
178 int key_num = 0;
|
|
179 Set<SelectionKey> keys;
|
|
180
|
|
181 while (q_top != null){
|
|
182 PSXQueue c = q_top;
|
|
183 c.Send();
|
|
184 q_top = c.next;
|
|
185 // psx_free(c);
|
|
186 // q_top = c = t;
|
|
187 qsize--;
|
|
188 }
|
2
|
189
|
11
|
190 try {
|
|
191 key_num = selector.select(mtimeout);
|
|
192 keys = selector.selectedKeys();
|
|
193 for (SelectionKey key : keys) {
|
|
194 SocketChannel sock = (SocketChannel)key.channel();
|
|
195 chkCom(sock);
|
|
196 }
|
|
197 } catch (IOException e) {
|
|
198 e.printStackTrace();
|
|
199 } catch (ClosedSelectorException e) {
|
|
200 e.printStackTrace();
|
|
201 }
|
2
|
202
|
11
|
203 return key_num;
|
|
204 }
|
|
205
|
|
206 // should be in PSXLinda, but sock->linda is unknown here
|
2
|
207
|
11
|
208 private void chkCom(SocketChannel sock) throws IOException {
|
|
209
|
|
210 int length;
|
17
|
211 ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE);
|
11
|
212 command.order(ByteOrder.BIG_ENDIAN);
|
|
213 debug = false;
|
2
|
214
|
11
|
215 sock.read(command);
|
|
216 command.rewind();
|
17
|
217 length = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
|
11
|
218 if (length>0) {
|
|
219 ByteBuffer data = ByteBuffer.allocate(length);
|
|
220 int read = length;
|
|
221 if (debug) {
|
|
222 System.out.print("reading:");
|
|
223 System.out.println(length);
|
|
224 }
|
|
225
|
|
226 data.order(ByteOrder.BIG_ENDIAN);
|
|
227 while(read>0) {
|
|
228 read -= sock.read(data);
|
|
229 }
|
|
230 data.rewind();
|
2
|
231
|
11
|
232 if (debug) {
|
17
|
233 PSX.printCommand(command, data);
|
11
|
234 }
|
|
235 //if (debug_com) {
|
|
236 String comdata ="";
|
|
237 CharBuffer chardata = data.asCharBuffer();
|
|
238 comdata = chardata.toString();
|
2
|
239
|
11
|
240 //System.out.println("Com_data =>");
|
|
241 System.out.println(comdata);
|
|
242 //}
|
|
243 /***if (debug) {
|
2
|
244 System.out.print("header:");
|
|
245 for(int i=0;i<LINDA_HEADER_SIZE;i++) {
|
|
246 System.out.println(command.get(i));
|
|
247 }
|
|
248 System.out.print("data:");
|
|
249 for(int i=0;i<length;i++) {
|
|
250 System.out.println(data.get(i));
|
|
251 }
|
|
252 data.rewind();
|
|
253 }***/
|
11
|
254
|
17
|
255 int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET);
|
|
256 int mode = command.get(PSX.LINDA_MODE_OFFSET);
|
11
|
257 Integer a;
|
|
258 /***
|
2
|
259 if (debug) {
|
|
260 System.out.print("mode = ");
|
|
261 System.out.println(mode);
|
|
262 System.out.print("seq = ");
|
|
263 System.out.println(rseq);
|
|
264 }***/
|
11
|
265 try {
|
|
266 PSXReply r = seqHash.get((a = new Integer(rseq)));
|
|
267 seqHash.put(a, null);
|
|
268 if (debug) {
|
|
269 System.out.print("hash value = ");
|
|
270 System.out.println(a.hashCode());
|
|
271 }
|
|
272
|
|
273 r.setAnswer(mode,command,data);
|
2
|
274
|
11
|
275 if (r.callback != null ) {
|
|
276 r.callback.callback(data);
|
|
277 }
|
|
278 } catch (NullPointerException e ) {
|
|
279 if (debug) {
|
|
280 System.out.println("hashed reply not found");
|
|
281 }
|
|
282 // can't happen
|
|
283 return ;
|
|
284 }
|
|
285 }
|
2
|
286 }
|
|
287
|
|
288 private void chkServe(SocketChannel sock)
|
11
|
289 throws IOException {
|
|
290 int length;
|
17
|
291 ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE);
|
11
|
292 command.order(ByteOrder.BIG_ENDIAN);
|
0
|
293
|
11
|
294 sock.read(command);
|
|
295 command.rewind();
|
17
|
296 length = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
|
11
|
297 if (length>0) {
|
|
298 ByteBuffer data = ByteBuffer.allocate(length);
|
|
299 int read = length;
|
|
300 if (debug) {
|
|
301 System.out.print("reading:");
|
|
302 System.out.println(length);
|
|
303 }
|
0
|
304
|
11
|
305 data.order(ByteOrder.BIG_ENDIAN);
|
|
306 while(read>0) {
|
|
307 read -= sock.read(data);
|
|
308 }
|
|
309 data.rewind();
|
2
|
310
|
11
|
311 if (debug) {
|
17
|
312 PSX.printCommand(command, data);
|
11
|
313 }
|
|
314 /***if (debug) {
|
0
|
315 System.out.print("header:");
|
|
316 for(int i=0;i<LINDA_HEADER_SIZE;i++) {
|
|
317 System.out.println(command.get(i));
|
|
318 }
|
|
319 System.out.print("data:");
|
|
320 for(int i=0;i<length;i++) {
|
|
321 System.out.println(data.get(i));
|
|
322 }
|
|
323 data.rewind();
|
2
|
324 }***/
|
11
|
325
|
17
|
326 int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET);
|
|
327 int mode = command.get(PSX.LINDA_MODE_OFFSET);
|
11
|
328 Integer a;
|
|
329 /***
|
0
|
330 if (debug) {
|
|
331 System.out.print("mode = ");
|
|
332 System.out.println(mode);
|
|
333 System.out.print("seq = ");
|
|
334 System.out.println(rseq);
|
2
|
335 }***/
|
11
|
336 try {
|
|
337 PSXReply r = seqHash.get((a = new Integer(rseq)));
|
15
|
338 seqHash.put(a, null); // should be clear or delete
|
11
|
339 if (debug) {
|
|
340 System.out.print("hash value = ");
|
|
341 System.out.println(a.hashCode());
|
|
342 }
|
|
343
|
|
344 r.setAnswer(mode,command,data);
|
0
|
345
|
11
|
346 if (r.callback != null ) {
|
|
347 r.callback.callback(data);
|
|
348 }
|
|
349 } catch (NullPointerException e ) {
|
|
350 if (debug) {
|
|
351 System.out.println("hashed reply not found");
|
|
352 }
|
|
353 // can't happen
|
|
354 return ;
|
|
355 }
|
0
|
356 }
|
11
|
357 }
|
0
|
358 }
|
|
359
|
|
360 /* end */
|