Mercurial > hg > FederatedLinda
annotate src/fdl/FederatedLinda.java @ 24:35375016b2f0 simple-test-passed
cleanup.
author | kono |
---|---|
date | Wed, 20 Aug 2008 10:18:05 +0900 (2008-08-20) |
parents | b4fd7fb9135a |
children | 330fa49bc4fd |
rev | line source |
---|---|
0 | 1 |
2 /* | |
3 * @(#)FederatedLinda.java 1.1 06/04/01 | |
4 * | |
5 * Copyright 2006 Shinji KONO | |
6 * | |
7 | |
8 PSX Lidna | |
9 Trasport layer of PSX Linda library | |
10 | |
11 */ | |
12 | |
13 package fdl; | |
14 | |
15 import java.io.IOException; | |
16 import java.nio.ByteBuffer; | |
17 import java.nio.ByteOrder; | |
18 import java.nio.channels.ClosedSelectorException; | |
19 import java.nio.channels.SelectionKey; | |
20 import java.nio.channels.Selector; | |
21 import java.nio.channels.SocketChannel; | |
22 import java.util.Hashtable; | |
23 import java.util.Set; | |
24 | |
25 | |
26 /** | |
27 FederatedLinda | |
28 * | |
29 * @author Shinji Kono | |
30 * | |
31 * @param mytsid Tuple Space ID | |
32 | |
33 Initialize connection channel for a tuple space | |
34 | |
35 one instance for each Tuple space connection | |
36 | |
37 */ | |
38 | |
17 | 39 public class FederatedLinda { |
0 | 40 |
22 | 41 static FederatedLinda fdl = new FederatedLinda(); |
11 | 42 static int MAX_SEQUENCE = 2048; |
43 static boolean debug = true; | |
0 | 44 |
11 | 45 public int tid; |
46 public int seq; | |
47 public int qsize; | |
19
0243987383b7
Meta Protocol Engine and sample implementation of event logger.
kono
parents:
17
diff
changeset
|
48 public PSXLindaInterface linda; |
11 | 49 |
50 public Selector selector; | |
0 | 51 |
11 | 52 public PSXQueue q_top,q_end; |
53 public PSXReply r_top,r_end; | |
54 public Hashtable<Integer,PSXReply> seqHash; | |
0 | 55 |
22 | 56 public static FederatedLinda init() { |
11 | 57 return fdl; |
0 | 58 } |
59 | |
22 | 60 private FederatedLinda() { |
61 try { | |
62 selector = Selector.open(); | |
63 } catch (IOException e) { | |
64 e.printStackTrace(); | |
65 } | |
11 | 66 seqHash = new Hashtable<Integer, PSXReply>(); |
67 } | |
0 | 68 |
19
0243987383b7
Meta Protocol Engine and sample implementation of event logger.
kono
parents:
17
diff
changeset
|
69 public PSXLindaInterface open(String _host,int _port) |
11 | 70 throws IOException { |
71 tid++; | |
19
0243987383b7
Meta Protocol Engine and sample implementation of event logger.
kono
parents:
17
diff
changeset
|
72 PSXLindaInterface newlinda = new PSXLinda(this,tid,_host,_port); |
11 | 73 linda = newlinda.add(linda); |
74 return linda; | |
75 } | |
0 | 76 |
23 | 77 public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int mode, PSXCallback callback) { |
78 PSXQueue c = new PSXQueue(linda,id,mode,s,callback); | |
0 | 79 |
11 | 80 if (q_top == null) { |
81 c = q_end = q_top = c; | |
82 } else { | |
83 q_end.next = c; | |
84 q_end = c; | |
85 } | |
86 qsize++; | |
0 | 87 |
17 | 88 if (mode != PSX.PSX_OUT) { |
89 PSXReply p = new PSXReply(PSX.PSX_REPLY,callback); | |
11 | 90 p.seq = seq(p); |
91 c.setSeq(p.seq); | |
92 if (r_top == null){ | |
93 r_end = r_top = p; | |
94 } else { | |
95 r_end.next = p; | |
96 r_end = p; | |
97 } | |
98 return p; | |
99 } | |
100 return null; | |
0 | 101 } |
102 | |
11 | 103 public int seq(PSXReply reply) { |
104 Integer s; | |
105 do { | |
106 seq++; | |
107 if (seq>MAX_SEQUENCE) { | |
108 seq = 0; | |
109 } | |
110 s = new Integer(seq); | |
111 } while (seqHash.containsKey(s)); | |
112 if (debug) { | |
113 System.out.print("hash value = "); | |
114 System.out.println(s.hashCode()); | |
115 } | |
116 seqHash.put(s,reply); | |
117 seq++; | |
118 return seq-1; | |
119 } | |
0 | 120 |
11 | 121 public Selector selector() { |
122 return selector; | |
123 } | |
0 | 124 |
11 | 125 public int sync() throws IOException { |
126 return sync(0); | |
0 | 127 } |
128 | |
11 | 129 public int sync(long mtimeout) |
130 throws IOException { | |
131 int key_num = 0; | |
132 Set<SelectionKey> keys; | |
0 | 133 |
11 | 134 while (q_top != null){ |
135 PSXQueue c = q_top; | |
23 | 136 c.send(); |
11 | 137 q_top = c.next; |
138 qsize--; | |
139 } | |
2 | 140 |
11 | 141 try { |
142 key_num = selector.select(mtimeout); | |
143 keys = selector.selectedKeys(); | |
144 for (SelectionKey key : keys) { | |
145 SocketChannel sock = (SocketChannel)key.channel(); | |
146 chkServe(sock); | |
147 } | |
148 } catch (IOException e) { | |
149 e.printStackTrace(); | |
150 } catch (ClosedSelectorException e) { | |
151 e.printStackTrace(); | |
152 } | |
2 | 153 |
11 | 154 return key_num; |
155 } | |
156 | |
2 | 157 private void chkServe(SocketChannel sock) |
11 | 158 throws IOException { |
159 int length; | |
17 | 160 ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE); |
11 | 161 command.order(ByteOrder.BIG_ENDIAN); |
0 | 162 |
11 | 163 sock.read(command); |
164 command.rewind(); | |
17 | 165 length = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); |
11 | 166 if (length>0) { |
167 ByteBuffer data = ByteBuffer.allocate(length); | |
168 int read = length; | |
169 if (debug) { | |
170 System.out.print("reading:"); | |
171 System.out.println(length); | |
172 } | |
0 | 173 |
11 | 174 data.order(ByteOrder.BIG_ENDIAN); |
175 while(read>0) { | |
176 read -= sock.read(data); | |
177 } | |
178 data.rewind(); | |
2 | 179 |
11 | 180 if (debug) { |
17 | 181 PSX.printCommand(command, data); |
11 | 182 } |
183 | |
17 | 184 int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET); |
185 int mode = command.get(PSX.LINDA_MODE_OFFSET); | |
24 | 186 PSXReply r = getReply(rseq); |
187 r.setAnswer(mode,command,data); | |
11 | 188 |
24 | 189 if (r.callback != null ) { |
190 r.callback.callback(data); | |
11 | 191 } |
0 | 192 } |
11 | 193 } |
24 | 194 |
195 private PSXReply getReply(int rseq) { | |
196 Integer a; | |
197 | |
198 PSXReply r = seqHash.get((a = new Integer(rseq))); | |
199 if (r==null) { | |
200 System.out.println("hashed reply not found"); | |
201 } | |
202 seqHash.remove(a); | |
203 return r; | |
204 } | |
0 | 205 } |
206 | |
207 /* end */ |