view rep/SessionManager.java @ 332:36faf76a087c

*** empty log message ***
author kono
date Sun, 12 Oct 2008 10:22:44 +0900
parents ddfc786811b9
children 4fae49280699
line wrap: on
line source


package rep;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


import rep.channel.REPServerSocketChannel;
import rep.channel.REPSocketChannel;
import rep.handler.PacketSet;
import rep.handler.REPHandler;
import rep.handler.REPEditorHandler;
import rep.handler.REPSessionManagerHandler;
import rep.channel.REPSelector;
import rep.xml.SessionXMLDecoder;
import rep.xml.SessionXMLEncoder;
import rep.channel.REPSelectionKey;

/*
	+-------+--------+--------+-------+--------+---------+------+
	| cmd   | session| editor | seqid | lineno | textsiz | text |
	|       | id     | id     |       |        |         |      |
	+-------+--------+--------+-------+--------+---------+------+
	o---------- header section (network order) ----------o
	
	int cmd;		kind of command
	int sid;		session ID : uniqu to editing file
	int eid;		editor ID : owner editor ID = 1。Session に対して unique
	                -1 session manager command
	                -2 merge command
	int seqno;		Sequence number : sequence number はエディタごとに管理
	int lineno;		line number
	int textsize;   textsize : bytesize
	byte[] text;
*/

public class SessionManager implements SessionManagerEventListener{
	
	LinkedList<Session> sessionList;
	private SessionManagerGUI gui;
	private REPSelector<REPCommand> selector;
	SessionManagerList smList;
	List<Editor> editorList;
	// editorList は、sessionList に入っているeditorとは別なeditorのlistらしい。
	private String maxHost;
	private List<PacketSet> waitingCommandInMerge;
	REPHandler normalHandler = new REPEditorHandler(this);
	private BlockingQueue<SessionManagerEvent> waitingEventQueue = new LinkedBlockingQueue<SessionManagerEvent>();;
	String myHost;
	private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>();
	private static int receive_port;
	private static int parent_port;
	static final int DEFAULT_PORT = 8766;
	private static final int packetLimit = 200;

	public static void main(String[] args) throws InterruptedException, IOException {
		
		int port = DEFAULT_PORT;
		int port_s = DEFAULT_PORT;
		//System.setProperty("file.encoding", "UTF-8");
		if(args.length > 0){
			port = Integer.parseInt(args[0]);
			port_s = Integer.parseInt(args[1]);
		}
		receive_port = port;
		parent_port = port_s;
		SessionManager sm = new SessionManager();
		sm.init(port,new SessionManagerGUIimpl(sm));
		

	}

	
	public void openSelector() throws IOException{
		selector = REPSelector.<REPCommand>create();
	}

	public void init(int port, SessionManagerGUI gui) throws IOException, InterruptedException {
		this.gui = gui;
		openSelector();
		init(port);
		mainLoop(); 
	}


	private void init(int port) throws InterruptedException, IOException {
	
		REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker());
		ssc.configureBlocking(false);	//reuse address 必須
		ssc.socket().setReuseAddress(true);
		//getAllByNameで取れた全てのアドレスに対してbindする
		ssc.socket().bind(new InetSocketAddress(port));
		ssc.register(selector, SelectionKey.OP_ACCEPT, normalHandler);

