Mercurial > hg > FederatedLinda
view src/fdl/test/debug/MetaProtocolEngine.java @ 85:d0d8aeaebccf
add routing table
author | one |
---|---|
date | Mon, 08 Feb 2010 11:07:57 +0900 |
parents | c0575f877591 |
children | c0591636a71a |
line wrap: on
line source
package fdl.test.debug; import java.io.*; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; import javax.xml.parsers.*; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.NodeList; import org.xml.sax.SAXException; import fdl.*; /** * MetaProtocolEngine * * @author Kazuki Akamine * * 接続する機能までを実装した MetaEngine * これを継承して、具体的な処理を書く * */ public class MetaProtocolEngine implements MetaEngine { // Fields public static final int DEFAULTPORT = 10000; public static final int MANAGE = 60000; public static final int DEBUG = 61000; private MetaLinda ml; private String localHostName; private int localPort; private PSXLinda manager; private String managerHostName; private int managerPort = DEFAULTPORT; private boolean running = true; private boolean connected = false; private boolean debugConnected = false; private int nodeId; private HashMap<Integer, Routing> nodes; // Callback class class AcceptXMLCallback implements PSXCallback { int tid; private DocumentBuilderFactory dbFactory = null; private DocumentBuilder docBuilder = null; protected Document document; public AcceptXMLCallback(int tid) { this.tid = tid; dbFactory = DocumentBuilderFactory.newInstance(); try { docBuilder = dbFactory.newDocumentBuilder(); } catch (ParserConfigurationException e) { e.printStackTrace(); } } public void callback(ByteBuffer reply) { String xml = new String(reply.array()); print(xml); parseXML(xml); ml.in(tid, this); } @SuppressWarnings("deprecation") protected void parseXML(String xml) { Document doc = null; try { doc = docBuilder.parse(new StringBufferInputStream(xml)); } catch (SAXException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } Element root = doc.getDocumentElement(); if(root.getTagName().equals("connections")) { nodeId = new Integer(root.getAttribute("id")).intValue(); NodeList connections = root.getElementsByTagName("connection"); for (int i = 0; i < connections.getLength(); i++) { Element connection = (Element)connections.item(i); Element host = (Element)connection.getElementsByTagName("host").item(0); Element port = (Element)connection.getElementsByTagName("port").item(0); Element t = (Element)connection.getElementsByTagName("tid").item(0); int srcId = new Integer(connection.getAttribute("id")).intValue(); String dstHostName = host.getTextContent(); int dstPort = new Integer(port.getAttribute("id")).intValue(); int dstId = new Integer(t.getAttribute("id")).intValue(); try { PSXLindaImpl linda = (PSXLindaImpl) ml.open(dstHostName, dstPort); Routing r = new Routing(linda, dstId); nodes.put(new Integer(srcId), r); ml.in(srcId, new RoutingCallback(srcId, r)); } catch (IOException e) { e.printStackTrace(); } } } else if (root.getTagName().equals("routing")) { print("Routing xml received!"); NodeList routing = root.getElementsByTagName("source"); for (int i = 0; i < routing.getLength(); i++) { Element src = (Element) routing.item(i); Integer srcId = new Integer(src.getAttribute("id")); Routing r = nodes.get(srcId); NodeList dest = src.getElementsByTagName("dest"); for (int j = 0; j < dest.getLength(); j++) { Element dst = (Element) dest.item(j); Integer dstId = new Integer(dst.getAttribute("id")); r.route.add(dstId); } } } if (tid == MANAGE) connected = true; else if (tid == DEBUG) debugConnected = true; if (connected && debugConnected) { sendLocalHostName(); print("Send local host name"); connected = debugConnected = false; } } } private class RoutingCallback implements PSXCallback { int tid; Routing routing; public RoutingCallback(int tid, Routing routing) { this.tid = tid; this.routing = routing; } public void callback(ByteBuffer reply) { Iterator<Integer> it = routing.route.iterator(); while (it.hasNext()) { Integer dstId = it.next(); Routing r = nodes.get(dstId); r.linda.out(r.dstId, reply); try { r.linda.sync(1); } catch (IOException e) { e.printStackTrace(); } ml.in(tid, this); } } } // Constructor public MetaProtocolEngine(int port, MetaLinda ml, String managerHostName, int managerPort) { this.ml = ml; this.localPort = port; this.managerHostName = managerHostName; this.managerPort = managerPort; this.nodes = new HashMap<Integer, Routing>(); try { this.localHostName = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { e.printStackTrace(); } manager = connectServer(managerHostName, managerPort); sendLocalHostName(); } public void mainLoop() { initPoller(); while (running) { ml.sync(); } } protected void initPoller() { ml.in(MANAGE, new AcceptXMLCallback(MANAGE)); ml.in(DEBUG, new AcceptXMLCallback(DEBUG)); } protected void sendLocalHostName() { // TopologyManager に自分のホストネームを送信して、起動を伝える ByteBuffer local; local = ByteBuffer.wrap((localHostName + ":" + new Integer(localPort).toString()).getBytes()); manager.out(MANAGE, local); try { manager.sync(1); } catch (IOException e) { e.printStackTrace(); } } protected PSXLinda connectServer(String hostName, int port) { PSXLinda linda = null; boolean connectPSX = true; while (connectPSX) { try { linda = ml.open(hostName, port); connectPSX = false; } catch (IOException e) { try { Thread.sleep(40); } catch (InterruptedException e1) { } } } print("Connect to " + hostName); return linda; } void print(String str) { System.err.println("[DEBUG] " + localHostName + ": " + str); } }