Mercurial > hg > FederatedLinda
changeset 79:805645cf5ec0
topology modifired
author | one |
---|---|
date | Tue, 17 Nov 2009 18:19:39 +0900 |
parents | 4fd2d1094bb9 |
children | 04bd4ae97e7c |
files | src/fdl/test/topology/MetaProtocolEngine.java src/fdl/test/topology/NodeManager.java src/fdl/test/topology/RingTopologyManager.java src/fdl/test/topology/TopologyManagerEngine.java src/fdl/test/topology/ring/FDLindaRingNode.java src/fdl/test/topology/ring/RingMetaProtocolEngine.java |
diffstat | 6 files changed, 110 insertions(+), 46 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/test/topology/MetaProtocolEngine.java Tue Nov 17 18:18:37 2009 +0900 +++ b/src/fdl/test/topology/MetaProtocolEngine.java Tue Nov 17 18:19:39 2009 +0900 @@ -24,14 +24,14 @@ public class MetaProtocolEngine implements MetaEngine { // Fields - private static String lastToken = "null"; - private static int port = 10000; - private static int manageId = 60000; - private PSXLinda manager, psxLocal; - private ArrayList<PSXLinda> psxSendServers; - private MetaLinda fdlMeta; - private String managerHostName; - private String localHostName; + protected static String lastToken = "null"; + protected static int port = 10000; + protected static int manageId = 60000; + protected PSXLinda manager, psxLocal; + protected ArrayList<PSXLinda> psxSendServers; + protected MetaLinda fdlMeta; + protected String managerHostName; + protected String localHostName; // Constructor public MetaProtocolEngine(MetaLinda ml, String managerHostName) { @@ -51,12 +51,12 @@ initSendServer(); } - private void initSendServer() { + protected void initSendServer() { sendLocalHostName(); connectSendServers(); } - private void initTopologyManager() { + protected void initTopologyManager() { // Connect to TopologyManager Server try { manager = fdlMeta.open(managerHostName, port); @@ -67,7 +67,7 @@ } } - private void sendLocalHostName() { + protected void sendLocalHostName() { // TopologyManager に自分のホストネームを送信して、起動を伝える ByteBuffer local; local = ByteBuffer.wrap(localHostName.getBytes()); @@ -79,7 +79,7 @@ } } - private void connectSendServers() { + protected void connectSendServers() { // TopologyManager から、送られてくる ConnectServer の hostName を取得して接続 System.out.println("[DEBUG] MethodCall connectSendServers()"); while (true) { @@ -101,7 +101,7 @@ } } - private void connectSendServer(String hostName) { + protected void connectSendServer(String hostName) { PSXLinda linda; try { linda = fdlMeta.open(hostName, port);
--- a/src/fdl/test/topology/NodeManager.java Tue Nov 17 18:18:37 2009 +0900 +++ b/src/fdl/test/topology/NodeManager.java Tue Nov 17 18:19:39 2009 +0900 @@ -15,7 +15,7 @@ private int manageId; private ArrayList<NodeManager> waitingNodes; private ArrayList<NodeManager> waitedNodes; - private ArrayList<NodeManager> sendNodes; +// private ArrayList<NodeManager> sendNodes; public NodeManager(MetaLinda ml, int port, int manageId) { this.port = port; @@ -24,7 +24,7 @@ hostName = null; waitingNodes = new ArrayList<NodeManager>(); waitedNodes = new ArrayList<NodeManager>(); - sendNodes = new ArrayList<NodeManager>(); +// sendNodes = new ArrayList<NodeManager>(); } public void addConnection(NodeManager node) { @@ -35,7 +35,7 @@ public void finishConnection(NodeManager node) { waitingNodes.remove(node); node.waitedNodes.remove(this); - sendNodes.add(node); +// sendNodes.add(node); } public void startUp(String hostName) {
--- a/src/fdl/test/topology/RingTopologyManager.java Tue Nov 17 18:18:37 2009 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,15 +0,0 @@ -package fdl.test.topology; - -import java.io.IOException; - -public class RingTopologyManager extends TopologyManager { - public RingTopologyManager(int port) throws IOException { - //super(port); - } - - void makeTopology() { - - - } - -}
--- a/src/fdl/test/topology/TopologyManagerEngine.java Tue Nov 17 18:18:37 2009 +0900 +++ b/src/fdl/test/topology/TopologyManagerEngine.java Tue Nov 17 18:19:39 2009 +0900 @@ -19,13 +19,13 @@ */ public class TopologyManagerEngine implements MetaEngine { - private static String lastToken = "null"; - private static int port = 10000; - private static int manageId = 60000; - private PSXLinda manager; - private MetaLinda fdlMeta; - private int nodeNum; - private NodeManager[] nodes; + protected static String lastToken = "null"; + protected static int port = 10000; + protected static int manageId = 60000; + protected PSXLinda manager; + protected MetaLinda fdlMeta; + protected int nodeNum; // 後々は、nodelist から自動で取得したい + protected NodeManager[] nodes; // Constructor public TopologyManagerEngine(MetaLinda ml, int nodeNum) { @@ -42,11 +42,23 @@ makeTopology(); } - private void makeTopology() { + protected void makeTopology() { + makeConnection(); + acceptNewNode(); + sendLastToken(); + } + + // これを継承して、 Topology を形成 + // 最終的には外部XMLを読み込んで接続するようにする + protected void makeConnection() { // 接続を定義 nodes[0].addConnection(nodes[1]); nodes[1].addConnection(nodes[0]); + } + + protected void acceptNewNode() { // 起動状況を確認しつつ、接続命令を送信 + // nodes の数だけ新規参入 node を待つ。 for (int i = 0; i < nodes.length; i++) { PSXReply reply; reply = manager.in(manageId); @@ -61,10 +73,13 @@ String hostName = new String(data.array()); System.out.println("[DEBUG] GetNodeName: " + hostName); nodes[i].startUp(hostName); - } + } + } + + protected void sendLastToken() { ByteBuffer lastTokenBB = ByteBuffer.wrap(lastToken.getBytes()); for (int i = 0; i < nodes.length; i++) { - // TODO: 参加ノードに実験開始を告知する ("null"を送る) + // 参加ノードに実験開始を告知する ("null"を送る) nodes[i].linda.out(manageId, lastTokenBB); System.out.println("[DEBUG] SendMsg: " + lastToken + " to " + nodes[i].hostName); @@ -75,5 +90,5 @@ } } } - + }
--- a/src/fdl/test/topology/ring/FDLindaRingNode.java Tue Nov 17 18:18:37 2009 +0900 +++ b/src/fdl/test/topology/ring/FDLindaRingNode.java Tue Nov 17 18:19:39 2009 +0900 @@ -18,6 +18,8 @@ // Fields private static int localPort = 10000; private static String managerHostName; + private static String relayNumString; + private static int relayNum; private final static String usageString = "Usage: FDLindaRingNode -manager SERVERNAME"; @@ -26,13 +28,17 @@ for (int i = 0; i < args.length; i++) { if ("-manager".equals(args[i])) { managerHostName = args[++i]; + } else if ("-relay".equals(args[i])) { + relayNumString = args[++i]; } else { System.err.println(usageString); } } + relayNum = Integer.parseInt(relayNumString); + try { FDLindaNode node = new FDLindaNode(localPort); - MetaEngine me = new MetaProtocolEngine(node.getMetaLinda(), managerHostName); + MetaEngine me = new RingMetaProtocolEngine(node.getMetaLinda(), managerHostName, relayNum); node.setMetaEngine(me); node.mainLoop(); } catch (IOException e) {
--- a/src/fdl/test/topology/ring/RingMetaProtocolEngine.java Tue Nov 17 18:18:37 2009 +0900 +++ b/src/fdl/test/topology/ring/RingMetaProtocolEngine.java Tue Nov 17 18:19:39 2009 +0900 @@ -1,10 +1,15 @@ package fdl.test.topology.ring; +import java.io.IOException; +import java.nio.ByteBuffer; + import fdl.MetaLinda; +import fdl.PSXLinda; +import fdl.PSXReply; import fdl.test.topology.MetaProtocolEngine; /** -* MetaProtocolEngine +* RingMetaProtocolEngine * * @author Kazuki Akamine * @@ -14,9 +19,62 @@ */ public class RingMetaProtocolEngine extends MetaProtocolEngine { - - public RingMetaProtocolEngine(MetaLinda ml, String managerHostName) { + private int relayNum; + private static int relayId = 10; + + public RingMetaProtocolEngine(MetaLinda ml, String managerHostName, int relayNum) { super(ml, managerHostName); + this.relayNum = relayNum; + } + + @Override public void mainLoop() { + // 接続処理 + super.mainLoop(); + // Ring 実験開始 + relayTuple(relayId); + } + + private void relayTuple(int tupleId) { + while (true) { + ByteBuffer data = receiveToken(tupleId); + if (relayNum == 0) { + String token = new String(data.array()); + if (!token.equals(lastToken)) { + // TODO: 実験結果をManagerに伝えるなどの処理 + System.out.println("[Ring] relay finished: " + token); + // 実験終了を各ノードにリレーで伝える + sendToken(tupleId, ByteBuffer.wrap(lastToken.getBytes())); + } + return; + } + sendToken(tupleId, data); + relayNum--; + } + } + + private ByteBuffer receiveToken(int tupleId) { + PSXReply reply = psxLocal.in(tupleId); + do { + try { + psxLocal.sync(1); + } catch (IOException e) { + e.printStackTrace(); + } + } while (!reply.ready()); + ByteBuffer data = reply.getData(); + return data; + } + + private void sendToken(int tupleId, ByteBuffer data) { + for (int i = 0; i < psxSendServers.size(); i++) { + PSXLinda linda = psxSendServers.get(i); + linda.out(tupleId, data); + try { + linda.sync(1); + } catch (IOException e) { + e.printStackTrace(); + } + } } }