Mercurial > hg > RemoteEditor > REPSessionManager
view rep/SessionManager.java @ 82:4bb583553a42
*** empty log message ***
author | pin |
---|---|
date | Tue, 11 Dec 2007 14:26:13 +0900 |
parents | 13819571691d |
children | 9381b4734a0b |
line wrap: on
line source
package rep; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.StringTokenizer; import rep.xml.SessionXMLDecoder; import rep.xml.SessionXMLEncoder; //+-------+--------+--------+-------+--------+---------+------+ //| cmd | session| editor | seqid | lineno | textsiz | text | //| | id | id | | | | | //+-------+--------+--------+-------+--------+---------+------+ //o-------header section (network order)-------------o /*int cmd; // command int sid; // session ID int eid; // editor ID int seqno; // Sequence number int lineno; // line number int textsize; // textsize byte[] text;*/ public class SessionManager implements ConnectionListener, REPActionListener{ private SessionList sessionlist; //SocketChannel sessionchannel; private SessionManagerGUI sessionmanagerGUI; private Selector selector; private SessionManagerList smList; private String myHost; private boolean isMaster = true; private EditorList allEditorList; private String maxHost; //private boolean addressIsGlobal; //private SocketChannel sessionchannel; //private boolean co; public SessionManager(int port) { sessionmanagerGUI = new SessionManagerGUI(); } public void openSelector() throws IOException{ selector = Selector.open(); } public void sessionManagerNet(int port) throws InterruptedException, IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.socket().bind(new InetSocketAddress(port)); ssc.register(selector, SelectionKey.OP_ACCEPT); sessionlist = new SessionList(); smList = new SessionManagerList(); allEditorList = new EditorList(); while(true){ selector.select(); for(SelectionKey key : selector.selectedKeys()){ if(key.isAcceptable()){ /*** serverChannelはenableになったSelectionKeyのchannel ***/ ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel(); /*** EditorChannel を用いない記述 ***/ SocketChannel channel = serverChannel.accept(); //keyからchannelを取って、accept registerChannel (selector, channel, SelectionKey.OP_READ); channel = null; /*** EditorChannel を用いた記述 ****/ //EditorChannel echannel = (EditorChannel) ssc.accept(); //echannel.setIO(); //registerChannel(selector, echannel, SelectionKey.OP_READ); //echannel = null; /*** SelectableEditorChannel ***/ //SocketChannel channel = ssc.accept(); //SelectableEditorChannel echannel2 = new SelectableEditorChannel(channel); //registerChannel(selector, echannel2, SelectionKey.OP_READ); //channel = null; //echannel2 = null; }else if(key.isReadable()){ /*** EditorChannel を用いない記述 ***/ SocketChannel channel = (SocketChannel)key.channel(); REPPacketReceive receive = new REPPacketReceive(channel); //getPacket(), putPacket() にする。 receive.setkey(key); //REPCommand repCom = repRec.unpackUConv(); REPCommand receivedCommand = receive.unpack(); manager(channel, receivedCommand); /*** EditorChannel を用いた記述 ****/ //EditorChannel echannel = (EditorChannel) key.channel(); //REPCommand command = echannel.getPacket(); //manager(echannel, command); }else if(key.isConnectable()){ System.out.println("Connectable"); } } } } private synchronized void registerChannel(Selector selector, SelectableChannel channel, int ops) throws IOException { if(channel == null) { return; } //System.out.println("registerChannel()"); channel.configureBlocking(false); selector.wakeup(); channel.register(selector, ops); } private void manager(SocketChannel channel, REPCommand receivedCommand) { if(receivedCommand == null) return; Editor editor; Session session; REPCommand sendCommand = receivedCommand.clone(); REPPacketSend send = new REPPacketSend(channel); //SessionXMLEncoder encoder = new SessionXMLEncoder(); switch(receivedCommand.cmd){ case REP.SMCMD_JOIN: if(isMaster){ int eid = allEditorList.addEditor(channel, receivedCommand); receivedCommand.setEID(eid); allEditorList.sendJoinAck(channel, receivedCommand); sessionmanagerGUI.setComboEditor(eid, channel); }else{ allEditorList.addEditor(channel); smList.sendJoin(receivedCommand); //sessionmanagerGUI.setComboEditor(repCmd.eid, channel); } break; case REP.SMCMD_JOIN_ACK: // editorList.setEID(repCmd); // editorList.sendJoinAck(repCmd); // sessionmanagerGUI.setComboEditor(repCmd.eid, channel); break; case REP.SMCMD_PUT: editor = new Editor(channel); editor.setEID(1); editor.setName(receivedCommand.string); session = new Session(editor); session.setOwner(true); session.addEditor(editor); sessionlist.addSession(session); sessionmanagerGUI.setComboSession(session.getSID(), session.getName()); sessionmanagerGUI.setComboEditor(editor.getEID(), editor.getChannel()); session.addToRoutingTable(editor); receivedCommand.setCMD(REP.SMCMD_PUT_ACK); receivedCommand.setEID(1); receivedCommand.setSID(session.getSID()); editor.send(receivedCommand); //if(isMaster){ SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session); REPCommand command = new REPCommand(); command.setSID(session.getSID()); command.setString(sessionEncoder.sessionListToXML()); if(isMaster){ command.setCMD(REP.SMCMD_UPDATE_ACK); smList.sendToSlave(command); }else{ command.setCMD(REP.SMCMD_UPDATE); smList.sendToMaster(command); } break; // case REP.SMCMD_PUT_ACK: // break; case REP.SMCMD_SELECT: // sessionlist.addEditor(channel, repCmd.sid, repCmd); //sessionlistへ追加 editor = new Editor(channel); session = sessionlist.getSession(receivedCommand.sid); if(session.isOwner()){ int eid = session.addEditor(editor); editor.setEID(eid); //REPPacketSend send = new REPPacketSend(channel); receivedCommand.setCMD(REP.SMCMD_SELECT_ACK); receivedCommand.setEID(eid); send.send(receivedCommand); }else { } break; case REP.SMCMD_SELECT_ACK: receivedCommand.setCMD(REP.SMCMD_JOIN_ACK); receivedCommand.setEID(receivedCommand.eid); session = sessionlist.getSession(receivedCommand.sid); session.sendToEditor(receivedCommand); //Editor editor3 = session3.getEditorList().get(0); //REPPacketSend send = new REPPacketSend(editor3.getChannel()); //send.send(repCmd); break; case REP.SMCMD_SM_JOIN: //XMLからSessionListオブジェクトを生成する。 SessionXMLDecoder decoder = new SessionXMLDecoder(); SessionList receivedSessionList = decoder.decode(receivedCommand.string); //myHost を設定。 if(myHost == null) setMyHostName(getLocalHostName(channel)); //maxHost を設定。 setMaxHost(channel, receivedSessionList.getMaxHost()); //SessionListからXMLを生成。 //joinしてきたSessionManagerに対してACKを送信。 SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionlist); sendCommand = new REPCommand(); sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK); sendCommand.setString(sessionlistEncoder.sessionListToXML()); send.send(sendCommand); //その他のSessionManagerに対してSMCMD_SM_JOINを送信。 sendCommand = new REPCommand(); sendCommand.setCMD(REP.SMCMD_SM_JOIN); sendCommand.setString(receivedCommand.string); smList.sendExcept(channel, sendCommand); if(isMaster){ }else { } break; case REP.SMCMD_SM_JOIN_ACK: //XMLからSessionListオブジェクトを生成。 SessionXMLDecoder decoder2 = new SessionXMLDecoder(); SessionList receivedSessionList2 = decoder2.decode(receivedCommand.string); //maxHostを決定。 setMaxHost(channel, receivedSessionList2.getMaxHost()); if(isMaster){ }else{ } break; case REP.SMCMD_UPDATE: //SessionXMLDecoder decoder = new SessionXMLDecoder(receivedCommand.string); editor = new Editor(channel); editor.setName(receivedCommand.string); session = new Session(editor); session.addEditor(editor); sessionlist.addSession(session); sessionmanagerGUI.setComboSession(session.getSID(), session.getName()); if(isMaster){ receivedCommand.setCMD(REP.SMCMD_UPDATE_ACK); smList.sendToSlave(receivedCommand); }else{ receivedCommand.setCMD(REP.SMCMD_UPDATE); smList.sendToMaster(receivedCommand); } break; case REP.SMCMD_UPDATE_ACK: if(receivedCommand.sid > sessionlist.getList().size()){ editor = new Editor(channel); editor.setName(receivedCommand.string); session = new Session(editor); session.addEditor(editor); sessionlist.addSession(session); sessionmanagerGUI.setComboSession(session.getSID(), session.getName()); } smList.sendToSlave(receivedCommand); break; case REP.REPCMD_READ: //sessionlist.sendCmd(channel, repCmd); break; default: //sessionlist.sendCmd(channel, repCmd); sessionlist.sendToNextEditor(channel, receivedCommand); break; } } private boolean setMaxHost(SocketChannel channel, String host) { if(maxHost == null) { maxHost = myHost; sessionlist.setMaxHost(maxHost); } if(host.compareTo(maxHost) > 0){ //host > MaxHost なら maxHost = host //masterを設定する。 maxHost = host; sessionlist.setMaxHost(maxHost); setMaster(false, channel); return true; }else{ return false; } } private void setMyHostName(String localHostName) { myHost = localHostName; if(maxHost == null) { maxHost = myHost; sessionlist.setMaxHost(maxHost); } allEditorList.setHost(myHost); } private void setMaster(boolean b, SocketChannel channel) { isMaster = b; System.out.println("isMaster = " + b); smList.setMaster(channel); } public static void main(String[] args) throws InterruptedException, IOException { int port = 8766; if(args.length > 0){ port = Integer.parseInt(args[0]); } SessionManager sm = new SessionManager(port); sm.openSelector(); sm.openWindow(); sm.sessionManagerNet(port); } private void openWindow() { Thread th = new Thread( sessionmanagerGUI ); th.start(); //System.out.println(sessionmanagerGUI.toString()); sessionmanagerGUI.addConnectionListener(this); sessionmanagerGUI.addREPActionListener(this); } private void connectSession(String host) { int port = 8766; InetSocketAddress addr = new InetSocketAddress(host, port); try { SocketChannel sessionchannel = SocketChannel.open(); sessionchannel.configureBlocking(true); sessionchannel.connect(addr); while(!sessionchannel.finishConnect()){ System.out.print("test afro"); } System.out.println(""); registerChannel(selector, sessionchannel, SelectionKey.OP_READ); sm_join(sessionchannel); }catch (IOException e) { e.printStackTrace(); } } private void sm_join(SocketChannel channel){ //SM_JOINコマンドを生成。 REPCommand command = new REPCommand(); command.setCMD(REP.SMCMD_SM_JOIN); //hostnameをセット。 setMyHostName(getLocalHostName(channel)); //XMLを生成。送信コマンドにセット。 SessionXMLEncoder encoder = new SessionXMLEncoder(sessionlist); String string = encoder.sessionListToXML(); command.setString(string); //SM_JOINコマンドを送信。 REPPacketSend send = new REPPacketSend(channel); send.send(command); //SessionManagerのListに追加。 smList.add(channel); } private String getLocalHostName(SocketChannel channel) { String host = null; host = channel.socket().getLocalAddress().getHostName(); return host; } // private String getSocketString(SocketChannel sessionchannel) { // SocketAddress socket = sessionchannel.socket().getRemoteSocketAddress(); // //String inetAddressString = sessionchannel.socket().getInetAddress().toString(); // StringTokenizer stn = new StringTokenizer(socket.toString(), "/"); // String socketString = null; // while(stn.hasMoreTokens()){ // socketString = stn.nextToken(); // //System.out.println(socketString); // } // return socketString; // } public void connectionOccured(ConnectionEvent event) { connectSession(event.getHost()); } public void ActionOccured(REPActionEvent event) { System.out.println("Action!"); SocketChannel editorChannel = event.getEditorChannel(); int sid = event.getSID(); int eid = 0; //int eid = event.getEID(); //sessionlist.addEditor(editorChannel, sid, eid); Editor editor = new Editor(editorChannel); Session session = sessionlist.getSession(sid); session.addEditor(editor); Editor master = session.getMaster(); REPCommand command = new REPCommand(); command.setCMD(REP.SMCMD_SELECT); command.setSID(sid); master.send(command); REPPacketSend send = new REPPacketSend(editorChannel); send.send(new REPCommand(REP.SMCMD_SELECT_ACK, sid, eid, 0,0,0,"")); //sessionlist.sendSelect(sid); } }