Mercurial > hg > FederatedLinda
view src/fdl/test/debug/MetaProtocolEngine.java @ 91:4df1d50df52a
Ring: fdl.test.debug
author | kazz <kazz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 16 Feb 2010 03:58:06 +0900 |
parents | 9cdc24bae625 |
children | 0ea086f0e96f |
line wrap: on
line source
package fdl.test.debug; import java.io.*; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; import javax.xml.parsers.*; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.NodeList; import org.xml.sax.SAXException; import fdl.*; /** * MetaProtocolEngine * * @author Kazuki Akamine * * 接続する機能までを実装した MetaEngine * これを継承して、具体的な処理を書く * */ public class MetaProtocolEngine implements MetaEngine { // Fields public static final int DEFAULTPORT = 10000; public static final int BODY = 100; public static final int START = 101; public static final int FINISH = 102; public static final int MANAGE = 60000; public static final int DEBUG = 61000; public static final int TREETOP = MANAGE + 1; public static final int TREELEFT = MANAGE + 2; public static final int TREERIGHT = MANAGE + 3; public static final int RINGLEFT = DEBUG + 1; public static final int RINGRIGHT = DEBUG + 2; public static final int DEBUGSTART = DEBUG + 1000; private MetaLinda ml; private String localHostName; private int localPort; private PSXLinda manager; private String managerHostName; private int managerPort = DEFAULTPORT; private boolean running = true; private boolean connected = false; private boolean debugConnected = false; private int nodeId; private HashMap<Integer, Routing> nodes; private int relayNum, relaySize, relayCounter; private Date startTime, endTime; // Callback class class AcceptXMLCallback implements PSXCallback { int tid; private DocumentBuilderFactory dbFactory = null; private DocumentBuilder docBuilder = null; public AcceptXMLCallback(int tid) { this.tid = tid; dbFactory = DocumentBuilderFactory.newInstance(); try { docBuilder = dbFactory.newDocumentBuilder(); } catch (ParserConfigurationException e) { e.printStackTrace(); } } public void callback(ByteBuffer reply) { String xml = new String(reply.array()); print(xml); parseXML(xml); ml.in(tid, this); } @SuppressWarnings("deprecation") protected void parseXML(String xml) { Document doc = null; try { doc = docBuilder.parse(new StringBufferInputStream(xml)); } catch (SAXException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } Element root = doc.getDocumentElement(); if(root.getTagName().equals("connections")) { nodeId = Integer.parseInt(root.getAttribute("id")); if (nodeId == 0) { ml.in(START, new StartCallback()); ml.in(DEBUGSTART, new DebugStartCallback()); } NodeList connections = root.getElementsByTagName("connection"); for (int i = 0; i < connections.getLength(); i++) { Element connection = (Element)connections.item(i); Element host = (Element)connection.getElementsByTagName("host").item(0); Element port = (Element)connection.getElementsByTagName("port").item(0); Element t = (Element)connection.getElementsByTagName("tid").item(0); int srcId = Integer.parseInt(connection.getAttribute("id")); String dstHostName = host.getTextContent(); int dstPort = Integer.parseInt(port.getAttribute("id")); int dstId = Integer.parseInt(t.getAttribute("id")); try { PSXLindaImpl linda = (PSXLindaImpl) ml.open(dstHostName, dstPort); Routing r = new Routing(linda, dstId); nodes.put(srcId, r); ml.in(srcId, new RoutingCallback(srcId, r)); } catch (IOException e) { e.printStackTrace(); } } } else if (root.getTagName().equals("routing")) { print("Routing xml received!"); NodeList routing = root.getElementsByTagName("source"); for (int i = 0; i < routing.getLength(); i++) { Element src = (Element) routing.item(i); Integer srcId = Integer.parseInt(src.getAttribute("id")); Routing r = nodes.get(srcId); NodeList dest = src.getElementsByTagName("dest"); for (int j = 0; j < dest.getLength(); j++) { Element dst = (Element) dest.item(j); r.route.add(Integer.parseInt(dst.getAttribute("id"))); } } } if (tid == MANAGE) connected = true; else if (tid == DEBUG) debugConnected = true; if (connected && debugConnected) { sendLocalHostName(); print("Send local host name"); connected = debugConnected = false; } } } private class RoutingCallback implements PSXCallback { int tid; Routing routing; public RoutingCallback(int tid, Routing routing) { this.tid = tid; this.routing = routing; ml.out(BODY, ByteBuffer.wrap("dummy".getBytes())); } public void callback(ByteBuffer reply) { String str = new String(reply.array()); print("get message"); if (tid == TREETOP || tid == TREELEFT || tid == TREERIGHT) { ml.in(BODY); ml.out(BODY, reply); print("Update body"); } else if (str.equals("shutdown") && (tid == RINGLEFT || tid == RINGRIGHT)) { print("get shutdown command id: " + nodeId); if (nodeId != 0) { Routing r = null; if (tid == RINGLEFT) { r = nodes.get(new Integer(RINGRIGHT)); } else if (tid == RINGRIGHT) { r = nodes.get(new Integer(RINGLEFT)); } r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes())); print("out"); ml.fdl.queueExec(); print("sync"); } else { print("shutdown reaches last node!"); } running = false; return; } else if (nodeId == 0 && tid == RINGLEFT) { relayCounter++; print("" + relayCounter + " relay"); if (relayCounter >= relayNum) { // 実験終了 endTime = new Date(); Double resultTime = new Double(((endTime.getTime() - startTime.getTime()) / (double)relayNum)); ByteBuffer data = ByteBuffer.wrap(resultTime.toString().getBytes()); manager.out(MANAGE, data); ml.in(tid, this); return; } } Iterator<Integer> it = routing.route.iterator(); while (it.hasNext()) { Integer dstId = it.next(); Routing r = nodes.get(dstId); r.linda.out(r.dstId, reply); ml.in(tid, this); } } } private class StartCallback implements PSXCallback { public void callback(ByteBuffer reply) { Routing r; // 子があるならば、子にタプルを伝搬 if (nodes.containsKey(TREERIGHT)) { r = nodes.get(TREERIGHT); r.linda.out(r.dstId, reply); } if (nodes.containsKey(TREELEFT)) { r = nodes.get(TREELEFT); r.linda.out(r.dstId, reply); } ml.in(START, this); } } private class DebugStartCallback implements PSXCallback { public void callback(ByteBuffer reply) { String[] commands = new String(reply.array()).split(","); String command = commands[0]; if (command.equals("relay")) { relayNum = Integer.parseInt(commands[1]); relaySize = Integer.parseInt(commands[2]); relayCounter = 0; print("relay num=" + relayNum + " size=" + relaySize); Routing r = nodes.get(RINGRIGHT); // 実験開始 startTime = new Date(); r.linda.out(r.dstId, ByteBuffer.wrap(new byte[relaySize])); ml.in(DEBUGSTART, this); } else if (command.equals("shutdown")) { Routing r = nodes.get(RINGRIGHT); r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes())); //running = false; } } } // Constructor public MetaProtocolEngine(int port, MetaLinda ml, String managerHostName, int managerPort) { this.ml = ml; this.localPort = port; this.managerHostName = managerHostName; this.managerPort = managerPort; this.nodes = new HashMap<Integer, Routing>(); try { this.localHostName = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { e.printStackTrace(); } manager = connectServer(this.managerHostName, this.managerPort); sendLocalHostName(); } public void mainLoop() { initPoller(); while (running) { ml.sync(); } print("Terminated" + nodeId + " replies=" + ml.replies.size() + " qsize=" + ml.fdl.qsize); } protected void initPoller() { ml.in(MANAGE, new AcceptXMLCallback(MANAGE)); ml.in(DEBUG, new AcceptXMLCallback(DEBUG)); } protected void sendLocalHostName() { // TopologyManager に自分のホストネームを送信して、起動を伝える ByteBuffer local = ByteBuffer.wrap((localHostName + ":" + localPort).getBytes()); manager.out(MANAGE, local); } protected PSXLinda connectServer(String hostName, int port) { PSXLinda linda = null; boolean connectPSX = true; while (connectPSX) { try { linda = ml.open(hostName, port); connectPSX = false; } catch (IOException e) { try { Thread.sleep(40); } catch (InterruptedException e1) { } } } print("Connect to " + hostName); return linda; } void print(String str) { System.err.println("[DEBUG] " + localHostName + ": " + str); System.err.flush(); } }