Mercurial > hg > FederatedLinda
comparison src/fdl/test/debug/MetaProtocolEngine.java @ 91:4df1d50df52a
Ring: fdl.test.debug
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 16 Feb 2010 03:58:06 +0900 |
parents | 9cdc24bae625 |
children | 0ea086f0e96f |
comparison
equal
deleted
inserted
replaced
90:9cdc24bae625 | 91:4df1d50df52a |
---|---|
66 class AcceptXMLCallback implements PSXCallback { | 66 class AcceptXMLCallback implements PSXCallback { |
67 int tid; | 67 int tid; |
68 | 68 |
69 private DocumentBuilderFactory dbFactory = null; | 69 private DocumentBuilderFactory dbFactory = null; |
70 private DocumentBuilder docBuilder = null; | 70 private DocumentBuilder docBuilder = null; |
71 protected Document document; | |
72 | 71 |
73 public AcceptXMLCallback(int tid) { | 72 public AcceptXMLCallback(int tid) { |
74 this.tid = tid; | 73 this.tid = tid; |
75 dbFactory = DocumentBuilderFactory.newInstance(); | 74 dbFactory = DocumentBuilderFactory.newInstance(); |
76 try { | 75 try { |
98 e.printStackTrace(); | 97 e.printStackTrace(); |
99 } | 98 } |
100 | 99 |
101 Element root = doc.getDocumentElement(); | 100 Element root = doc.getDocumentElement(); |
102 if(root.getTagName().equals("connections")) { | 101 if(root.getTagName().equals("connections")) { |
103 nodeId = new Integer(root.getAttribute("id")).intValue(); | 102 nodeId = Integer.parseInt(root.getAttribute("id")); |
104 if (nodeId == 0) { | 103 if (nodeId == 0) { |
105 ml.in(START, new StartCallback()); | 104 ml.in(START, new StartCallback()); |
106 ml.in(DEBUGSTART, new DebugStartCallback()); | 105 ml.in(DEBUGSTART, new DebugStartCallback()); |
107 } | 106 } |
108 NodeList connections = root.getElementsByTagName("connection"); | 107 NodeList connections = root.getElementsByTagName("connection"); |
109 for (int i = 0; i < connections.getLength(); i++) { | 108 for (int i = 0; i < connections.getLength(); i++) { |
110 Element connection = (Element)connections.item(i); | 109 Element connection = (Element)connections.item(i); |
111 Element host = (Element)connection.getElementsByTagName("host").item(0); | 110 Element host = (Element)connection.getElementsByTagName("host").item(0); |
112 Element port = (Element)connection.getElementsByTagName("port").item(0); | 111 Element port = (Element)connection.getElementsByTagName("port").item(0); |
113 Element t = (Element)connection.getElementsByTagName("tid").item(0); | 112 Element t = (Element)connection.getElementsByTagName("tid").item(0); |
114 int srcId = new Integer(connection.getAttribute("id")).intValue(); | 113 int srcId = Integer.parseInt(connection.getAttribute("id")); |
115 String dstHostName = host.getTextContent(); | 114 String dstHostName = host.getTextContent(); |
116 int dstPort = new Integer(port.getAttribute("id")).intValue(); | 115 int dstPort = Integer.parseInt(port.getAttribute("id")); |
117 int dstId = new Integer(t.getAttribute("id")).intValue(); | 116 int dstId = Integer.parseInt(t.getAttribute("id")); |
118 try { | 117 try { |
119 PSXLindaImpl linda = (PSXLindaImpl) ml.open(dstHostName, dstPort); | 118 PSXLindaImpl linda = (PSXLindaImpl) ml.open(dstHostName, dstPort); |
120 Routing r = new Routing(linda, dstId); | 119 Routing r = new Routing(linda, dstId); |
121 nodes.put(new Integer(srcId), r); | 120 nodes.put(srcId, r); |
122 ml.in(srcId, new RoutingCallback(srcId, r)); | 121 ml.in(srcId, new RoutingCallback(srcId, r)); |
123 } catch (IOException e) { | 122 } catch (IOException e) { |
124 e.printStackTrace(); | 123 e.printStackTrace(); |
125 } | 124 } |
126 } | 125 } |
128 print("Routing xml received!"); | 127 print("Routing xml received!"); |
129 | 128 |
130 NodeList routing = root.getElementsByTagName("source"); | 129 NodeList routing = root.getElementsByTagName("source"); |
131 for (int i = 0; i < routing.getLength(); i++) { | 130 for (int i = 0; i < routing.getLength(); i++) { |
132 Element src = (Element) routing.item(i); | 131 Element src = (Element) routing.item(i); |
133 Integer srcId = new Integer(src.getAttribute("id")); | 132 Integer srcId = Integer.parseInt(src.getAttribute("id")); |
134 Routing r = nodes.get(srcId); | 133 Routing r = nodes.get(srcId); |
135 NodeList dest = src.getElementsByTagName("dest"); | 134 NodeList dest = src.getElementsByTagName("dest"); |
136 for (int j = 0; j < dest.getLength(); j++) { | 135 for (int j = 0; j < dest.getLength(); j++) { |
137 Element dst = (Element) dest.item(j); | 136 Element dst = (Element) dest.item(j); |
138 Integer dstId = new Integer(dst.getAttribute("id")); | 137 r.route.add(Integer.parseInt(dst.getAttribute("id"))); |
139 r.route.add(dstId); | |
140 } | 138 } |
141 } | 139 } |
142 | 140 |
143 } | 141 } |
144 if (tid == MANAGE) connected = true; | 142 if (tid == MANAGE) connected = true; |
162 ml.out(BODY, ByteBuffer.wrap("dummy".getBytes())); | 160 ml.out(BODY, ByteBuffer.wrap("dummy".getBytes())); |
163 } | 161 } |
164 | 162 |
165 public void callback(ByteBuffer reply) { | 163 public void callback(ByteBuffer reply) { |
166 String str = new String(reply.array()); | 164 String str = new String(reply.array()); |
167 | 165 print("get message"); |
168 if (tid == TREETOP || tid == TREELEFT || tid == TREERIGHT) { | 166 if (tid == TREETOP || tid == TREELEFT || tid == TREERIGHT) { |
169 ml.in(BODY); | 167 ml.in(BODY); |
170 ml.out(BODY, reply); | 168 ml.out(BODY, reply); |
171 print("Update body"); | 169 print("Update body"); |
172 } else if (str.equals("shutdown") && (tid == RINGLEFT || tid == RINGRIGHT)) { | 170 } else if (str.equals("shutdown") && (tid == RINGLEFT || tid == RINGRIGHT)) { |
173 print("get shutdown command"); | 171 print("get shutdown command id: " + nodeId); |
174 if (nodeId != 0) { | 172 if (nodeId != 0) { |
175 Routing r = null; | 173 Routing r = null; |
176 if (tid == RINGLEFT) { | 174 if (tid == RINGLEFT) { |
177 r = nodes.get(new Integer(RINGRIGHT)); | 175 r = nodes.get(new Integer(RINGRIGHT)); |
178 } else if (tid == RINGRIGHT) { | 176 } else if (tid == RINGRIGHT) { |
179 r = nodes.get(new Integer(RINGLEFT)); | 177 r = nodes.get(new Integer(RINGLEFT)); |
180 } | 178 } |
179 r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes())); | |
181 print("out"); | 180 print("out"); |
182 PSXReply o = r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes())); | 181 ml.fdl.queueExec(); |
183 while (ml.fdl.qsize != 0) | |
184 ml.sync(1); | |
185 print("sync"); | 182 print("sync"); |
186 ((PSXLindaImpl) r.linda).close(); | 183 } else { |
184 print("shutdown reaches last node!"); | |
187 } | 185 } |
188 running = false; | 186 running = false; |
189 return; | 187 return; |
190 } else if (nodeId == 0 && tid == RINGLEFT) { | 188 } else if (nodeId == 0 && tid == RINGLEFT) { |
191 relayCounter++; | 189 relayCounter++; |
192 print(new Integer(relayCounter).toString() + " relay"); | 190 print("" + relayCounter + " relay"); |
193 if (relayCounter >= relayNum) { | 191 if (relayCounter >= relayNum) { |
194 // 実験終了 | 192 // 実験終了 |
195 endTime = new Date(); | 193 endTime = new Date(); |
196 Double resultTime = new Double(((endTime.getTime() - startTime.getTime()) / (double)relayNum)); | 194 Double resultTime = new Double(((endTime.getTime() - startTime.getTime()) / (double)relayNum)); |
197 ByteBuffer data = ByteBuffer.wrap(resultTime.toString().getBytes()); | 195 ByteBuffer data = ByteBuffer.wrap(resultTime.toString().getBytes()); |
198 manager.out(MANAGE, data); | 196 manager.out(MANAGE, data); |
199 //sendLocalHostName(); | |
200 ml.in(tid, this); | 197 ml.in(tid, this); |
201 return; | 198 return; |
202 } | 199 } |
203 } | 200 } |
204 | 201 |
212 } | 209 } |
213 | 210 |
214 } | 211 } |
215 | 212 |
216 private class StartCallback implements PSXCallback { | 213 private class StartCallback implements PSXCallback { |
217 | |
218 public void callback(ByteBuffer reply) { | 214 public void callback(ByteBuffer reply) { |
219 Routing r; | 215 Routing r; |
220 | 216 |
221 // 子があるならば、子にタプルを伝搬 | 217 // 子があるならば、子にタプルを伝搬 |
222 if (nodes.containsKey(new Integer(TREERIGHT))) { | 218 if (nodes.containsKey(TREERIGHT)) { |
223 r = nodes.get(new Integer(TREERIGHT)); | 219 r = nodes.get(TREERIGHT); |
224 r.linda.out(r.dstId, reply); | 220 r.linda.out(r.dstId, reply); |
225 } | 221 } |
226 if (nodes.containsKey(new Integer(TREELEFT))) { | 222 if (nodes.containsKey(TREELEFT)) { |
227 r = nodes.get(new Integer(TREELEFT)); | 223 r = nodes.get(TREELEFT); |
228 r.linda.out(r.dstId, reply); | 224 r.linda.out(r.dstId, reply); |
229 } | 225 } |
230 ml.in(START, this); | 226 ml.in(START, this); |
231 } | 227 } |
232 | 228 |
235 private class DebugStartCallback implements PSXCallback { | 231 private class DebugStartCallback implements PSXCallback { |
236 public void callback(ByteBuffer reply) { | 232 public void callback(ByteBuffer reply) { |
237 String[] commands = new String(reply.array()).split(","); | 233 String[] commands = new String(reply.array()).split(","); |
238 String command = commands[0]; | 234 String command = commands[0]; |
239 if (command.equals("relay")) { | 235 if (command.equals("relay")) { |
240 relayNum = new Integer(commands[1]).intValue(); | 236 relayNum = Integer.parseInt(commands[1]); |
241 relaySize = new Integer(commands[2]).intValue(); | 237 relaySize = Integer.parseInt(commands[2]); |
242 relayCounter = 0; | 238 relayCounter = 0; |
243 | 239 print("relay num=" + relayNum + " size=" + relaySize); |
244 print("relay num=" + new Integer(relayNum).toString() + " size=" + new Integer(relaySize).toString()); | 240 Routing r = nodes.get(RINGRIGHT); |
245 | |
246 Routing r = nodes.get(new Integer(RINGRIGHT)); | |
247 | |
248 // 実験開始 | 241 // 実験開始 |
249 startTime = new Date(); | 242 startTime = new Date(); |
250 r.linda.out(r.dstId, ByteBuffer.wrap(new byte[relaySize])); | 243 r.linda.out(r.dstId, ByteBuffer.wrap(new byte[relaySize])); |
251 ml.in(DEBUGSTART, this); | 244 ml.in(DEBUGSTART, this); |
252 } else if (command.equals("shutdown")) { | 245 } else if (command.equals("shutdown")) { |
253 Routing r = nodes.get(new Integer(RINGRIGHT)); | 246 Routing r = nodes.get(RINGRIGHT); |
254 r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes())); | 247 r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes())); |
255 //running = false; | 248 //running = false; |
256 } | 249 } |
257 } | 250 } |
258 } | 251 } |
276 public void mainLoop() { | 269 public void mainLoop() { |
277 initPoller(); | 270 initPoller(); |
278 while (running) { | 271 while (running) { |
279 ml.sync(); | 272 ml.sync(); |
280 } | 273 } |
281 print("Terminated"); | 274 print("Terminated" + nodeId |
282 | 275 + " replies=" + ml.replies.size() |
276 + " qsize=" + ml.fdl.qsize); | |
283 } | 277 } |
284 | 278 |
285 protected void initPoller() { | 279 protected void initPoller() { |
286 ml.in(MANAGE, new AcceptXMLCallback(MANAGE)); | 280 ml.in(MANAGE, new AcceptXMLCallback(MANAGE)); |
287 ml.in(DEBUG, new AcceptXMLCallback(DEBUG)); | 281 ml.in(DEBUG, new AcceptXMLCallback(DEBUG)); |
288 } | 282 } |
289 | 283 |
290 protected void sendLocalHostName() { | 284 protected void sendLocalHostName() { |
291 // TopologyManager に自分のホストネームを送信して、起動を伝える | 285 // TopologyManager に自分のホストネームを送信して、起動を伝える |
292 ByteBuffer local; | 286 ByteBuffer local = ByteBuffer.wrap((localHostName + ":" + localPort).getBytes()); |
293 local = ByteBuffer.wrap((localHostName + ":" + new Integer(localPort).toString()).getBytes()); | |
294 manager.out(MANAGE, local); | 287 manager.out(MANAGE, local); |
295 } | 288 } |
296 | 289 |
297 protected PSXLinda connectServer(String hostName, int port) { | 290 protected PSXLinda connectServer(String hostName, int port) { |
298 PSXLinda linda = null; | 291 PSXLinda linda = null; |