Mercurial > hg > FederatedLinda
comparison src/fdl/TupleSpace.java @ 17:609b288f47f9
*** empty log message ***
author | kono |
---|---|
date | Mon, 18 Aug 2008 07:28:29 +0900 |
parents | cccf34386cad |
children | 0243987383b7 |
comparison
equal
deleted
inserted
replaced
16:cccf34386cad | 17:609b288f47f9 |
---|---|
7 import java.nio.channels.SocketChannel; | 7 import java.nio.channels.SocketChannel; |
8 import java.nio.charset.CharacterCodingException; | 8 import java.nio.charset.CharacterCodingException; |
9 import java.nio.charset.Charset; | 9 import java.nio.charset.Charset; |
10 import java.nio.charset.CharsetDecoder; | 10 import java.nio.charset.CharsetDecoder; |
11 | 11 |
12 public class TupleSpace implements PSXQueueInterface{ | 12 public class TupleSpace { |
13 static final boolean debug = true; | 13 static final boolean debug = true; |
14 static final int CAPSIZE = 4194304; | 14 static final int CAPSIZE = 4194304; |
15 public Tuple[] tuple_space; | 15 public Tuple[] tuple_space; |
16 | 16 |
17 public TupleSpace(Tuple[] _tuple_space) { | 17 public TupleSpace(Tuple[] _tuple_space) { |
23 public TupleSpace() { | 23 public TupleSpace() { |
24 // TODO Auto-generated constructor stub | 24 // TODO Auto-generated constructor stub |
25 } | 25 } |
26 | 26 |
27 protected String Out(ByteBuffer command, ByteBuffer data) throws IOException { | 27 protected String Out(ByteBuffer command, ByteBuffer data) throws IOException { |
28 Tuple tmpTuple; | 28 Tuple tuple; |
29 int id; | 29 int id; |
30 int datasize; | 30 int datasize; |
31 char idc = (char)command.getShort(LINDA_ID_OFFSET); | 31 char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); |
32 command.rewind(); | 32 command.rewind(); |
33 id = (int)idc; | 33 id = (int)idc; |
34 String sendtext = "none"; | 34 String sendtext = "none"; |
35 | 35 |
36 datasize = command.getInt(LINDA_DATA_LENGTH_OFFSET); | 36 datasize = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); |
37 command.rewind(); | 37 command.rewind(); |
38 | 38 |
39 System.out.println("*** out command : id = "+id +" ***"); | 39 System.out.println("*** out command : id = "+id +" ***"); |
40 | 40 |
41 while((tuple_space[id] != null) && | 41 while((tuple_space[id] != null) && |
42 ((tuple_space[id].mode == PSX_WAIT_RD)||(tuple_space[id].mode == PSX_RD))) { | 42 ((tuple_space[id].mode == PSX.PSX_WAIT_RD)||(tuple_space[id].mode == PSX.PSX_RD))) { |
43 command.put(LINDA_MODE_OFFSET, (byte)'a'); | 43 PSX.setAnserCommand(command, tuple_space[id].getSeq()); |
44 command.rewind(); | |
45 command.putInt(LINDA_SEQ_OFFSET, tuple_space[id].getSeq()); | |
46 command.rewind(); | |
47 //if(debug){ | 44 //if(debug){ |
48 //int sendsize = tmpTuple.getdataLength()+LINDA_HEADER_SIZE; | 45 //int sendsize = tmpTuple.getdataLength()+PSX.LINDA_HEADER_SIZE; |
49 //System.out.println("send size "+sendsize+" : mode = "+(char)mode); | 46 //System.out.println("send size "+sendsize+" : mode = "+(char)mode); |
50 //} | 47 //} |
51 //ByteBuffer sendcommand = tmpTuple.getCommand(); | 48 //ByteBuffer sendcommand = tmpTuple.getCommand(); |
52 //ByteBuffer senddata = tmpTuple.getData(); | 49 //ByteBuffer senddata = tmpTuple.getData(); |
53 send(tuple_space[id].ch, command, data); | 50 send(tuple_space[id].ch, command, data); |
54 | 51 |
55 sendtext = getdataString(data); | 52 sendtext = getdataString(data); |
56 | 53 |
57 | 54 |
58 //後処理 | 55 removeTuple(id); |
59 tmpTuple = tuple_space[id]; | 56 tuple = null; |
60 tuple_space[id] = tmpTuple.next; | 57 } |
61 tmpTuple = null; | 58 if(tuple_space[id] != null && tuple_space[id].mode == PSX.PSX_IN) { |
62 } | 59 PSX.setAnserCommand(command, tuple_space[id].getSeq()); |
63 if(tuple_space[id] != null && tuple_space[id].mode == PSX_IN) { | |
64 command.put(LINDA_MODE_OFFSET, (byte)'a'); | |
65 command.rewind(); | |
66 command.putInt(LINDA_SEQ_OFFSET, tuple_space[id].getSeq()); | |
67 command.rewind(); | |
68 | 60 |
69 if(debug){ | 61 if(debug){ |
70 int sendsize = datasize+LINDA_HEADER_SIZE; | 62 int sendsize = datasize+PSX.LINDA_HEADER_SIZE; |
71 System.out.println("send size "+sendsize+" : mode = "+(char)'a'); | 63 System.out.println("send size "+sendsize+" : mode = "+(char)'a'); |
72 } | 64 } |
73 //ByteBuffer sendcommand = tmpTuple.getCommand(); | 65 //ByteBuffer sendcommand = tmpTuple.getCommand(); |
74 //ByteBuffer senddata = tmpTuple.getData(); | 66 //ByteBuffer senddata = tmpTuple.getData(); |
75 send(tuple_space[id].ch, command, data); | 67 send(tuple_space[id].ch, command, data); |
76 | 68 |
77 sendtext = getdataString(data); | 69 sendtext = getdataString(data); |
78 | 70 |
79 //後処理 | 71 removeTuple(id); |
80 tmpTuple = tuple_space[id]; | 72 tuple = null; |
81 tuple_space[id] = tmpTuple.next; | 73 } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX.PSX_OUT)) { |
82 tmpTuple = null; | 74 if((tuple = tuple_space[id]) == null) { |
83 } else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX_OUT)) { | 75 tuple = tuple_space[id] = new Tuple(); |
84 if((tmpTuple = tuple_space[id]) == null) { | 76 tuple.next = null; |
85 tmpTuple = tuple_space[id] = new Tuple(); | |
86 tmpTuple.next = null; | |
87 } | 77 } |
88 else { | 78 else { |
89 while(tmpTuple.next != null) tmpTuple = tmpTuple.next; | 79 while(tuple.next != null) tuple = tuple.next; |
90 tmpTuple.next = new Tuple(); | 80 tuple.next = new Tuple(); |
91 tmpTuple = tmpTuple.next; | 81 tuple = tuple.next; |
92 tmpTuple.next = null; | 82 tuple.next = null; |
93 } | 83 } |
94 | 84 |
95 tmpTuple.setMode('o'); | 85 tuple.setMode('o'); |
96 int seq = command.getInt(LINDA_SEQ_OFFSET); | 86 int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); |
97 command.rewind(); | 87 command.rewind(); |
98 tmpTuple.setSeq(seq); | 88 tuple.setSeq(seq); |
99 tmpTuple.setData(data); | 89 tuple.setData(data); |
100 tmpTuple.setDataLength(datasize); | 90 tuple.setDataLength(datasize); |
101 if(debug){ | 91 if(debug){ |
102 System.out.println("data inserted len = "+tmpTuple.getdataLength()+" : id = "+id); | 92 System.out.println("data inserted len = "+tuple.getdataLength()+" : id = "+id); |
103 } | 93 } |
104 } | 94 } |
105 else { | 95 else { |
106 System.out.println("Incorrect mode :"+(char)tuple_space[id].getMode()); | 96 System.out.println("Incorrect mode :"+(char)tuple_space[id].getMode()); |
107 command.clear(); | 97 command.clear(); |
109 System.exit(1); | 99 System.exit(1); |
110 } | 100 } |
111 return sendtext; | 101 return sendtext; |
112 } | 102 } |
113 | 103 |
104 private void removeTuple(int id) { | |
105 Tuple tuple; | |
106 //後処理 | |
107 tuple = tuple_space[id]; | |
108 tuple_space[id] = tuple.next; | |
109 } | |
110 | |
114 protected void Wait_Rd(SelectionKey key, ByteBuffer command, int mode) { | 111 protected void Wait_Rd(SelectionKey key, ByteBuffer command, int mode) { |
115 Tuple tmpTuple; | 112 Tuple tuple; |
116 int id; | 113 int id; |
117 | 114 |
118 char idc = (char)command.getShort(LINDA_ID_OFFSET); | 115 char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); |
119 command.rewind(); | 116 command.rewind(); |
120 id = (int)idc; | 117 id = (int)idc; |
121 | 118 |
122 if (debug) | 119 if (debug) |
123 System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); | 120 System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); |
124 | 121 |
125 tmpTuple = new Tuple(); | 122 tuple = new Tuple(); |
126 tmpTuple.setMode(mode); | 123 tuple.setMode(mode); |
127 int seq = command.getInt(LINDA_SEQ_OFFSET); | 124 int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); |
128 command.rewind(); | 125 command.rewind(); |
129 tmpTuple.setSeq(seq); | 126 tuple.setSeq(seq); |
130 tmpTuple.ch = (SocketChannel) key.channel(); | 127 tuple.ch = (SocketChannel) key.channel(); |
131 tmpTuple.setDataLength(0); | 128 tuple.setDataLength(0); |
132 ByteBuffer buff = ByteBuffer.allocate(0); | 129 ByteBuffer buff = ByteBuffer.allocate(0); |
133 tmpTuple.setData(buff); | 130 tuple.setData(buff); |
134 tmpTuple.next = tuple_space[id]; | 131 tuple.next = tuple_space[id]; |
135 tuple_space[id] = tmpTuple; | 132 tuple_space[id] = tuple; |
136 if(debug){ | 133 if(debug){ |
137 System.out.println("data inserted insert seq = "+seq +", id = "+id); | 134 System.out.println("data inserted insert seq = "+seq +", id = "+id); |
138 } | 135 } |
139 } | 136 } |
140 | 137 |
141 protected String In_Rd(SelectionKey key, ByteBuffer command, int mode) | 138 protected String In_Rd(SelectionKey key, ByteBuffer command, int mode) |
142 throws IOException { | 139 throws IOException { |
143 Tuple tmpTuple = read_in_1(key, command, mode); | 140 Tuple tuple = read_in_1(key, command, mode); |
144 | 141 |
145 if (tmpTuple!=null) { | 142 if (tuple!=null) { |
146 //send | 143 //send |
147 ByteBuffer sendcommand = tmpTuple.getCommand(); | 144 ByteBuffer sendcommand = tuple.getCommand(); |
148 ByteBuffer senddata = tmpTuple.getData(); | 145 ByteBuffer senddata = tuple.getData(); |
149 send(key,sendcommand, senddata); | 146 send(key,sendcommand, senddata); |
150 } | 147 } |
151 String sendtext = getdataString(tmpTuple.getData()); | 148 String sendtext = getdataString(tuple.getData()); |
152 return sendtext; | 149 return sendtext; |
153 } | 150 } |
154 | 151 |
155 private Tuple read_in_1(SelectionKey key, ByteBuffer command, int mode) { | 152 private Tuple read_in_1(SelectionKey key, ByteBuffer command, int mode) { |
156 Tuple tmpTuple; | 153 Tuple tuple; |
157 int id; | 154 int id; |
158 //id = command.getInt(LINDA_ID_OFFSET); | 155 //id = command.getInt(PSX.LINDA_ID_OFFSET); |
159 //int mode = command.getInt(LINDA_MODE_OFFSET); | 156 //int mode = command.getInt(PSX.LINDA_MODE_OFFSET); |
160 Tuple temp = null; | 157 Tuple temp = null; |
161 | 158 |
162 char idc = (char)command.getShort(LINDA_ID_OFFSET); | 159 char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); |
163 command.rewind(); | 160 command.rewind(); |
164 id = (int)idc; | 161 id = (int)idc; |
165 | 162 |
166 | 163 |
167 System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); | 164 System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n"); |
168 | 165 |
169 tmpTuple = tuple_space[id]; | 166 tuple = tuple_space[id]; |
170 | 167 |
171 //wを無視 | 168 //wを無視 |
172 while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ | 169 while(tuple != null && tuple.next != null && (tuple.mode == 'w')){ |
173 temp = tmpTuple; | 170 temp = tuple; |
174 tmpTuple = tmpTuple.next; | 171 tuple = tuple.next; |
175 } | 172 } |
176 | 173 |
177 if (tmpTuple != null && (tmpTuple.mode == 'o')){ | 174 if (tuple != null && (tuple.mode == 'o')){ |
178 //tmpTuple = new Tuple((SocketChannel)key.channel()); | 175 //tmpTuple = new Tuple((SocketChannel)key.channel()); |
179 int seq = command.getInt(LINDA_SEQ_OFFSET); | 176 int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); |
180 command.rewind(); | 177 command.rewind(); |
181 tmpTuple.setCommand('a', seq); | 178 tuple.setCommand('a', seq); |
182 | 179 |
183 if(debug){ | 180 if(debug){ |
184 int sendsize = tmpTuple.getdataLength()+LINDA_HEADER_SIZE; | 181 int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE; |
185 System.out.println("send size "+sendsize+" : mode = "+(char)tmpTuple.getMode()); | 182 System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode()); |
186 } | 183 } |
187 | 184 |
188 | 185 |
189 | 186 |
190 //INの場合はremoveする | 187 //INの場合はremoveする |
191 if(mode == PSX_IN) { | 188 if(mode == PSX.PSX_IN) { |
192 if(tmpTuple.data != null){ | 189 if(tuple.data != null){ |
193 //ByteBuffer buff = ByteBuffer.allocate(0); | 190 //ByteBuffer buff = ByteBuffer.allocate(0); |
194 //tmpTuple.setData(buff); | 191 //tmpTuple.setData(buff); |
195 tmpTuple.data = null; | 192 tuple.data = null; |
196 } | 193 } |
197 if(temp != null){ | 194 if(temp != null){ |
198 temp.next = tmpTuple.next; | 195 temp.next = tuple.next; |
199 } | 196 } |
200 else { | 197 else { |
201 tuple_space[id] = tmpTuple.next; | 198 tuple_space[id] = tuple.next; |
202 } | 199 } |
203 } | 200 } |
204 } else { | 201 } else { |
205 if(tmpTuple == null) { | 202 if(tuple == null) { |
206 //ServerSocketChannel sc = (ServerSocketChannel)key.channel(); | 203 //ServerSocketChannel sc = (ServerSocketChannel)key.channel(); |
207 tmpTuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); | 204 tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel()); |
208 tmpTuple.next = null; | 205 tuple.next = null; |
209 }else { | 206 }else { |
210 while(tmpTuple.next !=null) tmpTuple =tmpTuple.next; | 207 while(tuple.next !=null) tuple =tuple.next; |
211 tmpTuple.next= new Tuple((SocketChannel)key.channel()); | 208 tuple.next= new Tuple((SocketChannel)key.channel()); |
212 tmpTuple = tmpTuple.next; | 209 tuple = tuple.next; |
213 tmpTuple.next = null; | 210 tuple.next = null; |
214 } | 211 } |
215 | 212 |
216 tmpTuple.setMode(mode); | 213 tuple.setMode(mode); |
217 int seq2 = command.getInt(LINDA_SEQ_OFFSET); | 214 int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET); |
218 command.rewind(); | 215 command.rewind(); |
219 tmpTuple.setSeq(seq2); | 216 tuple.setSeq(seq2); |
220 tmpTuple.ch = (SocketChannel) key.channel(); | 217 tuple.ch = (SocketChannel) key.channel(); |
221 tmpTuple.setDataLength(0); | 218 tuple.setDataLength(0); |
222 ByteBuffer buff = ByteBuffer.allocate(0); | 219 ByteBuffer buff = ByteBuffer.allocate(0); |
223 buff.rewind(); | 220 buff.rewind(); |
224 tmpTuple.setData(buff); | 221 tuple.setData(buff); |
225 tmpTuple = null; | 222 tuple = null; |
226 | 223 |
227 if(debug){ | 224 if(debug){ |
228 System.out.println("data inserted insert seq = "+seq2 +", id = "+id); | 225 System.out.println("data inserted insert seq = "+seq2 +", id = "+id); |
229 } | 226 } |
230 } | 227 } |
231 return tmpTuple; | 228 return tuple; |
232 } | 229 } |
233 | 230 |
234 protected String Check(SelectionKey key, ByteBuffer command) throws IOException { | 231 protected String Check(SelectionKey key, ByteBuffer command) throws IOException { |
235 String sendtext; | 232 String sendtext; |
236 ByteBuffer data = check1(command); | 233 ByteBuffer data = check1(command); |
243 | 240 |
244 private ByteBuffer check1(ByteBuffer command) { | 241 private ByteBuffer check1(ByteBuffer command) { |
245 ByteBuffer data; | 242 ByteBuffer data; |
246 Tuple tmpTuple; | 243 Tuple tmpTuple; |
247 int id; | 244 int id; |
248 char idc = (char)command.getShort(LINDA_ID_OFFSET); | 245 char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); |
249 command.rewind(); | 246 command.rewind(); |
250 id = (int)idc; | 247 id = (int)idc; |
251 | 248 |
252 tmpTuple = tuple_space[id]; | 249 tmpTuple = tuple_space[id]; |
253 while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ | 250 while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){ |
254 tmpTuple = tmpTuple.next; | 251 tmpTuple = tmpTuple.next; |
255 } | 252 } |
256 if (tmpTuple != null && (tmpTuple.mode == 'o')) { | 253 if (tmpTuple != null && (tmpTuple.mode == 'o')) { |
257 command.putInt(LINDA_DATA_LENGTH_OFFSET, tmpTuple.datalen); | 254 command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, tmpTuple.datalen); |
258 command.rewind(); | 255 command.rewind(); |
259 data = tmpTuple.getData(); | 256 data = tmpTuple.getData(); |
260 }else { | 257 }else { |
261 //means no out tuple | 258 //means no out tuple |
262 command.putInt(LINDA_DATA_LENGTH_OFFSET, 0); | 259 command.putInt(PSX.LINDA_DATA_LENGTH_OFFSET, 0); |
263 command.rewind(); | 260 command.rewind(); |
264 data = ByteBuffer.allocate(0); | 261 data = ByteBuffer.allocate(0); |
265 } | 262 } |
266 return data; | 263 return data; |
267 } | 264 } |
274 } | 271 } |
275 if (data == null) { | 272 if (data == null) { |
276 System.out.println("Manager_run: data is null"); | 273 System.out.println("Manager_run: data is null"); |
277 } | 274 } |
278 } | 275 } |
279 int send_size = LINDA_HEADER_SIZE; | 276 int send_size = PSX.LINDA_HEADER_SIZE; |
280 int count = 0; | 277 int count = 0; |
281 | 278 |
282 //command Send | 279 //command Send |
283 command.rewind(); | 280 command.rewind(); |
284 while(send_size > 0){ | 281 while(send_size > 0){ |