view rep/SessionManager.java @ 363:1a8856580d38

*** empty log message ***
author kono
date Mon, 20 Oct 2008 03:03:28 +0900
parents f0bd158dace6
children c965ef2b5fd6
line wrap: on
line source


package rep;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.xml.sax.SAXException;



import rep.channel.REPLogger;
import rep.channel.REPServerSocketChannel;
import rep.channel.REPSocketChannel;
import rep.handler.PacketSet;
import rep.handler.REPHandler;
import rep.channel.REPSelector;
import rep.xml.SessionXMLDecoder;
import rep.xml.SessionXMLEncoder;
import rep.channel.REPSelectionKey;

/*
	+-------+--------+--------+-------+--------+---------+------+
	| cmd   | session| editor | seqid | lineno | textsiz | text |
	|       | id     | id     |       |        |         |      |
	+-------+--------+--------+-------+--------+---------+------+
	o---------- header section (network order) ----------o
	
	int cmd;		kind of command
	int sid;		session ID : uniqu to editing file
	int eid;		editor ID : owner editor ID = 1。Session に対して unique
	                -1 session manager command
	                -2 merge command
	int seqno;		Sequence number : sequence number はエディタごとに管理
	int lineno;		line number
	int textsize;   textsize : bytesize
	byte[] text;
*/

public class SessionManager implements SessionManagerEventListener{
	static public REPLogger logger = REPLogger.singleton();

	SessionList sessionList;           
	private SessionManagerGUI gui;
	// Main nio.Selector of this server
	private REPSelector<REPCommand> selector;
	// Known Session Manager List, At most one parent. No parent means master.
	SessionManagerList smList;
	// Known Editor list. Connected Editor has a channel. 
	// Session Manager Channel may have dummy editors.
	EditorList editorList;
	// Commands for busy editor are kept in this queue.
	private List<PacketSet> waitingCommandInMerge;
	// Command from gui. Synchronization is required.
	private BlockingQueue<SessionManagerEvent> waitingEventQueue 
		= new LinkedBlockingQueue<SessionManagerEvent>();;
	// host name of this server. One of connecting SocketChannel's hostname
	String myHost;
	// Single threaded write queueu. To avoid dead lock with too many writes.
	private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>();
	private int receive_port;
	private int parent_port;
	static final int DEFAULT_PORT = 8766;
	// Queue limit for debugging purpose.
	private static final int packetLimit = 200;

	// globalSessionID = SessionManagerID * MAXID + localSessionID
	private static final int MAXID = 10000;
	SessionXMLDecoder decoder = new SessionXMLDecoder();
	SessionXMLEncoder encoder = new SessionXMLEncoder();
	// SocketChannel for our parent. At most one parent is allowed.
	private Forwarder sm_join_channel;
	// Routing table for session and session manager.
	private RoutingTable routingTable = new RoutingTable();

	public static void main(String[] args) throws InterruptedException, IOException {
		
		int port = DEFAULT_PORT;
		int port_s = DEFAULT_PORT;
		//System.setProperty("file.encoding", "UTF-8");
		if(args.length > 0){
			if (args.length!=2) {
				logger.writeLog("Usage: sessionManager our_port parent_port");
				return;
			}
			port = Integer.parseInt(args[0]);
			port_s = Integer.parseInt(args[1]);
		}
		SessionManager sm = new SessionManager();
		sm.setReceivePort(port);
		sm.setParentPort(port_s);
		// Ok start main loop
		sm.init(port,new SessionManagerGUIimpl(sm));
	}

	public void setReceivePort(int port) {
		receive_port = port;
	}

	public void init(int port, SessionManagerGUI gui) throws IOException, InterruptedException {
		this.gui = gui;
		init(port);
		mainLoop(); 
	}

