0
|
1
|
|
2 /*
|
|
3 * @(#)FederatedLinda.java 1.1 06/04/01
|
|
4 *
|
|
5 * Copyright 2006 Shinji KONO
|
|
6 *
|
|
7
|
|
8 PSX Lidna
|
|
9 Trasport layer of PSX Linda library
|
|
10
|
|
11 */
|
|
12
|
|
13 package fdl;
|
|
14
|
|
15 import java.io.IOException;
|
|
16 import java.nio.ByteBuffer;
|
|
17 import java.nio.ByteOrder;
|
|
18 import java.nio.channels.ClosedSelectorException;
|
|
19 import java.nio.channels.SelectionKey;
|
|
20 import java.nio.channels.Selector;
|
|
21 import java.nio.channels.SocketChannel;
|
|
22 import java.util.Hashtable;
|
31
|
23 import java.util.Iterator;
|
0
|
24
|
|
25 /**
|
|
26 FederatedLinda
|
|
27 *
|
|
28 * @author Shinji Kono
|
|
29 *
|
|
30 * @param mytsid Tuple Space ID
|
|
31
|
|
32 Initialize connection channel for a tuple space
|
|
33
|
|
34 one instance for each Tuple space connection
|
|
35
|
|
36 */
|
|
37
|
17
|
38 public class FederatedLinda {
|
0
|
39
|
22
|
40 static FederatedLinda fdl = new FederatedLinda();
|
11
|
41 static int MAX_SEQUENCE = 2048;
|
|
42 static boolean debug = true;
|
0
|
43
|
11
|
44 public int tid;
|
|
45 public int seq;
|
|
46 public int qsize;
|
25
|
47 public PSXLinda linda;
|
11
|
48
|
|
49 public Selector selector;
|
0
|
50
|
11
|
51 public PSXQueue q_top,q_end;
|
|
52 public PSXReply r_top,r_end;
|
|
53 public Hashtable<Integer,PSXReply> seqHash;
|
0
|
54
|
22
|
55 public static FederatedLinda init() {
|
11
|
56 return fdl;
|
0
|
57 }
|
|
58
|
22
|
59 private FederatedLinda() {
|
|
60 try {
|
|
61 selector = Selector.open();
|
|
62 } catch (IOException e) {
|
|
63 e.printStackTrace();
|
|
64 }
|
11
|
65 seqHash = new Hashtable<Integer, PSXReply>();
|
|
66 }
|
0
|
67
|
25
|
68 public PSXLinda open(String _host,int _port)
|
11
|
69 throws IOException {
|
|
70 tid++;
|
25
|
71 PSXLindaImpl newlinda = new PSXLindaImpl(this,tid,_host,_port);
|
11
|
72 linda = newlinda.add(linda);
|
|
73 return linda;
|
|
74 }
|
0
|
75
|
23
|
76 public PSXReply psx_queue(PSXLinda linda, int id, ByteBuffer s, int mode, PSXCallback callback) {
|
|
77 PSXQueue c = new PSXQueue(linda,id,mode,s,callback);
|
0
|
78
|
11
|
79 if (q_top == null) {
|
|
80 c = q_end = q_top = c;
|
|
81 } else {
|
|
82 q_end.next = c;
|
|
83 q_end = c;
|
|
84 }
|
|
85 qsize++;
|
0
|
86
|
17
|
87 if (mode != PSX.PSX_OUT) {
|
|
88 PSXReply p = new PSXReply(PSX.PSX_REPLY,callback);
|
11
|
89 p.seq = seq(p);
|
|
90 c.setSeq(p.seq);
|
|
91 if (r_top == null){
|
|
92 r_end = r_top = p;
|
|
93 } else {
|
|
94 r_end.next = p;
|
|
95 r_end = p;
|
|
96 }
|
|
97 return p;
|
|
98 }
|
|
99 return null;
|
0
|
100 }
|
|
101
|
11
|
102 public int seq(PSXReply reply) {
|
|
103 Integer s;
|
|
104 do {
|
|
105 seq++;
|
|
106 if (seq>MAX_SEQUENCE) {
|
|
107 seq = 0;
|
|
108 }
|
|
109 s = new Integer(seq);
|
|
110 } while (seqHash.containsKey(s));
|
|
111 if (debug) {
|
|
112 System.out.print("hash value = ");
|
|
113 System.out.println(s.hashCode());
|
|
114 }
|
|
115 seqHash.put(s,reply);
|
|
116 seq++;
|
|
117 return seq-1;
|
|
118 }
|
0
|
119
|
11
|
120 public Selector selector() {
|
|
121 return selector;
|
|
122 }
|
0
|
123
|
11
|
124 public int sync() throws IOException {
|
|
125 return sync(0);
|
0
|
126 }
|
|
127
|
11
|
128 public int sync(long mtimeout)
|
|
129 throws IOException {
|
|
130 int key_num = 0;
|
|
131 while (q_top != null){
|
|
132 PSXQueue c = q_top;
|
23
|
133 c.send();
|
11
|
134 q_top = c.next;
|
|
135 qsize--;
|
|
136 }
|
2
|
137
|
11
|
138 try {
|
31
|
139 if (selector.select(mtimeout)>0) {
|
|
140 for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) {
|
|
141 SelectionKey s = it.next();
|
|
142 it.remove();
|
|
143 chkServe((SocketChannel)s.channel());
|
|
144 }
|
11
|
145 }
|
|
146 } catch (IOException e) {
|
|
147 e.printStackTrace();
|
|
148 } catch (ClosedSelectorException e) {
|
|
149 e.printStackTrace();
|
|
150 }
|
2
|
151
|
11
|
152 return key_num;
|
|
153 }
|
|
154
|
2
|
155 private void chkServe(SocketChannel sock)
|
11
|
156 throws IOException {
|
|
157 int length;
|
17
|
158 ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE);
|
11
|
159 command.order(ByteOrder.BIG_ENDIAN);
|
31
|
160 PSX.receive(sock, command, PSX.LINDA_HEADER_SIZE);
|
0
|
161
|
17
|
162 length = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
|
31
|
163 ByteBuffer data = ByteBuffer.allocate(length);
|
|
164 int read = length;
|
|
165 if (debug) {
|
|
166 System.out.print("client reading:");
|
|
167 System.out.println(length);
|
|
168 }
|
|
169
|
|
170 data.order(ByteOrder.BIG_ENDIAN);
|
|
171 while(read>0) {
|
|
172 read -= sock.read(data);
|
|
173 }
|
|
174 data.rewind();
|
0
|
175
|
31
|
176 if (debug) {
|
|
177 PSX.printCommand("chkServe:",command, data);
|
|
178 }
|
2
|
179
|
31
|
180 int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET);
|
|
181 int mode = command.get(PSX.LINDA_MODE_OFFSET);
|
|
182 PSXReply r = getReply(rseq);
|
|
183 if (r==null) {
|
|
184 System.err.println("Illegal answer sequence.");
|
|
185 return;
|
|
186 }
|
|
187 r.setAnswer(mode,command,data);
|
11
|
188
|
31
|
189 if (r.callback != null ) {
|
|
190 r.callback.callback(data);
|
0
|
191 }
|
11
|
192 }
|
24
|
193
|
|
194 private PSXReply getReply(int rseq) {
|
|
195 Integer a;
|
|
196
|
|
197 PSXReply r = seqHash.get((a = new Integer(rseq)));
|
|
198 seqHash.remove(a);
|
|
199 return r;
|
|
200 }
|
0
|
201 }
|
|
202
|
|
203 /* end */
|