view src/fdl/test/topology/TopologyManagerEngine.java @ 97:0ea086f0e96f fuchita

main loop modification, for easy meta engine addition. add comments.
author one
date Wed, 26 May 2010 10:49:50 +0900
parents 04bd4ae97e7c
children
line wrap: on
line source

package fdl.test.topology;

import java.io.IOException;
import java.nio.ByteBuffer;

import fdl.MetaEngine;
import fdl.MetaLinda;
import fdl.PSXLinda;
import fdl.PSXReply;

/**
* TopologyManagerEngine
*
* @author Kazuki Akamine
*
* FDLindaNode の Topology を管理する Server の MetaEngine
* TopologyManager の具体的な実装
*  
*/

public class TopologyManagerEngine implements MetaEngine {
	protected static String lastToken = "null";
	protected static int port = 10000;
	protected static int manageId = 60000;
	protected PSXLinda manager;
	protected MetaLinda fdlMeta;
	protected int nodeNum; // 後々は、nodelist から自動で取得したい
	protected NodeManager[] nodes;
		
	// Constructor
	public TopologyManagerEngine(MetaLinda ml, int nodeNum) {
		this.fdlMeta = ml;
		this.nodeNum = nodeNum;
		this.nodes = new NodeManager[this.nodeNum];
		for (int i = 0; i < nodes.length; i++) {
			this.nodes[i] = new NodeManager(fdlMeta, port, manageId);
		}
	}
	
	public void mainLoop(MetaLinda ml) {}

	public void mainLoop() {
		manager = fdlMeta;
		makeTopology();
	}
	
	protected void makeTopology() {
		makeConnection();
		acceptNewNode();
		sendLastToken();
		checkConnection();
	}
	
	// これを継承して、 Topology を形成
	// 最終的には外部XMLを読み込んで接続するようにする
	protected void makeConnection() {
		// 接続を定義
		nodes[0].addConnection(nodes[1]);
		nodes[1].addConnection(nodes[0]);
	}
	
	protected void acceptNewNode() {
		// 起動状況を確認しつつ、接続命令を送信
		// nodes の数だけ新規参入 node を待つ。
		for (int i = 0; i < nodes.length; i++) {
			PSXReply reply;
			reply = manager.in(manageId);
			do {
				try {
					manager.sync(1);
				} catch (IOException e) {
					e.printStackTrace();
				}
			} while (!reply.ready());
			ByteBuffer data = reply.getData();
			String hostName = new String(data.array());
			System.out.println("[DEBUG] GetNodeName: " + hostName);
			nodes[i].startUp(hostName);
		}	
	}
		
	protected void sendLastToken() {
		ByteBuffer lastTokenBB = ByteBuffer.wrap(lastToken.getBytes());
		for (int i = 0; i < nodes.length; i++) {
			// 参加ノードに実験開始を告知する ("null"を送る)
			nodes[i].linda.out(manageId, lastTokenBB);
			System.out.println("[DEBUG] SendMsg: " + lastToken + " to " + nodes[i].hostName);

			try {
				nodes[i].linda.sync(1);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	protected void checkConnection() {
		for (int i = 0; i < nodes.length; i++) {
			PSXReply reply;
			reply = manager.in(manageId);
			do {
				try {
					manager.sync(1);
				} catch (IOException e) {
					e.printStackTrace();
				}
			} while (!reply.ready());
			ByteBuffer data = reply.getData();
			String ack = new String(data.array());
			System.out.println("[DEBUG] ConnectedHost: " + ack);
		}
		System.out.println("[DEBUG] AllConnected");
	}
	
}