Mercurial > hg > FederatedLinda
changeset 87:8931a3e90c2a
ring implements
author | one |
---|---|
date | Thu, 11 Feb 2010 12:05:49 +0900 |
parents | c0591636a71a |
children | 5d1189e9e420 |
files | src/fdl/MetaReply.java src/fdl/test/debug/ConfigurationManagerEngine.java src/fdl/test/debug/MetaProtocolEngine.java |
diffstat | 3 files changed, 83 insertions(+), 22 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/MetaReply.java Tue Feb 09 06:14:52 2010 +0900 +++ b/src/fdl/MetaReply.java Thu Feb 11 12:05:49 2010 +0900 @@ -52,7 +52,7 @@ ts.Out(null, command, data); return true; case PSX.PSX_UPDATE: - // not implemented + // TODO: not implemented break; } return mode==PSX.PSX_ANSWER;
--- a/src/fdl/test/debug/ConfigurationManagerEngine.java Tue Feb 09 06:14:52 2010 +0900 +++ b/src/fdl/test/debug/ConfigurationManagerEngine.java Thu Feb 11 12:05:49 2010 +0900 @@ -9,6 +9,10 @@ public static final int DEFAULTPORT = 10000; // tuple id + public static final int BODY = 100; + public static final int START = 101; + public static final int FINISH = 102; + public static final int MANAGE = 60000; public static final int DEBUG = 61000; @@ -19,6 +23,8 @@ public static final int RINGLEFT = DEBUG + 1; public static final int RINGRIGHT = DEBUG + 2; + public static final int DEBUGSTART = DEBUG + 1000; + private int nodeNum = 0; private MetaLinda ml; private NodeInfo[] nodes; @@ -67,6 +73,10 @@ ml.in(MANAGE, this); } else { print("All link configured!"); + // 実験開始を通知 + nodes[0].linda.out(START, ByteBuffer.wrap("test".getBytes())); + // DebugRing 開始を通知 + nodes[0].linda.out(DEBUGSTART, ByteBuffer.wrap("relay,10,1024".getBytes())); } } @@ -112,11 +122,6 @@ nodes[i].debugConnectionXML = debug.createXML(); nodes[i].linda.out(DEBUG, ByteBuffer.wrap(nodes[i].debugConnectionXML.getBytes())); print(nodes[i].debugConnectionXML); -// try { -// nodes[i].linda.sync(1); -// } catch (IOException e) { -// e.printStackTrace(); -// } } } @@ -143,11 +148,6 @@ nodes[i].debugRoutingXML = debug.createXML(); nodes[i].linda.out(DEBUG, ByteBuffer.wrap(nodes[i].debugRoutingXML.getBytes())); print(nodes[i].debugRoutingXML); -// try { -// nodes[i].linda.sync(1); -// } catch (IOException e) { -// e.printStackTrace(); -// } } }
--- a/src/fdl/test/debug/MetaProtocolEngine.java Tue Feb 09 06:14:52 2010 +0900 +++ b/src/fdl/test/debug/MetaProtocolEngine.java Thu Feb 11 12:05:49 2010 +0900 @@ -29,9 +29,23 @@ public class MetaProtocolEngine implements MetaEngine { // Fields public static final int DEFAULTPORT = 10000; + + public static final int BODY = 100; + public static final int START = 101; + public static final int FINISH = 102; + public static final int MANAGE = 60000; public static final int DEBUG = 61000; + public static final int TREETOP = MANAGE + 1; + public static final int TREELEFT = MANAGE + 2; + public static final int TREERIGHT = MANAGE + 3; + + public static final int RINGLEFT = DEBUG + 1; + public static final int RINGRIGHT = DEBUG + 2; + + public static final int DEBUGSTART = DEBUG + 1000; + private MetaLinda ml; private String localHostName; private int localPort; @@ -44,6 +58,8 @@ private int nodeId; private HashMap<Integer, Routing> nodes; + private int relayNum, relaySize, relayCounter; + // Callback class class AcceptXMLCallback implements PSXCallback { int tid; @@ -83,6 +99,10 @@ Element root = doc.getDocumentElement(); if(root.getTagName().equals("connections")) { nodeId = new Integer(root.getAttribute("id")).intValue(); + if (nodeId == 0) { + ml.in(START, new StartCallback()); + ml.in(DEBUGSTART, new DebugStartCallback()); + } NodeList connections = root.getElementsByTagName("connection"); for (int i = 0; i < connections.getLength(); i++) { Element connection = (Element)connections.item(i); @@ -133,28 +153,74 @@ private class RoutingCallback implements PSXCallback { int tid; Routing routing; + public RoutingCallback(int tid, Routing routing) { this.tid = tid; this.routing = routing; + ml.out(BODY, ByteBuffer.wrap("dummy".getBytes())); } public void callback(ByteBuffer reply) { + if (tid == TREETOP || tid == TREELEFT || tid == TREERIGHT) { + ml.in(BODY); + ml.out(BODY, reply); + print("Update body"); + } else if (nodeId == 0 && tid == RINGLEFT) { + relayCounter++; + print(new Integer(relayCounter).toString() + " relay"); + if (relayCounter >= relayNum) { + ml.in(tid, this); + return; + } + } Iterator<Integer> it = routing.route.iterator(); while (it.hasNext()) { Integer dstId = it.next(); Routing r = nodes.get(dstId); r.linda.out(r.dstId, reply); -// try { -// r.linda.sync(1); -// } catch (IOException e) { -// e.printStackTrace(); -// } ml.in(tid, this); } } } + private class StartCallback implements PSXCallback { + + public void callback(ByteBuffer reply) { + Routing r; + + // 子があるならば、子にタプルを伝搬 + if (nodes.containsKey(new Integer(TREERIGHT))) { + r = nodes.get(new Integer(TREERIGHT)); + r.linda.out(r.dstId, reply); + } + if (nodes.containsKey(new Integer(TREELEFT))) { + r = nodes.get(new Integer(TREELEFT)); + r.linda.out(r.dstId, reply); + } + ml.in(START, this); + } + + } + + private class DebugStartCallback implements PSXCallback { + public void callback(ByteBuffer reply) { + String[] commands = new String(reply.array()).split(","); + String command = commands[0]; + if (command.equals("relay")) { + relayNum = new Integer(commands[1]).intValue(); + relaySize = new Integer(commands[2]).intValue(); + relayCounter = 0; + + print("relay num=" + new Integer(relayNum).toString() + " size=" + new Integer(relaySize).toString()); + + Routing r = nodes.get(new Integer(RINGRIGHT)); + r.linda.out(r.dstId, ByteBuffer.wrap(new byte[relaySize])); + ml.in(DEBUGSTART, this); + } + } + } + // Constructor public MetaProtocolEngine(int port, MetaLinda ml, String managerHostName, int managerPort) { this.ml = ml; @@ -167,7 +233,7 @@ } catch (UnknownHostException e) { e.printStackTrace(); } - manager = connectServer(managerHostName, managerPort); + manager = connectServer(this.managerHostName, this.managerPort); sendLocalHostName(); } @@ -188,11 +254,6 @@ ByteBuffer local; local = ByteBuffer.wrap((localHostName + ":" + new Integer(localPort).toString()).getBytes()); manager.out(MANAGE, local); -// try { -// manager.sync(1); -// } catch (IOException e) { -// e.printStackTrace(); -// } } protected PSXLinda connectServer(String hostName, int port) {