comparison src/fdl/test/debug/MetaProtocolEngine.java @ 91:4df1d50df52a

Ring: fdl.test.debug
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 16 Feb 2010 03:58:06 +0900
parents 9cdc24bae625
children 0ea086f0e96f
comparison
equal deleted inserted replaced
90:9cdc24bae625 91:4df1d50df52a
66 class AcceptXMLCallback implements PSXCallback { 66 class AcceptXMLCallback implements PSXCallback {
67 int tid; 67 int tid;
68 68
69 private DocumentBuilderFactory dbFactory = null; 69 private DocumentBuilderFactory dbFactory = null;
70 private DocumentBuilder docBuilder = null; 70 private DocumentBuilder docBuilder = null;
71 protected Document document;
72 71
73 public AcceptXMLCallback(int tid) { 72 public AcceptXMLCallback(int tid) {
74 this.tid = tid; 73 this.tid = tid;
75 dbFactory = DocumentBuilderFactory.newInstance(); 74 dbFactory = DocumentBuilderFactory.newInstance();
76 try { 75 try {
98 e.printStackTrace(); 97 e.printStackTrace();
99 } 98 }
100 99
101 Element root = doc.getDocumentElement(); 100 Element root = doc.getDocumentElement();
102 if(root.getTagName().equals("connections")) { 101 if(root.getTagName().equals("connections")) {
103 nodeId = new Integer(root.getAttribute("id")).intValue(); 102 nodeId = Integer.parseInt(root.getAttribute("id"));
104 if (nodeId == 0) { 103 if (nodeId == 0) {
105 ml.in(START, new StartCallback()); 104 ml.in(START, new StartCallback());
106 ml.in(DEBUGSTART, new DebugStartCallback()); 105 ml.in(DEBUGSTART, new DebugStartCallback());
107 } 106 }
108 NodeList connections = root.getElementsByTagName("connection"); 107 NodeList connections = root.getElementsByTagName("connection");
109 for (int i = 0; i < connections.getLength(); i++) { 108 for (int i = 0; i < connections.getLength(); i++) {
110 Element connection = (Element)connections.item(i); 109 Element connection = (Element)connections.item(i);
111 Element host = (Element)connection.getElementsByTagName("host").item(0); 110 Element host = (Element)connection.getElementsByTagName("host").item(0);
112 Element port = (Element)connection.getElementsByTagName("port").item(0); 111 Element port = (Element)connection.getElementsByTagName("port").item(0);
113 Element t = (Element)connection.getElementsByTagName("tid").item(0); 112 Element t = (Element)connection.getElementsByTagName("tid").item(0);
114 int srcId = new Integer(connection.getAttribute("id")).intValue(); 113 int srcId = Integer.parseInt(connection.getAttribute("id"));
115 String dstHostName = host.getTextContent(); 114 String dstHostName = host.getTextContent();
116 int dstPort = new Integer(port.getAttribute("id")).intValue(); 115 int dstPort = Integer.parseInt(port.getAttribute("id"));
117 int dstId = new Integer(t.getAttribute("id")).intValue(); 116 int dstId = Integer.parseInt(t.getAttribute("id"));
118 try { 117 try {
119 PSXLindaImpl linda = (PSXLindaImpl) ml.open(dstHostName, dstPort); 118 PSXLindaImpl linda = (PSXLindaImpl) ml.open(dstHostName, dstPort);
120 Routing r = new Routing(linda, dstId); 119 Routing r = new Routing(linda, dstId);
121 nodes.put(new Integer(srcId), r); 120 nodes.put(srcId, r);
122 ml.in(srcId, new RoutingCallback(srcId, r)); 121 ml.in(srcId, new RoutingCallback(srcId, r));
123 } catch (IOException e) { 122 } catch (IOException e) {
124 e.printStackTrace(); 123 e.printStackTrace();
125 } 124 }
126 } 125 }
128 print("Routing xml received!"); 127 print("Routing xml received!");
129 128
130 NodeList routing = root.getElementsByTagName("source"); 129 NodeList routing = root.getElementsByTagName("source");
131 for (int i = 0; i < routing.getLength(); i++) { 130 for (int i = 0; i < routing.getLength(); i++) {
132 Element src = (Element) routing.item(i); 131 Element src = (Element) routing.item(i);
133 Integer srcId = new Integer(src.getAttribute("id")); 132 Integer srcId = Integer.parseInt(src.getAttribute("id"));
134 Routing r = nodes.get(srcId); 133 Routing r = nodes.get(srcId);
135 NodeList dest = src.getElementsByTagName("dest"); 134 NodeList dest = src.getElementsByTagName("dest");
136 for (int j = 0; j < dest.getLength(); j++) { 135 for (int j = 0; j < dest.getLength(); j++) {
137 Element dst = (Element) dest.item(j); 136 Element dst = (Element) dest.item(j);
138 Integer dstId = new Integer(dst.getAttribute("id")); 137 r.route.add(Integer.parseInt(dst.getAttribute("id")));
139 r.route.add(dstId);
140 } 138 }
141 } 139 }
142 140
143 } 141 }
144 if (tid == MANAGE) connected = true; 142 if (tid == MANAGE) connected = true;
162 ml.out(BODY, ByteBuffer.wrap("dummy".getBytes())); 160 ml.out(BODY, ByteBuffer.wrap("dummy".getBytes()));
163 } 161 }
164 162
165 public void callback(ByteBuffer reply) { 163 public void callback(ByteBuffer reply) {
166 String str = new String(reply.array()); 164 String str = new String(reply.array());
167 165 print("get message");
168 if (tid == TREETOP || tid == TREELEFT || tid == TREERIGHT) { 166 if (tid == TREETOP || tid == TREELEFT || tid == TREERIGHT) {
169 ml.in(BODY); 167 ml.in(BODY);
170 ml.out(BODY, reply); 168 ml.out(BODY, reply);
171 print("Update body"); 169 print("Update body");
172 } else if (str.equals("shutdown") && (tid == RINGLEFT || tid == RINGRIGHT)) { 170 } else if (str.equals("shutdown") && (tid == RINGLEFT || tid == RINGRIGHT)) {
173 print("get shutdown command"); 171 print("get shutdown command id: " + nodeId);
174 if (nodeId != 0) { 172 if (nodeId != 0) {
175 Routing r = null; 173 Routing r = null;
176 if (tid == RINGLEFT) { 174 if (tid == RINGLEFT) {
177 r = nodes.get(new Integer(RINGRIGHT)); 175 r = nodes.get(new Integer(RINGRIGHT));
178 } else if (tid == RINGRIGHT) { 176 } else if (tid == RINGRIGHT) {
179 r = nodes.get(new Integer(RINGLEFT)); 177 r = nodes.get(new Integer(RINGLEFT));
180 } 178 }
179 r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes()));
181 print("out"); 180 print("out");
182 PSXReply o = r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes())); 181 ml.fdl.queueExec();
183 while (ml.fdl.qsize != 0)
184 ml.sync(1);
185 print("sync"); 182 print("sync");
186 ((PSXLindaImpl) r.linda).close(); 183 } else {
184 print("shutdown reaches last node!");
187 } 185 }
188 running = false; 186 running = false;
189 return; 187 return;
190 } else if (nodeId == 0 && tid == RINGLEFT) { 188 } else if (nodeId == 0 && tid == RINGLEFT) {
191 relayCounter++; 189 relayCounter++;
192 print(new Integer(relayCounter).toString() + " relay"); 190 print("" + relayCounter + " relay");
193 if (relayCounter >= relayNum) { 191 if (relayCounter >= relayNum) {
194 // 実験終了 192 // 実験終了
195 endTime = new Date(); 193 endTime = new Date();
196 Double resultTime = new Double(((endTime.getTime() - startTime.getTime()) / (double)relayNum)); 194 Double resultTime = new Double(((endTime.getTime() - startTime.getTime()) / (double)relayNum));
197 ByteBuffer data = ByteBuffer.wrap(resultTime.toString().getBytes()); 195 ByteBuffer data = ByteBuffer.wrap(resultTime.toString().getBytes());
198 manager.out(MANAGE, data); 196 manager.out(MANAGE, data);
199 //sendLocalHostName();
200 ml.in(tid, this); 197 ml.in(tid, this);
201 return; 198 return;
202 } 199 }
203 } 200 }
204 201
212 } 209 }
213 210
214 } 211 }
215 212
216 private class StartCallback implements PSXCallback { 213 private class StartCallback implements PSXCallback {
217
218 public void callback(ByteBuffer reply) { 214 public void callback(ByteBuffer reply) {
219 Routing r; 215 Routing r;
220 216
221 // 子があるならば、子にタプルを伝搬 217 // 子があるならば、子にタプルを伝搬
222 if (nodes.containsKey(new Integer(TREERIGHT))) { 218 if (nodes.containsKey(TREERIGHT)) {
223 r = nodes.get(new Integer(TREERIGHT)); 219 r = nodes.get(TREERIGHT);
224 r.linda.out(r.dstId, reply); 220 r.linda.out(r.dstId, reply);
225 } 221 }
226 if (nodes.containsKey(new Integer(TREELEFT))) { 222 if (nodes.containsKey(TREELEFT)) {
227 r = nodes.get(new Integer(TREELEFT)); 223 r = nodes.get(TREELEFT);
228 r.linda.out(r.dstId, reply); 224 r.linda.out(r.dstId, reply);
229 } 225 }
230 ml.in(START, this); 226 ml.in(START, this);
231 } 227 }
232 228
235 private class DebugStartCallback implements PSXCallback { 231 private class DebugStartCallback implements PSXCallback {
236 public void callback(ByteBuffer reply) { 232 public void callback(ByteBuffer reply) {
237 String[] commands = new String(reply.array()).split(","); 233 String[] commands = new String(reply.array()).split(",");
238 String command = commands[0]; 234 String command = commands[0];
239 if (command.equals("relay")) { 235 if (command.equals("relay")) {
240 relayNum = new Integer(commands[1]).intValue(); 236 relayNum = Integer.parseInt(commands[1]);
241 relaySize = new Integer(commands[2]).intValue(); 237 relaySize = Integer.parseInt(commands[2]);
242 relayCounter = 0; 238 relayCounter = 0;
243 239 print("relay num=" + relayNum + " size=" + relaySize);
244 print("relay num=" + new Integer(relayNum).toString() + " size=" + new Integer(relaySize).toString()); 240 Routing r = nodes.get(RINGRIGHT);
245
246 Routing r = nodes.get(new Integer(RINGRIGHT));
247
248 // 実験開始 241 // 実験開始
249 startTime = new Date(); 242 startTime = new Date();
250 r.linda.out(r.dstId, ByteBuffer.wrap(new byte[relaySize])); 243 r.linda.out(r.dstId, ByteBuffer.wrap(new byte[relaySize]));
251 ml.in(DEBUGSTART, this); 244 ml.in(DEBUGSTART, this);
252 } else if (command.equals("shutdown")) { 245 } else if (command.equals("shutdown")) {
253 Routing r = nodes.get(new Integer(RINGRIGHT)); 246 Routing r = nodes.get(RINGRIGHT);
254 r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes())); 247 r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes()));
255 //running = false; 248 //running = false;
256 } 249 }
257 } 250 }
258 } 251 }
276 public void mainLoop() { 269 public void mainLoop() {
277 initPoller(); 270 initPoller();
278 while (running) { 271 while (running) {
279 ml.sync(); 272 ml.sync();
280 } 273 }
281 print("Terminated"); 274 print("Terminated" + nodeId
282 275 + " replies=" + ml.replies.size()
276 + " qsize=" + ml.fdl.qsize);
283 } 277 }
284 278
285 protected void initPoller() { 279 protected void initPoller() {
286 ml.in(MANAGE, new AcceptXMLCallback(MANAGE)); 280 ml.in(MANAGE, new AcceptXMLCallback(MANAGE));
287 ml.in(DEBUG, new AcceptXMLCallback(DEBUG)); 281 ml.in(DEBUG, new AcceptXMLCallback(DEBUG));
288 } 282 }
289 283
290 protected void sendLocalHostName() { 284 protected void sendLocalHostName() {
291 // TopologyManager に自分のホストネームを送信して、起動を伝える 285 // TopologyManager に自分のホストネームを送信して、起動を伝える
292 ByteBuffer local; 286 ByteBuffer local = ByteBuffer.wrap((localHostName + ":" + localPort).getBytes());
293 local = ByteBuffer.wrap((localHostName + ":" + new Integer(localPort).toString()).getBytes());
294 manager.out(MANAGE, local); 287 manager.out(MANAGE, local);
295 } 288 }
296 289
297 protected PSXLinda connectServer(String hostName, int port) { 290 protected PSXLinda connectServer(String hostName, int port) {
298 PSXLinda linda = null; 291 PSXLinda linda = null;