view rep/SessionManager.java @ 491:5945266c970d

before unMergedCmds fix , deadlockTimer API
author one
date Sat, 23 Oct 2010 12:34:46 +0900
parents cc262a519b8a
children ebfa3b05a8dd
line wrap: on
line source


package rep;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.LinkedList;

import org.xml.sax.SAXException;



import rep.channel.REPLogger;
import rep.channel.REPSocketChannel;
import rep.gui.CloseButtonEvent;
import rep.gui.DoGUIUpdate;
import rep.gui.SelectButtonEvent;
import rep.gui.SessionManagerEvent;
import rep.gui.SessionManagerEventListener;
import rep.gui.SessionManagerGUI;
import rep.gui.SessionManagerGUIimpl;
import rep.handler.Editor;
import rep.handler.REPNode;
import rep.handler.FirstConnector;
import rep.handler.Forwarder;
import rep.xml.SessionXMLDecoder;
import rep.xml.SessionXMLEncoder;

/*
	+-------+--------+--------+-------+--------+---------+------+
	| cmd   | session| editor | seqid | lineno | textsiz | text |
	|       | id     | id     |       |        |         |      |
	+-------+--------+--------+-------+--------+---------+------+
	o---------- header section (network order) ----------o
	
	int cmd;		kind of command
	int sid;		session ID : uniqu to editing file
	int eid;		editor ID : owner editor ID = 1。Session に対して unique
	                -1 session manager command
	                -2 merge command
	int seqno;		Sequence number : sequence number はエディタごとに管理
	int lineno;		line number
	int textsize;   textsize : bytesize
	byte[] text;
*/

