view rep/SessionManager.java @ 82:4bb583553a42

*** empty log message ***
author pin
date Tue, 11 Dec 2007 14:26:13 +0900
parents 13819571691d
children 9381b4734a0b
line wrap: on
line source

package rep;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.StringTokenizer;

import rep.xml.SessionXMLDecoder;
import rep.xml.SessionXMLEncoder;

//+-------+--------+--------+-------+--------+---------+------+
//| cmd   | session| editor | seqid | lineno | textsiz | text |
//|       | id     | id     |       |        |         |      |
//+-------+--------+--------+-------+--------+---------+------+
//o-------header section (network order)-------------o
/*int cmd;	// command
int sid;	// session ID
int eid;	// editor ID
int seqno;	// Sequence number
int lineno;	// line number
int textsize;   // textsize
byte[] text;*/

public class SessionManager implements ConnectionListener, REPActionListener{
	
	
	private SessionList sessionlist;
	//SocketChannel sessionchannel;
	private SessionManagerGUI sessionmanagerGUI;
	private Selector selector;
	private SessionManagerList smList;
	private String myHost;
	private boolean isMaster = true;
	private EditorList  allEditorList;
	private String maxHost;
	//private boolean addressIsGlobal;
	//private SocketChannel sessionchannel;
	//private boolean co;
	public SessionManager(int port) {
		sessionmanagerGUI = new SessionManagerGUI();
	}
	
	public void openSelector() throws IOException{
		selector = Selector.open();
	}
	
	public void sessionManagerNet(int port) throws InterruptedException, IOException {
	
		ServerSocketChannel ssc = ServerSocketChannel.open();
		ssc.configureBlocking(false);	
		ssc.socket().bind(new InetSocketAddress(port));
		ssc.register(selector, SelectionKey.OP_ACCEPT);

		
		sessionlist = new SessionList();
		smList = new SessionManagerList();
		allEditorList = new EditorList();
		
		while(true){
			selector.select();
			for(SelectionKey key : selector.selectedKeys()){
				if(key.isAcceptable()){
					/*** serverChannelはenableになったSelectionKeyのchannel ***/
					ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
					/*** EditorChannel を用いない記述 ***/
					SocketChannel channel = serverChannel.accept();	//keyからchannelを取って、accept 
					registerChannel (selector, channel, SelectionKey.OP_READ);
					channel = null;
					
					/*** EditorChannel を用いた記述 ****/
					//EditorChannel echannel = (EditorChannel) ssc.accept();
					//echannel.setIO();
					//registerChannel(selector, echannel, SelectionKey.OP_READ);
					//echannel = null;
					
					/*** SelectableEditorChannel ***/
					//SocketChannel channel = ssc.accept();
					//SelectableEditorChannel echannel2 = new SelectableEditorChannel(channel);
					//registerChannel(selector, echannel2, SelectionKey.OP_READ);
					//channel = null;
					//echannel2 = null;
					
				}else if(key.isReadable()){
					
					/*** EditorChannel を用いない記述 ***/
					SocketChannel channel = (SocketChannel)key.channel();
					REPPacketReceive receive = new REPPacketReceive(channel); //getPacket(), putPacket() にする。
					receive.setkey(key);
					//REPCommand repCom = repRec.unpackUConv();
					REPCommand receivedCommand = receive.unpack();
					manager(channel, receivedCommand);
					
					/*** EditorChannel を用いた記述 ****/
					//EditorChannel echannel = (EditorChannel) key.channel();
					//REPCommand command = echannel.getPacket();
					//manager(echannel, command);
					
				}else if(key.isConnectable()){
					System.out.println("Connectable");
				}
			}
		}
	}
	
	private synchronized void registerChannel(Selector selector, SelectableChannel channel, int ops) throws IOException {
		if(channel == null) {
			return;
		}
		//System.out.println("registerChannel()");
		channel.configureBlocking(false);
		selector.wakeup();
		channel.register(selector, ops);
	}