		sessionList = new LinkedList<Session>();
		smList = new SessionManagerList();
		editorList = new LinkedList<Editor>();
		waitingCommandInMerge = new LinkedList<PacketSet>();
		

	}
	
	/*
	 * We wrote everything in one thread, but we can assign
	 * one thread for each communication channel and GUI event.
	 */

	public void mainLoop() throws IOException {
		while(true){
		    checkWaitingCommandInMerge();
			if (checkInputEvent() ||
			    checkWaitingWrite()) { 
				   // try to do fair execution for waiting task
				   if(selector.selectNow() > 0) select();
				   continue;
			}
			// now we can wait for input packet or event
			selector.select();
			select();
		}
	}

	private boolean checkInputEvent() {
		SessionManagerEvent e;
		if((e = waitingEventQueue.poll())!=null){
			e.exec();
			return true;
		}
		return false;
	}

	private boolean checkWaitingWrite() throws IOException {
		PacketSet p = writeQueue.poll();
		if (p!=null) {
			p.channel.write(p.command);
			return true;
		}
		return false;
	}

	/**
	 * Check waiting command in merge
	 * @return true if there is a processed waiting command
	 * @throws IOException
	 */
	private void checkWaitingCommandInMerge() throws IOException {
		List<PacketSet> w = waitingCommandInMerge;
		waitingCommandInMerge = new LinkedList<PacketSet>();
		for(PacketSet p: w) {
			Editor e = p.getEditor();
			if(e!=null &&e.isMerging()) { // still merging do nothing
				waitingCommandInMerge.add(p);
			} else {
				manage(p.channel, p.command);		
			}
		}
	}
	
	public boolean hasWaitingCommand(REPSocketChannel<REPCommand>c) {
		for(PacketSet p:waitingCommandInMerge) {
			if (p.channel==c) {
				return true;
			}
		}
		return false;
	}

	private void select() throws IOException {
		
		Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1();
		for(REPSelectionKey<REPCommand> key : keys){
			if(key.isAcceptable()){
				REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker());
				System.out.println("SessionManager.select() : key.isAcceptable : channel = " + channel);
				registerChannel (channel, SelectionKey.OP_READ);
				channel = null;

			}else if(key.isReadable()){
				REPHandler handler = (REPHandler)(key.attachment());
				try {
					handler.handle(key);
				} catch (ClosedChannelException x) {
					key.cancel();
					handler.cancel(key.channel1());
				} catch (IOException x) {
					key.cancel();
					handler.cancel( key.channel1());
				}
			}
		}
	}
	
	private void registerChannel(REPSocketChannel<REPCommand> channel, int ops) throws IOException {
		if(channel == null) {
			return;
		}
		channel.configureBlocking(false);
		REPHandler handler = normalHandler;
		channel.register(selector, ops, handler);
	}

	public void manage(REPSocketChannel<REPCommand> channel, REPCommand receivedCommand) throws IOException {
		if (sessionManagerCommand(channel, receivedCommand)) return;
		Session s = getSession(receivedCommand.sid);
		Editor e = s.getEditor(channel);
		e.manage(receivedCommand);
	}


	private boolean sessionManagerCommand(REPSocketChannel<REPCommand> channel,
			REPCommand receivedCommand) throws ClosedChannelException,
			IOException {
		switch(receivedCommand.cmd){

		// Session Manager Command

		case SMCMD_JOIN:
		{
			//どのSessionにも属さないエディタをリストに追加
			//エディタとchannelは1対1 (ではない)
			//エディタが新しくputする場合は新しくソケットを作る
			// ここのeditorList はsessionのとは別物
			Editor editor1 = new Editor(this,-1,channel);
			editor1.setHost(myHost);
			editorList.add(editor1);
	
			updateGUI();
	
		}
	
		break;
	
		case SMCMD_JOIN_ACK:
			assert (false);
			break;
	
		case SMCMD_PUT:
		{
			//Sessionを生成
			// sessionIDってglobaly uniqueだから、本来は、
			// 自分の親に作ってもらう必要がある。自分が親なら自分で作って良い。
			
			int sid = sessionList.size();
			Editor editor = new Editor(this,0, channel);
			editorList.add(editor);
			editor.setHost(myHost);
			Session session = new Session(sid, receivedCommand.string, editor);
			session.hasOwner(true);
			sessionList.add(session);
	
			updateGUI();
	
			//エディタにAckを送信
			REPCommand sendCommand = new REPCommand(receivedCommand);
			sendCommand.setCMD(REP.SMCMD_PUT_ACK);
			sendCommand.setEID(editor.getEID());
			sendCommand.setSID(session.getSID());
			editor.send(sendCommand);
	
			//他のSessionManagerへSessionの追加を報告
			//親に送って、親から子へ
			SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session);
			REPCommand command = new REPCommand();
			command.setSID(session.getSID());
			command.setString(sessionEncoder.sessionListToXML());
			command.setCMD(REP.SMCMD_UPDATE);
			smList.sendExcept(channel, command);
	
		}
	
		break;
	
		// SELECT is no longer used in a editor. Select
		// operation is handled in Session Manager Only
		case SMCMD_SELECT:
		{
			//他のSessionManagerをエディタとしてSessionに追加
			Forwarder next = new Forwarder(this);
			next.setChannel(channel);
			Session session = getSession(receivedCommand.sid);
			session.addForwarder(next);
	
			if(session.hasOwner()){
				//このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す
				REPCommand sendCommand = new REPCommand(receivedCommand);
				sendCommand.setCMD(REP.SMCMD_SELECT_ACK);
				sendCommand.setEID(next.getEID());
				next.send(sendCommand);
			}else{
				//オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する
				Forwarder owner = session.getOwner();
				owner.send(receivedCommand);
			}
		}
	
		break;
	
		case SMCMD_SELECT_ACK:
		{
			String hostport = receivedCommand.string;
			Forwarder editor1 = getEditor(hostport);
	
			if(editor1 != null) {
				//host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する
				REPCommand command = new REPCommand();
				command.setCMD(REP.SMCMD_JOIN_ACK);
				command.setSID(receivedCommand.sid);
				command.setEID(receivedCommand.eid);
				editor1.send(command);
	
			}else{
				//自分が送信したコマンドでなければ、次のSessionManagerへ中継する
				smList.sendExcept(channel, receivedCommand);
			}
		}
	
		break;
		case SMCMD_SM_JOIN:

		{
			// このchannelの相手は、SessionManager なので、
			// 特別なhandlerを接続する必要がある
			channel.register(selector, SelectionKey.OP_READ, 
					new REPSessionManagerHandler(this));
			
			//SessionManagerのリストへ追加
			smList.add(channel);

			//XMLからSessionListオブジェクトを生成する。
			SessionXMLDecoder decoder = new SessionXMLDecoder();
			SessionList receivedSessionList = decoder.decode(receivedCommand.string);

			//myHost を設定。
			//立ち上げ時にやるとlocalhostしか取れない
			if(myHost == null) setMyHostName(getLocalHostName(channel));

			//maxHost を設定。
			if(setMaxHost(channel, receivedSessionList.getMaxHost())){
				REPCommand sendCommand = new REPCommand();
				sendCommand.setCMD(REP.SMCMD_CH_MASTER);
				sendCommand.setString(maxHost);
				smList.sendExcept(channel, sendCommand);
			}

			//SessionListからXMLを生成。
			//joinしてきたSessionManagerに対してACKを送信。
			SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList);
			REPCommand sendCommand = new REPCommand();
			sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK);
			sendCommand.setString(sessionlistEncoder.sessionListToXML());
			channel.write(sendCommand);

			//その他の SessionManager に対して SMCMD_UPDATEを 送信。
			sendCommand = new REPCommand();
			sendCommand.setCMD(REP.SMCMD_UPDATE);
			sendCommand.setString(receivedCommand.string);
			smList.sendExcept(channel, sendCommand);

		}
		break;

		case SMCMD_SM_JOIN_ACK:

			//XMLからSessionListオブジェクトを生成。
			SessionXMLDecoder decoder2 = new SessionXMLDecoder();
			SessionList receivedSessionList2 = decoder2.decode(receivedCommand.string);

			//maxHostを決定。
			if(setMaxHost(channel, receivedSessionList2.getMaxHost())){
				REPCommand sendCommand = new REPCommand();
				sendCommand.setCMD(REP.SMCMD_CH_MASTER);
				sendCommand.setString(maxHost);
				smList.sendExcept(channel, sendCommand);
			}

			break;

		case SMCMD_UPDATE:
		{
			SessionXMLDecoder decoder3 = new SessionXMLDecoder();
			SessionList receivedSessionList3 = decoder3.decode(receivedCommand.string);

			//UPDATEコマンドにより送られてきたSessionの情報を追加する
			LinkedList<Session> list = receivedSessionList3.getList();
			for(Session session : list){
				session.getEditorList().get(0).setChannel(channel);
				sessionList.add(session);
			}

			//他のSessionManagerへ中継する
			smList.sendExcept(channel, receivedCommand);

			updateGUI();
		}
			break;

		case SMCMD_UPDATE_ACK:
		{
			if(!hasSession(receivedCommand.sid)) {
				// accept new Session
				// ここで初めてsession id が決まる。
				// このコマンドは、master session manager が出すはず
				Forwarder sm = new Forwarder(this);
				sm.setChannel(channel);
				Session session = new Session(receivedCommand.sid,receivedCommand.string,null);
				session.addForwarder(sm);

				sessionList.add(session);
				
				updateGUI();
			}
			smList.sendToSlave(receivedCommand);
		}
			break;

		case SMCMD_CH_MASTER:
		{
			//maxHost を設定。
			if(setMaxHost(channel, receivedCommand.string)){
				REPCommand sendCommand = new REPCommand();
				sendCommand.setCMD(REP.SMCMD_CH_MASTER);
				sendCommand.setString(maxHost);
				smList.sendExcept(channel, sendCommand);
			}
		}
			break;


		default:
			return false;
		}
		return true;
	}


	private boolean hasSession(int sid) {
		for(Session s:sessionList) {
			if (s.getSID()==sid) return true;
		}
		return false;
	}


	void updateGUI() {
		//リストのコピーをGUIに渡す
		LinkedList<Session> sList = new LinkedList<Session>(sessionList);
		LinkedList<Editor> eList = new LinkedList<Editor>(editorList);
		//GUIに反映
		Runnable doRun = new DoGUIUpdate(sList, eList, gui);
		gui.invokeLater(doRun);
	}

	Forwarder getEditor(String hostport) {
		for(Editor editor : editorList){
			if(editor.getHost() == hostport){
				return editor;
			}
		}
		return null;
	}
	
	public Session getSession(int sid) throws IOException {
		for(Session session : sessionList){
			if(session.getSID() == sid) return session;
		}
		throw new IOException();
	}

	private boolean setMaxHost(REPSocketChannel<REPCommand> channel, String maxHost2) {
		if(maxHost.compareTo(maxHost2) > 0){
			return false;
		}else{
			maxHost = maxHost2;
			return true;
		}
	}

	private void setMyHostName(String localHostName) {
		myHost = localHostName + receive_port;
		if(maxHost == null) {
			maxHost = myHost;
		}
		setHostToEditor(myHost);
	}

	private void setHostToEditor(String myHost2) {
		for(Editor editor : editorList){
			editor.setHost(myHost2);
		}
	}

	public void connectSession(String host) {
		int port = DEFAULT_PORT;
		port = parent_port;
		InetSocketAddress addr = new InetSocketAddress(host, port);
		try {
			REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker());
			sessionchannel.configureBlocking(true);
			sessionchannel.connect(addr);
			while(!sessionchannel.finishConnect()){
				System.out.print("test afro");
			}
			System.out.println("");
			registerChannel(sessionchannel, SelectionKey.OP_READ);
			
			sm_join(sessionchannel);
			
		}catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void sm_join(REPSocketChannel<REPCommand> channel){
		
		//SM_JOINコマンドを生成。
		REPCommand command = new REPCommand();
		command.setCMD(REP.SMCMD_SM_JOIN);
		
		//hostnameをセット。
		setMyHostName(getLocalHostName(channel));
		
		//XMLを生成。送信コマンドにセット。
		SessionXMLEncoder encoder = new SessionXMLEncoder(sessionList);
		String string = encoder.sessionListToXML();
		command.setString(string);
		
		//SM_JOINコマンドを送信。
		channel.write(command);
		//SessionManagerのListに追加。
		smList.add(channel);
	}

	private String getLocalHostName(REPSocketChannel<?> channel) {
		String host = null;
		host = channel.socket().getLocalAddress().getHostName();
		return host;
	}

	public void selectSession(SelectButtonEvent event) throws IOException {
		int sid = event.getSID();
		Session session = getSession(sid);
		
		Editor editor = (Editor)event.getEditor();
		if(editor == null){
			System.out.println("SessionManager.selectSession():editor = " + editor);
			return;
		}
		if (editor.hasSession()) return;
		REPSocketChannel<REPCommand> channel = editor.getChannel();

		// System.out.println("SessionManager.session.hasOnwer="+session.hasOwner());
		if(session.hasOwner()){
			editor.setEID(session.newEid());
			editor.setSID(sid);
			session.addForwarder(editor);
			REPCommand sendCommand = new REPCommand();
			sendCommand.setCMD(REP.SMCMD_JOIN_ACK);
			sendCommand.setEID(editor.getEID());
			sendCommand.setSID(sid);
			sendCommand.string = "";
			channel.write(sendCommand);
		}else {
			editor.setHost(myHost);
			editor.setSID(sid);
			session.addEditor(editor);
			Forwarder owner = session.getOwner();
			
			REPCommand command = new REPCommand();
			command.setCMD(REP.SMCMD_SELECT);
			command.setSID(sid);
			command.setString(editor.getHost());
			owner.send(command);
		}
	}

	public void addWaitingCommand(PacketSet set) {
		waitingCommandInMerge.add(set);
	}

	public void buttonPressed(SessionManagerEvent event) {
		try {
			waitingEventQueue.put(event);
		} catch (InterruptedException e) {}
		selector.wakeup();
	}
	
	public void syncExec(SessionManagerEvent event) {
		try {
			waitingEventQueue.put(event);
		} catch (InterruptedException e) {
		}
	}

	public void closeSession(SessionManagerEvent event) {
		Session session = ((CloseButtonEvent) event).getSession();
		session.closeSession();
		sessionList.remove(session);
		updateGUI();
	}

	public void remove(REPSocketChannel<REPCommand> channel) {
		for(Session s:sessionList) {
			if (s.deleteEditor(channel)) {
				return ;
			}
		}
		assert(false);
		// can be other session manager? what should I do?
	}


	public void addWriteQueue(PacketSet packetSet) {
		writeQueue.addLast(packetSet);
		assert(writeQueue.size()<packetLimit) ;
	}


	public void remove(Editor editor) {
		for(Session s:sessionList) {
			s.deleteForwarder(editor);
		}
		//assert(false);
	}

}