0
|
1
|
|
2 /*
|
|
3 * @(#)FederatedLinda.java 1.1 06/04/01
|
|
4 *
|
|
5 * Copyright 2006 Shinji KONO
|
|
6 *
|
|
7
|
|
8 PSX Lidna
|
|
9 Trasport layer of PSX Linda library
|
|
10
|
|
11 */
|
|
12
|
|
13 package fdl;
|
|
14
|
|
15 import java.io.IOException;
|
|
16 import java.nio.ByteBuffer;
|
|
17 import java.nio.ByteOrder;
|
2
|
18 import java.nio.CharBuffer;
|
0
|
19 import java.nio.channels.ClosedSelectorException;
|
|
20 import java.nio.channels.SelectionKey;
|
|
21 import java.nio.channels.Selector;
|
|
22 import java.nio.channels.SocketChannel;
|
|
23 import java.util.Hashtable;
|
|
24 import java.util.Set;
|
|
25
|
|
26
|
|
27 /**
|
|
28 FederatedLinda
|
|
29 *
|
|
30 * @author Shinji Kono
|
|
31 *
|
|
32 * @param mytsid Tuple Space ID
|
|
33
|
|
34 Initialize connection channel for a tuple space
|
|
35
|
|
36 one instance for each Tuple space connection
|
|
37
|
|
38 */
|
|
39
|
|
40 public class FederatedLinda implements PSXQueueInterface {
|
|
41
|
11
|
42 static FederatedLinda fdl;
|
|
43 static int MAX_SEQUENCE = 2048;
|
|
44 static boolean debug = true;
|
0
|
45
|
11
|
46 public int tid;
|
|
47 public int seq;
|
|
48 public int qsize;
|
|
49 public PSXLinda linda;
|
|
50
|
|
51 public Selector selector;
|
0
|
52
|
11
|
53 public PSXQueue q_top,q_end;
|
|
54 public PSXReply r_top,r_end;
|
|
55 public Hashtable<Integer,PSXReply> seqHash;
|
0
|
56
|
11
|
57 static FederatedLinda init()
|
|
58 throws IOException {
|
|
59 if (fdl==null) {
|
|
60 fdl = new FederatedLinda();
|
|
61 }
|
|
62 return fdl;
|
0
|
63 }
|
|
64
|
11
|
65 private FederatedLinda()
|
|
66 throws IOException {
|
|
67 selector = Selector.open();
|
|
68 seqHash = new Hashtable<Integer, PSXReply>();
|
|
69 }
|
0
|
70
|
11
|
71 public PSXLinda open(String _host,int _port)
|
|
72 throws IOException {
|
|
73 tid++;
|
|
74 // System.out.print("Tid = ");
|
|
75 // System.out.println(tid);
|
|
76 PSXLinda newlinda = new PSXLinda(this,tid,_host,_port);
|
|
77 linda = newlinda.add(linda);
|
|
78 return linda;
|
|
79 }
|
0
|
80
|
11
|
81 /**
|
0
|
82 psx_queue (unsigned int tspace_id, unsigned int id,
|
|
83 unsigned int size, unsigned char *data, char mode,
|
|
84 void(*callback)(char*,void*), void * obj):
|
11
|
85 */
|
|
86
|
|
87 public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int length, int mode, PSXCallback callback) {
|
|
88 PSXQueue c = new PSXQueue(linda,id,mode,s,length,callback);
|
0
|
89
|
11
|
90 if (q_top == null) {
|
|
91 c = q_end = q_top = c;
|
|
92 } else {
|
|
93 q_end.next = c;
|
|
94 q_end = c;
|
|
95 }
|
|
96 qsize++;
|
0
|
97
|
11
|
98 if (mode != PSX_OUT) {
|
|
99 PSXReply p = new PSXReply(PSX_REPLY,callback);
|
|
100 p.seq = seq(p);
|
|
101 c.setSeq(p.seq);
|
|
102 if (debug) {
|
|
103 System.out.print("Integer compare=");
|
|
104 System.out.println((new Integer(2)).equals(new Integer(2)));
|
|
105 System.out.print("Seding seq=");
|
|
106 System.out.println(p.seq);
|
|
107 }
|
|
108 if (r_top == null){
|
|
109 r_end = r_top = p;
|
|
110 } else {
|
|
111 r_end.next = p;
|
|
112 r_end = p;
|
|
113 }
|
|
114 return p;
|
|
115 }
|
|
116 return null;
|
0
|
117 }
|
|
118
|
11
|
119 public int seq(PSXReply reply) {
|
|
120 Integer s;
|
|
121 do {
|
|
122 seq++;
|
|
123 if (seq>MAX_SEQUENCE) {
|
|
124 seq = 0;
|
|
125 }
|
|
126 s = new Integer(seq);
|
|
127 } while (seqHash.containsKey(s));
|
|
128 if (debug) {
|
|
129 System.out.print("hash value = ");
|
|
130 System.out.println(s.hashCode());
|
|
131 }
|
|
132 seqHash.put(s,reply);
|
|
133 seq++;
|
|
134 return seq-1;
|
|
135 }
|
0
|
136
|
11
|
137 public Selector selector() {
|
|
138 return selector;
|
|
139 }
|
0
|
140
|
11
|
141 public int sync() throws IOException {
|
|
142 return sync(0);
|
0
|
143 }
|
|
144
|
11
|
145 public int sync(long mtimeout)
|
|
146 throws IOException {
|
|
147 int key_num = 0;
|
|
148 Set<SelectionKey> keys;
|
0
|
149
|
11
|
150 while (q_top != null){
|
|
151 PSXQueue c = q_top;
|
|
152 c.Send();
|
|
153 q_top = c.next;
|
|
154 // psx_free(c);
|
|
155 // q_top = c = t;
|
|
156 qsize--;
|
|
157 }
|
2
|
158
|
11
|
159 try {
|
|
160 key_num = selector.select(mtimeout);
|
|
161 keys = selector.selectedKeys();
|
|
162 for (SelectionKey key : keys) {
|
|
163 // System.out.println("selecting");
|
|
164 SocketChannel sock = (SocketChannel)key.channel();
|
|
165 chkServe(sock);
|
|
166 }
|
|
167 } catch (IOException e) {
|
|
168 e.printStackTrace();
|
|
169 } catch (ClosedSelectorException e) {
|
|
170 e.printStackTrace();
|
|
171 }
|
2
|
172
|
11
|
173 return key_num;
|
|
174 }
|
|
175
|
|
176 public int sync_com(long mtimeout)
|
|
177 throws IOException {
|
|
178 int key_num = 0;
|
|
179 Set<SelectionKey> keys;
|
|
180
|
|
181 while (q_top != null){
|
|
182 PSXQueue c = q_top;
|
|
183 c.Send();
|
|
184 q_top = c.next;
|
|
185 // psx_free(c);
|
|
186 // q_top = c = t;
|
|
187 qsize--;
|
|
188 }
|
2
|
189
|
11
|
190 try {
|
|
191 key_num = selector.select(mtimeout);
|
|
192 keys = selector.selectedKeys();
|
|
193 for (SelectionKey key : keys) {
|
|
194 SocketChannel sock = (SocketChannel)key.channel();
|
|
195 chkCom(sock);
|
|
196 }
|
|
197 } catch (IOException e) {
|
|
198 e.printStackTrace();
|
|
199 } catch (ClosedSelectorException e) {
|
|
200 e.printStackTrace();
|
|
201 }
|
2
|
202
|
11
|
203 return key_num;
|
|
204 }
|
|
205
|
|
206 // should be in PSXLinda, but sock->linda is unknown here
|
2
|
207
|
11
|
208 private void chkCom(SocketChannel sock) throws IOException {
|
|
209
|
|
210 int length;
|
|
211 ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE);
|
|
212 command.order(ByteOrder.BIG_ENDIAN);
|
|
213 debug = false;
|
2
|
214
|
11
|
215 sock.read(command);
|
|
216 command.rewind();
|
|
217 length = command.getInt(LINDA_DATA_LENGTH_OFFSET);
|
|
218 if (length>0) {
|
|
219 ByteBuffer data = ByteBuffer.allocate(length);
|
|
220 int read = length;
|
|
221 if (debug) {
|
|
222 System.out.print("reading:");
|
|
223 System.out.println(length);
|
|
224 }
|
|
225
|
|
226 data.order(ByteOrder.BIG_ENDIAN);
|
|
227 while(read>0) {
|
|
228 read -= sock.read(data);
|
|
229 }
|
|
230 data.rewind();
|
2
|
231
|
11
|
232 if (debug) {
|
|
233 char id = (char)command.getShort(LINDA_ID_OFFSET);
|
|
234 System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+
|
|
235 "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+
|
|
236 "ID:"+(int)id+" "+
|
|
237 "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+
|
|
238 "DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" ");
|
|
239 System.out.println("DATA:"+data);
|
|
240 command.rewind();
|
|
241 }
|
|
242 //if (debug_com) {
|
|
243 String comdata ="";
|
|
244 CharBuffer chardata = data.asCharBuffer();
|
|
245 comdata = chardata.toString();
|
2
|
246
|
11
|
247 //System.out.println("Com_data =>");
|
|
248 System.out.println(comdata);
|
|
249 //}
|
|
250 /***if (debug) {
|
2
|
251 System.out.print("header:");
|
|
252 for(int i=0;i<LINDA_HEADER_SIZE;i++) {
|
|
253 System.out.println(command.get(i));
|
|
254 }
|
|
255 System.out.print("data:");
|
|
256 for(int i=0;i<length;i++) {
|
|
257 System.out.println(data.get(i));
|
|
258 }
|
|
259 data.rewind();
|
|
260 }***/
|
11
|
261
|
|
262 int rseq = command.getInt(LINDA_SEQ_OFFSET);
|
|
263 int mode = command.get(LINDA_MODE_OFFSET);
|
|
264 Integer a;
|
|
265 /***
|
2
|
266 if (debug) {
|
|
267 System.out.print("mode = ");
|
|
268 System.out.println(mode);
|
|
269 System.out.print("seq = ");
|
|
270 System.out.println(rseq);
|
|
271 }***/
|
11
|
272 try {
|
|
273 PSXReply r = seqHash.get((a = new Integer(rseq)));
|
|
274 seqHash.put(a, null);
|
|
275 if (debug) {
|
|
276 System.out.print("hash value = ");
|
|
277 System.out.println(a.hashCode());
|
|
278 }
|
|
279
|
|
280 r.setAnswer(mode,command,data);
|
2
|
281
|
11
|
282 if (r.callback != null ) {
|
|
283 r.callback.callback(data);
|
|
284 }
|
|
285 } catch (NullPointerException e ) {
|
|
286 if (debug) {
|
|
287 System.out.println("hashed reply not found");
|
|
288 }
|
|
289 // can't happen
|
|
290 return ;
|
|
291 }
|
|
292 }
|
2
|
293 }
|
|
294
|
|
295 private void chkServe(SocketChannel sock)
|
11
|
296 throws IOException {
|
|
297 int length;
|
|
298 ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE);
|
|
299 command.order(ByteOrder.BIG_ENDIAN);
|
0
|
300
|
11
|
301 sock.read(command);
|
|
302 command.rewind();
|
|
303 length = command.getInt(LINDA_DATA_LENGTH_OFFSET);
|
|
304 if (length>0) {
|
|
305 ByteBuffer data = ByteBuffer.allocate(length);
|
|
306 int read = length;
|
|
307 if (debug) {
|
|
308 System.out.print("reading:");
|
|
309 System.out.println(length);
|
|
310 }
|
0
|
311
|
11
|
312 data.order(ByteOrder.BIG_ENDIAN);
|
|
313 while(read>0) {
|
|
314 read -= sock.read(data);
|
|
315 }
|
|
316 data.rewind();
|
2
|
317
|
11
|
318 if (debug) {
|
|
319 char id = (char)command.getShort(LINDA_ID_OFFSET);
|
|
320 System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+
|
|
321 "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+
|
|
322 "ID:"+(int)id+" "+
|
|
323 "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+
|
|
324 "DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" ");
|
|
325 System.out.println("DATA:"+data);
|
|
326 command.rewind();
|
|
327 }
|
|
328 /***if (debug) {
|
0
|
329 System.out.print("header:");
|
|
330 for(int i=0;i<LINDA_HEADER_SIZE;i++) {
|
|
331 System.out.println(command.get(i));
|
|
332 }
|
|
333 System.out.print("data:");
|
|
334 for(int i=0;i<length;i++) {
|
|
335 System.out.println(data.get(i));
|
|
336 }
|
|
337 data.rewind();
|
2
|
338 }***/
|
11
|
339
|
|
340 int rseq = command.getInt(LINDA_SEQ_OFFSET);
|
|
341 int mode = command.get(LINDA_MODE_OFFSET);
|
|
342 Integer a;
|
|
343 /***
|
0
|
344 if (debug) {
|
|
345 System.out.print("mode = ");
|
|
346 System.out.println(mode);
|
|
347 System.out.print("seq = ");
|
|
348 System.out.println(rseq);
|
2
|
349 }***/
|
11
|
350 try {
|
|
351 PSXReply r = seqHash.get((a = new Integer(rseq)));
|
|
352 seqHash.put(a, null);
|
|
353 if (debug) {
|
|
354 System.out.print("hash value = ");
|
|
355 System.out.println(a.hashCode());
|
|
356 }
|
|
357
|
|
358 r.setAnswer(mode,command,data);
|
0
|
359
|
11
|
360 if (r.callback != null ) {
|
|
361 r.callback.callback(data);
|
|
362 }
|
|
363 } catch (NullPointerException e ) {
|
|
364 if (debug) {
|
|
365 System.out.println("hashed reply not found");
|
|
366 }
|
|
367 // can't happen
|
|
368 return ;
|
|
369 }
|
0
|
370 }
|
11
|
371 }
|
0
|
372 }
|
|
373
|
|
374 /* end */
|