	private void init(int port) throws InterruptedException, IOException {
		selector = REPSelector.<REPCommand>create();	
		REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker());
		ssc.configureBlocking(false);       // Selector requires this
		ssc.socket().setReuseAddress(true);	//reuse address 必須
		//getAllByNameで取れた全てのアドレスに対してbindする
		ssc.socket().bind(new InetSocketAddress(port));
		ssc.register(selector, SelectionKey.OP_ACCEPT, 
				new Forwarder(this));

		sessionList = new SessionList();
		smList = new SessionManagerList();
		editorList = new EditorList();
		waitingCommandInMerge = new LinkedList<PacketSet>();
		

	}
	
	/*
	 * The main loop.
	 *     Check incoming events and waiting writes.
	 *     Do select and call select() to check in coming packets.
	 * We wrote everything in one thread, but we can assign
	 * one thread for each communication channel and GUI event.
	 */
	public void mainLoop() throws IOException {
		while(true){
		    checkWaitingCommandInMerge();
			if (checkInputEvent() ||
			    checkWaitingWrite()) { 
				   // try to do fair execution for waiting task
				   if(selector.selectNow() > 0) select();
				   continue;
			}
			// now we can wait for input packet or event
			selector.select();
			select();
		}
	}

	/*
	 * Synchronize GUI event in the main loop.
	 */
	private boolean checkInputEvent() {
		SessionManagerEvent e;
		if((e = waitingEventQueue.poll())!=null){
			e.exec(this);
			return true;
		}
		return false;
	}

	/*
	 * Write a packet during the main loop.
	 */
	private boolean checkWaitingWrite() throws IOException {
		PacketSet p = writeQueue.poll();
		if (p!=null) {
			p.channel.write(p.command);
			return true;
		}
		return false;
	}

	/**
	 * Check waiting command in merge
	 * @return true if there is a processed waiting command
	 * @throws IOException
	 */
	private void checkWaitingCommandInMerge() {
		List<PacketSet> w = waitingCommandInMerge;
		waitingCommandInMerge = new LinkedList<PacketSet>();
		for(PacketSet p: w) {
			Editor e = p.getEditor();
			if(e.isMerging()) { // still merging do nothing
				waitingCommandInMerge.add(p);
			} else {
				try {
					if (sessionManage(e, p.command)) { // we don't need this
						assert false;
						return;
					}
					e.manage(p.command);
				} catch (Exception e1) {
					// should be e.close()?
					close(p.channel);
				}		
			}
		}
	}

	/*
	 * If we have waiting write commands, further sent commands also
	 * wait to avoid out of order packet sending.
	 */
	public boolean hasWaitingCommand(REPSocketChannel<REPCommand>c) {
		for(PacketSet p:waitingCommandInMerge) {
			if (p.channel==c) {
				return true;
			}
		}
		return false;
	}
	
	/*
	 * Close a channel in case of exception or close. 
	 */
	private void close(REPSocketChannel<REPCommand> channel) {
		REPSelectionKey<REPCommand>key = channel.keyFor1(selector);
		REPHandler handler = (REPHandler)key.attachment();
		key.cancel();
		handler.cancel(channel);
		// we have to remove session/enditor
	}


	/*
	 * Do select operation on the Selector. Each key has a forwarder.
	 * A forwarder can be a firstConnector, a forwarder for Session Manager
	 * or an Editor.
	 */
	private void select() throws IOException {
		
		Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1();
		for(REPSelectionKey<REPCommand> key : keys){
			if(key.isAcceptable()){
				/*
				 * Incoming connection. We don't know which, editor or
				 * session manager. Assign FirstConnector to distinguish.
				 */
				REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker());
				logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel);
				registerChannel(channel, new FirstConnector(this));
				channel = null;
			}else if(key.isReadable()){
				/*
				 * Incoming packets are handled by a various forwarder.
				 * A hadler throw IOException() in case of a trouble to
				 * close the channel.
				 */
				REPHandler handler = (REPHandler)(key.attachment());
				try {
					handler.handle(key);
				} catch (IOException e) {
					key.cancel();
					handler.cancel(key.channel1());
				}
			}
		}
	}
	
	void registerChannel(REPSocketChannel<REPCommand> channel,Forwarder handler) throws IOException {
		if(channel == null) {
			return;
		}
		handler.setChannel(channel);
		channel.configureBlocking(false);
		channel.register(selector, SelectionKey.OP_READ, handler);
	}

	/*
	 * After loop detection, we give up session manager join.
	 */
	private void cancel_sm_join() {
		removeChannel(sm_join_channel);
		sm_join_channel=null;
	}


	private void removeChannel(Forwarder channel) {
		REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector);
		key.cancel();
		try {
			channel.channel.close();
		} catch (IOException e) {
		}
	}


	void updateGUI() {
		//リストのコピーをGUIに渡す
		LinkedList<Session> sList = new LinkedList<Session>(sessionList.values());
		LinkedList<Editor> eList = new LinkedList<Editor>(editorList.values());
		//GUIに反映
		Runnable doRun = new DoGUIUpdate(sList, eList, gui);
		gui.invokeLater(doRun);
	}



	void setMyHostName(String localHostName) {
		myHost = localHostName + receive_port;
		setHostToEditor(myHost);
	}

	private void setHostToEditor(String myHost2) {
		for(Editor editor : editorList.values()){
			if (editor.channel!=null)
				editor.setHost(myHost2);
		}
	}


	/**
	 * GUI から、呼ばれて、Session Managerに接続する。
	 * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては
	 * ならない。複数のSession Managerにjoinすることは出来ない。(NATを実装するまでは)。
	 * @param host
	 */
	public void connectSession(String host) {
		if (sm_join_channel!=null) return;
		if (!sessionList.isEmpty()) return;
		if (!smList.isMaster()) return;
		int port = parent_port;
		/*
		 * IPv6 対応では、複数のアドレスを取って、それのすべてに接続を試す必要が
		 * ある。
		 */
		InetSocketAddress addr = new InetSocketAddress(host, port);
		try {
			REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker());
			sessionchannel.connect(addr);
			while(!sessionchannel.finishConnect());
			Forwarder sm = new Forwarder(this);
			registerChannel(sessionchannel, sm);
			sm_join(sm);
		}catch (IOException e) {
		}
	}
	
	/**
	 * channel に SMCMD_SM_JOIN command を送る。
	 * @param channel
	 */
	private void sm_join(Forwarder channel){
		sm_join_channel = channel;
		//SM_JOINコマンドを生成。
		REPCommand command = new REPCommand();
		command.setCMD(REP.SMCMD_SM_JOIN);
		command.setEID(-1);   // request Parent SessionManagerID
		command.setSID(-1);   // request SessionManagerID
		
		//hostnameをセット。
		setMyHostName(channel.getLocalHostName());
		
		String string = myHost;
		command.setString(string);
		
		//SM_JOINコマンドを送信。
		channel.send(command);
		// ack を受け取ったら、SessionManagerのListに追加。ここではやらない。
	}
	
	/* 
	 * Select Session from Manager button
	 *    selected editor is joined editor directly connected to this session
	 *    manager.
	 */
	public void selectSession(SelectButtonEvent event) throws IOException {
		int sid = event.getSID();
		Session session = sessionList.get(sid);
		
		Editor editor = (Editor)event.getEditor();
		if(editor == null){
			logger.writeLog("Error SessionManager.selectSession(): editor = " + editor);
			return;
		}
		if (editor.hasSession()) return;

		selectSession(sid, session, editor.getEID(), editor);
	}

	/*
	 * Select Session Protocol handler
	 *    called from GUI or incoming SMCMD_SELECT command.
	 */
	private void selectSession(int sid, Session session, int eid, Forwarder editor) {
		if(session.hasOwner()){
			// we have selected session.
			REPCommand sendCommand = new REPCommand();
			if (editor.isDirect()&&editor.getEID()==eid) {
				// Found directly connected joined editor. Send join_ack().
				session.addForwarder(editor);
				sendUpdate(session.getSID());
				sendCommand.setCMD(REP.SMCMD_JOIN_ACK);
			} else {
				// We have a session, but joined editor is on the other sm.
				// SELECT_ACK is sent to the session ring to
				// find out the joined editor.
				sendCommand.setCMD(REP.SMCMD_SELECT_ACK);
				// Do not directly addForwarder(forwarder). It may be
				// shared among sessions.
				Forwarder f = new Editor(this, false, makeID(editorList.newEid()));
				f.setChannel(editor.channel); // incoming channel
				f.setHost(myHost);
				f.setSID(sid);
				session.addForwarder(f); // f.next is set up here.
			}
			sendCommand.setEID(editor.getEID());
			sendCommand.setSID(sid);
			sendCommand.string = session.getName();
			editor.send(sendCommand);
		}else {
			// session searching continue...
			Forwarder next = routingTable.toSession(sid);
			
			// create dummy editor for this session
			Forwarder f = new Editor(this, false, makeID(editorList.newEid()));
			f.setChannel(editor.channel); // incoming channel
			f.setNext(next);
			f.setHost(myHost);
			f.setSID(sid);
			session.setFirstForwarder(f);
			
			// pass the select command to the next path.
			REPCommand command = new REPCommand();
			command.setCMD(REP.SMCMD_SELECT);
			command.setSID(sid);
			command.setEID(eid);
			command.setString(editor.getHost());
			next.send(command);
		}
	}

	/*
	 * Create and send UPDATE command.
	 */
	private void sendUpdate(int sid) {
		REPCommand command = makeREPCommandWithSessionList(REP.SMCMD_UPDATE);
		command.setSID(sid);
		command.setEID(REP.SM_EID.id);
		smList.sendToMaster(command);
	}

	/*
	 * Create new editor in this sessin manager. A dummy editor
	 * is created also.
	 */
	public Editor newEditor(REPSocketChannel<REPCommand> channel) {
		int eid =  makeID(editorList.newEid());
		Editor editor = new Editor(this, eid, channel);
		editorList.add(editor);
		return editor;
	}

	/*
	 * Create new session.
	 */
	public Session newSession(Forwarder master) {
		int sid= makeID(sessionList.newSessionID());
		Session session = new Session(sid, master);
		sessionList.put(sid, session);
		return session;
	}

	public void addWaitingCommand(PacketSet set) {
		waitingCommandInMerge.add(set);
	}

	/*
	 * Synchronize GUI command in this session manager.
	 */
	public void buttonPressed(SessionManagerEvent event) {
		try {
			waitingEventQueue.put(event);
		} catch (InterruptedException e) {}
		selector.wakeup();
	}
	
	/*
	 * Execute incoming event during the initialization for
	 * testing purpose.
	 */
	public void syncExec(SessionManagerEvent event) {
		try {
			waitingEventQueue.put(event);
		} catch (InterruptedException e) {
		}
	}

	/*
	 * GUI command interface for close session.
	 */
	public void closeSession(SessionManagerEvent event) {
		Session session = ((CloseButtonEvent) event).getSession();
		session.closeSession();
		sessionList.remove(session);
		updateGUI();
	}

	/*
	 * Remove editors which has the cannel.
	 */
	public void remove(REPSocketChannel<REPCommand> channel) {
		int i = 0;
		for(Session s:sessionList.values()) {
			if (s.deleteEditor(channel)) {
				i++;
			}
		}
		assert(i==1);
		// can be other session manager? what should I do?
	}


	public void addWriteQueue(PacketSet packetSet) {
		writeQueue.addLast(packetSet);
		assert(writeQueue.size()<packetLimit) ;
	}


	public void remove(Editor editor) {
		Session s = sessionList.get(editor.getSID());
		if (s==null) {
			assert(false);
			editorList.remove(editor);
		} else if (editor.isMaster()) {
			removeSession(s);
		} else {
			s.deleteForwarder(editor);
			editorList.remove(editor);
		}
		updateGUI();
	}

	private void removeSession(Session s0) {
		s0.remove(this);
		sessionList.remove(s0);
		sendUpdate(s0.getSID());
	}

	public void setParentPort(int port) {
		parent_port = port;
	}
	public int getParentPort() {
		return parent_port;
	}
	
	public int getPort() {
		return receive_port;
	}


	boolean sessionManage(Forwarder forwarder, REPCommand command) throws ClosedChannelException,
			IOException {
		switch(command.cmd){
	
		// Session Manager Command
	
		case SMCMD_JOIN:
		{
			// first connection or forwarded command
			if(isMaster()) {
				REPCommand ackCommand = new REPCommand();
				ackCommand.setCMD(REP.SMCMD_JOIN_ACK);
				ackCommand.setEID(command.eid);
				ackCommand.setSID(command.sid);
				ackCommand.string = command.string;
				smList.sendToSlaves(ackCommand);
				registEditor(forwarder,ackCommand);
			} else {
				routingTable.add(forwarder,getSMID(command.eid),command.sid);
				smList.sendToMaster(command);
			}
			updateGUI();
		}
	
		break;
	
		case SMCMD_PUT_ACK:
		case SMCMD_JOIN_ACK:
			registEditor(forwarder,command);
			updateGUI();
			break;
	
		case SMCMD_PUT:
		{
			// first connection or forwarded command
			if(isMaster()) {
				command.setCMD(REP.SMCMD_PUT_ACK);
				command.string = command.string;
				command.setEID(command.eid);
				command.setSID(command.sid);
				smList.sendToSlaves(command);
				registEditor(forwarder,command);
			} else {
				routingTable.add(forwarder,getSMID(command.eid),command.sid);
				smList.sendToMaster(command);
				// registEditor will be done by SMCMD_PUT_ACK
			}
			if (forwarder.isDirect()) {
				// send put_ack to the editor now.
				command.setCMD(REP.SMCMD_PUT_ACK);
				command.string = command.string;
				command.setEID(command.eid);
				command.setSID(command.sid);
				forwarder.send(command);
			}
			updateGUI();
	
		}
		break;

		case SMCMD_SELECT:
		{
			Session session = sessionList.get(command.sid);
			if (session==null) {
				sessionList.put(command.sid,
						new Session(command.sid, command.string,null));
			}
			selectSession(command.sid, session, command.eid, forwarder);
		}
			break;
		case SMCMD_SELECT_ACK:
		{
			Session session = sessionList.get(command.sid);
			selectSession(command.sid, session, command.eid, 
					session.getFirstForwarder());
		}
			break;
			
		case SMCMD_SM_JOIN:
		{
			// SM_JOIN中にSMCMD_SM_JOINが来たら、これはループなので、
			///自分のSM_JOINを取り消す。
			if (sm_join_channel!=null) cancel_sm_join();
			// SMCMD_SM_JOIN は、master まで上昇する。
			//    masterでなければ、自分のparentに転送する。
			if(isMaster()) {
				//    master であれば、SessionManager IDを決めて、
				//    自分のsmList に登録
				Forwarder sm;
				int psid = command.eid;
				if (forwarder.sid!=-1) {
				// すでに channelはSessionManager Idを持っていて、
				// direct link ではないので、
				// channel を持たないForwarderとして登録する
					sm = new Forwarder(this);
				} else {
					sm = forwarder;
				}
				int sid = smList.addNewSessionManager(sm,command);		
				REPCommand sendCommand = makeREPCommandWithSessionList(REP.SMCMD_SM_JOIN_ACK);
				// command.eid==smList.sesionManagerID() の場合は、
				// 待っている自分の下のsessionManagerにsidをassignする必要がある。
				sendCommand.setSID(sid); // new Session manager ID
				// 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた
				// Session manager IDを使う。
				sendCommand.setEID(psid);
				send_sm_join_ack(psid, sid, sendCommand);
			} else {
				if (forwarder.sid==-1) {
					// direct link の場合は、識別のために、EIDに直上の
					// smid を入れておく。
					command.setEID(smList.sessionManagerID());
				}
				smList.sendToMaster(command);
			}
		}
		break;
	
		case SMCMD_SM_JOIN_ACK:				
			send_sm_join_ack(command.eid, command.sid, command);
			break;
	
		case SMCMD_UPDATE:
			if (!isMaster()) {
				command.setString(mergeUpdate(command));
				// 上に知らせる
				smList.sendToMaster(command);
				break;
			}
			// fall thru
			command.setCMD(REP.SMCMD_UPDATE_ACK);
		case SMCMD_UPDATE_ACK:
			command.setString(mergeUpdate(command));
			// 下に知らせる
			smList.sendToSlaves(command);
			updateGUI();
			break;
		default:
			return false;
		}
		return true;
	}


	/**
     * UPDATE/UPDATE_ACKにより送られてきたSessionの情報を追加する
	 * @param command
	 * @return
	 * @throws IOException
	 */
	private String mergeUpdate(REPCommand command) throws IOException {
		SessionList receivedSessionList;
		try {
			receivedSessionList = decoder.decode(command.string);
		} catch (SAXException e) {
			throw new IOException();
		}
		// 受け取った情報と自分の情報を混ぜる。
		sessionList.merge(receivedSessionList);
		//XMLを生成。送信コマンドにセット。
		return encoder.sessionListToXML(sessionList);

	}

	/*
	 * id has SessionManager ID part
	 */
	private int makeID(int newid) {
		return newid+smList.sessionManagerID()*MAXID;
	}
	
	private int getSMID(int id) {
		return id/MAXID;
	}


	/**
	 * Register Editor to our editorList. No connection is made.
	 * @param forwarder     Editor to be add
	 * @param command
	 */
	private void registEditor(Forwarder forwarder,REPCommand command) {
		// make ack for PUT/JOIN. Do not send this to the editor,
		// before select. After select, ack is sent to the editor. 
		routingTable.add(forwarder,getSMID(command.eid),command.sid);
		Editor editor;
		if (getSMID(command.sid)==smList.sessionManagerID()
				&& forwarder.isDirect()) {
			// direct link だった
			editor = (Editor)forwarder;
		} else {
			editor = new Editor(this, command.cmd==REP.SMCMD_PUT_ACK, command.eid);
		}
		editor.setName(command.string);
		editor.setSID(command.sid);
		if (!editorList.hasEid(command.eid)) {
			editorList.add(editor);
		}
		// we don't join ack to the direct linked editor. We
		// have to wait select command
	}


	void send_sm_join_ack(int psid, int sid,REPCommand sendCommand) {
		if (psid==smList.sessionManagerID()) {
			// 直下のsessionManagerにIDを割り振る必要がある。
			smList.assignSessionManagerIDtoWaitingSM(sid);
			// ここで smList に一つだけ追加されるので
			// 待っている最初のsm一つにだけ、sm_join_ackが新たに送られる。
		}
		smList.sendToSlaves(sendCommand);
	}


	private REPCommand makeREPCommandWithSessionList(REP cmd) {
		//SessionListからXMLを生成。
		//joinしてきたSessionManagerに対してACKを送信。
		REPCommand sendCommand = new REPCommand();
		sendCommand.setCMD(cmd);
		sendCommand.setString(encoder.sessionListToXML(sessionList));
		return sendCommand;
	}


	public boolean isMaster() {
		return smList.isMaster();
	}


	public void setSessionManagerID(int sid) {
		smList.setSessionManagerID(sid);		
	}


	public Session getSession(int sid) {
		return sessionList.get(sid);
	}

}