Mercurial > hg > RemoteEditor > REPSessionManager
view rep/SessionManager.java @ 491:5945266c970d
before unMergedCmds fix , deadlockTimer API
author | one |
---|---|
date | Sat, 23 Oct 2010 12:34:46 +0900 |
parents | cc262a519b8a |
children | ebfa3b05a8dd |
line wrap: on
line source
package rep; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.util.LinkedList; import org.xml.sax.SAXException; import rep.channel.REPLogger; import rep.channel.REPSocketChannel; import rep.gui.CloseButtonEvent; import rep.gui.DoGUIUpdate; import rep.gui.SelectButtonEvent; import rep.gui.SessionManagerEvent; import rep.gui.SessionManagerEventListener; import rep.gui.SessionManagerGUI; import rep.gui.SessionManagerGUIimpl; import rep.handler.Editor; import rep.handler.REPNode; import rep.handler.FirstConnector; import rep.handler.Forwarder; import rep.xml.SessionXMLDecoder; import rep.xml.SessionXMLEncoder; /* +-------+--------+--------+-------+--------+---------+------+ | cmd | session| editor | seqid | lineno | textsiz | text | | | id | id | | | | | +-------+--------+--------+-------+--------+---------+------+ o---------- header section (network order) ----------o int cmd; kind of command int sid; session ID : uniqu to editing file int eid; editor ID : owner editor ID = 1。Session に対して unique -1 session manager command -2 merge command int seqno; Sequence number : sequence number はエディタごとに管理 int lineno; line number int textsize; textsize : bytesize byte[] text; */ public class SessionManager extends ServerMainLoop implements SessionManagerEventListener { SessionList sessionList = new SessionList(); // Known Session Manager List, At most one parent. No parent means master. SessionManagerList smList = new SessionManagerList(); // Known Editor list. Connected Editor has a channel. // Session Manager Channel may have dummy editors. public EditorList editorList = new EditorList(); // Queue limit for debugging purpose. static final int packetLimit = 400; // globalSessionID = SessionManagerID * MAXID + localSessionID private static final int MAXID = 10000; SessionXMLDecoder decoder = new SessionXMLDecoder(); SessionXMLEncoder encoder = new SessionXMLEncoder(); // SocketChannel for our parent. At most one parent is allowed. private REPNode sm_join_channel; // Routing table for session and session manager. private RoutingTable routingTable = new RoutingTable(this); // sync option public boolean sync = true; // list olny local editor in GUI protected boolean listLocalEditorOnly = false; static public REPLogger logger = REPLogger.singleton(); public static void main(String[] args) throws InterruptedException, IOException { int port =ServerMainLoop.DEFAULT_PORT; int port_s = ServerMainLoop.DEFAULT_PORT; //System.setProperty("file.encoding", "UTF-8"); if(args.length > 0){ if (args.length!=2) { logger.writeLog("Usage: sessionManager our_port parent_port"); return; } port = Integer.parseInt(args[0]); port_s = Integer.parseInt(args[1]); } SessionManager sm = new SessionManager(); sm.setReceivePort(port); sm.setParentPort(port_s); // Ok start main loop sm.init(port,new SessionManagerGUIimpl(sm)); } public void init(int port, SessionManagerGUI gui) throws IOException, InterruptedException { mainLoop(this, port, gui); } /* * After loop detection, we give up session manager join. */ private void cancel_sm_join() { logger.writeLog("Loop detected "+this); removeChannel(this, sm_join_channel); sm_join_channel=null; } /** * GUI から、呼ばれて、Session Managerに接続する。 * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては * ならない。複数のSession Managerにjoinすることは出来ない。(NATを実装するまでは)。 * @param host * @return error massage, return null when no errors. */ public String connectSessionManager(String host, int port) { if (sm_join_channel!=null) return "No Channel of sessionManager, Network Error."; if (!sessionList.isEmpty()) return "Cannot connect with sessions."; if (!smList.isMaster()) return "Already connected to the master."; /* * IPv6 対応では、複数のアドレスを取って、それのすべてに接続を試す必要が * ある。 */ InetSocketAddress addr = new InetSocketAddress(host, port); try { REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker()); sessionchannel.connect(addr); while(!sessionchannel.finishConnect()); REPNode sm = new FirstConnector(this,sessionchannel); registerChannel(sessionchannel, sm); sm_join(sm); }catch (IOException e) { } return null; } public void connectSessionManager(String host) { connectSessionManager(host,parent_port); } /** * channel に SMCMD_SM_JOIN command を送る。 * @param channel */ private void sm_join(REPNode channel){ sm_join_channel = channel; //SM_JOINコマンドを生成。 REPCommand command = new REPCommand(); command.setCMD(REP.SMCMD_SM_JOIN); command.setEID(-1); // request Parent SessionManagerID command.setSID(-1); // request SessionManagerID //hostnameをセット。 setMyHostName(channel.getLocalHostName()); String string = myHost; command.setString(string); //SM_JOINコマンドを送信。 channel.send(command); // ack を受け取ったら、SessionManagerのListに追加。ここではやらない。 } /* * Select Session from Manager button * selected editor is joined editor directly connected to this session * manager. */ public void selectSession(SelectButtonEvent event) throws IOException { int sid = event.getSID(); Session session = sessionList.get(sid); if (session==null) throw new IOException(); REPNode editor = event.getEditor(); if (editor.hasSession()) return; // assert(getSMID(editor.eid)==smList.sessionManagerID()); // assert(editor.channel!=null); editor.setSID(sid); // mark as selected selectSession0(sid, session, editor.getEID(), editor); } private void selectSession0(int sid, Session session, int eid, REPNode editor) { logger.writeLog("Select sid="+sid+" and "+editor); if (editor.isDirect()&&editor.getEID()==eid) { REPCommand command = new REPCommand(); command.setSID(sid); command.setEID(eid); command.setString(session.getName()); if(session.hasOwner()){ editor.selectSession(command,session); }else { forwardSelect(sid, session, eid, editor); } } else { // we don't have this editor, search the editor first. REPNode next = routingTable.toSessionManager(getSMID(eid)); // pass the select command to the next path. REPCommand command = new REPCommand(); command.setCMD(REP.SMCMD_SELECT0); command.setSID(sid); command.setEID(eid); command.setString(editor.getHost()); next.send(command); } } public void forwardSelect(int sid, Session session, int eid, REPNode editor) { REPNode next; // session searching continue... next = routingTable.toSessionManager(getSMID(sid)); // make a forwarding channel here REPNode f = createSessionForwarder(sid, next); session.setFirstForwarder(f); session.addForwarder(editor); // pass the select command to the next path. REPCommand command = new REPCommand(); command.setCMD(REP.SMCMD_SELECT); command.setSID(sid); command.setEID(eid); command.setString(editor.getHost()); next.send(command); } private REPNode createSessionForwarder(int sid, REPNode editor) { REPNode f = new Forwarder(this,editor.channel); f.setEID(makeID(editorList.newEid())); // f.setChannel(editor.channel); // incoming channel f.setHost(myHost); f.setSID(sid); return f; } /* * Create and send UPDATE command. */ public void sendUpdate(int sid) { REPCommand command = makeREPCommandWithSessionList(REP.SMCMD_UPDATE); command.setSID(sid); command.setEID(REP.SM_EID.id); if (isMaster()) { command.setCMD(REP.SMCMD_UPDATE_ACK); smList.sendToSlaves(command); } else { smList.sendToMaster(command); } } /* * Create new editor in this sessin manager. A dummy editor * is created also. */ public REPNode newEditor(REPSocketChannel<REPCommand> channel) { int eid = makeID(editorList.newEid()); REPNode editor = new Editor(eid, this, channel); editorList.add(editor); return editor; } /* * Create new session. */ public Session newSession(REPNode master) { int sid= makeID(sessionList.newSessionID()); Session session = new Session(sid, master); sessionList.put(sid, session); return session; } /* * GUI command interface for close session. */ public void closeSession(SessionManagerEvent event) { Session session = ((CloseButtonEvent) event).getSession(); session.closeSession(); sessionList.remove(session); manager.updateGUI(this); } /* * Remove editors which has the cannel. */ public void remove(REPSocketChannel<REPCommand> channel) { int i = 0; for(Session s:sessionList.values()) { if (s.deleteEditor(channel)) { i++; } } assert(i==1); // can be other session manager? what should I do? } public void remove(Editor editor) { Session s = sessionList.get(editor.getSID()); if (s==null) { assert(false); editorList.remove(editor); } else if (editor.isMaster()) { removeSession(s); } else { s.deleteForwarder(editor); editorList.remove(editor); } manager.updateGUI(this); } private void removeSession(Session s0) { s0.remove(this); sessionList.remove(s0); sendUpdate(s0.getSID()); } public boolean sessionManage(REPNode forwarder, REPCommand command) throws ClosedChannelException, IOException { switch(command.cmd){ // Session Manager Command case SMCMD_JOIN: { // first connection or forwarded command routingTable.add(forwarder,getSMID(command.eid)); if(isMaster()) { REPCommand ackCommand = new REPCommand(); ackCommand.setCMD(REP.SMCMD_JOIN_ACK); ackCommand.setEID(command.eid); ackCommand.setSID(command.sid); ackCommand.string = command.string; smList.sendToSlaves(ackCommand); registEditor(forwarder,ackCommand); } else { smList.sendToMaster(command); } manager.updateGUI(this); } break; case SMCMD_PUT_ACK: if (forwarder.isDirect()) { // send put_ack to the editor now. command.setCMD(REP.SMCMD_PUT_ACK); command.string = command.string; command.setEID(command.eid); command.setSID(command.sid); forwarder.send(command); } case SMCMD_JOIN_ACK: registEditor(forwarder,command); manager.updateGUI(this); break; case SMCMD_PUT: { // first connection or forwarded command routingTable.add(forwarder,getSMID(command.eid)); REPCommand ack = new REPCommand(command); ack.setCMD(REP.SMCMD_PUT_ACK); if(isMaster()) { // Reached to the top of the tree, multicast the ack. smList.sendToSlaves(ack); registEditor(forwarder,ack); if (forwarder.isDirect()) { // If put editor on the master, no SMCMD_PUT_ACK is // generated. Send ack to the editor now. forwarder.send(ack); } } else { // Pass this to the master. smList.sendToMaster(command); // registEditor will be done by SMCMD_PUT_ACK } manager.updateGUI(this); } break; case SMCMD_SELECT0: /* * finding joining editor, do not make the path. */ REPNode editor = editorList.get(command.eid); if (editor==null|| !editor.isDirect()) { REPNode next = routingTable.toSessionManager(getSMID(command.eid)); next.send(command); break; } // we've found the editor, fall thru. case SMCMD_SELECT: { /* * finding active session ring from joined editor. */ Session session = sessionList.get(command.sid); if (session==null) { session = new Session(command.sid, command.string,null); sessionList.put(command.sid,session); } // Do not directly addForwarder(forwarder). It may be // shared among sessions. REPNode f = createSessionForwarder(command.sid, forwarder); session.addForwarder(f); // f.next is set up here. if(session.hasOwner()){ forwarder.selectSession(command,session); }else { forwardSelect(command.sid, session, command.eid, forwarder); } } break; case SMCMD_SELECT_ACK: { // Sessionが見つかったので、select したeditorに教える。 Session session = sessionList.get(command.sid); searchSelectedEditor(command,session.getForwarder(forwarder.channel)); } break; case SMCMD_SM_JOIN: { // SM_JOIN中にSMCMD_SM_JOINが来たら、これはループなので、 ///自分のSM_JOINを取り消す。 if (sm_join_channel!=null) cancel_sm_join(); // SMCMD_SM_JOIN は、master まで上昇する。 // masterでなければ、自分のparentに転送する。 if(isMaster()) { // master であれば、SessionManager IDを決めて、 // 自分のsmList に登録 registSessionManager(forwarder, command); } else { if (forwarder.sid==-1) { // direct link の場合は、識別のために、EIDに直上の // smid を入れておく。 command.setEID(smList.sessionManagerID()); } smList.sendToMaster(command); } } break; case SMCMD_SYNC_ACK: break; case SMCMD_SM_JOIN_ACK: send_sm_join_ack(command.eid, command.sid, command); break; case SMCMD_UPDATE: sendUpdate(command.sid); break; case SMCMD_UPDATE_ACK: command.setString(mergeUpdate(command)); // 下に知らせる smList.sendToSlaves(command); manager.updateGUI(this); break; default: return false; } return true; } private void registSessionManager(REPNode forwarder, REPCommand command) { REPNode sm; int psid = command.eid; if (forwarder.sid!=-1) { // すでに channelはSessionManager Idを持っていて、 // direct link ではないので、 // channel を持たないForwarderとして登録する sm = new Forwarder(this,null); } else { sm = forwarder; } int sid = smList.addNewSessionManager(sm,command); routingTable.add(forwarder,sid); REPCommand sendCommand = makeREPCommandWithSessionList(REP.SMCMD_SM_JOIN_ACK); // command.eid==smList.sesionManagerID() の場合は、 // 待っている自分の下のsessionManagerにsidをassignする必要がある。 sendCommand.setSID(sid); // new Session manager ID // 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた // Session manager IDを使う。 sendCommand.setEID(psid); send_sm_join_ack(psid, sid, sendCommand); } void send_sm_join_ack(int psid, int sid,REPCommand sendCommand) { if (psid==smList.sessionManagerID()) { // 直下のsessionManagerにIDを割り振る必要がある。 smList.assignSessionManagerIDtoWaitingSM(sid); // ここで smList に一つだけ追加されるので // 待っている最初のsm一つにだけ、sm_join_ackが新たに送られる。 } smList.sendToSlaves(sendCommand); } /* * 指定されたeditorがlocalにあるかどうかを調べる。なければ、他に送る。戻って何回も探すことが * あり得るので、よろしくない。 */ private void searchSelectedEditor(REPCommand command, REPNode editor) { for(;editor.isDirect();editor = editor.getNextForwarder()) { if (editor.getEID()==command.eid) { // select したeditor を見つけた editor.joinAck(command,command.sid); return; } } // ここにはありませんでした。 editor.send(command); } /** * UPDATE/UPDATE_ACKにより送られてきたSessionの情報を追加する * @param command * @return * @throws IOException */ private String mergeUpdate(REPCommand command) throws IOException { SessionList receivedSessionList; try { receivedSessionList = decoder.decode(command.string); } catch (SAXException e) { throw new IOException(); } // 受け取った情報と自分の情報を混ぜる。 sessionList.merge(receivedSessionList); //XMLを生成。送信コマンドにセット。 return encoder.sessionListToXML(sessionList); } /* * id has SessionManager ID part */ private int makeID(int newid) { return newid+smList.sessionManagerID()*MAXID; } int getSMID(int id) { return id/MAXID; } /** * Register Editor to our editorList. No connection is made. * @param forwarder Editor to be add * @param command */ public void registEditor(REPNode forwarder,REPCommand command) { // make ack for PUT/JOIN. Do not send this to the editor, // before select. After select, ack is sent to the editor. REPNode editor; if (getSMID(command.eid)==smList.sessionManagerID()) { if (forwarder.isDirect()) { editor = (Editor)forwarder; } else return; } else { editor = new Editor(manager, command.eid); } editor.setName(command.string); editor.setSID(command.sid); if (!editorList.hasEid(command.eid)) { editorList.add(editor); } if (command.cmd==REP.SMCMD_PUT_ACK) { Session session = new Session(command.sid, command.string, editor); sessionList.put(command.sid, session); } // we don't join ack to the direct linked editor. We // have to wait select command } private REPCommand makeREPCommandWithSessionList(REP cmd) { //SessionListからXMLを生成。 //joinしてきたSessionManagerに対してACKを送信。 REPCommand sendCommand = new REPCommand(); sendCommand.setCMD(cmd); sendCommand.setString(encoder.sessionListToXML(sessionList)); return sendCommand; } public boolean isMaster() { return smList.isMaster(); } public void setSessionManagerID(int sid) { smList.setSessionManagerID(sid); } public Session getSession(int sid) { return sessionList.get(sid); } public void setParent(REPNode fw) { smList.setParent(fw); } public String toString() { int myId = 0; if (smList!=null) myId = smList.sessionManagerID(); return "rep.SessionManager-"+myId+"@"+super.toString(); } public void addWaitingSessionManager(REPNode fw, REPCommand command) { smList.addWaitingSessionManager(fw, command); } public int getId() { return smList.sessionManagerID(); } public void checkWaitingCommandInMerge() { for(REPNode e:editorList.values()) { e.checkWaitingCommandInMerge(); } } /** * Notify status change to our GUI */ protected void updateGUI(ServerMainLoop serverMainLoop) { //リストのコピーをGUIに渡す LinkedList<Session> sList = new LinkedList<Session>(sessionList.values()); LinkedList<REPNode> eList; if (listLocalEditorOnly ) { // Do not list an editor or session outside of this session manager to // avoid confusion. Actually we can joinSession to an editor outside of // this manager, but is not secure to do this. eList = new LinkedList<REPNode>(); for(REPNode e:editorList.values()) { if (getSMID(e.eid)==smList.sessionManagerID()) { eList.add(e); } } } else { eList = new LinkedList<REPNode>(editorList.values()); } //GUIに反映 Runnable doRun = new DoGUIUpdate(sList, eList, serverMainLoop.gui); serverMainLoop.gui.invokeLater(doRun); } @Override public void deadlockDetected() throws IOException { logger.writeLog("Deadlock detected"); for(REPNode e:editorList.values()) { logger.writeLog(" "+e+"\n"+e.report()); } throw new IOException(); } }