Mercurial > hg > FederatedLinda
annotate src/fdl/FederatedLinda.java @ 122:ad73eacf560a default tip
remove warning
author | e095732 |
---|---|
date | Thu, 07 Feb 2013 22:32:26 +0900 |
parents | 8ae522e1a4bf |
children |
rev | line source |
---|---|
0 | 1 |
2 /* | |
3 * @(#)FederatedLinda.java 1.1 06/04/01 | |
4 * | |
5 * Copyright 2006 Shinji KONO | |
6 * | |
7 | |
8 PSX Lidna | |
9 Trasport layer of PSX Linda library | |
10 | |
11 */ | |
12 | |
13 package fdl; | |
14 | |
15 import java.io.IOException; | |
16 import java.nio.ByteBuffer; | |
17 import java.nio.channels.ClosedSelectorException; | |
18 import java.nio.channels.SelectionKey; | |
19 import java.nio.channels.Selector; | |
20 import java.util.Hashtable; | |
31 | 21 import java.util.Iterator; |
40 | 22 import java.util.logging.Level; |
0 | 23 |
24 /** | |
25 FederatedLinda | |
26 * | |
27 * @author Shinji Kono | |
28 * | |
29 * @param mytsid Tuple Space ID | |
30 | |
31 Initialize connection channel for a tuple space | |
32 | |
33 one instance for each Tuple space connection | |
34 | |
35 */ | |
36 | |
17 | 37 public class FederatedLinda { |
0 | 38 |
35
fe338d497c72
FederatedLinda was static singleton. It does not work on Thread based test.
kono
parents:
34
diff
changeset
|
39 FederatedLinda fdl; |
11 | 40 static int MAX_SEQUENCE = 2048; |
96 | 41 public static boolean debug = false; |
0 | 42 |
11 | 43 public int tid; |
44 public int seq; | |
45 public int qsize; | |
25 | 46 public PSXLinda linda; |
11 | 47 |
48 public Selector selector; | |
0 | 49 |
11 | 50 public PSXQueue q_top,q_end; |
51 public PSXReply r_top,r_end; | |
101 | 52 public Hashtable<Integer,PSXReply> seqHash = new Hashtable<Integer, PSXReply>(); |
0 | 53 |
22 | 54 public static FederatedLinda init() { |
35
fe338d497c72
FederatedLinda was static singleton. It does not work on Thread based test.
kono
parents:
34
diff
changeset
|
55 FederatedLinda fdl = new FederatedLinda(); |
11 | 56 return fdl; |
0 | 57 } |
58 | |
101 | 59 public static FederatedLinda init(Selector selector) { |
60 FederatedLinda fdl = new FederatedLinda(selector); | |
61 return fdl; | |
62 } | |
63 | |
22 | 64 private FederatedLinda() { |
65 try { | |
66 selector = Selector.open(); | |
67 } catch (IOException e) { | |
68 e.printStackTrace(); | |
69 } | |
101 | 70 } |
71 | |
72 public FederatedLinda(Selector selector) { | |
73 this.selector = selector; | |
11 | 74 } |
0 | 75 |
25 | 76 public PSXLinda open(String _host,int _port) |
11 | 77 throws IOException { |
78 tid++; | |
39 | 79 PSXLindaImpl newlinda = new PSXLindaImpl(this,selector,tid,_host,_port); |
11 | 80 linda = newlinda.add(linda); |
81 return linda; | |
82 } | |
0 | 83 |
39 | 84 PSXLinda openFromMetaLinda(MetaLinda metaLinda, String _host, int _port) |
85 throws IOException { | |
86 tid++; | |
87 PSXLindaImpl newlinda = new PSXLindaImpl(this,metaLinda.fds.selector,tid,_host,_port); | |
88 linda = newlinda.add(linda); | |
89 return newlinda; | |
90 } | |
91 | |
23 | 92 public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int mode, PSXCallback callback) { |
93 PSXQueue c = new PSXQueue(linda,id,mode,s,callback); | |
0 | 94 |
11 | 95 if (q_top == null) { |
96 c = q_end = q_top = c; | |
97 } else { | |
98 q_end.next = c; | |
99 q_end = c; | |
100 } | |
101 qsize++; | |
0 | 102 |
17 | 103 if (mode != PSX.PSX_OUT) { |
104 PSXReply p = new PSXReply(PSX.PSX_REPLY,callback); | |
11 | 105 p.seq = seq(p); |
106 c.setSeq(p.seq); | |
107 if (r_top == null){ | |
108 r_end = r_top = p; | |
109 } else { | |
110 r_end.next = p; | |
111 r_end = p; | |
112 } | |
113 return p; | |
114 } | |
115 return null; | |
0 | 116 } |
117 | |
11 | 118 public int seq(PSXReply reply) { |
119 Integer s; | |
120 do { | |
121 seq++; | |
122 if (seq>MAX_SEQUENCE) { | |
123 seq = 0; | |
124 } | |
125 s = new Integer(seq); | |
126 } while (seqHash.containsKey(s)); | |
96 | 127 // log(Level.INFO,"hash value = "+s.hashCode()); |
11 | 128 seqHash.put(s,reply); |
129 seq++; | |
130 return seq-1; | |
131 } | |
0 | 132 |
11 | 133 public Selector selector() { |
134 return selector; | |
135 } | |
0 | 136 |
96 | 137 /** |
138 * sync with no wait | |
139 * @return 0 | |
140 * @throws IOException | |
141 */ | |
100 | 142 public int sync() { |
96 | 143 int key_num = 0; |
144 queueExec(); | |
145 | |
146 try { | |
100 | 147 if (selector.selectNow()>0) { |
96 | 148 for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) { |
149 SelectionKey s = it.next(); | |
150 it.remove(); | |
151 try { | |
152 if (!s.isReadable()) | |
153 throw new IOException(); | |
154 TupleHandler handle = (TupleHandler)s.attachment(); | |
155 handle.handle(s); | |
156 } catch (IOException e) { | |
157 s.cancel(); | |
158 log(Level.INFO,""+s.channel()+" is closed."); | |
159 } | |
160 } | |
161 } | |
162 } catch (IOException e) { | |
163 e.printStackTrace(); | |
164 } catch (ClosedSelectorException e) { | |
165 // client should be know | |
166 } | |
167 | |
168 return key_num; | |
0 | 169 } |
170 | |
96 | 171 /** |
172 * sync with mtimeout msec wait | |
173 * @param mtimeout 0 means indifinite wait | |
174 * @return 0 | |
175 * @throws IOException | |
176 */ | |
100 | 177 public int sync(long mtimeout) { |
11 | 178 int key_num = 0; |
39 | 179 queueExec(); |
2 | 180 |
11 | 181 try { |
31 | 182 if (selector.select(mtimeout)>0) { |
183 for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) { | |
184 SelectionKey s = it.next(); | |
185 it.remove(); | |
33 | 186 try { |
34 | 187 if (!s.isReadable()) |
188 throw new IOException(); | |
39 | 189 TupleHandler handle = (TupleHandler)s.attachment(); |
190 handle.handle(s); | |
33 | 191 } catch (IOException e) { |
34 | 192 s.cancel(); |
40 | 193 log(Level.INFO,""+s.channel()+" is closed."); |
33 | 194 } |
195 } | |
11 | 196 } |
197 } catch (IOException e) { | |
198 e.printStackTrace(); | |
199 } catch (ClosedSelectorException e) { | |
36 | 200 // client should be know |
11 | 201 } |
2 | 202 |
11 | 203 return key_num; |
204 } | |
205 | |
86 | 206 public void queueExec() { |
39 | 207 while (q_top != null){ |
208 PSXQueue c = q_top; | |
209 c.send(); | |
210 q_top = c.next; | |
211 qsize--; | |
0 | 212 } |
11 | 213 } |
39 | 214 |
215 PSXReply getReply(int rseq) { | |
24 | 216 Integer a; |
217 | |
218 PSXReply r = seqHash.get((a = new Integer(rseq))); | |
219 seqHash.remove(a); | |
220 return r; | |
221 } | |
39 | 222 |
40 | 223 public void log(Level level,String msg) { |
96 | 224 if (level!=Level.SEVERE && !debug) return; |
40 | 225 System.err.println(msg); |
226 if (level==Level.SEVERE) | |
227 new IOException().setStackTrace(null); | |
228 } | |
93 | 229 |
230 public void wakeup() { | |
231 selector.wakeup(); | |
232 } | |
101 | 233 |
0 | 234 } |
235 | |
236 /* end */ |