Mercurial > hg > RemoteEditor > REPSessionManager
view rep/ServerMainLoop.java @ 450:21cb16b7f3df
block message in Editor.write()
author | one |
---|---|
date | Thu, 23 Sep 2010 18:15:37 +0900 |
parents | 3819dec4641e |
children | c22f6833d736 |
line wrap: on
line source
package rep; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.util.LinkedList; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import rep.channel.REPLogger; import rep.channel.REPSelectionKey; import rep.channel.REPSelector; import rep.channel.REPServerSocketChannel; import rep.channel.REPSocketChannel; import rep.gui.SessionManagerEvent; import rep.gui.SessionManagerGUI; import rep.handler.FirstConnector; import rep.handler.REPNode; /** * @author kono * Single Threaded Server Main Loop * maintain multiple connections * gui interface is provided. * Protocols are handled by our manager. * We believe this is an protocol independent server. */ public class ServerMainLoop { public static REPLogger logger = REPLogger.singleton(); public SessionManager manager; protected SessionManagerGUI gui; protected REPSelector<REPCommand> selector; private BlockingQueue<SessionManagerEvent> waitingEventQueue = new LinkedBlockingQueue<SessionManagerEvent>(); public String myHost; private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); protected int receive_port; protected int parent_port; protected static final int DEFAULT_PORT = 8766; private SessionManagerEvent execAfterConnect = null; private boolean running = true; public void setReceivePort(int port) { receive_port = port; } void mainLoop(SessionManager sessionManager, int port, SessionManagerGUI gui) throws IOException, SocketException, ClosedChannelException { this.gui = gui; manager = sessionManager; receive_port = port; serverInit(); mainLoop(); } public void mainLoop() throws IOException { while(running){ manager.checkWaitingCommandInMerge(); if (checkInputEvent() || checkWaitingWrite()) { continue; // try to do fair execution for waiting task //if(selector.selectNow() > 0) select(); //continue; } // now we can wait for input packet or event selector.select(); select(); } } void serverInit() throws IOException, SocketException, ClosedChannelException { selector = REPSelector.<REPCommand>create(); REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker()); ssc.configureBlocking(false); // Selector requires this ssc.socket().setReuseAddress(true); //reuse address 必須 //getAllByNameで取れた全てのアドレスに対してbindする try { ssc.socket().bind(new InetSocketAddress("::",receive_port)); } catch (SocketException e) { // for some IPv6 implementation ssc.socket().bind(new InetSocketAddress(receive_port)); } ssc.register(selector, SelectionKey.OP_ACCEPT,null); } private boolean checkInputEvent() { SessionManagerEvent e; if((e = waitingEventQueue.poll())!=null){ e.exec(manager); return true; } return false; } public void serverStop() { running = false; selector.wakeup(); } /** * To avoid dead locks, we write a command one at a time * during select(). * @return * @throws IOException */ private boolean checkWaitingWrite() throws IOException { PacketSet p = writeQueue.poll(); if (p!=null) { // sendLog(p); p.channel.write(p.command); return true; } return false; } /** * Debug message * @param p */ @SuppressWarnings("unused") private void sendLog(PacketSet p) { REPNode to; String s; to = manager.editorList.editorByChannel(p.channel.channel); if (to==null) s = p.channel.toString(); else s = to.toString(); logger.writeLog("writing: "+p.command+" to: " + s); } public void close(REPSocketChannel<REPCommand> channel) { REPSelectionKey<REPCommand>key = channel.keyFor1(selector); REPNode handler = (REPNode)key.attachment(); key.cancel(); handler.cancel(channel); // we have to remove session/editor } /** * Main Select routing * check incoming connection request and incoming packet * A request is handled by a handler object which is attached * to the SelectionKey. * @throws IOException */ private void select() throws IOException { Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); for(REPSelectionKey<REPCommand> key : keys){ if(key.isAcceptable()){ /* * Incoming connection. We don't know which, editor or * session manager. Assign FirstConnector to distinguish. */ REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); registerChannel(channel, new FirstConnector(manager,channel)); } else if(key.isReadable()){ /* * Incoming packets are handled by a various forwarder. * A handler throw IOException() in case of a trouble to * close the channel. */ REPNode handler = (REPNode)key.attachment(); try { REPCommand command = key.channel1().read(); handler.handle(command, key); } catch (IOException e) { key.cancel(); handler.cancel(key.channel1()); } } } } public void registerChannel(REPSocketChannel<REPCommand> channel, REPNode handler) throws IOException { if(channel == null) { return; } // handler.setChannel(channel); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ, handler); } public void setMyHostName(String localHostName) { myHost = localHostName + receive_port; setHostToEditor(myHost); } public String myHost() { return myHost; } private void setHostToEditor(String myHost2) { for(REPNode editor : manager.editorList.values()){ if (editor.channel!=null) editor.setHost(myHost2); } } public void buttonPressed(SessionManagerEvent event) { try { waitingEventQueue.put(event); } catch (InterruptedException e) {} selector.wakeup(); } public void syncExec(SessionManagerEvent event) { try { waitingEventQueue.put(event); } catch (InterruptedException e) { } } public void addWriteQueue(PacketSet packetSet) { writeQueue.addLast(packetSet); assert(writeQueue.size()<SessionManager.packetLimit) ; } public void setParentPort(int port) { parent_port = port; } public int getParentPort() { return parent_port; } public int getPort() { return receive_port; } public void execAfterConnect(SessionManagerEvent sessionManagerEvent) { execAfterConnect = sessionManagerEvent; } public void afterConnect() { SessionManagerEvent e = execAfterConnect; execAfterConnect = null; if (e!=null) e.exec(manager); } void removeChannel(SessionManager sessionManager, REPNode channel) { REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector); key.cancel(); try { channel.channel.close1(); } catch (IOException e) { } } public String toString() { return ""+myHost+":"+receive_port; } public void setGUI(SessionManagerGUI gui) { this.gui = gui; } public void setManager(SessionManager sessionManager) { manager = sessionManager; } }