Mercurial > hg > FederatedLinda
view src/fdl/test/topology/TopologyManagerEngine.java @ 93:a1d796c0e975 fuchita
Wait Read Tester
author | one |
---|---|
date | Tue, 25 May 2010 22:57:54 +0900 |
parents | 04bd4ae97e7c |
children | 0ea086f0e96f |
line wrap: on
line source
package fdl.test.topology; import java.io.IOException; import java.nio.ByteBuffer; import fdl.MetaEngine; import fdl.MetaLinda; import fdl.PSXLinda; import fdl.PSXReply; /** * TopologyManagerEngine * * @author Kazuki Akamine * * FDLindaNode の Topology を管理する Server の MetaEngine * TopologyManager の具体的な実装 * */ public class TopologyManagerEngine implements MetaEngine { protected static String lastToken = "null"; protected static int port = 10000; protected static int manageId = 60000; protected PSXLinda manager; protected MetaLinda fdlMeta; protected int nodeNum; // 後々は、nodelist から自動で取得したい protected NodeManager[] nodes; // Constructor public TopologyManagerEngine(MetaLinda ml, int nodeNum) { this.fdlMeta = ml; this.nodeNum = nodeNum; this.nodes = new NodeManager[this.nodeNum]; for (int i = 0; i < nodes.length; i++) { this.nodes[i] = new NodeManager(fdlMeta, port, manageId); } } public void mainLoop() { manager = fdlMeta; makeTopology(); } protected void makeTopology() { makeConnection(); acceptNewNode(); sendLastToken(); checkConnection(); } // これを継承して、 Topology を形成 // 最終的には外部XMLを読み込んで接続するようにする protected void makeConnection() { // 接続を定義 nodes[0].addConnection(nodes[1]); nodes[1].addConnection(nodes[0]); } protected void acceptNewNode() { // 起動状況を確認しつつ、接続命令を送信 // nodes の数だけ新規参入 node を待つ。 for (int i = 0; i < nodes.length; i++) { PSXReply reply; reply = manager.in(manageId); do { try { manager.sync(1); } catch (IOException e) { e.printStackTrace(); } } while (!reply.ready()); ByteBuffer data = reply.getData(); String hostName = new String(data.array()); System.out.println("[DEBUG] GetNodeName: " + hostName); nodes[i].startUp(hostName); } } protected void sendLastToken() { ByteBuffer lastTokenBB = ByteBuffer.wrap(lastToken.getBytes()); for (int i = 0; i < nodes.length; i++) { // 参加ノードに実験開始を告知する ("null"を送る) nodes[i].linda.out(manageId, lastTokenBB); System.out.println("[DEBUG] SendMsg: " + lastToken + " to " + nodes[i].hostName); try { nodes[i].linda.sync(1); } catch (IOException e) { e.printStackTrace(); } } } protected void checkConnection() { for (int i = 0; i < nodes.length; i++) { PSXReply reply; reply = manager.in(manageId); do { try { manager.sync(1); } catch (IOException e) { e.printStackTrace(); } } while (!reply.ready()); ByteBuffer data = reply.getData(); String ack = new String(data.array()); System.out.println("[DEBUG] ConnectedHost: " + ack); } System.out.println("[DEBUG] AllConnected"); } }