Mercurial > hg > RemoteEditor > REPSessionManager
view rep/ServerMainLoop.java @ 391:3b0a5a55e3ee
24
author | one@firefly.cr.ie.u-ryukyu.ac.jp |
---|---|
date | Mon, 10 Nov 2008 22:25:14 +0900 |
parents | aa07134fea32 |
children | 19705f4b8015 |
line wrap: on
line source
package rep; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import rep.channel.REPLogger; import rep.channel.REPSelectionKey; import rep.channel.REPSelector; import rep.channel.REPServerSocketChannel; import rep.channel.REPSocketChannel; import rep.gui.DoGUIUpdate; import rep.gui.SessionManagerEvent; import rep.gui.SessionManagerGUI; import rep.handler.FirstConnector; import rep.handler.REPNode; public class ServerMainLoop { public static REPLogger logger = REPLogger.singleton(); public SessionManager manager; protected SessionManagerGUI gui; protected REPSelector<REPCommand> selector; protected List<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>(); private BlockingQueue<SessionManagerEvent> waitingEventQueue = new LinkedBlockingQueue<SessionManagerEvent>(); public String myHost; private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); protected int receive_port; protected int parent_port; protected static final int DEFAULT_PORT = 8766; private SessionManagerEvent execAfterConnect = null; public void setReceivePort(int port) { receive_port = port; } void mainLoop(SessionManager sessionManager, int port, SessionManagerGUI gui) throws IOException, SocketException, ClosedChannelException { this.gui = gui; manager = sessionManager; receive_port = port; serverInit(); mainLoop(); } public void mainLoop() throws IOException { while(true){ checkWaitingCommandInMerge(); if (checkInputEvent() || checkWaitingWrite()) { // try to do fair execution for waiting task if(selector.selectNow() > 0) select(); continue; } // now we can wait for input packet or event selector.select(); select(); } } void serverInit() throws IOException, SocketException, ClosedChannelException { selector = REPSelector.<REPCommand>create(); REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker()); ssc.configureBlocking(false); // Selector requires this ssc.socket().setReuseAddress(true); //reuse address 必須 //getAllByNameで取れた全てのアドレスに対してbindする try { ssc.socket().bind(new InetSocketAddress("::",receive_port)); } catch (SocketException e) { // for some IPv6 implementation ssc.socket().bind(new InetSocketAddress(receive_port)); } ssc.register(selector, SelectionKey.OP_ACCEPT,null); } private boolean checkInputEvent() { SessionManagerEvent e; if((e = waitingEventQueue.poll())!=null){ e.exec(manager); return true; } return false; } private boolean checkWaitingWrite() throws IOException { PacketSet p = writeQueue.poll(); if (p!=null) { logger.writeLog("writing: "+p.command+" to: " +manager.editorList.editorByChannel(p.channel)); p.channel.write(p.command); return true; } return false; } /** * Check waiting command in merge * @return true if there is a processed waiting command * @throws IOException */ public void checkWaitingCommandInMerge() { List<PacketSet> w = waitingCommandInMerge; waitingCommandInMerge = new LinkedList<PacketSet>(); for(PacketSet p: w) { REPNode e = p.getEditor(); if(e.isMerging()) { // still merging do nothing waitingCommandInMerge.add(p); } else { try { // if (manager.sessionManage(e, p.command)) { // we don't need this // assert false; // return; // } e.manage(p.command); } catch (Exception e1) { // should be e.close()? close(p.channel); } } } } public boolean hasWaitingCommand(REPSocketChannel<REPCommand>c) { for(PacketSet p:waitingCommandInMerge) { if (p.channel==c) { return true; } } return false; } private void close(REPSocketChannel<REPCommand> channel) { REPSelectionKey<REPCommand>key = channel.keyFor1(selector); REPNode handler = (REPNode)key.attachment(); key.cancel(); handler.cancel(channel); // we have to remove session/editor } private void select() throws IOException { Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); for(REPSelectionKey<REPCommand> key : keys){ if(key.isAcceptable()){ /* * Incoming connection. We don't know which, editor or * session manager. Assign FirstConnector to distinguish. */ REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); registerChannel(channel, new FirstConnector(manager,channel)); } else if(key.isReadable()){ /* * Incoming packets are handled by a various forwarder. * A handler throw IOException() in case of a trouble to * close the channel. */ REPNode handler = (REPNode)key.attachment(); try { REPCommand command = key.channel1().read(); handler.handle(command, key); } catch (IOException e) { key.cancel(); handler.cancel(key.channel1()); } } } } public void registerChannel(REPSocketChannel<REPCommand> channel, REPNode handler) throws IOException { if(channel == null) { return; } // handler.setChannel(channel); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ, handler); } protected void updateGUI() { //リストのコピーをGUIに渡す LinkedList<Session> sList = new LinkedList<Session>(manager.sessionList.values()); LinkedList<REPNode> eList; if (false) { // local editor only eList = new LinkedList<REPNode>(); for(REPNode e:manager.editorList.values()) { if (manager.getSMID(e.eid)==manager.smList.sessionManagerID()) { eList.add(e); } } } else { eList = new LinkedList<REPNode>(manager.editorList.values()); } //GUIに反映 Runnable doRun = new DoGUIUpdate(sList, eList, gui); gui.invokeLater(doRun); } public void setMyHostName(String localHostName) { myHost = localHostName + receive_port; setHostToEditor(myHost); } public String myHost() { return myHost; } private void setHostToEditor(String myHost2) { for(REPNode editor : manager.editorList.values()){ if (editor.channel!=null) editor.setHost(myHost2); } } public void addWaitingCommand(PacketSet set) { waitingCommandInMerge.add(set); } public void buttonPressed(SessionManagerEvent event) { try { waitingEventQueue.put(event); } catch (InterruptedException e) {} selector.wakeup(); } public void syncExec(SessionManagerEvent event) { try { waitingEventQueue.put(event); } catch (InterruptedException e) { } } public void addWriteQueue(PacketSet packetSet) { writeQueue.addLast(packetSet); assert(writeQueue.size()<SessionManager.packetLimit) ; } public void setParentPort(int port) { parent_port = port; } public int getParentPort() { return parent_port; } public int getPort() { return receive_port; } public void execAfterConnect(SessionManagerEvent sessionManagerEvent) { execAfterConnect = sessionManagerEvent; } public void afterConnect() { SessionManagerEvent e = execAfterConnect; execAfterConnect = null; if (e!=null) e.exec(manager); } void removeChannel(SessionManager sessionManager, REPNode channel) { REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector); key.cancel(); try { channel.channel.close1(); } catch (IOException e) { } } public String toString() { return ""+myHost+":"+receive_port; } public void setGUI(SessionManagerGUI gui) { this.gui = gui; } public void setManager(SessionManager sessionManager) { manager = sessionManager; } }