Mercurial > hg > FederatedLinda
changeset 92:ea4ee892baf5
commit
line wrap: on
line diff
--- a/src/fdl/FDLindaServ.java Tue Feb 16 03:58:06 2010 +0900 +++ b/src/fdl/FDLindaServ.java Thu Apr 22 16:13:03 2010 +0900 @@ -92,7 +92,7 @@ // TupleHandler handler = (TupleHandler)s.attachment(); // handler.handle(s); // } - for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) { + for (Iterator<SelectionKey> it = selector.selectedKeys().iterator();it.hasNext(); ) { SelectionKey s = it.next(); it.remove(); TupleHandler handler = (TupleHandler)s.attachment();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/AcceptXMLCallback.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,117 @@ +package fdl.test.debug2; + +import java.io.IOException; +import java.io.StringReader; +import java.nio.ByteBuffer; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; +import org.xml.sax.InputSource; +import org.xml.sax.SAXException; + +import fdl.PSXCallback; +import fdl.PSXLindaImpl; + +class AcceptXMLCallback implements PSXCallback { + TupleId tid; + NodeProperty np; + TreeProperty tp; + DebugProperty dp; + + private DocumentBuilderFactory dbFactory = null; + private DocumentBuilder docBuilder = null; + + public AcceptXMLCallback(TupleId tid, NodeProperty np) { + this.tid = tid; + this.np = np; + dbFactory = DocumentBuilderFactory.newInstance(); + try { + docBuilder = dbFactory.newDocumentBuilder(); + } catch (ParserConfigurationException e) { + e.printStackTrace(); + } + + } + public void callback(ByteBuffer reply) { + String xml = new String(reply.array()); + Debug.print(xml); + parseXML(xml); + + np.ml.in(tid.id, this); + } + protected void parseXML(String xml) { + Document doc = null; + try { + doc = docBuilder.parse(new InputSource(new StringReader(xml))); + } catch (SAXException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + + Element root = doc.getDocumentElement(); + if(root.getTagName().equals("connections")) { + np.nodeId = Integer.parseInt(root.getAttribute("id")); + if (np.nodeId == 0) { + np.ml.in(TupleId.START.id, new StartCallback(np, tp = new TreeProperty(np))); + np.ml.in(TupleId.DEBUGSTART.id, new DebugStartCallback(np, dp = new DebugProperty())); + } + NodeList connections = root.getElementsByTagName("connection"); + makeConnections(connections); + } else if (root.getTagName().equals("routing")) { + Debug.print("Routing xml received!"); + + NodeList routing = root.getElementsByTagName("source"); + for (int i = 0; i < routing.getLength(); i++) { + Element src = (Element) routing.item(i); + Integer srcId = Integer.parseInt(src.getAttribute("id")); + Routing r = np.nodes.get(TupleId.getTupleIdFromId(srcId)); + NodeList dest = src.getElementsByTagName("dest"); + for (int j = 0; j < dest.getLength(); j++) { + Element dst = (Element) dest.item(j); + r.route.add(Integer.parseInt(dst.getAttribute("id"))); + } + } + + } + switch (tid) { + case MANAGE: + np.connected = true; + break; + case DEBUG: + np.debugConnected = true; + break; + } + if (np.connected && np.debugConnected) { + np.sendLocalHostName(); + Debug.print("Send local host name"); + np.connected = np.debugConnected = false; + } + } + private void makeConnections(NodeList connections) { + for (int i = 0; i < connections.getLength(); i++) { + Element connection = (Element)connections.item(i); + Element host = (Element)connection.getElementsByTagName("host").item(0); + Element port = (Element)connection.getElementsByTagName("port").item(0); + Element t = (Element)connection.getElementsByTagName("tid").item(0); + int srcId = Integer.parseInt(connection.getAttribute("id")); + String dstHostName = host.getTextContent(); + int dstPort = Integer.parseInt(port.getAttribute("id")); + int dstId = Integer.parseInt(t.getAttribute("id")); + try { + PSXLindaImpl linda = (PSXLindaImpl) np.ml.open(dstHostName, dstPort); + Routing r = new Routing(linda, dstId); + np.nodes.put(TupleId.getTupleIdFromId(srcId), r); + np.ml.in(srcId, new RoutingCallback(TupleId.getTupleIdFromId(srcId), r, np, tp, dp)); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/ConfigurationManager.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,45 @@ +package fdl.test.debug2; + +import java.io.IOException; + +import fdl.MetaEngine; + +/** +* ConfigurationManager +* +* @author Kazuki Akamine +* +* FDLindaNode を管理する Server +* +*/ + +public class ConfigurationManager { + private static int nodeNum = 2; + private static int relayNum = 1; + private static int relaySize = 1024; + private static String usageString + = "ConfigurationManager -nodes NODENUM -relay RELAYNUM -size RELAYSIZE"; + + public static void main(String[] args) { + for (int i = 0; i < args.length; i++) { + if ("-nodes".equals(args[i])) { + nodeNum = Integer.parseInt(args[++i]); + } else if ("-relay".equals(args[i])) { + relayNum = Integer.parseInt(args[++i]); + } else if ("-size".equals(args[i])) { + relaySize = Integer.parseInt(args[++i]); + } else { + System.err.println(usageString); + } + } + try { + FDLindaNode manager = new FDLindaNode(FDLindaNode.DEFAULTPORT); + MetaEngine me = new ConfigurationManagerEngine(manager.getMetaLinda(), nodeNum, relayNum, relaySize); + manager.setMetaEngine(me); + manager.mainLoop(); + } catch (IOException e) { + e.printStackTrace(); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/ConfigurationManagerEngine.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,173 @@ +package fdl.test.debug2; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import fdl.*; + +public class ConfigurationManagerEngine implements MetaEngine { + public static final int DEFAULTPORT = 10000; + + private int nodeNum = 0; + private int relayNum = 0; + private int relaySize = 1; + private MetaLinda ml; + private NodeInfo[] nodes; + private boolean running = true; + + private class AcceptNewNode implements PSXCallback { + int count = 0; + public void callback(ByteBuffer reply) { + String[] hostData = new String(reply.array()).split(":"); + String hostName = hostData[0]; + int port = DEFAULTPORT; + if (hostData.length > 1) + port = Integer.parseInt(hostData[1]); + nodes[count] = new NodeInfo(hostName, port); + try { + nodes[count].linda = ml.open(hostName, port); + } catch (IOException e) { + e.printStackTrace(); + } + if (++count < nodeNum) { + ml.in(TupleId.MANAGE.id, this); + } else { + linkNodes(); + ml.in(TupleId.MANAGE.id, new ConfirmConnectionNode()); + } + } + } + + private class ConfirmConnectionNode implements PSXCallback { + int count = 0; + public void callback(ByteBuffer reply) { + if (++count < nodeNum) { + ml.in(TupleId.MANAGE.id, this); + } else { + routingNodes(); + ml.in(TupleId.MANAGE.id, new ConfirmRoutingNode()); + } + } + + } + + private class ConfirmRoutingNode implements PSXCallback { + int count = 0; + public void callback(ByteBuffer reply) { + if (++count < nodeNum) { + ml.in(TupleId.MANAGE.id, this); + } else { + print("All link configured!"); + // TREE実験開始を通知 + nodes[0].linda.out(TupleId.START.id, ByteBuffer.wrap("0".getBytes())); + // DebugRing 開始を通知 + //nodes[0].linda.out(DEBUGSTART, ByteBuffer.wrap(("print," + BODY + ",").getBytes())); + ml.in(TupleId.START.id, new ConfirmFinish()); + } + } + + } + + private class ConfirmFinish implements PSXCallback { + public void callback(ByteBuffer reply) { + System.out.println(new String(reply.array())); + nodes[0].linda.out(TupleId.DEBUGSTART.id, ByteBuffer.wrap("shutdown".getBytes())); + print("Finish token"); + try { + nodes[0].linda.sync(1); + } catch (IOException e) { + e.printStackTrace(); + } + running = false; + } + + } + + private class RingLoop implements PSXCallback { + int counter = 0; + public void callback(ByteBuffer reply) { + nodes[0].linda.out(TupleId.DEBUGSTART.id, ByteBuffer.wrap(("print," + TupleId.BODY.id + ",").getBytes())); + print("ring=" + counter + ": \n" + new String(reply.array())); + counter++; + ml.in(TupleId.DEBUG.id, this); + } + + } + + public ConfigurationManagerEngine(MetaLinda metaLinda, int nodeNum, int relayNum, int relaySize) { + // initialize + this.ml = metaLinda; + this.nodeNum = nodeNum; + this.relayNum = relayNum; + this.relaySize = relaySize; + nodes = new NodeInfo[nodeNum]; + } + + public void mainLoop() { + // resist poll tuple id + ml.in(TupleId.MANAGE.id, new AcceptNewNode()); + ml.in(TupleId.DEBUG.id, new RingLoop()); + while (running) + ml.sync(); + } + + private void linkNodes() { + for (int i = 0; i < nodes.length; i++) { + // XML for Tree Topology + ConnectionXMLBuilder tree = new ConnectionXMLBuilder(i); + int k; + if (i != 0) { // TOP + k = (i-1)/2; + tree.appendConnection(TupleId.TREETOP, nodes[k].host, nodes[k].port, (i%2 == 0) ? TupleId.TREERIGHT : TupleId.TREELEFT); + } + if ((k = 2*i+1) < nodes.length) // LEFT + tree.appendConnection(TupleId.TREELEFT, nodes[k].host, nodes[k].port, TupleId.TREETOP); + if ((k = 2*i+2) < nodes.length) // RIGHT + tree.appendConnection(TupleId.TREERIGHT, nodes[k].host, nodes[k].port, TupleId.TREETOP); + nodes[i].connectionXML = tree.createXML(); + nodes[i].linda.out(TupleId.MANAGE.id, ByteBuffer.wrap(nodes[i].connectionXML.getBytes())); + print(nodes[i].connectionXML); + + // XML for Ring Debug Topology + ConnectionXMLBuilder debug = new ConnectionXMLBuilder(i); + int left = (nodes.length + i - 1) % nodes.length; + int right = (i + 1) % nodes.length; + debug.appendConnection(TupleId.RINGLEFT, nodes[left].host, nodes[left].port, TupleId.RINGRIGHT); + debug.appendConnection(TupleId.RINGRIGHT, nodes[right].host, nodes[right].port, TupleId.RINGLEFT); + nodes[i].debugConnectionXML = debug.createXML(); + nodes[i].linda.out(TupleId.DEBUG.id, ByteBuffer.wrap(nodes[i].debugConnectionXML.getBytes())); + print(nodes[i].debugConnectionXML); + } + } + + private void routingNodes() { + for (int i = 0; i < nodes.length; i++) { + RoutingXMLBuilder tree = new RoutingXMLBuilder(); + if (i != 0) { // TOP + if (2*i+1 < nodes.length) { // LEFT + tree.appendRoutingTable(TupleId.TREETOP, TupleId.TREELEFT); + tree.appendRoutingTable(TupleId.TREELEFT, TupleId.TREETOP); + } + if (2*i+2 < nodes.length) { // RIGHT + tree.appendRoutingTable(TupleId.TREETOP, TupleId.TREERIGHT); + tree.appendRoutingTable(TupleId.TREERIGHT, TupleId.TREETOP); + } + } + nodes[i].routingXML = tree.createXML(); + nodes[i].linda.out(TupleId.MANAGE.id, ByteBuffer.wrap(nodes[i].routingXML.getBytes())); + print(nodes[i].routingXML); + + RoutingXMLBuilder debug = new RoutingXMLBuilder(); + debug.appendRoutingTable(TupleId.RINGLEFT, TupleId.RINGRIGHT); + debug.appendRoutingTable(TupleId.RINGRIGHT, TupleId.RINGLEFT); + nodes[i].debugRoutingXML = debug.createXML(); + nodes[i].linda.out(TupleId.DEBUG.id, ByteBuffer.wrap(nodes[i].debugRoutingXML.getBytes())); + print(nodes[i].debugRoutingXML); + } + } + + void print(String str) { + System.err.println("[DEBUG] ConfigurationManager: " + str); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/ConnectionXMLBuilder.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,31 @@ +package fdl.test.debug2; + +import org.w3c.dom.*; + +public class ConnectionXMLBuilder extends XMLBuilder { + private Element connections; + + public ConnectionXMLBuilder(int id) { + super(); + connections = document.createElement("connections"); + connections.setAttribute("id", new Integer(id).toString()); + document.appendChild(connections); + } + + public void appendConnection(TupleId mytid, String host, int port, TupleId tid){ + Element connection = document.createElement("connection"); + connection.setAttribute("id", new Integer(mytid.id).toString()); + Element h = document.createElement("host"); + h.setTextContent(host); + Element p = document.createElement("port"); + p.setAttribute("id", new Integer(port).toString()); + Element t = document.createElement("tid"); + t.setAttribute("id", new Integer(tid.id).toString()); + + connections.appendChild(connection); + connection.appendChild(h); + connection.appendChild(p); + connection.appendChild(t); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/Debug.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,21 @@ +package fdl.test.debug2; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class Debug { + private static Debug d = new Debug(); // Singleton + private String localHostName; + private Debug() { + try { + localHostName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + } + static void print(String str) { + System.err.println("[DEBUG] " + d.localHostName + ": " + str); + System.err.flush(); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/DebugProperty.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,5 @@ +package fdl.test.debug2; + +public class DebugProperty { + public int relayNum, relaySize, relayCounter; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/DebugStartCallback.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,39 @@ +package fdl.test.debug2; + +import java.nio.ByteBuffer; +//import java.util.Date; + +import fdl.PSXCallback; + +class DebugStartCallback implements PSXCallback { + public NodeProperty np; + public DebugProperty dp; + public DebugStartCallback(NodeProperty np, DebugProperty dp) { + this.np = np; + this.dp = dp; + } + public void callback(ByteBuffer reply) { + String[] commands = new String(reply.array()).split(","); + String command = commands[0]; + if (command.equals("relay")) { + dp.relayNum = Integer.parseInt(commands[1]); + dp.relaySize = Integer.parseInt(commands[2]); + dp.relayCounter = 0; + Debug.print("relay num=" + dp.relayNum + " size=" + dp.relaySize); + Routing r = np.nodes.get(TupleId.RINGRIGHT); + // 実験開始 + // TODO: startTime? + //startTime = new Date(); + r.linda.out(r.dstId, ByteBuffer.wrap(new byte[dp.relaySize])); + np.ml.in(TupleId.DEBUGSTART.id, this); + } else if (command.equals("print")) { + Routing r = np.nodes.get(TupleId.RINGRIGHT); + r.linda.out(r.dstId, reply); + Debug.print("Send Debug Message: print"); + np.ml.in(TupleId.DEBUGSTART.id, this); + } else if (command.equals("shutdown")) { + Routing r = np.nodes.get(TupleId.RINGRIGHT); + r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes())); + } + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/FDLindaNode.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,75 @@ +package fdl.test.debug2; + +import java.io.IOException; + +import fdl.FDLindaServ; +import fdl.MetaLinda; +import fdl.MetaEngine; + +/** +* FDLindaNode +* +* @author Kazuki Akamine +* +* Federated Linda の Topology Node +* 接続する機能までを実装 MetaEngine は外部から提供される +* +*/ + +public class FDLindaNode extends FDLindaServ { + // Fields + private MetaLinda ml; + public static final int DEFAULTPORT = 10000; + private static String manager; + private final static String usageString + = "Usage: FDLindaNode -manager SERVERNAME:PORT"; + + // Constructor + public FDLindaNode(int port) throws IOException { + super(port); + this.ml = new MetaLinda(tupleSpace, this); + } + + @Override public void mainLoop() { + me.mainLoop(); + } + public void setMetaEngine(MetaEngine me) { + this.me = me; + } + public MetaLinda getMetaLinda() { + return ml; + } + + // main routine + public static void main(String[] args) { + int port = DEFAULTPORT; + for (int i = 0; i < args.length; i++) { + if ("-manager".equals(args[i])) { + manager = args[++i]; + } else if ("-p".equals(args[i])) { + port = new Integer(args[++i]).intValue(); + } else { + System.err.println(usageString); + } + } + + String[] managerData = manager.split(":"); + String managerHostName = managerData[0]; + int managerPort; + if (managerData.length > 1) { + managerPort = new Integer(managerData[1]).intValue(); + } else { + managerPort = DEFAULTPORT; + } + + try { + FDLindaNode node = new FDLindaNode(DEFAULTPORT); + MetaEngine me = new MetaProtocolEngine(port, node.getMetaLinda(), managerHostName, managerPort); + node.setMetaEngine(me); + node.mainLoop(); + } catch (IOException e) { + e.printStackTrace(); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/MetaProtocolEngine.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,42 @@ +package fdl.test.debug2; + +import java.nio.ByteBuffer; + +import fdl.*; + +/** +* MetaProtocolEngine +* +* @author Kazuki Akamine +* +* 接続する機能までを実装した MetaEngine +* これを継承して、具体的な処理を書く +* +*/ + +public class MetaProtocolEngine implements MetaEngine { + // Fields + private NodeProperty np; + + // Constructor + public MetaProtocolEngine(int port, MetaLinda ml, String managerHostName, int managerPort) { + this.np = new NodeProperty(port, ml, managerHostName, managerPort); + ml.out(TupleId.BODY.id, ByteBuffer.wrap("dummy".getBytes())); + } + + public void mainLoop() { + initPoller(); + while (np.running) { + np.ml.sync(); + } + Debug.print("Terminated" + np.nodeId + + " replies=" + np.ml.replies.size() + + " qsize=" + np.ml.fdl.qsize); + } + + protected void initPoller() { + np.ml.in(TupleId.MANAGE.id, new AcceptXMLCallback(TupleId.MANAGE, np)); + np.ml.in(TupleId.DEBUG.id, new AcceptXMLCallback(TupleId.DEBUG, np)); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/NodeInfo.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,17 @@ +package fdl.test.debug2; + +import fdl.PSXLinda; + +public class NodeInfo { + public String host; + public int port; + public PSXLinda linda; + public String connectionXML, debugConnectionXML; + public String routingXML, debugRoutingXML; + + public NodeInfo(String host, int port) { + this.host = host; + this.port = port; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/NodeProperty.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,66 @@ +package fdl.test.debug2; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.HashMap; + +import fdl.MetaLinda; +import fdl.PSXLinda; + +public class NodeProperty { + public static int DEFAULTPORT = 10000; + + public String localHostName; + public int localPort; + public PSXLinda manager; + public String managerHostName; + public int managerPort = DEFAULTPORT; + + public int nodeId; + public boolean running = true; + public boolean connected = false; + public boolean debugConnected = false; + public HashMap<TupleId, Routing> nodes = new HashMap<TupleId, Routing>(); + public MetaLinda ml; + + public NodeProperty(int port, MetaLinda ml, String managerHostName, int managerPort) { + this.ml = ml; + this.localPort = port; + this.managerHostName = managerHostName; + this.managerPort = managerPort; + try { + this.localHostName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + manager = connectServer(this.managerHostName, this.managerPort); + sendLocalHostName(); + } + + protected void sendLocalHostName() { + // TopologyManager に自分のホストネームを送信して、起動を伝える + ByteBuffer local = ByteBuffer.wrap((localHostName + ":" + localPort).getBytes()); + manager.out(TupleId.MANAGE.id, local); + } + + protected PSXLinda connectServer(String hostName, int port) { + PSXLinda linda = null; + boolean connectPSX = true; + while (connectPSX) { + try { + linda = ml.open(hostName, port); + connectPSX = false; + } catch (IOException e) { + try { + Thread.sleep(40); + } catch (InterruptedException e1) { + } + } + } + Debug.print("Connect to " + hostName); + return linda; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/Routing.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,17 @@ +package fdl.test.debug2; + +import java.util.LinkedList; + +import fdl.*; + +public class Routing { + PSXLinda linda; + int dstId; + LinkedList<Integer> route; + + public Routing(PSXLinda linda, int dstId) { + this.linda = linda; + this.dstId = dstId; + this.route = new LinkedList<Integer>(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/RoutingCallback.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,141 @@ +package fdl.test.debug2; + +import java.nio.ByteBuffer; +import java.util.Date; + +import fdl.PSXCallback; +import fdl.PSXReply; + +class RoutingCallback implements PSXCallback { + TupleId tid; + Routing routing; + NodeProperty np; + TreeProperty tp; + DebugProperty dp; + + public RoutingCallback(TupleId tid, Routing routing, NodeProperty np, TreeProperty tp, DebugProperty dp) { + this.tid = tid; + this.routing = routing; + this.np = np; + this.tp = tp; + this.dp = dp; + } + + public void callback(ByteBuffer reply) { + Debug.print("get: " + new String(reply.array())); + String str = new String(reply.array()); + String[] commands= str.split(","); + switch (tid) { + case TREETOP: + case TREELEFT: + case TREERIGHT: + np.ml.in(TupleId.BODY.id); + np.ml.out(TupleId.BODY.id, reply); + Debug.print("Update body: " + new String(reply.array())); + switch (tid) { + case TREETOP: + Routing r = np.nodes.get(tid); + if (r.route.isEmpty()) { + // TREE末端の処理 + r.linda.out(r.dstId, reply); // 送ってきた方に送り返す + np.ml.in(tid.id, this); + Debug.print("UP! " + np.nodeId); + return; + } + break; + default: + switch (tid) { + case TREELEFT: + tp.leftFlag = true; + break; + case TREERIGHT: + tp.rightFlag = true; + break; + } + if (tp.leftFlag && tp.rightFlag) { + Debug.print("UP"); + if (np.nodeId == 0) { + if (tp.treeCounter++ < tp.treeLoopNum) { + // int num = Integer.parseInt(new String(reply.array())); + tp.startTree(ByteBuffer.wrap((""+tp.treeCounter).getBytes())); + } else { + tp.endTime = new Date(); + Double resultTime = new Double(((tp.endTime.getTime() - tp.startTime.getTime()) / (double)tp.treeLoopNum)); + ByteBuffer data = ByteBuffer.wrap(resultTime.toString().getBytes()); + np.manager.out(TupleId.START.id, data); + Debug.print("Finish Tree"); + } + } else { + Routing r1 = np.nodes.get(TupleId.TREETOP); + r1.linda.out(r1.dstId, reply); + } + } + np.ml.in(tid.id, this); + return; + } + break; + case RINGLEFT: + case RINGRIGHT: + if (str.equals("shutdown")) { + Debug.print("get shutdown command id: " + np.nodeId); + if (np.nodeId != 0) { + Routing r = null; + r = np.nodes.get(tid.getMirrorId()); + r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes())); + np.ml.fdl.queueExec(); + } else { + Debug.print("shutdown reaches last node!"); + } + np.running = false; + return; + } else if (commands[0].equals("print")) { + Debug.print("Get Debug Message: print"); + int printId = Integer.parseInt(commands[1]); + PSXReply rep = np.ml.rd(printId); + np.ml.sync(1); + str += "<debug id=\"" + + np.nodeId + "\"><host>" + + np.localHostName + ":" + np.localPort + + "</host><data id=\"" + + printId + "\">"; + if (rep.ready()) + str += new String(rep.data.array()); + str += "</data></debug>\n"; + np.ml.in(tid.id, this); + if (np.nodeId == 0) { + // 実験終了 + np.manager.out(TupleId.DEBUG.id, ByteBuffer.wrap(str.getBytes())); + } else { + Routing r = np.nodes.get(tid.getMirrorId()); + r.linda.out(r.dstId, ByteBuffer.wrap(str.getBytes())); + } + return; + } + break; + default: + if (np.nodeId == 0 && tid == TupleId.RINGLEFT) { + dp.relayCounter++; + Debug.print("" + dp.relayCounter + " relay"); + if (dp.relayCounter >= dp.relayNum) { + // 実験終了 + tp.endTime = new Date(); + Double resultTime = new Double(((tp.endTime.getTime() - tp.startTime.getTime()) / (double)dp.relayNum)); + ByteBuffer data = ByteBuffer.wrap(resultTime.toString().getBytes()); + np.manager.out(TupleId.DEBUG.id, data); + np.ml.in(tid.id, this); + return; + } + } + } + for (Integer dstId : routing.route) { + if (dstId == TupleId.TREELEFT.id) + tp.leftFlag = false; + else if (dstId == TupleId.TREERIGHT.id) + tp.rightFlag = false; + Routing r = np.nodes.get(TupleId.getTupleIdFromId(dstId)); + r.linda.out(r.dstId, reply); + } + np.ml.in(tid.id, this); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/RoutingXMLBuilder.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,36 @@ +package fdl.test.debug2; + +import java.util.*; + +import org.w3c.dom.Element; + +public class RoutingXMLBuilder extends XMLBuilder { + private Element routing; + private HashMap<Integer, Element> routingTable; + + public RoutingXMLBuilder() { + super(); + routing = document.createElement("routing"); + document.appendChild(routing); + routingTable = new HashMap<Integer, Element>(); + } + + private Element createRoutingTable(TupleId src) { + Integer tupleId = new Integer(src.id); + Element source = document.createElement("source"); + source.setAttribute("id", tupleId.toString()); + routing.appendChild(source); + routingTable.put(tupleId, source); + return source; + } + + public void appendRoutingTable(TupleId src, TupleId dst) { + Element source = routingTable.get(new Integer(src.id)); + if (source == null) + source = createRoutingTable(src); + Element dest = document.createElement("dest"); + dest.setAttribute("id", new Integer(dst.id).toString()); + source.appendChild(dest); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/StartCallback.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,27 @@ +package fdl.test.debug2; + +import java.nio.ByteBuffer; +import java.util.Date; + +import fdl.PSXCallback; + +class StartCallback implements PSXCallback { + NodeProperty np; + TreeProperty tp; + + public StartCallback(NodeProperty np, TreeProperty tp) { + this.np = np; + this.tp = tp; + } + + public void callback(ByteBuffer reply) { + Debug.print("Start Tree"); + tp.startTime = new Date(); + tp = new TreeProperty(np); + tp.startTree(reply); + np.ml.in(TupleId.BODY.id); + np.ml.out(TupleId.BODY.id, reply); + np.ml.in(TupleId.START.id, this); + } + +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/TreeProperty.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,30 @@ +package fdl.test.debug2; + +import java.nio.ByteBuffer; +import java.util.Date; + +public class TreeProperty { + public boolean rightFlag = true, leftFlag = true; + public int treeCounter = 0, treeLoopNum = 100; + public Date startTime, endTime; + public NodeProperty np; + + public TreeProperty(NodeProperty np) { + this.np = np; + } + + public void startTree(ByteBuffer reply) { + Routing r; + // 子があるならば、子にタプルを伝搬 + if (np.nodes.containsKey(TupleId.TREERIGHT)) { + r = np.nodes.get(TupleId.TREERIGHT); + r.linda.out(r.dstId, reply); + rightFlag = false; + } + if (np.nodes.containsKey(TupleId.TREELEFT)) { + r = np.nodes.get(TupleId.TREELEFT); + r.linda.out(r.dstId, reply); + leftFlag = false; + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/TupleId.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,41 @@ +package fdl.test.debug2; + +import java.util.HashMap; + +public enum TupleId { + BODY (100), + START (BODY.id+1), + FINISH (BODY.id+2), + MANAGE (60000), + TREETOP (MANAGE.id+1), + TREELEFT (MANAGE.id+2) { public TupleId getMirrorId() { return TREERIGHT; } }, + TREERIGHT (MANAGE.id+3) { public TupleId getMirrorId() { return TREELEFT; } }, + DEBUG (61000), + RINGLEFT (DEBUG.id+1) { public TupleId getMirrorId() { return RINGRIGHT; } }, + RINGRIGHT (DEBUG.id+2) { public TupleId getMirrorId() { return RINGLEFT; } }, + DEBUGSTART (DEBUG.id+3); + + public int id; + public static HashMap<Integer, TupleId> hash = new HashMap<Integer, TupleId>(); + + private TupleId(int id) { + this.id = id; + } + public TupleId getMirrorId() { + return this; + } + public static TupleId getTupleIdFromId(int id) { +// for (TupleId tid : TupleId.values()) { +// if (tid.id == id) { +// return tid; +// } +// } + return hash.get(id); + } + static { + for (TupleId tid : TupleId.values()) { + hash.put(tid.id, tid); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fdl/test/debug2/XMLBuilder.java Thu Apr 22 16:13:03 2010 +0900 @@ -0,0 +1,40 @@ +package fdl.test.debug2; + +import java.io.*; +import javax.xml.parsers.*; +import javax.xml.transform.*; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import org.w3c.dom.*; + +public class XMLBuilder { + protected Document document; + protected Transformer transformer; + public XMLBuilder() { + DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder docBuilder = null; + try { + docBuilder = dbFactory.newDocumentBuilder(); + } catch (ParserConfigurationException e) { + e.printStackTrace(); + } + document = docBuilder.newDocument(); + TransformerFactory tFactory = TransformerFactory.newInstance(); + try { + transformer = tFactory.newTransformer(); + transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + } catch (TransformerConfigurationException e) { + e.printStackTrace(); + } + } + + public String createXML() { + StringWriter sw = new StringWriter(); + try { + transformer.transform(new DOMSource(document), new StreamResult(sw)); + } catch (TransformerException e) { + e.printStackTrace(); + } + return sw.toString(); + } +}