Mercurial > hg > FederatedLinda
annotate src/fdl/FederatedLinda.java @ 101:d671c78d3757 fuchita
share selector in FDLindaServe and FederatedLinda
author | one |
---|---|
date | Wed, 26 May 2010 15:57:23 +0900 |
parents | 270093b61001 |
children | 8ae522e1a4bf |
rev | line source |
---|---|
0 | 1 |
2 /* | |
3 * @(#)FederatedLinda.java 1.1 06/04/01 | |
4 * | |
5 * Copyright 2006 Shinji KONO | |
6 * | |
7 | |
8 PSX Lidna | |
9 Trasport layer of PSX Linda library | |
10 | |
11 */ | |
12 | |
13 package fdl; | |
14 | |
15 import java.io.IOException; | |
16 import java.nio.ByteBuffer; | |
17 import java.nio.channels.ClosedSelectorException; | |
18 import java.nio.channels.SelectionKey; | |
19 import java.nio.channels.Selector; | |
101 | 20 import java.nio.channels.spi.AbstractSelector; |
0 | 21 import java.util.Hashtable; |
31 | 22 import java.util.Iterator; |
40 | 23 import java.util.logging.Level; |
0 | 24 |
25 /** | |
26 FederatedLinda | |
27 * | |
28 * @author Shinji Kono | |
29 * | |
30 * @param mytsid Tuple Space ID | |
31 | |
32 Initialize connection channel for a tuple space | |
33 | |
34 one instance for each Tuple space connection | |
35 | |
36 */ | |
37 | |
17 | 38 public class FederatedLinda { |
0 | 39 |
35
fe338d497c72
FederatedLinda was static singleton. It does not work on Thread based test.
kono
parents:
34
diff
changeset
|
40 FederatedLinda fdl; |
11 | 41 static int MAX_SEQUENCE = 2048; |
96 | 42 public static boolean debug = false; |
0 | 43 |
11 | 44 public int tid; |
45 public int seq; | |
46 public int qsize; | |
25 | 47 public PSXLinda linda; |
11 | 48 |
49 public Selector selector; | |
0 | 50 |
11 | 51 public PSXQueue q_top,q_end; |
52 public PSXReply r_top,r_end; | |
101 | 53 public Hashtable<Integer,PSXReply> seqHash = new Hashtable<Integer, PSXReply>(); |
0 | 54 |
22 | 55 public static FederatedLinda init() { |
35
fe338d497c72
FederatedLinda was static singleton. It does not work on Thread based test.
kono
parents:
34
diff
changeset
|
56 FederatedLinda fdl = new FederatedLinda(); |
11 | 57 return fdl; |
0 | 58 } |
59 | |
101 | 60 public static FederatedLinda init(Selector selector) { |
61 FederatedLinda fdl = new FederatedLinda(selector); | |
62 return fdl; | |
63 } | |
64 | |
22 | 65 private FederatedLinda() { |
66 try { | |
67 selector = Selector.open(); | |
68 } catch (IOException e) { | |
69 e.printStackTrace(); | |
70 } | |
101 | 71 } |
72 | |
73 public FederatedLinda(Selector selector) { | |
74 this.selector = selector; | |
11 | 75 } |
0 | 76 |
25 | 77 public PSXLinda open(String _host,int _port) |
11 | 78 throws IOException { |
79 tid++; | |
39 | 80 PSXLindaImpl newlinda = new PSXLindaImpl(this,selector,tid,_host,_port); |
11 | 81 linda = newlinda.add(linda); |
82 return linda; | |
83 } | |
0 | 84 |
39 | 85 PSXLinda openFromMetaLinda(MetaLinda metaLinda, String _host, int _port) |
86 throws IOException { | |
87 tid++; | |
88 PSXLindaImpl newlinda = new PSXLindaImpl(this,metaLinda.fds.selector,tid,_host,_port); | |
89 linda = newlinda.add(linda); | |
90 return newlinda; | |
91 } | |
92 | |
23 | 93 public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int mode, PSXCallback callback) { |
94 PSXQueue c = new PSXQueue(linda,id,mode,s,callback); | |
0 | 95 |
11 | 96 if (q_top == null) { |
97 c = q_end = q_top = c; | |
98 } else { | |
99 q_end.next = c; | |
100 q_end = c; | |
101 } | |
102 qsize++; | |
0 | 103 |
17 | 104 if (mode != PSX.PSX_OUT) { |
105 PSXReply p = new PSXReply(PSX.PSX_REPLY,callback); | |
11 | 106 p.seq = seq(p); |
107 c.setSeq(p.seq); | |
108 if (r_top == null){ | |
109 r_end = r_top = p; | |
110 } else { | |
111 r_end.next = p; | |
112 r_end = p; | |
113 } | |
114 return p; | |
115 } | |
116 return null; | |
0 | 117 } |
118 | |
11 | 119 public int seq(PSXReply reply) { |
120 Integer s; | |
121 do { | |
122 seq++; | |
123 if (seq>MAX_SEQUENCE) { | |
124 seq = 0; | |
125 } | |
126 s = new Integer(seq); | |
127 } while (seqHash.containsKey(s)); | |
96 | 128 // log(Level.INFO,"hash value = "+s.hashCode()); |
11 | 129 seqHash.put(s,reply); |
130 seq++; | |
131 return seq-1; | |
132 } | |
0 | 133 |
11 | 134 public Selector selector() { |
135 return selector; | |
136 } | |
0 | 137 |
96 | 138 /** |
139 * sync with no wait | |
140 * @return 0 | |
141 * @throws IOException | |
142 */ | |
100 | 143 public int sync() { |
96 | 144 int key_num = 0; |
145 queueExec(); | |
146 | |
147 try { | |
100 | 148 if (selector.selectNow()>0) { |
96 | 149 for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) { |
150 SelectionKey s = it.next(); | |
151 it.remove(); | |
152 try { | |
153 if (!s.isReadable()) | |
154 throw new IOException(); | |
155 TupleHandler handle = (TupleHandler)s.attachment(); | |
156 handle.handle(s); | |
157 } catch (IOException e) { | |
158 s.cancel(); | |
159 log(Level.INFO,""+s.channel()+" is closed."); | |
160 } | |
161 } | |
162 } | |
163 } catch (IOException e) { | |
164 e.printStackTrace(); | |
165 } catch (ClosedSelectorException e) { | |
166 // client should be know | |
167 } | |
168 | |
169 return key_num; | |
0 | 170 } |
171 | |
96 | 172 /** |
173 * sync with mtimeout msec wait | |
174 * @param mtimeout 0 means indifinite wait | |
175 * @return 0 | |
176 * @throws IOException | |
177 */ | |
100 | 178 public int sync(long mtimeout) { |
11 | 179 int key_num = 0; |
39 | 180 queueExec(); |
2 | 181 |
11 | 182 try { |
31 | 183 if (selector.select(mtimeout)>0) { |
184 for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) { | |
185 SelectionKey s = it.next(); | |
186 it.remove(); | |
33 | 187 try { |
34 | 188 if (!s.isReadable()) |
189 throw new IOException(); | |
39 | 190 TupleHandler handle = (TupleHandler)s.attachment(); |
191 handle.handle(s); | |
33 | 192 } catch (IOException e) { |
34 | 193 s.cancel(); |
40 | 194 log(Level.INFO,""+s.channel()+" is closed."); |
33 | 195 } |
196 } | |
11 | 197 } |
198 } catch (IOException e) { | |
199 e.printStackTrace(); | |
200 } catch (ClosedSelectorException e) { | |
36 | 201 // client should be know |
11 | 202 } |
2 | 203 |
11 | 204 return key_num; |
205 } | |
206 | |
86 | 207 public void queueExec() { |
39 | 208 while (q_top != null){ |
209 PSXQueue c = q_top; | |
210 c.send(); | |
211 q_top = c.next; | |
212 qsize--; | |
0 | 213 } |
11 | 214 } |
39 | 215 |
216 PSXReply getReply(int rseq) { | |
24 | 217 Integer a; |
218 | |
219 PSXReply r = seqHash.get((a = new Integer(rseq))); | |
220 seqHash.remove(a); | |
221 return r; | |
222 } | |
39 | 223 |
40 | 224 public void log(Level level,String msg) { |
96 | 225 if (level!=Level.SEVERE && !debug) return; |
40 | 226 System.err.println(msg); |
227 if (level==Level.SEVERE) | |
228 new IOException().setStackTrace(null); | |
229 } | |
93 | 230 |
231 public void wakeup() { | |
232 selector.wakeup(); | |
233 } | |
101 | 234 |
0 | 235 } |
236 | |
237 /* end */ |