view src/fdl/test/debug/MetaProtocolEngine.java @ 91:4df1d50df52a

Ring: fdl.test.debug
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 16 Feb 2010 03:58:06 +0900
parents 9cdc24bae625
children 0ea086f0e96f
line wrap: on
line source

package fdl.test.debug;

import java.io.*;
import java.net.InetAddress;
import java.net.UnknownHostException;

import java.nio.ByteBuffer;
import java.util.*;

import javax.xml.parsers.*;

import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

import fdl.*;

/**
* MetaProtocolEngine
*
* @author Kazuki Akamine
* 
* 接続する機能までを実装した MetaEngine
* これを継承して、具体的な処理を書く
*  
*/

public class MetaProtocolEngine implements MetaEngine {
	// Fields
	public static final int DEFAULTPORT = 10000;
	
	public static final int BODY = 100;
	public static final int START = 101;
	public static final int FINISH = 102;
	
	public static final int MANAGE = 60000;
	public static final int DEBUG = 61000;
	
	public static final int TREETOP = MANAGE + 1;
	public static final int TREELEFT = MANAGE + 2;
	public static final int TREERIGHT = MANAGE + 3;

	public static final int RINGLEFT = DEBUG + 1;
	public static final int RINGRIGHT = DEBUG + 2;
	
	public static final int DEBUGSTART = DEBUG + 1000;
	
	private MetaLinda ml;
	private String localHostName;
	private int localPort;
	private PSXLinda manager;
	private String managerHostName;
	private int managerPort = DEFAULTPORT;
	private boolean running = true;
	private boolean connected = false;
	private boolean debugConnected = false;
	private int nodeId;
	private HashMap<Integer, Routing> nodes;
	
	private int relayNum, relaySize, relayCounter;
	
	private Date startTime, endTime;
	
	// Callback class
	class AcceptXMLCallback implements PSXCallback {
		int tid;
		
		private DocumentBuilderFactory dbFactory = null;
		private DocumentBuilder docBuilder = null;
		
		public AcceptXMLCallback(int tid) {
			this.tid = tid;
			dbFactory = DocumentBuilderFactory.newInstance();
			try {
				docBuilder = dbFactory.newDocumentBuilder();
			} catch (ParserConfigurationException e) {
				e.printStackTrace();
			}

		}
		public void callback(ByteBuffer reply) {
			String xml = new String(reply.array());
			print(xml);
			parseXML(xml);
			
			ml.in(tid, this);
		}
		@SuppressWarnings("deprecation")
		protected void parseXML(String xml) {
			Document doc = null;
			try {
				doc = docBuilder.parse(new StringBufferInputStream(xml));
			} catch (SAXException e) {
				e.printStackTrace();
			} catch (IOException e) {
				e.printStackTrace();
			}
			
			Element root = doc.getDocumentElement();
			if(root.getTagName().equals("connections")) {
				nodeId = Integer.parseInt(root.getAttribute("id"));
				if (nodeId == 0) {
					ml.in(START, new StartCallback());
					ml.in(DEBUGSTART, new DebugStartCallback());
				}
				NodeList connections = root.getElementsByTagName("connection");
				for (int i = 0; i < connections.getLength(); i++) {
					Element connection = (Element)connections.item(i);
					Element host = (Element)connection.getElementsByTagName("host").item(0);
					Element port = (Element)connection.getElementsByTagName("port").item(0);
					Element t = (Element)connection.getElementsByTagName("tid").item(0);
					int srcId = Integer.parseInt(connection.getAttribute("id"));
					String dstHostName = host.getTextContent();
					int dstPort = Integer.parseInt(port.getAttribute("id"));
					int dstId = Integer.parseInt(t.getAttribute("id"));
					try {
						PSXLindaImpl linda = (PSXLindaImpl) ml.open(dstHostName, dstPort);
						Routing r = new Routing(linda, dstId);
						nodes.put(srcId, r);
						ml.in(srcId, new RoutingCallback(srcId, r));
					} catch (IOException e) {
						e.printStackTrace();
					}
				}
			} else if (root.getTagName().equals("routing")) {
				print("Routing xml received!");

				NodeList routing = root.getElementsByTagName("source");
				for (int i = 0; i < routing.getLength(); i++) {
					Element src = (Element) routing.item(i);
					Integer srcId = Integer.parseInt(src.getAttribute("id"));
					Routing r = nodes.get(srcId);
					NodeList dest = src.getElementsByTagName("dest");
					for (int j = 0; j < dest.getLength(); j++) {
						Element dst = (Element) dest.item(j);
						r.route.add(Integer.parseInt(dst.getAttribute("id")));
					}
				}
				
			}
			if (tid == MANAGE) connected = true;
			else if (tid == DEBUG) debugConnected = true;
			if (connected && debugConnected) {
				sendLocalHostName();
				print("Send local host name");
				connected = debugConnected = false;
			}
		}
		
	} 

	private class RoutingCallback implements PSXCallback {
		int tid;
		Routing routing;
		