public class SessionManager extends ServerMainLoop 
		implements SessionManagerEventListener {
	SessionList sessionList = new SessionList();           
	// Known Session Manager List, At most one parent. No parent means master.
	SessionManagerList smList = new SessionManagerList();
	// Known Editor list. Connected Editor has a channel. 
	// Session Manager Channel may have dummy editors.
	public EditorList editorList = new EditorList();
	// Queue limit for debugging purpose.
	static final int packetLimit = 400;

	// globalSessionID = SessionManagerID * MAXID + localSessionID
	private static final int MAXID = 10000;
	SessionXMLDecoder decoder = new SessionXMLDecoder();
	SessionXMLEncoder encoder = new SessionXMLEncoder();
	// SocketChannel for our parent. At most one parent is allowed.
	private REPNode sm_join_channel;
	// Routing table for session and session manager.
	private RoutingTable routingTable = new RoutingTable(this);
	// sync option
	public boolean sync = true;
	// list olny local editor in GUI
	protected boolean listLocalEditorOnly = false;

	static public REPLogger logger = REPLogger.singleton();
	
	public static void main(String[] args) throws InterruptedException, IOException {
		int port =ServerMainLoop.DEFAULT_PORT;
		int port_s = ServerMainLoop.DEFAULT_PORT;
		//System.setProperty("file.encoding", "UTF-8");
		if(args.length > 0){
			if (args.length!=2) {
				logger.writeLog("Usage: sessionManager our_port parent_port");
				return;
			}
			port = Integer.parseInt(args[0]);
			port_s = Integer.parseInt(args[1]);
		}
		SessionManager sm = new SessionManager();
		sm.setReceivePort(port);
		sm.setParentPort(port_s);
		// Ok start main loop
		sm.init(port,new SessionManagerGUIimpl(sm));
	}

	public void init(int port, SessionManagerGUI gui) throws IOException, InterruptedException {
		mainLoop(this, port, gui); 
	}

	/*
	 * After loop detection, we give up session manager join.
	 */
	private void cancel_sm_join() {
		logger.writeLog("Loop detected "+this);
		removeChannel(this, sm_join_channel);
		sm_join_channel=null;
	}


	/**
	 * GUI から、呼ばれて、Session Managerに接続する。
	 * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては
	 * ならない。複数のSession Managerにjoinすることは出来ない。(NATを実装するまでは)。
	 * @param host
	 * @return error massage, return null when no errors.
	 */
	public String connectSessionManager(String host, int port) {
		if (sm_join_channel!=null) return "No Channel of sessionManager, Network Error.";
		if (!sessionList.isEmpty()) return "Cannot connect with sessions.";
		if (!smList.isMaster()) return "Already connected to the master.";
		/*
		 * IPv6 対応では、複数のアドレスを取って、それのすべてに接続を試す必要が
		 * ある。
		 */
		InetSocketAddress addr = new InetSocketAddress(host, port);
		try {
			REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker());
			sessionchannel.connect(addr);
			while(!sessionchannel.finishConnect());
			REPNode sm = new FirstConnector(this,sessionchannel);
			registerChannel(sessionchannel, sm);
			sm_join(sm);
		}catch (IOException e) {
		}
		return null;
	}

	public void connectSessionManager(String host) {
		connectSessionManager(host,parent_port);
	}
	
	/**
	 * channel に SMCMD_SM_JOIN command を送る。
	 * @param channel
	 */
	private void sm_join(REPNode channel){
		sm_join_channel = channel;
		//SM_JOINコマンドを生成。
		REPCommand command = new REPCommand();
		command.setCMD(REP.SMCMD_SM_JOIN);
		command.setEID(-1);   // request Parent SessionManagerID
		command.setSID(-1);   // request SessionManagerID
		
		//hostnameをセット。
		setMyHostName(channel.getLocalHostName());
		
		String string = myHost;
		command.setString(string);
		
		//SM_JOINコマンドを送信。
		channel.send(command);
		// ack を受け取ったら、SessionManagerのListに追加。ここではやらない。
	}
	
	/* 
	 * Select Session from Manager button
	 *    selected editor is joined editor directly connected to this session
	 *    manager.
	 */
	public void selectSession(SelectButtonEvent event) throws IOException {
		int sid = event.getSID();
		Session session = sessionList.get(sid);
		if (session==null) throw new IOException();
		REPNode editor = event.getEditor();
		if (editor.hasSession()) return;
		// assert(getSMID(editor.eid)==smList.sessionManagerID());
		// assert(editor.channel!=null);
		editor.setSID(sid); // mark as selected
		selectSession0(sid, session, editor.getEID(), editor);
	}

	private void selectSession0(int sid, Session session, int eid, REPNode editor) {
		logger.writeLog("Select sid="+sid+" and "+editor);
		if (editor.isDirect()&&editor.getEID()==eid) {
			REPCommand command = new REPCommand();
			command.setSID(sid);
			command.setEID(eid);
			command.setString(session.getName());
			if(session.hasOwner()){
				editor.selectSession(command,session);
			}else {
				forwardSelect(sid, session, eid, editor);
			}
		} else {
			// we don't have this editor, search the editor first.
			REPNode next = routingTable.toSessionManager(getSMID(eid));
			// pass the select command to the next path.
			REPCommand command = new REPCommand();
			command.setCMD(REP.SMCMD_SELECT0);
			command.setSID(sid);
			command.setEID(eid);
			command.setString(editor.getHost());
			next.send(command);
		}
	}

	public void forwardSelect(int sid, Session session, int eid,
			REPNode editor) {
		REPNode next;
		// session searching continue...
		next = routingTable.toSessionManager(getSMID(sid));
		// make a forwarding channel here
		REPNode f = createSessionForwarder(sid, next);
		session.setFirstForwarder(f);
		session.addForwarder(editor);
		// pass the select command to the next path.
		REPCommand command = new REPCommand();
		command.setCMD(REP.SMCMD_SELECT);
		command.setSID(sid);
		command.setEID(eid);
		command.setString(editor.getHost());
		next.send(command);
	}

	private REPNode createSessionForwarder(int sid, REPNode editor) {
		REPNode f = new Forwarder(this,editor.channel);
		f.setEID(makeID(editorList.newEid()));
		// f.setChannel(editor.channel); // incoming channel
		f.setHost(myHost);
		f.setSID(sid);
		return f;
	}

	/*
	 * Create and send UPDATE command.
	 */
	public void sendUpdate(int sid) {
		REPCommand command = makeREPCommandWithSessionList(REP.SMCMD_UPDATE);
		command.setSID(sid);
		command.setEID(REP.SM_EID.id);
		if (isMaster()) {
			command.setCMD(REP.SMCMD_UPDATE_ACK);
			smList.sendToSlaves(command);
		} else {
			smList.sendToMaster(command);
		}
	}

	/*
	 * Create new editor in this sessin manager. A dummy editor
	 * is created also.
	 */
	public REPNode newEditor(REPSocketChannel<REPCommand> channel) {
		int eid =  makeID(editorList.newEid());
		REPNode editor = new Editor(eid, this, channel);
		editorList.add(editor);
		return editor;
	}

	/*
	 * Create new session.
	 */
	public Session newSession(REPNode master) {
		int sid= makeID(sessionList.newSessionID());
		Session session = new Session(sid, master);
		sessionList.put(sid, session);
		return session;
	}

	/*
	 * GUI command interface for close session.
	 */
	public void closeSession(SessionManagerEvent event) {
		Session session = ((CloseButtonEvent) event).getSession();
		session.closeSession();
		sessionList.remove(session);
		manager.updateGUI(this);
	}

	/*
	 * Remove editors which has the cannel.
	 */
	public void remove(REPSocketChannel<REPCommand> channel) {
		int i = 0;
		for(Session s:sessionList.values()) {
			if (s.deleteEditor(channel)) {
				i++;
			}
		}
		assert(i==1);
		// can be other session manager? what should I do?
	}


	public void remove(Editor editor) {
		Session s = sessionList.get(editor.getSID());
		if (s==null) {
			assert(false);
			editorList.remove(editor);
		} else if (editor.isMaster()) {
			removeSession(s);
		} else {
			s.deleteForwarder(editor);
			editorList.remove(editor);
		}
		manager.updateGUI(this);
	}

	private void removeSession(Session s0) {
		s0.remove(this);
		sessionList.remove(s0);
		sendUpdate(s0.getSID());
	}

	public boolean sessionManage(REPNode forwarder, REPCommand command) throws ClosedChannelException,
			IOException {
		switch(command.cmd){
	
		// Session Manager Command
	
		case SMCMD_JOIN:
		{
			// first connection or forwarded command
			routingTable.add(forwarder,getSMID(command.eid));
			if(isMaster()) {
				REPCommand ackCommand = new REPCommand();
				ackCommand.setCMD(REP.SMCMD_JOIN_ACK);
				ackCommand.setEID(command.eid);
				ackCommand.setSID(command.sid);
				ackCommand.string = command.string;
				smList.sendToSlaves(ackCommand);
				registEditor(forwarder,ackCommand);
			} else {
				smList.sendToMaster(command);
			}
			manager.updateGUI(this);
		}
	
		break;
	
		case SMCMD_PUT_ACK:
			if (forwarder.isDirect()) {
				// send put_ack to the editor now.
				command.setCMD(REP.SMCMD_PUT_ACK);
				command.string = command.string;
				command.setEID(command.eid);
				command.setSID(command.sid);
				forwarder.send(command);
			}
		case SMCMD_JOIN_ACK:
			registEditor(forwarder,command);
			manager.updateGUI(this);
			break;
	
		case SMCMD_PUT:
		{
			// first connection or forwarded command
			routingTable.add(forwarder,getSMID(command.eid));
			REPCommand ack = new REPCommand(command); ack.setCMD(REP.SMCMD_PUT_ACK);
 			if(isMaster()) {
 				// Reached to the top of the tree, multicast the ack. 
				smList.sendToSlaves(ack);
				registEditor(forwarder,ack);
	 			if (forwarder.isDirect()) {
	 				// If put editor on the master, no SMCMD_PUT_ACK is
	 				// generated. Send ack to the editor now.
	 				forwarder.send(ack);
	 			}
			} else {
				// Pass this to the master.
				smList.sendToMaster(command);
				// registEditor will be done by SMCMD_PUT_ACK
			}
			manager.updateGUI(this);
	
		}
		break;

		case SMCMD_SELECT0:
			/*
			 * finding joining editor, do not make the path.
			 */
			REPNode editor  = editorList.get(command.eid);
			if (editor==null|| !editor.isDirect()) {
				REPNode next = routingTable.toSessionManager(getSMID(command.eid));
				next.send(command);
				break;
			}
			// we've found the editor, fall thru.
		case SMCMD_SELECT:
		{
			/*
			 * finding active session ring from joined editor. 
			 */
			Session session = sessionList.get(command.sid);
			if (session==null) {
				session = new Session(command.sid, command.string,null);
				sessionList.put(command.sid,session);
			}
			// Do not directly addForwarder(forwarder). It may be
			// shared among sessions.
			REPNode f = createSessionForwarder(command.sid, forwarder);
			session.addForwarder(f); // f.next is set up here.
			if(session.hasOwner()){
				forwarder.selectSession(command,session);
			}else {
				forwardSelect(command.sid, session, command.eid, forwarder);
			}
		}
			break;
		case SMCMD_SELECT_ACK:
		{
			// Sessionが見つかったので、select したeditorに教える。
			Session session = sessionList.get(command.sid);
			searchSelectedEditor(command,session.getForwarder(forwarder.channel)); 
		}
			break;
			
		case SMCMD_SM_JOIN:
		{
			// SM_JOIN中にSMCMD_SM_JOINが来たら、これはループなので、
			///自分のSM_JOINを取り消す。
			if (sm_join_channel!=null) cancel_sm_join();
			// SMCMD_SM_JOIN は、master まで上昇する。
			//    masterでなければ、自分のparentに転送する。
			if(isMaster()) {
				//    master であれば、SessionManager IDを決めて、
				//    自分のsmList に登録
				registSessionManager(forwarder, command);
			} else {
				if (forwarder.sid==-1) {
					// direct link の場合は、識別のために、EIDに直上の
					// smid を入れておく。
					command.setEID(smList.sessionManagerID());
				}
				smList.sendToMaster(command);
			}
		}
		break;

		case SMCMD_SYNC_ACK:
			break;
			
		case SMCMD_SM_JOIN_ACK:				
			send_sm_join_ack(command.eid, command.sid, command);
			break;
	
		case SMCMD_UPDATE:
			sendUpdate(command.sid);
			break;
		case SMCMD_UPDATE_ACK:
			command.setString(mergeUpdate(command));
			// 下に知らせる
			smList.sendToSlaves(command);
			manager.updateGUI(this);
			break;
		default:
			return false;
		}
		return true;
	}

	private void registSessionManager(REPNode forwarder, REPCommand command) {
		REPNode sm;
		int psid = command.eid;
		if (forwarder.sid!=-1) {
		// すでに channelはSessionManager Idを持っていて、
		// direct link ではないので、
		// channel を持たないForwarderとして登録する
			sm = new Forwarder(this,null);
		} else {
			sm = forwarder;
		}
		int sid = smList.addNewSessionManager(sm,command);					
		routingTable.add(forwarder,sid);

		REPCommand sendCommand = makeREPCommandWithSessionList(REP.SMCMD_SM_JOIN_ACK);
		// command.eid==smList.sesionManagerID() の場合は、
		// 待っている自分の下のsessionManagerにsidをassignする必要がある。
		sendCommand.setSID(sid); // new Session manager ID
		// 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた
		// Session manager IDを使う。
		sendCommand.setEID(psid);
		send_sm_join_ack(psid, sid, sendCommand);
	}


	void send_sm_join_ack(int psid, int sid,REPCommand sendCommand) {
		if (psid==smList.sessionManagerID()) {
			// 直下のsessionManagerにIDを割り振る必要がある。
			smList.assignSessionManagerIDtoWaitingSM(sid);
			// ここで smList に一つだけ追加されるので
			// 待っている最初のsm一つにだけ、sm_join_ackが新たに送られる。
		}
		smList.sendToSlaves(sendCommand);
	}

	/*
	 * 指定されたeditorがlocalにあるかどうかを調べる。なければ、他に送る。戻って何回も探すことが
	 * あり得るので、よろしくない。
	 */
	private void searchSelectedEditor(REPCommand command, REPNode editor) {
		for(;editor.isDirect();editor = editor.getNextForwarder()) {
			if (editor.getEID()==command.eid) {
				// select したeditor を見つけた
				editor.joinAck(command,command.sid);
				return;
			}
		}
		// ここにはありませんでした。
		editor.send(command);
	}


	/**
     * UPDATE/UPDATE_ACKにより送られてきたSessionの情報を追加する
	 * @param command
	 * @return
	 * @throws IOException
	 */
	private String mergeUpdate(REPCommand command) throws IOException {
		SessionList receivedSessionList;
		try {
			receivedSessionList = decoder.decode(command.string);
		} catch (SAXException e) {
			throw new IOException();
		}
		// 受け取った情報と自分の情報を混ぜる。
		sessionList.merge(receivedSessionList);
		//XMLを生成。送信コマンドにセット。
		return encoder.sessionListToXML(sessionList);

	}

	/*
	 * id has SessionManager ID part
	 */
	private int makeID(int newid) {
		return newid+smList.sessionManagerID()*MAXID;
	}
	
	int getSMID(int id) {
		return id/MAXID;
	}


	/**
	 * Register Editor to our editorList. No connection is made.
	 * @param forwarder     Editor to be add
	 * @param command
	 */
	public void registEditor(REPNode forwarder,REPCommand command) {
		// make ack for PUT/JOIN. Do not send this to the editor,
		// before select. After select, ack is sent to the editor. 
		REPNode editor;
		if (getSMID(command.eid)==smList.sessionManagerID()) {
			if (forwarder.isDirect()) {
				editor = (Editor)forwarder;
			} else 
				return;
		} else {
			editor = new Editor(manager, command.eid);
		}
		editor.setName(command.string);
		editor.setSID(command.sid);
		if (!editorList.hasEid(command.eid)) {
			editorList.add(editor);
		}
		if (command.cmd==REP.SMCMD_PUT_ACK) {
			Session session = new Session(command.sid, command.string, editor);
			sessionList.put(command.sid, session);
		}
		// we don't join ack to the direct linked editor. We
		// have to wait select command
	}


	private REPCommand makeREPCommandWithSessionList(REP cmd) {
		//SessionListからXMLを生成。
		//joinしてきたSessionManagerに対してACKを送信。
		REPCommand sendCommand = new REPCommand();
		sendCommand.setCMD(cmd);
		sendCommand.setString(encoder.sessionListToXML(sessionList));
		return sendCommand;
	}


	public boolean isMaster() {
		return smList.isMaster();
	}


	public void setSessionManagerID(int sid) {
		smList.setSessionManagerID(sid);		
	}


	public Session getSession(int sid) {
		return sessionList.get(sid);
	}

	public void setParent(REPNode fw) {
		smList.setParent(fw);
	}
	
	public String toString() {
		int myId = 0;
		if (smList!=null) myId = smList.sessionManagerID();
		return "rep.SessionManager-"+myId+"@"+super.toString(); 
	}

	public void addWaitingSessionManager(REPNode fw, REPCommand command) {
		smList.addWaitingSessionManager(fw, command);
	}

	public int getId() {
		return smList.sessionManagerID();
	}

	public void checkWaitingCommandInMerge() {
		for(REPNode e:editorList.values()) {
			e.checkWaitingCommandInMerge();
		}
		
	}

	/**
	 *   Notify status change to our GUI
	 */
	protected void updateGUI(ServerMainLoop serverMainLoop) {
		//リストのコピーをGUIに渡す
		LinkedList<Session> sList = new LinkedList<Session>(sessionList.values());
		LinkedList<REPNode> eList;
		if (listLocalEditorOnly ) { 
			// Do not list an editor or session outside of this session manager to
			// avoid confusion. Actually we can joinSession to an editor outside of
			// this manager, but is not secure to do this.
			eList = new LinkedList<REPNode>();
			for(REPNode e:editorList.values()) {
				if (getSMID(e.eid)==smList.sessionManagerID()) {
					eList.add(e);
				}
			}
		} else {
			eList = new LinkedList<REPNode>(editorList.values());
		}
		//GUIに反映
		Runnable doRun = new DoGUIUpdate(sList, eList, serverMainLoop.gui);
		serverMainLoop.gui.invokeLater(doRun);
	}

	@Override
	public void deadlockDetected() throws IOException {
		logger.writeLog("Deadlock detected");
		for(REPNode e:editorList.values()) {
			logger.writeLog("  "+e+"\n"+e.report());
		}
		throw new IOException();
	}

}