Mercurial > hg > RemoteEditor > REPSessionManager
changeset 384:bcdf5476b8e4
restructured-version
author | one@firefly.cr.ie.u-ryukyu.ac.jp |
---|---|
date | Mon, 10 Nov 2008 22:16:37 +0900 |
parents | 6d48db302b07 |
children | 1fca50ce3508 |
files | rep/Session.java rep/SessionManager.java rep/handler/Dispatcher.java rep/handler/Editor.java rep/handler/FirstConnector.java rep/handler/NullForwarder.java rep/handler/REPNode.java test/sematest/TestSessionManager.java |
diffstat | 8 files changed, 46 insertions(+), 322 deletions(-) [+] |
line wrap: on
line diff
--- a/rep/Session.java Mon Nov 10 22:13:40 2008 +0900 +++ b/rep/Session.java Mon Nov 10 22:16:37 2008 +0900 @@ -144,7 +144,7 @@ public REPNode getForwarder(REPSocketChannel<REPCommand> channel) { REPNode f = first; while(f.channel!=channel) f = f.next; - SessionManager.logger.writeLog("getFirstForwarder="+f.next+"=>"+f.next.channel); + ServerMainLoop.logger.writeLog("getFirstForwarder="+f.next+"=>"+f.next.channel); return f.next; } @@ -201,7 +201,7 @@ break; } } - SessionManager.logger.writeLog(log); + ServerMainLoop.logger.writeLog(log); } }
--- a/rep/SessionManager.java Mon Nov 10 22:13:40 2008 +0900 +++ b/rep/SessionManager.java Mon Nov 10 22:16:37 2008 +0900 @@ -4,36 +4,25 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import org.xml.sax.SAXException; import rep.channel.REPLogger; -import rep.channel.REPServerSocketChannel; import rep.channel.REPSocketChannel; import rep.gui.CloseButtonEvent; -import rep.gui.DoGUIUpdate; import rep.gui.SelectButtonEvent; import rep.gui.SessionManagerEvent; import rep.gui.SessionManagerEventListener; import rep.gui.SessionManagerGUI; import rep.gui.SessionManagerGUIimpl; -import rep.handler.Dispatcher; import rep.handler.Editor; import rep.handler.REPNode; import rep.handler.FirstConnector; import rep.handler.Forwarder; -import rep.channel.REPSelector; import rep.xml.SessionXMLDecoder; import rep.xml.SessionXMLEncoder; -import rep.channel.REPSelectionKey; /* +-------+--------+--------+-------+--------+---------+------+ @@ -53,32 +42,16 @@ byte[] text; */ -public class SessionManager implements SessionManagerEventListener{ - static public REPLogger logger = REPLogger.singleton(); - - SessionList sessionList; - private SessionManagerGUI gui; - // Main nio.Selector of this server - private REPSelector<REPCommand> selector; +public class SessionManager extends ServerMainLoop + implements SessionManagerEventListener { + SessionList sessionList = new SessionList(); // Known Session Manager List, At most one parent. No parent means master. - SessionManagerList smList; + SessionManagerList smList = new SessionManagerList(); // Known Editor list. Connected Editor has a channel. // Session Manager Channel may have dummy editors. - EditorList editorList; - // Commands for busy editor are kept in this queue. - private List<PacketSet> waitingCommandInMerge; - // Command from gui. Synchronization is required. - private BlockingQueue<SessionManagerEvent> waitingEventQueue - = new LinkedBlockingQueue<SessionManagerEvent>();; - // host name of this server. One of connecting SocketChannel's hostname - public String myHost; - // Single threaded write queueu. To avoid dead lock with too many writes. - private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); - private int receive_port; - private int parent_port; - static final int DEFAULT_PORT = 8766; + EditorList editorList = new EditorList(); // Queue limit for debugging purpose. - private static final int packetLimit = 200; + static final int packetLimit = 200; // globalSessionID = SessionManagerID * MAXID + localSessionID private static final int MAXID = 10000; @@ -88,12 +61,12 @@ private REPNode sm_join_channel; // Routing table for session and session manager. private RoutingTable routingTable = new RoutingTable(this); - private SessionManagerEvent execAfterConnect = null;; - + + static public REPLogger logger = REPLogger.singleton(); + public static void main(String[] args) throws InterruptedException, IOException { - - int port = DEFAULT_PORT; - int port_s = DEFAULT_PORT; + int port =ServerMainLoop.DEFAULT_PORT; + int port_s = ServerMainLoop.DEFAULT_PORT; //System.setProperty("file.encoding", "UTF-8"); if(args.length > 0){ if (args.length!=2) { @@ -110,174 +83,8 @@ sm.init(port,new SessionManagerGUIimpl(sm)); } - public void setReceivePort(int port) { - receive_port = port; - } - public void init(int port, SessionManagerGUI gui) throws IOException, InterruptedException { - this.gui = gui; - init(port); - mainLoop(); - } - - private void init(int port) throws InterruptedException, IOException { - selector = REPSelector.<REPCommand>create(); - REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker()); - ssc.configureBlocking(false); // Selector requires this - ssc.socket().setReuseAddress(true); //reuse address 必須 - //getAllByNameで取れた全てのアドレスに対してbindする - ssc.socket().bind(new InetSocketAddress(port)); - ssc.register(selector, SelectionKey.OP_ACCEPT, - new Dispatcher(this)); // FirstConnector? - - sessionList = new SessionList(); - smList = new SessionManagerList(); - editorList = new EditorList(); - waitingCommandInMerge = new LinkedList<PacketSet>(); - - - } - - /* - * The main loop. - * Check incoming events and waiting writes. - * Do select and call select() to check in coming packets. - * We wrote everything in one thread, but we can assign - * one thread for each communication channel and GUI event. - */ - public void mainLoop() throws IOException { - while(true){ - checkWaitingCommandInMerge(); - if (checkInputEvent() || - checkWaitingWrite()) { - // try to do fair execution for waiting task - if(selector.selectNow() > 0) select(); - continue; - } - // now we can wait for input packet or event - selector.select(); - select(); - } - } - - /* - * Synchronize GUI event in the main loop. - */ - private boolean checkInputEvent() { - SessionManagerEvent e; - if((e = waitingEventQueue.poll())!=null){ - e.exec(this); - return true; - } - return false; - } - - /* - * Write a packet during the main loop. - */ - private boolean checkWaitingWrite() throws IOException { - PacketSet p = writeQueue.poll(); - if (p!=null) { - p.channel.write(p.command); - return true; - } - return false; - } - - /** - * Check waiting command in merge - * @return true if there is a processed waiting command - * @throws IOException - */ - private void checkWaitingCommandInMerge() { - List<PacketSet> w = waitingCommandInMerge; - waitingCommandInMerge = new LinkedList<PacketSet>(); - for(PacketSet p: w) { - REPNode e = p.getEditor(); - if(e.isMerging()) { // still merging do nothing - waitingCommandInMerge.add(p); - } else { - try { - if (sessionManage(e, p.command)) { // we don't need this - assert false; - return; - } - e.manage(p.command); - } catch (Exception e1) { - // should be e.close()? - close(p.channel); - } - } - } - } - - /* - * If we have waiting write commands, further sent commands also - * wait to avoid out of order packet sending. - */ - public boolean hasWaitingCommand(REPSocketChannel<REPCommand>c) { - for(PacketSet p:waitingCommandInMerge) { - if (p.channel==c) { - return true; - } - } - return false; - } - - /* - * Close a channel in case of exception or close. - */ - private void close(REPSocketChannel<REPCommand> channel) { - REPSelectionKey<REPCommand>key = channel.keyFor1(selector); - REPNode handler = (REPNode)key.attachment(); - key.cancel(); - handler.cancel(channel); - // we have to remove session/enditor - } - - - /* - * Do select operation on the Selector. Each key has a forwarder. - * A forwarder can be a firstConnector, a forwarder for Session Manager - * or an Editor. - */ - private void select() throws IOException { - - Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); - for(REPSelectionKey<REPCommand> key : keys){ - if(key.isAcceptable()){ - /* - * Incoming connection. We don't know which, editor or - * session manager. Assign FirstConnector to distinguish. - */ - REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); - logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); - registerChannel(channel, new FirstConnector(this)); - channel = null; - }else if(key.isReadable()){ - /* - * Incoming packets are handled by a various forwarder. - * A hadler throw IOException() in case of a trouble to - * close the channel. - */ - REPNode handler = (REPNode)key.attachment(); - try { - handler.handle(key); - } catch (IOException e) { - key.cancel(); - handler.cancel(key.channel1()); - } - } - } - } - - public void registerChannel(REPSocketChannel<REPCommand> channel,REPNode handler) throws IOException { - if(channel == null) { - return; - } - handler.setChannel(channel); - channel.configureBlocking(false); - channel.register(selector, SelectionKey.OP_READ, handler); + mainLoop(this, port, gui); } /* @@ -285,56 +92,11 @@ */ private void cancel_sm_join() { logger.writeLog("Loop detected "+this); - removeChannel(sm_join_channel); + removeChannel(this, sm_join_channel); sm_join_channel=null; } - private void removeChannel(REPNode channel) { - REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector); - key.cancel(); - try { - channel.channel.close1(); - } catch (IOException e) { - } - } - - - void updateGUI() { - //リストのコピーをGUIに渡す - LinkedList<Session> sList = new LinkedList<Session>(sessionList.values()); - LinkedList<REPNode> eList; - if (false) { - // local editor only - eList = new LinkedList<REPNode>(); - for(REPNode e:editorList.values()) { - if (getSMID(e.eid)==smList.sessionManagerID()) { - eList.add(e); - } - } - } else { - eList = new LinkedList<REPNode>(editorList.values()); - } - //GUIに反映 - Runnable doRun = new DoGUIUpdate(sList, eList, gui); - gui.invokeLater(doRun); - } - - - - public void setMyHostName(String localHostName) { - myHost = localHostName + receive_port; - setHostToEditor(myHost); - } - - private void setHostToEditor(String myHost2) { - for(REPNode editor : editorList.values()){ - if (editor.channel!=null) - editor.setHost(myHost2); - } - } - - /** * GUI から、呼ばれて、Session Managerに接続する。 * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては @@ -507,31 +269,6 @@ return session; } - public void addWaitingCommand(PacketSet set) { - waitingCommandInMerge.add(set); - } - - /* - * Synchronize GUI command in this session manager. - */ - public void buttonPressed(SessionManagerEvent event) { - try { - waitingEventQueue.put(event); - } catch (InterruptedException e) {} - selector.wakeup(); - } - - /* - * Execute incoming event during the initialization for - * testing purpose. - */ - public void syncExec(SessionManagerEvent event) { - try { - waitingEventQueue.put(event); - } catch (InterruptedException e) { - } - } - /* * GUI command interface for close session. */ @@ -557,12 +294,6 @@ } - public void addWriteQueue(PacketSet packetSet) { - writeQueue.addLast(packetSet); - assert(writeQueue.size()<packetLimit) ; - } - - public void remove(Editor editor) { Session s = sessionList.get(editor.getSID()); if (s==null) { @@ -583,18 +314,6 @@ sendUpdate(s0.getSID()); } - public void setParentPort(int port) { - parent_port = port; - } - public int getParentPort() { - return parent_port; - } - - public int getPort() { - return receive_port; - } - - public boolean sessionManage(REPNode forwarder, REPCommand command) throws ClosedChannelException, IOException { switch(command.cmd){ @@ -822,7 +541,7 @@ return newid+smList.sessionManagerID()*MAXID; } - private int getSMID(int id) { + int getSMID(int id) { return id/MAXID; } @@ -882,15 +601,6 @@ return sessionList.get(sid); } - public void execAfterConnect(SessionManagerEvent sessionManagerEvent) { - execAfterConnect = sessionManagerEvent; - } - - public void afterConnect() { - if (execAfterConnect!=null) execAfterConnect.exec(this); - execAfterConnect = null; - } - public void setParent(REPNode fw) { smList.setParent(fw); } @@ -898,12 +608,6 @@ public String toString() { int myId = 0; if (smList!=null) myId = smList.sessionManagerID(); - return "rep.SessionManager-"+myId+"@"+myHost+":"+receive_port; + return "rep.SessionManager-"+myId+"@"+super.toString(); } - - public void addWaitingSessionManager(REPNode fw, REPCommand command) { - smList.addWaitingSessionManager(fw, command) ; - } - - }
--- a/rep/handler/Dispatcher.java Mon Nov 10 22:13:40 2008 +0900 +++ b/rep/handler/Dispatcher.java Mon Nov 10 22:16:37 2008 +0900 @@ -3,6 +3,7 @@ import java.io.IOException; import rep.REPCommand; +import rep.ServerMainLoop; import rep.Session; import rep.SessionManager; import rep.channel.REPSelectionKey; @@ -42,7 +43,7 @@ */ REPSocketChannel<REPCommand> channel = key.channel1(); REPCommand command = channel.read(); - SessionManager.logger.writeLog("REPHandlerImpl.handle() : command = " + command); + ServerMainLoop.logger.writeLog("REPHandlerImpl.handle() : command = " + command); if (manager.sessionManage(this, command)) return; distpatchToEditor(channel, command);
--- a/rep/handler/Editor.java Mon Nov 10 22:13:40 2008 +0900 +++ b/rep/handler/Editor.java Mon Nov 10 22:16:37 2008 +0900 @@ -7,6 +7,7 @@ import rep.PacketSet; import rep.REP; import rep.REPCommand; +import rep.ServerMainLoop; import rep.SessionManager; import rep.channel.REPSelectionKey; import rep.channel.REPSocketChannel; @@ -89,7 +90,7 @@ if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) { String err = "Editor.checkReturnedCommand() : command = " + command + " prev="; err += prev==null?"null":prev.toString(); - SessionManager.logger.writeLog(err); + ServerMainLoop.logger.writeLog(err); assert(false); } @@ -207,7 +208,7 @@ public void handle(REPSelectionKey<REPCommand> key) throws IOException { REPSocketChannel<REPCommand> channel = key.channel1(); REPCommand command = channel.read(); - SessionManager.logger.writeLog("REPHandlerImpl.handle() read : command = " + command +" from "+channel); + ServerMainLoop.logger.writeLog("REPHandlerImpl.handle() read : command = " + command +" from "+channel); if (manager.sessionManage(this, command)) return; manage(command); }
--- a/rep/handler/FirstConnector.java Mon Nov 10 22:13:40 2008 +0900 +++ b/rep/handler/FirstConnector.java Mon Nov 10 22:16:37 2008 +0900 @@ -3,6 +3,7 @@ import java.io.IOException; import rep.REPCommand; +import rep.ServerMainLoop; import rep.Session; import rep.SessionManager; import rep.channel.REPSelectionKey; @@ -28,7 +29,7 @@ REPNode fw; REPSocketChannel<REPCommand> channel = key.channel1(); REPCommand command = channel.read(); - SessionManager.logger.writeLog("FirstConnector: command = " + command); + ServerMainLoop.logger.writeLog("FirstConnector: command = " + command); switch(command.cmd) { case SMCMD_JOIN: { @@ -37,7 +38,7 @@ //エディタが新しくputする場合は新しくソケットを作る // 1対1でない場合は、multiplexerを挿めば良い REPNode editor = manager.newEditor(channel); - editor.setHost(manager.myHost); + editor.setHost(manager.myHost()); command.eid = editor.eid; command.sid = -1; editor.setSID(-1); @@ -74,9 +75,9 @@ } //myHost を設定。 //立ち上げ時にやるとlocalhostしか取れない - if(manager.myHost == null) manager.setMyHostName(getLocalHostName()); + if(manager.myHost() == null) manager.setMyHostName(getLocalHostName()); fw.setMode(command.cmd); - fw.setHost(manager.myHost); + fw.setHost(manager.myHost()); manager.registerChannel(channel, fw); manager.sessionManage(fw, command);
--- a/rep/handler/NullForwarder.java Mon Nov 10 22:13:40 2008 +0900 +++ b/rep/handler/NullForwarder.java Mon Nov 10 22:16:37 2008 +0900 @@ -7,6 +7,11 @@ import rep.channel.REPSelectionKey; import rep.channel.REPSocketChannel; +/** + * @author kono + * No connection + * if a manager.parent is set to this, the manager is a master. + */ public class NullForwarder extends Forwarder { public NullForwarder(SessionManager manager) {
--- a/rep/handler/REPNode.java Mon Nov 10 22:13:40 2008 +0900 +++ b/rep/handler/REPNode.java Mon Nov 10 22:16:37 2008 +0900 @@ -7,10 +7,21 @@ import rep.channel.REPSelectionKey; import rep.channel.REPSocketChannel; +/** + * @author kono + * Abstract class for all REP node + * Sub classes: + * FirstConnector waiting first connection and determines its type. + * Editor editor direct connect or no connection (master/slave) + * Forwarder send command to the other session manager + * base class for other communication node + * Dispatcher Session Manager entry, dispatch commands to editors. + * NullForwarder REP node with no connection + */ public abstract class REPNode { - public int eid; // globally unique - public int sid=-1; // globally unique + public int eid; // globally unique (contains SessionManagerID) + public int sid=-1; // globally unique public String host; public String file; public REP mode;
--- a/test/sematest/TestSessionManager.java Mon Nov 10 22:13:40 2008 +0900 +++ b/test/sematest/TestSessionManager.java Mon Nov 10 22:16:37 2008 +0900 @@ -1,6 +1,7 @@ package test.sematest; import java.io.IOException; + import rep.SessionManager; import rep.channel.REPLogger; import rep.channel.REPServerSocketChannel;