	private void manager(SocketChannel channel, REPCommand receivedCommand) {
		if(receivedCommand == null) return;
		Editor editor;
		Session session;
		REPCommand sendCommand = receivedCommand.clone();
		REPPacketSend send = new REPPacketSend(channel);
		//SessionXMLEncoder encoder = new SessionXMLEncoder();
		
		switch(receivedCommand.cmd){
		
		case REP.SMCMD_JOIN:
			if(isMaster){
				int eid = allEditorList.addEditor(channel, receivedCommand);
				receivedCommand.setEID(eid);
				allEditorList.sendJoinAck(channel, receivedCommand);
				sessionmanagerGUI.setComboEditor(eid, channel);
			}else{
				allEditorList.addEditor(channel);
				smList.sendJoin(receivedCommand);
				//sessionmanagerGUI.setComboEditor(repCmd.eid, channel);
			}
			break;
			
		case REP.SMCMD_JOIN_ACK:
//				editorList.setEID(repCmd);
//				editorList.sendJoinAck(repCmd);
//				sessionmanagerGUI.setComboEditor(repCmd.eid, channel);
			break;
			
		case REP.SMCMD_PUT:
			editor = new Editor(channel);
			editor.setEID(1);
			editor.setName(receivedCommand.string);
			session = new Session(editor);
			session.setOwner(true);
			session.addEditor(editor);
			sessionlist.addSession(session);
			sessionmanagerGUI.setComboSession(session.getSID(), session.getName());
			sessionmanagerGUI.setComboEditor(editor.getEID(), editor.getChannel());
			session.addToRoutingTable(editor);
			receivedCommand.setCMD(REP.SMCMD_PUT_ACK);
			receivedCommand.setEID(1);
			receivedCommand.setSID(session.getSID());
			editor.send(receivedCommand);
			
			//if(isMaster){
			SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session);
			REPCommand command = new REPCommand();
			command.setSID(session.getSID());
			command.setString(sessionEncoder.sessionListToXML());
			if(isMaster){
				command.setCMD(REP.SMCMD_UPDATE_ACK);
				smList.sendToSlave(command);
			}else{
				command.setCMD(REP.SMCMD_UPDATE);
				smList.sendToMaster(command);
			}
			break;
			
//		case REP.SMCMD_PUT_ACK:
//			break;
			
		case REP.SMCMD_SELECT:
//			sessionlist.addEditor(channel, repCmd.sid, repCmd);	//sessionlistへ追加
			editor = new Editor(channel);
			session = sessionlist.getSession(receivedCommand.sid);
			if(session.isOwner()){
				int eid = session.addEditor(editor);
				editor.setEID(eid);
				//REPPacketSend send = new REPPacketSend(channel);
				receivedCommand.setCMD(REP.SMCMD_SELECT_ACK);
				receivedCommand.setEID(eid);
				send.send(receivedCommand);
			}else {
				
			}
			
			break;
			
		case REP.SMCMD_SELECT_ACK:
			receivedCommand.setCMD(REP.SMCMD_JOIN_ACK);
			receivedCommand.setEID(receivedCommand.eid);
			session = sessionlist.getSession(receivedCommand.sid);
			session.sendToEditor(receivedCommand);
			//Editor editor3 = session3.getEditorList().get(0);
			//REPPacketSend send = new REPPacketSend(editor3.getChannel());
			//send.send(repCmd);
			break;
			
		case REP.SMCMD_SM_JOIN:
			
			//XMLからSessionListオブジェクトを生成する。
			SessionXMLDecoder decoder = new SessionXMLDecoder();
			SessionList receivedSessionList = decoder.decode(receivedCommand.string);
			
			//myHost を設定。
			if(myHost == null) setMyHostName(getLocalHostName(channel));
			
			//maxHost を設定。
			setMaxHost(channel, receivedSessionList.getMaxHost());
			
			//SessionListからXMLを生成。
			//joinしてきたSessionManagerに対してACKを送信。
			SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionlist);
			sendCommand = new REPCommand();
			sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK);
			sendCommand.setString(sessionlistEncoder.sessionListToXML());
			send.send(sendCommand);
			
			//その他のSessionManagerに対してSMCMD_SM_JOINを送信。
			sendCommand = new REPCommand();
			sendCommand.setCMD(REP.SMCMD_SM_JOIN);
			sendCommand.setString(receivedCommand.string);
			smList.sendExcept(channel, sendCommand);
			
			if(isMaster){
			}else {
			}
			
			break;
			
		case REP.SMCMD_SM_JOIN_ACK:
			
			//XMLからSessionListオブジェクトを生成。
			SessionXMLDecoder decoder2 = new SessionXMLDecoder();
			SessionList receivedSessionList2 = decoder2.decode(receivedCommand.string);
			
			//maxHostを決定。
			setMaxHost(channel, receivedSessionList2.getMaxHost());
			
			if(isMaster){
			}else{
			}
			
			break;
			
		case REP.SMCMD_UPDATE:
			//SessionXMLDecoder decoder = new SessionXMLDecoder(receivedCommand.string);
			
			editor = new Editor(channel);
			editor.setName(receivedCommand.string);
			
			session = new Session(editor);
			session.addEditor(editor);
			
			sessionlist.addSession(session);
			
			sessionmanagerGUI.setComboSession(session.getSID(), session.getName());
			
			if(isMaster){
				receivedCommand.setCMD(REP.SMCMD_UPDATE_ACK);
				smList.sendToSlave(receivedCommand);
			}else{
				receivedCommand.setCMD(REP.SMCMD_UPDATE);
				smList.sendToMaster(receivedCommand);
			}
			break;
			
		case REP.SMCMD_UPDATE_ACK:
			if(receivedCommand.sid > sessionlist.getList().size()){
				editor = new Editor(channel);
				editor.setName(receivedCommand.string);
				
				session = new Session(editor);
				session.addEditor(editor);
				
				sessionlist.addSession(session);
				
				sessionmanagerGUI.setComboSession(session.getSID(), session.getName());
			}
			smList.sendToSlave(receivedCommand);
			break;
			
		case REP.REPCMD_READ:
			//sessionlist.sendCmd(channel, repCmd);
			break;
			
		default:
			//sessionlist.sendCmd(channel, repCmd);
			sessionlist.sendToNextEditor(channel, receivedCommand);
			break;
		}
	}
	
	private boolean setMaxHost(SocketChannel channel, String host) {
		if(maxHost == null) {
			maxHost = myHost;
			sessionlist.setMaxHost(maxHost);
		}
		if(host.compareTo(maxHost) > 0){
			//host > MaxHost なら maxHost = host
			//masterを設定する。
			maxHost = host;
			sessionlist.setMaxHost(maxHost);
			setMaster(false, channel);
			return true;
		}else{
			return false;
		}
	}

	private void setMyHostName(String localHostName) {
		myHost = localHostName;
		if(maxHost == null) {
			maxHost = myHost;
			sessionlist.setMaxHost(maxHost);
		}
		allEditorList.setHost(myHost);
	}

	private void setMaster(boolean b, SocketChannel channel) {
		isMaster = b;
		System.out.println("isMaster = " + b);
		smList.setMaster(channel);
	}

	public static void main(String[] args) throws InterruptedException, IOException {
		int port = 8766;
		
		if(args.length > 0){
			port = Integer.parseInt(args[0]);
		}
		SessionManager sm = new SessionManager(port);
		sm.openSelector();
		sm.openWindow();
		sm.sessionManagerNet(port);
	}

	private void openWindow() {
		Thread th = new Thread( sessionmanagerGUI ); 
		th.start();
		//System.out.println(sessionmanagerGUI.toString());
		sessionmanagerGUI.addConnectionListener(this);
		sessionmanagerGUI.addREPActionListener(this);
	}

	private void connectSession(String host) {
		int port = 8766;
		InetSocketAddress addr = new InetSocketAddress(host, port);
		try {
			SocketChannel sessionchannel = SocketChannel.open();
			sessionchannel.configureBlocking(true);
			sessionchannel.connect(addr);
			while(!sessionchannel.finishConnect()){
				System.out.print("test afro");
			}
			System.out.println("");
			registerChannel(selector, sessionchannel, SelectionKey.OP_READ);
			
			sm_join(sessionchannel);
			
		}catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void sm_join(SocketChannel channel){
		
		//SM_JOINコマンドを生成。
		REPCommand command = new REPCommand();
		command.setCMD(REP.SMCMD_SM_JOIN);
		
		//hostnameをセット。
		setMyHostName(getLocalHostName(channel));
		
		//XMLを生成。送信コマンドにセット。
		SessionXMLEncoder encoder = new SessionXMLEncoder(sessionlist);
		String string = encoder.sessionListToXML();
		command.setString(string);
		
		//SM_JOINコマンドを送信。
		REPPacketSend send = new REPPacketSend(channel);
		send.send(command);
		
		//SessionManagerのListに追加。
		smList.add(channel);
	}

	private String getLocalHostName(SocketChannel channel) {
		String host = null;
		host = channel.socket().getLocalAddress().getHostName();
		return host;
	}

//	private String getSocketString(SocketChannel sessionchannel) {
//		SocketAddress socket = sessionchannel.socket().getRemoteSocketAddress();
//		//String inetAddressString = sessionchannel.socket().getInetAddress().toString();
//		StringTokenizer stn = new StringTokenizer(socket.toString(), "/");
//		String socketString = null;
//		while(stn.hasMoreTokens()){
//			socketString = stn.nextToken();
//			//System.out.println(socketString);
//		}
//		return socketString;
//	}

	public void connectionOccured(ConnectionEvent event) {
		connectSession(event.getHost());
	}

	public void ActionOccured(REPActionEvent event) {
		System.out.println("Action!");
		SocketChannel editorChannel = event.getEditorChannel();
		int sid = event.getSID();
		int eid = 0;
		//int eid = event.getEID(); 
		//sessionlist.addEditor(editorChannel, sid, eid);
		Editor editor = new Editor(editorChannel);
		Session session = sessionlist.getSession(sid);
		session.addEditor(editor);
		
		Editor master = session.getMaster();
		
		REPCommand command = new REPCommand();
		command.setCMD(REP.SMCMD_SELECT);
		command.setSID(sid);
		
		master.send(command);
		
		REPPacketSend send = new REPPacketSend(editorChannel);
		send.send(new REPCommand(REP.SMCMD_SELECT_ACK, sid, eid, 0,0,0,""));
		
		
		
		//sessionlist.sendSelect(sid);
	}
}