84
|
1 package fdl.test.debug;
|
|
2
|
|
3 import java.io.*;
|
|
4 import java.net.InetAddress;
|
|
5 import java.net.UnknownHostException;
|
|
6
|
|
7 import java.nio.ByteBuffer;
|
|
8 import java.util.*;
|
|
9
|
|
10 import javax.xml.parsers.*;
|
|
11
|
|
12 import org.w3c.dom.Document;
|
|
13 import org.w3c.dom.Element;
|
|
14 import org.w3c.dom.NodeList;
|
|
15 import org.xml.sax.SAXException;
|
|
16
|
|
17 import fdl.*;
|
|
18
|
|
19 /**
|
|
20 * MetaProtocolEngine
|
|
21 *
|
|
22 * @author Kazuki Akamine
|
|
23 *
|
|
24 * 接続する機能までを実装した MetaEngine
|
|
25 * これを継承して、具体的な処理を書く
|
|
26 *
|
|
27 */
|
|
28
|
|
29 public class MetaProtocolEngine implements MetaEngine {
|
|
30 // Fields
|
|
31 public static final int DEFAULTPORT = 10000;
|
87
|
32
|
|
33 public static final int BODY = 100;
|
|
34 public static final int START = 101;
|
|
35 public static final int FINISH = 102;
|
|
36
|
84
|
37 public static final int MANAGE = 60000;
|
|
38 public static final int DEBUG = 61000;
|
|
39
|
87
|
40 public static final int TREETOP = MANAGE + 1;
|
|
41 public static final int TREELEFT = MANAGE + 2;
|
|
42 public static final int TREERIGHT = MANAGE + 3;
|
|
43
|
|
44 public static final int RINGLEFT = DEBUG + 1;
|
|
45 public static final int RINGRIGHT = DEBUG + 2;
|
|
46
|
|
47 public static final int DEBUGSTART = DEBUG + 1000;
|
|
48
|
84
|
49 private MetaLinda ml;
|
|
50 private String localHostName;
|
|
51 private int localPort;
|
|
52 private PSXLinda manager;
|
|
53 private String managerHostName;
|
|
54 private int managerPort = DEFAULTPORT;
|
|
55 private boolean running = true;
|
|
56 private boolean connected = false;
|
|
57 private boolean debugConnected = false;
|
|
58 private int nodeId;
|
85
|
59 private HashMap<Integer, Routing> nodes;
|
84
|
60
|
87
|
61 private int relayNum, relaySize, relayCounter;
|
|
62
|
90
|
63 private Date startTime, endTime;
|
|
64
|
84
|
65 // Callback class
|
|
66 class AcceptXMLCallback implements PSXCallback {
|
|
67 int tid;
|
|
68
|
|
69 private DocumentBuilderFactory dbFactory = null;
|
|
70 private DocumentBuilder docBuilder = null;
|
|
71
|
|
72 public AcceptXMLCallback(int tid) {
|
|
73 this.tid = tid;
|
|
74 dbFactory = DocumentBuilderFactory.newInstance();
|
|
75 try {
|
|
76 docBuilder = dbFactory.newDocumentBuilder();
|
|
77 } catch (ParserConfigurationException e) {
|
|
78 e.printStackTrace();
|
|
79 }
|
|
80
|
|
81 }
|
|
82 public void callback(ByteBuffer reply) {
|
|
83 String xml = new String(reply.array());
|
|
84 print(xml);
|
|
85 parseXML(xml);
|
|
86
|
|
87 ml.in(tid, this);
|
|
88 }
|
|
89 @SuppressWarnings("deprecation")
|
|
90 protected void parseXML(String xml) {
|
|
91 Document doc = null;
|
|
92 try {
|
|
93 doc = docBuilder.parse(new StringBufferInputStream(xml));
|
|
94 } catch (SAXException e) {
|
|
95 e.printStackTrace();
|
|
96 } catch (IOException e) {
|
|
97 e.printStackTrace();
|
|
98 }
|
|
99
|
|
100 Element root = doc.getDocumentElement();
|
|
101 if(root.getTagName().equals("connections")) {
|
91
|
102 nodeId = Integer.parseInt(root.getAttribute("id"));
|
87
|
103 if (nodeId == 0) {
|
|
104 ml.in(START, new StartCallback());
|
|
105 ml.in(DEBUGSTART, new DebugStartCallback());
|
|
106 }
|
84
|
107 NodeList connections = root.getElementsByTagName("connection");
|
|
108 for (int i = 0; i < connections.getLength(); i++) {
|
|
109 Element connection = (Element)connections.item(i);
|
|
110 Element host = (Element)connection.getElementsByTagName("host").item(0);
|
|
111 Element port = (Element)connection.getElementsByTagName("port").item(0);
|
|
112 Element t = (Element)connection.getElementsByTagName("tid").item(0);
|
91
|
113 int srcId = Integer.parseInt(connection.getAttribute("id"));
|
84
|
114 String dstHostName = host.getTextContent();
|
91
|
115 int dstPort = Integer.parseInt(port.getAttribute("id"));
|
|
116 int dstId = Integer.parseInt(t.getAttribute("id"));
|
84
|
117 try {
|
|
118 PSXLindaImpl linda = (PSXLindaImpl) ml.open(dstHostName, dstPort);
|
|
119 Routing r = new Routing(linda, dstId);
|
91
|
120 nodes.put(srcId, r);
|
85
|
121 ml.in(srcId, new RoutingCallback(srcId, r));
|
84
|
122 } catch (IOException e) {
|
|
123 e.printStackTrace();
|
|
124 }
|
|
125 }
|
|
126 } else if (root.getTagName().equals("routing")) {
|
|
127 print("Routing xml received!");
|
85
|
128
|
84
|
129 NodeList routing = root.getElementsByTagName("source");
|
|
130 for (int i = 0; i < routing.getLength(); i++) {
|
|
131 Element src = (Element) routing.item(i);
|
91
|
132 Integer srcId = Integer.parseInt(src.getAttribute("id"));
|
85
|
133 Routing r = nodes.get(srcId);
|
84
|
134 NodeList dest = src.getElementsByTagName("dest");
|
85
|
135 for (int j = 0; j < dest.getLength(); j++) {
|
|
136 Element dst = (Element) dest.item(j);
|
91
|
137 r.route.add(Integer.parseInt(dst.getAttribute("id")));
|
84
|
138 }
|
|
139 }
|
|
140
|
|
141 }
|
|
142 if (tid == MANAGE) connected = true;
|
|
143 else if (tid == DEBUG) debugConnected = true;
|
|
144 if (connected && debugConnected) {
|
|
145 sendLocalHostName();
|
|
146 print("Send local host name");
|
|
147 connected = debugConnected = false;
|
|
148 }
|
|
149 }
|
|
150
|
|
151 }
|
|
152
|
85
|
153 private class RoutingCallback implements PSXCallback {
|
|
154 int tid;
|
|
155 Routing routing;
|
87
|
156
|
85
|
157 public RoutingCallback(int tid, Routing routing) {
|
|
158 this.tid = tid;
|
|
159 this.routing = routing;
|
87
|
160 ml.out(BODY, ByteBuffer.wrap("dummy".getBytes()));
|
85
|
161 }
|
|
162
|
|
163 public void callback(ByteBuffer reply) {
|
89
|
164 String str = new String(reply.array());
|
91
|
165 print("get message");
|
87
|
166 if (tid == TREETOP || tid == TREELEFT || tid == TREERIGHT) {
|
|
167 ml.in(BODY);
|
|
168 ml.out(BODY, reply);
|
|
169 print("Update body");
|
90
|
170 } else if (str.equals("shutdown") && (tid == RINGLEFT || tid == RINGRIGHT)) {
|
91
|
171 print("get shutdown command id: " + nodeId);
|
90
|
172 if (nodeId != 0) {
|
|
173 Routing r = null;
|
|
174 if (tid == RINGLEFT) {
|
|
175 r = nodes.get(new Integer(RINGRIGHT));
|
|
176 } else if (tid == RINGRIGHT) {
|
|
177 r = nodes.get(new Integer(RINGLEFT));
|
|
178 }
|
91
|
179 r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes()));
|
90
|
180 print("out");
|
91
|
181 ml.fdl.queueExec();
|
90
|
182 print("sync");
|
91
|
183 } else {
|
|
184 print("shutdown reaches last node!");
|
90
|
185 }
|
|
186 running = false;
|
|
187 return;
|
87
|
188 } else if (nodeId == 0 && tid == RINGLEFT) {
|
|
189 relayCounter++;
|
91
|
190 print("" + relayCounter + " relay");
|
87
|
191 if (relayCounter >= relayNum) {
|
90
|
192 // 実験終了
|
|
193 endTime = new Date();
|
|
194 Double resultTime = new Double(((endTime.getTime() - startTime.getTime()) / (double)relayNum));
|
|
195 ByteBuffer data = ByteBuffer.wrap(resultTime.toString().getBytes());
|
|
196 manager.out(MANAGE, data);
|
87
|
197 ml.in(tid, this);
|
|
198 return;
|
|
199 }
|
|
200 }
|
89
|
201
|
85
|
202 Iterator<Integer> it = routing.route.iterator();
|
|
203 while (it.hasNext()) {
|
|
204 Integer dstId = it.next();
|
|
205 Routing r = nodes.get(dstId);
|
|
206 r.linda.out(r.dstId, reply);
|
|
207 ml.in(tid, this);
|
|
208 }
|
|
209 }
|
|
210
|
|
211 }
|
|
212
|
87
|
213 private class StartCallback implements PSXCallback {
|
|
214 public void callback(ByteBuffer reply) {
|
|
215 Routing r;
|
|
216
|
|
217 // 子があるならば、子にタプルを伝搬
|
91
|
218 if (nodes.containsKey(TREERIGHT)) {
|
|
219 r = nodes.get(TREERIGHT);
|
87
|
220 r.linda.out(r.dstId, reply);
|
|
221 }
|
91
|
222 if (nodes.containsKey(TREELEFT)) {
|
|
223 r = nodes.get(TREELEFT);
|
87
|
224 r.linda.out(r.dstId, reply);
|
|
225 }
|
|
226 ml.in(START, this);
|
|
227 }
|
|
228
|
|
229 }
|
|
230
|
|
231 private class DebugStartCallback implements PSXCallback {
|
|
232 public void callback(ByteBuffer reply) {
|
|
233 String[] commands = new String(reply.array()).split(",");
|
|
234 String command = commands[0];
|
|
235 if (command.equals("relay")) {
|
91
|
236 relayNum = Integer.parseInt(commands[1]);
|
|
237 relaySize = Integer.parseInt(commands[2]);
|
87
|
238 relayCounter = 0;
|
91
|
239 print("relay num=" + relayNum + " size=" + relaySize);
|
|
240 Routing r = nodes.get(RINGRIGHT);
|
90
|
241 // 実験開始
|
|
242 startTime = new Date();
|
87
|
243 r.linda.out(r.dstId, ByteBuffer.wrap(new byte[relaySize]));
|
|
244 ml.in(DEBUGSTART, this);
|
89
|
245 } else if (command.equals("shutdown")) {
|
91
|
246 Routing r = nodes.get(RINGRIGHT);
|
89
|
247 r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes()));
|
90
|
248 //running = false;
|
87
|
249 }
|
|
250 }
|
|
251 }
|
|
252
|
84
|
253 // Constructor
|
|
254 public MetaProtocolEngine(int port, MetaLinda ml, String managerHostName, int managerPort) {
|
|
255 this.ml = ml;
|
|
256 this.localPort = port;
|
|
257 this.managerHostName = managerHostName;
|
|
258 this.managerPort = managerPort;
|
|
259 this.nodes = new HashMap<Integer, Routing>();
|
|
260 try {
|
|
261 this.localHostName = InetAddress.getLocalHost().getHostName();
|
|
262 } catch (UnknownHostException e) {
|
|
263 e.printStackTrace();
|
|
264 }
|
87
|
265 manager = connectServer(this.managerHostName, this.managerPort);
|
84
|
266 sendLocalHostName();
|
|
267 }
|
|
268
|
|
269 public void mainLoop() {
|
|
270 initPoller();
|
|
271 while (running) {
|
88
|
272 ml.sync();
|
84
|
273 }
|
91
|
274 print("Terminated" + nodeId
|
|
275 + " replies=" + ml.replies.size()
|
|
276 + " qsize=" + ml.fdl.qsize);
|
84
|
277 }
|
|
278
|
|
279 protected void initPoller() {
|
|
280 ml.in(MANAGE, new AcceptXMLCallback(MANAGE));
|
|
281 ml.in(DEBUG, new AcceptXMLCallback(DEBUG));
|
|
282 }
|
|
283
|
|
284 protected void sendLocalHostName() {
|
|
285 // TopologyManager に自分のホストネームを送信して、起動を伝える
|
91
|
286 ByteBuffer local = ByteBuffer.wrap((localHostName + ":" + localPort).getBytes());
|
84
|
287 manager.out(MANAGE, local);
|
|
288 }
|
|
289
|
|
290 protected PSXLinda connectServer(String hostName, int port) {
|
|
291 PSXLinda linda = null;
|
|
292 boolean connectPSX = true;
|
|
293 while (connectPSX) {
|
|
294 try {
|
|
295 linda = ml.open(hostName, port);
|
|
296 connectPSX = false;
|
|
297 } catch (IOException e) {
|
|
298 try {
|
|
299 Thread.sleep(40);
|
|
300 } catch (InterruptedException e1) {
|
|
301 }
|
|
302 }
|
|
303 }
|
|
304 print("Connect to " + hostName);
|
|
305 return linda;
|
|
306 }
|
|
307
|
|
308 void print(String str) {
|
|
309 System.err.println("[DEBUG] " + localHostName + ": " + str);
|
89
|
310 System.err.flush();
|
84
|
311 }
|
|
312
|
|
313 }
|