Mercurial > hg > FederatedLinda
comparison src/fdl/FederatedLinda.java @ 0:083a0b5e12cc
Apply Debug Interface version start
author | fuchita |
---|---|
date | Thu, 07 Feb 2008 14:21:30 +0900 |
parents | |
children | b49e593b2502 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:083a0b5e12cc |
---|---|
1 | |
2 /* | |
3 * @(#)FederatedLinda.java 1.1 06/04/01 | |
4 * | |
5 * Copyright 2006 Shinji KONO | |
6 * | |
7 | |
8 PSX Lidna | |
9 Trasport layer of PSX Linda library | |
10 | |
11 */ | |
12 | |
13 package fdl; | |
14 | |
15 import java.io.IOException; | |
16 import java.nio.ByteBuffer; | |
17 import java.nio.ByteOrder; | |
18 import java.nio.channels.ClosedSelectorException; | |
19 import java.nio.channels.SelectionKey; | |
20 import java.nio.channels.Selector; | |
21 import java.nio.channels.SocketChannel; | |
22 import java.util.Hashtable; | |
23 import java.util.Set; | |
24 | |
25 | |
26 /** | |
27 FederatedLinda | |
28 * | |
29 * @author Shinji Kono | |
30 * | |
31 * @param mytsid Tuple Space ID | |
32 | |
33 Initialize connection channel for a tuple space | |
34 | |
35 one instance for each Tuple space connection | |
36 | |
37 */ | |
38 | |
39 public class FederatedLinda implements PSXQueueInterface { | |
40 | |
41 static FederatedLinda fdl; | |
42 static int MAX_SEQUENCE = 2048; | |
43 static final boolean debug = true; | |
44 | |
45 public int tid; | |
46 public int seq; | |
47 public int qsize; | |
48 public PSXLinda linda; | |
49 | |
50 public Selector selector; | |
51 | |
52 public PSXQueue q_top,q_end; | |
53 public PSXReply r_top,r_end; | |
54 public Hashtable<Integer,PSXReply> seqHash; | |
55 | |
56 static FederatedLinda init() | |
57 throws IOException { | |
58 if (fdl==null) { | |
59 fdl = new FederatedLinda(); | |
60 } | |
61 return fdl; | |
62 } | |
63 | |
64 private FederatedLinda() | |
65 throws IOException { | |
66 selector = Selector.open(); | |
67 seqHash = new Hashtable<Integer, PSXReply>(); | |
68 } | |
69 | |
70 public PSXLinda open(String _host,int _port) | |
71 throws IOException { | |
72 tid++; | |
73 // System.out.print("Tid = "); | |
74 // System.out.println(tid); | |
75 PSXLinda newlinda = new PSXLinda(this,tid,_host,_port); | |
76 linda = newlinda.add(linda); | |
77 return linda; | |
78 } | |
79 | |
80 /** | |
81 psx_queue (unsigned int tspace_id, unsigned int id, | |
82 unsigned int size, unsigned char *data, char mode, | |
83 void(*callback)(char*,void*), void * obj): | |
84 */ | |
85 | |
86 public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int length, int mode, PSXCallback callback) { | |
87 PSXQueue c = new PSXQueue(linda,id,mode,s,length,callback); | |
88 | |
89 if (q_top == null) { | |
90 c = q_end = q_top = c; | |
91 } else { | |
92 q_end.next = c; | |
93 q_end = c; | |
94 } | |
95 qsize++; | |
96 | |
97 if (mode != PSX_OUT) { | |
98 PSXReply p = new PSXReply(PSX_REPLY,callback); | |
99 p.seq = seq(p); | |
100 c.setSeq(p.seq); | |
101 if (debug) { | |
102 System.out.print("Integer compare="); | |
103 System.out.println((new Integer(2)).equals(new Integer(2))); | |
104 System.out.print("Seding seq="); | |
105 System.out.println(p.seq); | |
106 } | |
107 if (r_top == null){ | |
108 r_end = r_top = p; | |
109 } else { | |
110 r_end.next = p; | |
111 r_end = p; | |
112 } | |
113 return p; | |
114 } | |
115 return null; | |
116 } | |
117 | |
118 public int seq(PSXReply reply) { | |
119 Integer s; | |
120 do { | |
121 seq++; | |
122 if (seq>MAX_SEQUENCE) { | |
123 seq = 0; | |
124 } | |
125 s = new Integer(seq); | |
126 } while (seqHash.containsKey(s)); | |
127 if (debug) { | |
128 System.out.print("hash value = "); | |
129 System.out.println(s.hashCode()); | |
130 } | |
131 seqHash.put(s,reply); | |
132 seq++; | |
133 return seq-1; | |
134 } | |
135 | |
136 public Selector selector() { | |
137 return selector; | |
138 } | |
139 | |
140 public int sync() throws IOException { | |
141 return sync(0); | |
142 } | |
143 | |
144 public int sync(long mtimeout) | |
145 throws IOException { | |
146 int key_num = 0; | |
147 Set<SelectionKey> keys; | |
148 | |
149 while (q_top != null){ | |
150 PSXQueue c = q_top; | |
151 c.Send(); | |
152 q_top = c.next; | |
153 // psx_free(c); | |
154 // q_top = c = t; | |
155 qsize--; | |
156 } | |
157 | |
158 try { | |
159 key_num = selector.select(mtimeout); | |
160 keys = selector.selectedKeys(); | |
161 for (SelectionKey key : keys) { | |
162 // System.out.println("selecting"); | |
163 SocketChannel sock = (SocketChannel)key.channel(); | |
164 chkServe(sock); | |
165 } | |
166 } catch (IOException e) { | |
167 e.printStackTrace(); | |
168 } catch (ClosedSelectorException e) { | |
169 e.printStackTrace(); | |
170 } | |
171 | |
172 return key_num; | |
173 } | |
174 | |
175 // should be in PSXLinda, but sock->linda is unknown here | |
176 | |
177 private void chkServe(SocketChannel sock) | |
178 throws IOException { | |
179 int length; | |
180 ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE); | |
181 command.order(ByteOrder.BIG_ENDIAN); | |
182 | |
183 sock.read(command); | |
184 command.rewind(); | |
185 length = command.getInt(LINDA_DATA_LENGTH_OFFSET); | |
186 if (length>0) { | |
187 ByteBuffer data = ByteBuffer.allocate(length); | |
188 int read = length; | |
189 if (debug) { | |
190 System.out.print("reading:"); | |
191 System.out.println(length); | |
192 } | |
193 | |
194 data.order(ByteOrder.BIG_ENDIAN); | |
195 while(read>0) { | |
196 read -= sock.read(data); | |
197 } | |
198 data.rewind(); | |
199 if (debug) { | |
200 System.out.print("header:"); | |
201 for(int i=0;i<LINDA_HEADER_SIZE;i++) { | |
202 System.out.println(command.get(i)); | |
203 } | |
204 System.out.print("data:"); | |
205 for(int i=0;i<length;i++) { | |
206 System.out.println(data.get(i)); | |
207 } | |
208 data.rewind(); | |
209 } | |
210 int rseq = command.getInt(LINDA_SEQ_OFFSET); | |
211 int mode = command.get(LINDA_MODE_OFFSET); | |
212 Integer a; | |
213 if (debug) { | |
214 System.out.print("mode = "); | |
215 System.out.println(mode); | |
216 System.out.print("seq = "); | |
217 System.out.println(rseq); | |
218 } | |
219 try { | |
220 PSXReply r = seqHash.get((a = new Integer(rseq))); | |
221 if (debug) { | |
222 System.out.print("hash value = "); | |
223 System.out.println(a.hashCode()); | |
224 } | |
225 | |
226 r.setAnswer(mode,command,data); | |
227 | |
228 if (r.callback != null ) { | |
229 r.callback.callback(data); | |
230 } | |
231 } catch (NullPointerException e ) { | |
232 if (debug) { | |
233 System.out.println("hashed reply not found"); | |
234 } | |
235 // can't happen | |
236 return ; | |
237 } | |
238 } | |
239 } | |
240 } | |
241 | |
242 /* end */ |