Mercurial > hg > FederatedLinda
annotate src/fdl/FederatedLinda.java @ 95:7bf2eeea23a0 fuchita
Merge with ea4ee892baf5df730093bbee7d5eb2f5b5acaf53
author | one |
---|---|
date | Tue, 25 May 2010 23:11:11 +0900 |
parents | a1d796c0e975 c0591636a71a |
children | 96c63bc659d4 |
rev | line source |
---|---|
0 | 1 |
2 /* | |
3 * @(#)FederatedLinda.java 1.1 06/04/01 | |
4 * | |
5 * Copyright 2006 Shinji KONO | |
6 * | |
7 | |
8 PSX Lidna | |
9 Trasport layer of PSX Linda library | |
10 | |
11 */ | |
12 | |
13 package fdl; | |
14 | |
15 import java.io.IOException; | |
16 import java.nio.ByteBuffer; | |
17 import java.nio.channels.ClosedSelectorException; | |
18 import java.nio.channels.SelectionKey; | |
19 import java.nio.channels.Selector; | |
20 import java.util.Hashtable; | |
31 | 21 import java.util.Iterator; |
40 | 22 import java.util.logging.Level; |
0 | 23 |
24 /** | |
25 FederatedLinda | |
26 * | |
27 * @author Shinji Kono | |
28 * | |
29 * @param mytsid Tuple Space ID | |
30 | |
31 Initialize connection channel for a tuple space | |
32 | |
33 one instance for each Tuple space connection | |
34 | |
35 */ | |
36 | |
17 | 37 public class FederatedLinda { |
0 | 38 |
35
fe338d497c72
FederatedLinda was static singleton. It does not work on Thread based test.
kono
parents:
34
diff
changeset
|
39 FederatedLinda fdl; |
11 | 40 static int MAX_SEQUENCE = 2048; |
33 | 41 static boolean debug = false; |
0 | 42 |
11 | 43 public int tid; |
44 public int seq; | |
45 public int qsize; | |
25 | 46 public PSXLinda linda; |
11 | 47 |
48 public Selector selector; | |
0 | 49 |
11 | 50 public PSXQueue q_top,q_end; |
51 public PSXReply r_top,r_end; | |
52 public Hashtable<Integer,PSXReply> seqHash; | |
0 | 53 |
22 | 54 public static FederatedLinda init() { |
35
fe338d497c72
FederatedLinda was static singleton. It does not work on Thread based test.
kono
parents:
34
diff
changeset
|
55 FederatedLinda fdl = new FederatedLinda(); |
11 | 56 return fdl; |
0 | 57 } |
58 | |
22 | 59 private FederatedLinda() { |
60 try { | |
61 selector = Selector.open(); | |
62 } catch (IOException e) { | |
63 e.printStackTrace(); | |
64 } | |
11 | 65 seqHash = new Hashtable<Integer, PSXReply>(); |
66 } | |
0 | 67 |
25 | 68 public PSXLinda open(String _host,int _port) |
11 | 69 throws IOException { |
70 tid++; | |
39 | 71 PSXLindaImpl newlinda = new PSXLindaImpl(this,selector,tid,_host,_port); |
11 | 72 linda = newlinda.add(linda); |
73 return linda; | |
74 } | |
0 | 75 |
39 | 76 PSXLinda openFromMetaLinda(MetaLinda metaLinda, String _host, int _port) |
77 throws IOException { | |
78 tid++; | |
79 PSXLindaImpl newlinda = new PSXLindaImpl(this,metaLinda.fds.selector,tid,_host,_port); | |
80 linda = newlinda.add(linda); | |
81 return newlinda; | |
82 } | |
83 | |
23 | 84 public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int mode, PSXCallback callback) { |
85 PSXQueue c = new PSXQueue(linda,id,mode,s,callback); | |
0 | 86 |
11 | 87 if (q_top == null) { |
88 c = q_end = q_top = c; | |
89 } else { | |
90 q_end.next = c; | |
91 q_end = c; | |
92 } | |
93 qsize++; | |
0 | 94 |
17 | 95 if (mode != PSX.PSX_OUT) { |
96 PSXReply p = new PSXReply(PSX.PSX_REPLY,callback); | |
11 | 97 p.seq = seq(p); |
98 c.setSeq(p.seq); | |
99 if (r_top == null){ | |
100 r_end = r_top = p; | |
101 } else { | |
102 r_end.next = p; | |
103 r_end = p; | |
104 } | |
105 return p; | |
106 } | |
107 return null; | |
0 | 108 } |
109 | |
11 | 110 public int seq(PSXReply reply) { |
111 Integer s; | |
112 do { | |
113 seq++; | |
114 if (seq>MAX_SEQUENCE) { | |
115 seq = 0; | |
116 } | |
117 s = new Integer(seq); | |
118 } while (seqHash.containsKey(s)); | |
119 if (debug) { | |
40 | 120 log(Level.INFO,"hash value = "+s.hashCode()); |
11 | 121 } |
122 seqHash.put(s,reply); | |
123 seq++; | |
124 return seq-1; | |
125 } | |
0 | 126 |
11 | 127 public Selector selector() { |
128 return selector; | |
129 } | |
0 | 130 |
11 | 131 public int sync() throws IOException { |
132 return sync(0); | |
0 | 133 } |
134 | |
11 | 135 public int sync(long mtimeout) |
136 throws IOException { | |
137 int key_num = 0; | |
39 | 138 queueExec(); |
2 | 139 |
11 | 140 try { |
31 | 141 if (selector.select(mtimeout)>0) { |
142 for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) { | |
143 SelectionKey s = it.next(); | |
144 it.remove(); | |
33 | 145 try { |
34 | 146 if (!s.isReadable()) |
147 throw new IOException(); | |
39 | 148 TupleHandler handle = (TupleHandler)s.attachment(); |
149 handle.handle(s); | |
33 | 150 } catch (IOException e) { |
34 | 151 s.cancel(); |
40 | 152 log(Level.INFO,""+s.channel()+" is closed."); |
33 | 153 } |
154 } | |
11 | 155 } |
156 } catch (IOException e) { | |
157 e.printStackTrace(); | |
158 } catch (ClosedSelectorException e) { | |
36 | 159 // client should be know |
11 | 160 } |
2 | 161 |
11 | 162 return key_num; |
163 } | |
164 | |
86 | 165 public void queueExec() { |
39 | 166 while (q_top != null){ |
167 PSXQueue c = q_top; | |
168 c.send(); | |
169 q_top = c.next; | |
170 qsize--; | |
0 | 171 } |
11 | 172 } |
39 | 173 |
174 PSXReply getReply(int rseq) { | |
24 | 175 Integer a; |
176 | |
177 PSXReply r = seqHash.get((a = new Integer(rseq))); | |
178 seqHash.remove(a); | |
179 return r; | |
180 } | |
39 | 181 |
40 | 182 public void log(Level level,String msg) { |
183 System.err.println(msg); | |
184 if (level==Level.SEVERE) | |
185 new IOException().setStackTrace(null); | |
186 } | |
93 | 187 |
188 public void wakeup() { | |
189 selector.wakeup(); | |
190 } | |
0 | 191 } |
192 | |
193 /* end */ |