		public RoutingCallback(int tid, Routing routing) {
			this.tid = tid;
			this.routing = routing;
			ml.out(BODY, ByteBuffer.wrap("dummy".getBytes()));
		} 
		
		public void callback(ByteBuffer reply) {
			String str = new String(reply.array());
			print("get message");
			if (tid == TREETOP || tid == TREELEFT || tid == TREERIGHT) {
				ml.in(BODY);
				ml.out(BODY, reply);
				print("Update body");
			} else if (str.equals("shutdown") && (tid == RINGLEFT || tid == RINGRIGHT)) {
				print("get shutdown command id: " + nodeId);
				if (nodeId != 0) {
					Routing r = null;
					if (tid == RINGLEFT) { 
						r = nodes.get(new Integer(RINGRIGHT));
					} else if (tid == RINGRIGHT) { 
						r = nodes.get(new Integer(RINGLEFT));
					}
					r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes()));
					print("out");
					ml.fdl.queueExec();
					print("sync");
				} else {
					print("shutdown reaches last node!");
				}
				running = false;
				return;
			} else if (nodeId == 0 && tid == RINGLEFT) {
				relayCounter++;
				print("" + relayCounter + " relay");
				if (relayCounter >= relayNum) {
					// 実験終了
					endTime = new Date();
					Double resultTime = new Double(((endTime.getTime() - startTime.getTime()) / (double)relayNum)); 
					ByteBuffer data = ByteBuffer.wrap(resultTime.toString().getBytes());
					manager.out(MANAGE, data);
					ml.in(tid, this);
					return;
				}
			}
				
			Iterator<Integer> it = routing.route.iterator();
			while (it.hasNext()) {
				Integer dstId = it.next();
				Routing r = nodes.get(dstId);
				r.linda.out(r.dstId, reply);
				ml.in(tid, this);
			}
		}
		
	}
	
	private class StartCallback implements PSXCallback {
		public void callback(ByteBuffer reply) {
			Routing r;
			
			// 子があるならば、子にタプルを伝搬
			if (nodes.containsKey(TREERIGHT)) {
				r = nodes.get(TREERIGHT);
				r.linda.out(r.dstId, reply);
			}
			if (nodes.containsKey(TREELEFT)) {
				r = nodes.get(TREELEFT);
				r.linda.out(r.dstId, reply);
			}
			ml.in(START, this);
		}
		
	}
	
	private class DebugStartCallback implements PSXCallback {
		public void callback(ByteBuffer reply) {
			String[] commands = new String(reply.array()).split(",");
			String command = commands[0];
			if (command.equals("relay")) {
				relayNum = Integer.parseInt(commands[1]);
				relaySize = Integer.parseInt(commands[2]);
				relayCounter = 0;
				print("relay num=" + relayNum + " size=" + relaySize);
				Routing r = nodes.get(RINGRIGHT);
				// 実験開始
				startTime = new Date();
				r.linda.out(r.dstId, ByteBuffer.wrap(new byte[relaySize]));
				ml.in(DEBUGSTART, this);
			} else if (command.equals("shutdown")) {
				Routing r = nodes.get(RINGRIGHT);
				r.linda.out(r.dstId, ByteBuffer.wrap("shutdown".getBytes()));
				//running = false;
			}
		}
	}
	
	// Constructor
	public MetaProtocolEngine(int port, MetaLinda ml, String managerHostName, int managerPort) {
		this.ml = ml;
		this.localPort = port;
		this.managerHostName = managerHostName;
		this.managerPort = managerPort;
		this.nodes = new HashMap<Integer, Routing>();
		try {
			this.localHostName = InetAddress.getLocalHost().getHostName();
		} catch (UnknownHostException e) {
			e.printStackTrace();
		}
		manager = connectServer(this.managerHostName, this.managerPort);
		sendLocalHostName();
	}

	public void mainLoop() {
		initPoller();
		while (running) {
			ml.sync();
		}
		print("Terminated" + nodeId
				+ " replies=" + ml.replies.size()
				+ " qsize=" + ml.fdl.qsize);
	}
	
	protected void initPoller() {
		ml.in(MANAGE, new AcceptXMLCallback(MANAGE));
		ml.in(DEBUG, new AcceptXMLCallback(DEBUG));
	}
	
	protected void sendLocalHostName() {
		// TopologyManager に自分のホストネームを送信して、起動を伝える
		ByteBuffer local = ByteBuffer.wrap((localHostName + ":" + localPort).getBytes());
		manager.out(MANAGE, local);
	}

	protected PSXLinda connectServer(String hostName, int port) {
		PSXLinda linda = null;
		boolean connectPSX = true;
		while (connectPSX) {
			try {
				linda = ml.open(hostName, port);
				connectPSX = false;
			} catch (IOException e) {
				try {
					Thread.sleep(40);
				} catch (InterruptedException e1) {
				}
			}
		}
		print("Connect to " + hostName);
		return linda;
	}
	
	void print(String str) {
		System.err.println("[DEBUG] " + localHostName + ": " + str);
		System.err.flush();
	}
	
}