Mercurial > hg > FederatedLinda
annotate src/fdl/FederatedLinda.java @ 23:b4fd7fb9135a
Simple Test run.
author | kono |
---|---|
date | Wed, 20 Aug 2008 03:28:45 +0900 |
parents | 56e015e8f5dc |
children | 35375016b2f0 |
rev | line source |
---|---|
0 | 1 |
2 /* | |
3 * @(#)FederatedLinda.java 1.1 06/04/01 | |
4 * | |
5 * Copyright 2006 Shinji KONO | |
6 * | |
7 | |
8 PSX Lidna | |
9 Trasport layer of PSX Linda library | |
10 | |
11 */ | |
12 | |
13 package fdl; | |
14 | |
15 import java.io.IOException; | |
16 import java.nio.ByteBuffer; | |
17 import java.nio.ByteOrder; | |
18 import java.nio.channels.ClosedSelectorException; | |
19 import java.nio.channels.SelectionKey; | |
20 import java.nio.channels.Selector; | |
21 import java.nio.channels.SocketChannel; | |
22 import java.util.Hashtable; | |
23 import java.util.Set; | |
24 | |
25 | |
26 /** | |
27 FederatedLinda | |
28 * | |
29 * @author Shinji Kono | |
30 * | |
31 * @param mytsid Tuple Space ID | |
32 | |
33 Initialize connection channel for a tuple space | |
34 | |
35 one instance for each Tuple space connection | |
36 | |
37 */ | |
38 | |
17 | 39 public class FederatedLinda { |
0 | 40 |
22 | 41 static FederatedLinda fdl = new FederatedLinda(); |
11 | 42 static int MAX_SEQUENCE = 2048; |
43 static boolean debug = true; | |
0 | 44 |
11 | 45 public int tid; |
46 public int seq; | |
47 public int qsize; | |
19
0243987383b7
Meta Protocol Engine and sample implementation of event logger.
kono
parents:
17
diff
changeset
|
48 public PSXLindaInterface linda; |
11 | 49 |
50 public Selector selector; | |
0 | 51 |
11 | 52 public PSXQueue q_top,q_end; |
53 public PSXReply r_top,r_end; | |
54 public Hashtable<Integer,PSXReply> seqHash; | |
0 | 55 |
22 | 56 public static FederatedLinda init() { |
11 | 57 return fdl; |
0 | 58 } |
59 | |
22 | 60 private FederatedLinda() { |
61 try { | |
62 selector = Selector.open(); | |
63 } catch (IOException e) { | |
64 e.printStackTrace(); | |
65 } | |
11 | 66 seqHash = new Hashtable<Integer, PSXReply>(); |
67 } | |
0 | 68 |
19
0243987383b7
Meta Protocol Engine and sample implementation of event logger.
kono
parents:
17
diff
changeset
|
69 public PSXLindaInterface open(String _host,int _port) |
11 | 70 throws IOException { |
71 tid++; | |
72 // System.out.print("Tid = "); | |
73 // System.out.println(tid); | |
19
0243987383b7
Meta Protocol Engine and sample implementation of event logger.
kono
parents:
17
diff
changeset
|
74 PSXLindaInterface newlinda = new PSXLinda(this,tid,_host,_port); |
11 | 75 linda = newlinda.add(linda); |
76 return linda; | |
77 } | |
0 | 78 |
11 | 79 /** |
0 | 80 psx_queue (unsigned int tspace_id, unsigned int id, |
81 unsigned int size, unsigned char *data, char mode, | |
82 void(*callback)(char*,void*), void * obj): | |
11 | 83 */ |
84 | |
23 | 85 public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int mode, PSXCallback callback) { |
86 PSXQueue c = new PSXQueue(linda,id,mode,s,callback); | |
0 | 87 |
11 | 88 if (q_top == null) { |
89 c = q_end = q_top = c; | |
90 } else { | |
91 q_end.next = c; | |
92 q_end = c; | |
93 } | |
94 qsize++; | |
0 | 95 |
17 | 96 if (mode != PSX.PSX_OUT) { |
97 PSXReply p = new PSXReply(PSX.PSX_REPLY,callback); | |
11 | 98 p.seq = seq(p); |
99 c.setSeq(p.seq); | |
100 if (debug) { | |
101 System.out.print("Integer compare="); | |
102 System.out.println((new Integer(2)).equals(new Integer(2))); | |
23 | 103 System.out.print("Sedning seq="); |
11 | 104 System.out.println(p.seq); |
105 } | |
106 if (r_top == null){ | |
107 r_end = r_top = p; | |
108 } else { | |
109 r_end.next = p; | |
110 r_end = p; | |
111 } | |
112 return p; | |
113 } | |
114 return null; | |
0 | 115 } |
116 | |
11 | 117 public int seq(PSXReply reply) { |
118 Integer s; | |
119 do { | |
120 seq++; | |
121 if (seq>MAX_SEQUENCE) { | |
122 seq = 0; | |
123 } | |
124 s = new Integer(seq); | |
125 } while (seqHash.containsKey(s)); | |
126 if (debug) { | |
127 System.out.print("hash value = "); | |
128 System.out.println(s.hashCode()); | |
129 } | |
130 seqHash.put(s,reply); | |
131 seq++; | |
132 return seq-1; | |
133 } | |
0 | 134 |
11 | 135 public Selector selector() { |
136 return selector; | |
137 } | |
0 | 138 |
11 | 139 public int sync() throws IOException { |
140 return sync(0); | |
0 | 141 } |
142 | |
11 | 143 public int sync(long mtimeout) |
144 throws IOException { | |
145 int key_num = 0; | |
146 Set<SelectionKey> keys; | |
0 | 147 |
11 | 148 while (q_top != null){ |
149 PSXQueue c = q_top; | |
23 | 150 c.send(); |
11 | 151 q_top = c.next; |
152 // psx_free(c); | |
153 // q_top = c = t; | |
154 qsize--; | |
155 } | |
2 | 156 |
11 | 157 try { |
158 key_num = selector.select(mtimeout); | |
159 keys = selector.selectedKeys(); | |
160 for (SelectionKey key : keys) { | |
161 // System.out.println("selecting"); | |
162 SocketChannel sock = (SocketChannel)key.channel(); | |
163 chkServe(sock); | |
164 } | |
165 } catch (IOException e) { | |
166 e.printStackTrace(); | |
167 } catch (ClosedSelectorException e) { | |
168 e.printStackTrace(); | |
169 } | |
2 | 170 |
11 | 171 return key_num; |
172 } | |
173 | |
2 | 174 private void chkServe(SocketChannel sock) |
11 | 175 throws IOException { |
176 int length; | |
17 | 177 ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); |
11 | 178 command.order(ByteOrder.BIG_ENDIAN); |
0 | 179 |
11 | 180 sock.read(command); |
181 command.rewind(); | |
17 | 182 length = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); |
11 | 183 if (length>0) { |
184 ByteBuffer data = ByteBuffer.allocate(length); | |
185 int read = length; | |
186 if (debug) { | |
187 System.out.print("reading:"); | |
188 System.out.println(length); | |
189 } | |
0 | 190 |
11 | 191 data.order(ByteOrder.BIG_ENDIAN); |
192 while(read>0) { | |
193 read -= sock.read(data); | |
194 } | |
195 data.rewind(); | |
2 | 196 |
11 | 197 if (debug) { |
17 | 198 PSX.printCommand(command, data); |
11 | 199 } |
200 /***if (debug) { | |
0 | 201 System.out.print("header:"); |
202 for(int i=0;i<LINDA_HEADER_SIZE;i++) { | |
203 System.out.println(command.get(i)); | |
204 } | |
205 System.out.print("data:"); | |
206 for(int i=0;i<length;i++) { | |
207 System.out.println(data.get(i)); | |
208 } | |
209 data.rewind(); | |
2 | 210 }***/ |
11 | 211 |
17 | 212 int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET); |
213 int mode = command.get(PSX.LINDA_MODE_OFFSET); | |
11 | 214 Integer a; |
215 /*** | |
0 | 216 if (debug) { |
217 System.out.print("mode = "); | |
218 System.out.println(mode); | |
219 System.out.print("seq = "); | |
220 System.out.println(rseq); | |
2 | 221 }***/ |
11 | 222 try { |
223 PSXReply r = seqHash.get((a = new Integer(rseq))); | |
19
0243987383b7
Meta Protocol Engine and sample implementation of event logger.
kono
parents:
17
diff
changeset
|
224 seqHash.remove(a); |
11 | 225 if (debug) { |
226 System.out.print("hash value = "); | |
227 System.out.println(a.hashCode()); | |
228 } | |
229 | |
230 r.setAnswer(mode,command,data); | |
0 | 231 |
11 | 232 if (r.callback != null ) { |
233 r.callback.callback(data); | |
234 } | |
235 } catch (NullPointerException e ) { | |
236 if (debug) { | |
237 System.out.println("hashed reply not found"); | |
238 } | |
239 // can't happen | |
240 return ; | |
241 } | |
0 | 242 } |
11 | 243 } |
0 | 244 } |
245 | |
246 /* end */ |