comparison src/fdl/FederatedLinda.java @ 0:083a0b5e12cc

Apply Debug Interface version start
author fuchita
date Thu, 07 Feb 2008 14:21:30 +0900
parents
children b49e593b2502
comparison
equal deleted inserted replaced
-1:000000000000 0:083a0b5e12cc
1
2 /*
3 * @(#)FederatedLinda.java 1.1 06/04/01
4 *
5 * Copyright 2006 Shinji KONO
6 *
7
8 PSX Lidna
9 Trasport layer of PSX Linda library
10
11 */
12
13 package fdl;
14
15 import java.io.IOException;
16 import java.nio.ByteBuffer;
17 import java.nio.ByteOrder;
18 import java.nio.channels.ClosedSelectorException;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.Selector;
21 import java.nio.channels.SocketChannel;
22 import java.util.Hashtable;
23 import java.util.Set;
24
25
26 /**
27 FederatedLinda
28 *
29 * @author Shinji Kono
30 *
31 * @param mytsid Tuple Space ID
32
33 Initialize connection channel for a tuple space
34
35 one instance for each Tuple space connection
36
37 */
38
39 public class FederatedLinda implements PSXQueueInterface {
40
41 static FederatedLinda fdl;
42 static int MAX_SEQUENCE = 2048;
43 static final boolean debug = true;
44
45 public int tid;
46 public int seq;
47 public int qsize;
48 public PSXLinda linda;
49
50 public Selector selector;
51
52 public PSXQueue q_top,q_end;
53 public PSXReply r_top,r_end;
54 public Hashtable<Integer,PSXReply> seqHash;
55
56 static FederatedLinda init()
57 throws IOException {
58 if (fdl==null) {
59 fdl = new FederatedLinda();
60 }
61 return fdl;
62 }
63
64 private FederatedLinda()
65 throws IOException {
66 selector = Selector.open();
67 seqHash = new Hashtable<Integer, PSXReply>();
68 }
69
70 public PSXLinda open(String _host,int _port)
71 throws IOException {
72 tid++;
73 // System.out.print("Tid = ");
74 // System.out.println(tid);
75 PSXLinda newlinda = new PSXLinda(this,tid,_host,_port);
76 linda = newlinda.add(linda);
77 return linda;
78 }
79
80 /**
81 psx_queue (unsigned int tspace_id, unsigned int id,
82 unsigned int size, unsigned char *data, char mode,
83 void(*callback)(char*,void*), void * obj):
84 */
85
86 public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int length, int mode, PSXCallback callback) {
87 PSXQueue c = new PSXQueue(linda,id,mode,s,length,callback);
88
89 if (q_top == null) {
90 c = q_end = q_top = c;
91 } else {
92 q_end.next = c;
93 q_end = c;
94 }
95 qsize++;
96
97 if (mode != PSX_OUT) {
98 PSXReply p = new PSXReply(PSX_REPLY,callback);
99 p.seq = seq(p);
100 c.setSeq(p.seq);
101 if (debug) {
102 System.out.print("Integer compare=");
103 System.out.println((new Integer(2)).equals(new Integer(2)));
104 System.out.print("Seding seq=");
105 System.out.println(p.seq);
106 }
107 if (r_top == null){
108 r_end = r_top = p;
109 } else {
110 r_end.next = p;
111 r_end = p;
112 }
113 return p;
114 }
115 return null;
116 }
117
118 public int seq(PSXReply reply) {
119 Integer s;
120 do {
121 seq++;
122 if (seq>MAX_SEQUENCE) {
123 seq = 0;
124 }
125 s = new Integer(seq);
126 } while (seqHash.containsKey(s));
127 if (debug) {
128 System.out.print("hash value = ");
129 System.out.println(s.hashCode());
130 }
131 seqHash.put(s,reply);
132 seq++;
133 return seq-1;
134 }
135
136 public Selector selector() {
137 return selector;
138 }
139
140 public int sync() throws IOException {
141 return sync(0);
142 }
143
144 public int sync(long mtimeout)
145 throws IOException {
146 int key_num = 0;
147 Set<SelectionKey> keys;
148
149 while (q_top != null){
150 PSXQueue c = q_top;
151 c.Send();
152 q_top = c.next;
153 // psx_free(c);
154 // q_top = c = t;
155 qsize--;
156 }
157
158 try {
159 key_num = selector.select(mtimeout);
160 keys = selector.selectedKeys();
161 for (SelectionKey key : keys) {
162 // System.out.println("selecting");
163 SocketChannel sock = (SocketChannel)key.channel();
164 chkServe(sock);
165 }
166 } catch (IOException e) {
167 e.printStackTrace();
168 } catch (ClosedSelectorException e) {
169 e.printStackTrace();
170 }
171
172 return key_num;
173 }
174
175 // should be in PSXLinda, but sock->linda is unknown here
176
177 private void chkServe(SocketChannel sock)
178 throws IOException {
179 int length;
180 ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE);
181 command.order(ByteOrder.BIG_ENDIAN);
182
183 sock.read(command);
184 command.rewind();
185 length = command.getInt(LINDA_DATA_LENGTH_OFFSET);
186 if (length>0) {
187 ByteBuffer data = ByteBuffer.allocate(length);
188 int read = length;
189 if (debug) {
190 System.out.print("reading:");
191 System.out.println(length);
192 }
193
194 data.order(ByteOrder.BIG_ENDIAN);
195 while(read>0) {
196 read -= sock.read(data);
197 }
198 data.rewind();
199 if (debug) {
200 System.out.print("header:");
201 for(int i=0;i<LINDA_HEADER_SIZE;i++) {
202 System.out.println(command.get(i));
203 }
204 System.out.print("data:");
205 for(int i=0;i<length;i++) {
206 System.out.println(data.get(i));
207 }
208 data.rewind();
209 }
210 int rseq = command.getInt(LINDA_SEQ_OFFSET);
211 int mode = command.get(LINDA_MODE_OFFSET);
212 Integer a;
213 if (debug) {
214 System.out.print("mode = ");
215 System.out.println(mode);
216 System.out.print("seq = ");
217 System.out.println(rseq);
218 }
219 try {
220 PSXReply r = seqHash.get((a = new Integer(rseq)));
221 if (debug) {
222 System.out.print("hash value = ");
223 System.out.println(a.hashCode());
224 }
225
226 r.setAnswer(mode,command,data);
227
228 if (r.callback != null ) {
229 r.callback.callback(data);
230 }
231 } catch (NullPointerException e ) {
232 if (debug) {
233 System.out.println("hashed reply not found");
234 }
235 // can't happen
236 return ;
237 }
238 }
239 }
240 }
241
242 /* end */