84
|
1 package fdl.test.debug;
|
|
2
|
|
3 import java.io.*;
|
|
4 import java.net.InetAddress;
|
|
5 import java.net.UnknownHostException;
|
|
6
|
|
7 import java.nio.ByteBuffer;
|
|
8 import java.util.*;
|
|
9
|
|
10 import javax.xml.parsers.*;
|
|
11
|
|
12 import org.w3c.dom.Document;
|
|
13 import org.w3c.dom.Element;
|
|
14 import org.w3c.dom.NodeList;
|
|
15 import org.xml.sax.SAXException;
|
|
16
|
|
17 import fdl.*;
|
|
18
|
|
19 /**
|
|
20 * MetaProtocolEngine
|
|
21 *
|
|
22 * @author Kazuki Akamine
|
|
23 *
|
|
24 * 接続する機能までを実装した MetaEngine
|
|
25 * これを継承して、具体的な処理を書く
|
|
26 *
|
|
27 */
|
|
28
|
|
29 public class MetaProtocolEngine implements MetaEngine {
|
|
30 // Fields
|
|
31 public static final int DEFAULTPORT = 10000;
|
87
|
32
|
|
33 public static final int BODY = 100;
|
|
34 public static final int START = 101;
|
|
35 public static final int FINISH = 102;
|
|
36
|
84
|
37 public static final int MANAGE = 60000;
|
|
38 public static final int DEBUG = 61000;
|
|
39
|
87
|
40 public static final int TREETOP = MANAGE + 1;
|
|
41 public static final int TREELEFT = MANAGE + 2;
|
|
42 public static final int TREERIGHT = MANAGE + 3;
|
|
43
|
|
44 public static final int RINGLEFT = DEBUG + 1;
|
|
45 public static final int RINGRIGHT = DEBUG + 2;
|
|
46
|
|
47 public static final int DEBUGSTART = DEBUG + 1000;
|
|
48
|
84
|
49 private MetaLinda ml;
|
|
50 private String localHostName;
|
|
51 private int localPort;
|
|
52 private PSXLinda manager;
|
|
53 private String managerHostName;
|
|
54 private int managerPort = DEFAULTPORT;
|
|
55 private boolean running = true;
|
|
56 private boolean connected = false;
|
|
57 private boolean debugConnected = false;
|
|
58 private int nodeId;
|
85
|
59 private HashMap<Integer, Routing> nodes;
|
84
|
60
|
87
|
61 private int relayNum, relaySize, relayCounter;
|
|
62
|
84
|
63 // Callback class
|
|
64 class AcceptXMLCallback implements PSXCallback {
|
|
65 int tid;
|
|
66
|
|
67 private DocumentBuilderFactory dbFactory = null;
|
|
68 private DocumentBuilder docBuilder = null;
|
|
69 protected Document document;
|
|
70
|
|
71 public AcceptXMLCallback(int tid) {
|
|
72 this.tid = tid;
|
|
73 dbFactory = DocumentBuilderFactory.newInstance();
|
|
74 try {
|
|
75 docBuilder = dbFactory.newDocumentBuilder();
|
|
76 } catch (ParserConfigurationException e) {
|
|
77 e.printStackTrace();
|
|
78 }
|
|
79
|
|
80 }
|
|
81 public void callback(ByteBuffer reply) {
|
|
82 String xml = new String(reply.array());
|
|
83 print(xml);
|
|
84 parseXML(xml);
|
|
85
|
|
86 ml.in(tid, this);
|
|
87 }
|
|
88 @SuppressWarnings("deprecation")
|
|
89 protected void parseXML(String xml) {
|
|
90 Document doc = null;
|
|
91 try {
|
|
92 doc = docBuilder.parse(new StringBufferInputStream(xml));
|
|
93 } catch (SAXException e) {
|
|
94 e.printStackTrace();
|
|
95 } catch (IOException e) {
|
|
96 e.printStackTrace();
|
|
97 }
|
|
98
|
|
99 Element root = doc.getDocumentElement();
|
|
100 if(root.getTagName().equals("connections")) {
|
|
101 nodeId = new Integer(root.getAttribute("id")).intValue();
|
87
|
102 if (nodeId == 0) {
|
|
103 ml.in(START, new StartCallback());
|
|
104 ml.in(DEBUGSTART, new DebugStartCallback());
|
|
105 }
|
84
|
106 NodeList connections = root.getElementsByTagName("connection");
|
|
107 for (int i = 0; i < connections.getLength(); i++) {
|
|
108 Element connection = (Element)connections.item(i);
|
|
109 Element host = (Element)connection.getElementsByTagName("host").item(0);
|
|
110 Element port = (Element)connection.getElementsByTagName("port").item(0);
|
|
111 Element t = (Element)connection.getElementsByTagName("tid").item(0);
|
|
112 int srcId = new Integer(connection.getAttribute("id")).intValue();
|
|
113 String dstHostName = host.getTextContent();
|
|
114 int dstPort = new Integer(port.getAttribute("id")).intValue();
|
|
115 int dstId = new Integer(t.getAttribute("id")).intValue();
|
|
116 try {
|
|
117 PSXLindaImpl linda = (PSXLindaImpl) ml.open(dstHostName, dstPort);
|
|
118 Routing r = new Routing(linda, dstId);
|
85
|
119 nodes.put(new Integer(srcId), r);
|
|
120 ml.in(srcId, new RoutingCallback(srcId, r));
|
84
|
121 } catch (IOException e) {
|
|
122 e.printStackTrace();
|
|
123 }
|
|
124 }
|
|
125 } else if (root.getTagName().equals("routing")) {
|
|
126 print("Routing xml received!");
|
85
|
127
|
84
|
128 NodeList routing = root.getElementsByTagName("source");
|
|
129 for (int i = 0; i < routing.getLength(); i++) {
|
|
130 Element src = (Element) routing.item(i);
|
85
|
131 Integer srcId = new Integer(src.getAttribute("id"));
|
|
132 Routing r = nodes.get(srcId);
|
84
|
133 NodeList dest = src.getElementsByTagName("dest");
|
85
|
134 for (int j = 0; j < dest.getLength(); j++) {
|
|
135 Element dst = (Element) dest.item(j);
|
|
136 Integer dstId = new Integer(dst.getAttribute("id"));
|
|
137 r.route.add(dstId);
|
84
|
138 }
|
|
139 }
|
|
140
|
|
141 }
|
|
142 if (tid == MANAGE) connected = true;
|
|
143 else if (tid == DEBUG) debugConnected = true;
|
|
144 if (connected && debugConnected) {
|
|
145 sendLocalHostName();
|
|
146 print("Send local host name");
|
|
147 connected = debugConnected = false;
|
|
148 }
|
|
149 }
|
|
150
|
|
151 }
|
|
152
|
85
|
153 private class RoutingCallback implements PSXCallback {
|
|
154 int tid;
|
|
155 Routing routing;
|
87
|
156
|
85
|
157 public RoutingCallback(int tid, Routing routing) {
|
|
158 this.tid = tid;
|
|
159 this.routing = routing;
|
87
|
160 ml.out(BODY, ByteBuffer.wrap("dummy".getBytes()));
|
85
|
161 }
|
|
162
|
|
163 public void callback(ByteBuffer reply) {
|
89
|
164 String str = new String(reply.array());
|
|
165
|
87
|
166 if (tid == TREETOP || tid == TREELEFT || tid == TREERIGHT) {
|
|
167 ml.in(BODY);
|
|
168 ml.out(BODY, reply);
|
|
169 print("Update body");
|
|
170 } else if (nodeId == 0 && tid == RINGLEFT) {
|
|
171 relayCounter++;
|
|
172 print(new Integer(relayCounter).toString() + " relay");
|
|
173 if (relayCounter >= relayNum) {
|
89
|
174 sendLocalHostName();
|
87
|
175 ml.in(tid, this);
|
|
176 return;
|
|
177 }
|
89
|
178 } else if (str.equals("shutdown")) {
|
|
179 if (tid == RINGLEFT) {
|
|
180 Routing r = nodes.get(new Integer(RINGRIGHT));
|
|
181 r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes()));
|
|
182 try {
|
|
183 r.linda.sync(1);
|
|
184 } catch (IOException e) {
|
|
185 e.printStackTrace();
|
|
186 }
|
|
187 running = false;
|
|
188 return;
|
|
189 } else if (tid == RINGRIGHT) {
|
|
190 Routing r = nodes.get(new Integer(RINGLEFT));
|
|
191 r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes()));
|
|
192 try {
|
|
193 r.linda.sync(1);
|
|
194 } catch (IOException e) {
|
|
195 e.printStackTrace();
|
|
196 }
|
|
197 running = false;
|
|
198 return;
|
|
199 }
|
87
|
200 }
|
89
|
201
|
85
|
202 Iterator<Integer> it = routing.route.iterator();
|
|
203 while (it.hasNext()) {
|
|
204 Integer dstId = it.next();
|
|
205 Routing r = nodes.get(dstId);
|
|
206 r.linda.out(r.dstId, reply);
|
|
207 ml.in(tid, this);
|
|
208 }
|
|
209 }
|
|
210
|
|
211 }
|
|
212
|
87
|
213 private class StartCallback implements PSXCallback {
|
|
214
|
|
215 public void callback(ByteBuffer reply) {
|
|
216 Routing r;
|
|
217
|
|
218 // 子があるならば、子にタプルを伝搬
|
|
219 if (nodes.containsKey(new Integer(TREERIGHT))) {
|
|
220 r = nodes.get(new Integer(TREERIGHT));
|
|
221 r.linda.out(r.dstId, reply);
|
|
222 }
|
|
223 if (nodes.containsKey(new Integer(TREELEFT))) {
|
|
224 r = nodes.get(new Integer(TREELEFT));
|
|
225 r.linda.out(r.dstId, reply);
|
|
226 }
|
|
227 ml.in(START, this);
|
|
228 }
|
|
229
|
|
230 }
|
|
231
|
|
232 private class DebugStartCallback implements PSXCallback {
|
|
233 public void callback(ByteBuffer reply) {
|
|
234 String[] commands = new String(reply.array()).split(",");
|
|
235 String command = commands[0];
|
|
236 if (command.equals("relay")) {
|
|
237 relayNum = new Integer(commands[1]).intValue();
|
|
238 relaySize = new Integer(commands[2]).intValue();
|
|
239 relayCounter = 0;
|
|
240
|
|
241 print("relay num=" + new Integer(relayNum).toString() + " size=" + new Integer(relaySize).toString());
|
|
242
|
|
243 Routing r = nodes.get(new Integer(RINGRIGHT));
|
|
244 r.linda.out(r.dstId, ByteBuffer.wrap(new byte[relaySize]));
|
|
245 ml.in(DEBUGSTART, this);
|
89
|
246 } else if (command.equals("shutdown")) {
|
|
247 Routing r = nodes.get(new Integer(RINGRIGHT));
|
|
248 r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes()));
|
|
249 running = false;
|
87
|
250 }
|
|
251 }
|
|
252 }
|
|
253
|
84
|
254 // Constructor
|
|
255 public MetaProtocolEngine(int port, MetaLinda ml, String managerHostName, int managerPort) {
|
|
256 this.ml = ml;
|
|
257 this.localPort = port;
|
|
258 this.managerHostName = managerHostName;
|
|
259 this.managerPort = managerPort;
|
|
260 this.nodes = new HashMap<Integer, Routing>();
|
|
261 try {
|
|
262 this.localHostName = InetAddress.getLocalHost().getHostName();
|
|
263 } catch (UnknownHostException e) {
|
|
264 e.printStackTrace();
|
|
265 }
|
87
|
266 manager = connectServer(this.managerHostName, this.managerPort);
|
84
|
267 sendLocalHostName();
|
|
268 }
|
|
269
|
|
270 public void mainLoop() {
|
|
271 initPoller();
|
|
272 while (running) {
|
88
|
273 ml.sync();
|
84
|
274 }
|
|
275 }
|
|
276
|
|
277 protected void initPoller() {
|
|
278 ml.in(MANAGE, new AcceptXMLCallback(MANAGE));
|
|
279 ml.in(DEBUG, new AcceptXMLCallback(DEBUG));
|
|
280 }
|
|
281
|
|
282 protected void sendLocalHostName() {
|
|
283 // TopologyManager に自分のホストネームを送信して、起動を伝える
|
|
284 ByteBuffer local;
|
|
285 local = ByteBuffer.wrap((localHostName + ":" + new Integer(localPort).toString()).getBytes());
|
|
286 manager.out(MANAGE, local);
|
|
287 }
|
|
288
|
|
289 protected PSXLinda connectServer(String hostName, int port) {
|
|
290 PSXLinda linda = null;
|
|
291 boolean connectPSX = true;
|
|
292 while (connectPSX) {
|
|
293 try {
|
|
294 linda = ml.open(hostName, port);
|
|
295 connectPSX = false;
|
|
296 } catch (IOException e) {
|
|
297 try {
|
|
298 Thread.sleep(40);
|
|
299 } catch (InterruptedException e1) {
|
|
300 }
|
|
301 }
|
|
302 }
|
|
303 print("Connect to " + hostName);
|
|
304 return linda;
|
|
305 }
|
|
306
|
|
307 void print(String str) {
|
|
308 System.err.println("[DEBUG] " + localHostName + ": " + str);
|
89
|
309 System.err.flush();
|
84
|
310 }
|
|
311
|
|
312 }
|