view rep/ServerMainLoop.java @ 450:21cb16b7f3df

block message in Editor.write()
author one
date Thu, 23 Sep 2010 18:15:37 +0900
parents 3819dec4641e
children c22f6833d736
line wrap: on
line source

package rep;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import rep.channel.REPLogger;
import rep.channel.REPSelectionKey;
import rep.channel.REPSelector;
import rep.channel.REPServerSocketChannel;
import rep.channel.REPSocketChannel;
import rep.gui.SessionManagerEvent;
import rep.gui.SessionManagerGUI;
import rep.handler.FirstConnector;
import rep.handler.REPNode;

/**
 * @author kono
 *  Single Threaded Server Main Loop
 *    maintain multiple connections
 *    gui interface is provided.
 *  Protocols are handled by our manager.
 *  We believe this is an protocol independent server.
 */
public class ServerMainLoop  {

	public static REPLogger logger = REPLogger.singleton();
	public SessionManager manager;
	protected SessionManagerGUI gui;
	protected REPSelector<REPCommand> selector;
	private BlockingQueue<SessionManagerEvent> waitingEventQueue 
								= new LinkedBlockingQueue<SessionManagerEvent>();
	public String myHost;
	private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>();
	protected int receive_port;
	protected int parent_port;
	protected static final int DEFAULT_PORT = 8766;
	private SessionManagerEvent execAfterConnect = null;
	private boolean running = true;


	public void setReceivePort(int port) {
		receive_port = port;
	}

	void mainLoop(SessionManager sessionManager, int port, SessionManagerGUI gui) throws IOException,
			SocketException, ClosedChannelException {
		this.gui = gui;
		manager = sessionManager;
		receive_port = port;
		serverInit();
		mainLoop();
	}
	
	public void mainLoop() throws IOException {
		while(running){
			manager.checkWaitingCommandInMerge();
			if (checkInputEvent() ||
			    checkWaitingWrite()) { 
				continue;
				   // try to do fair execution for waiting task
				   //if(selector.selectNow() > 0) select();
				   //continue;
			}
			// now we can wait for input packet or event
			selector.select();
			select();
		}
	}

	void serverInit() throws IOException, SocketException,
			ClosedChannelException {
		selector = REPSelector.<REPCommand>create();	
		REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker());
		ssc.configureBlocking(false);       // Selector requires this
		ssc.socket().setReuseAddress(true);	//reuse address 必須
		//getAllByNameで取れた全てのアドレスに対してbindする
		try {
			ssc.socket().bind(new InetSocketAddress("::",receive_port));
		} catch (SocketException e) {
			// for some IPv6 implementation
			ssc.socket().bind(new InetSocketAddress(receive_port));
		}
		ssc.register(selector, SelectionKey.OP_ACCEPT,null); 
	}

	private boolean checkInputEvent() {
		SessionManagerEvent e;
		if((e = waitingEventQueue.poll())!=null){
			e.exec(manager);
			return true;
		}
		return false;
	}
	
	public void serverStop() {
		running = false;
		selector.wakeup();
	}

	/**
	 * To avoid dead locks, we write a command one at a time
	 * during select().
	 * @return
	 * @throws IOException
	 */
	private boolean checkWaitingWrite() throws IOException {
		PacketSet p = writeQueue.poll();
		if (p!=null) {
			// sendLog(p);
			p.channel.write(p.command);
			return true;
		}
		return false;
	}

	/**
	 * Debug message
	 * @param p
	 */
	@SuppressWarnings("unused")
	private void sendLog(PacketSet p) {
		REPNode to;
		String s;
		to = manager.editorList.editorByChannel(p.channel.channel);
		if (to==null)
			s = p.channel.toString();
		else
			s = to.toString();
		logger.writeLog("writing: "+p.command+" to: " + s);
	}


	public void close(REPSocketChannel<REPCommand> channel) {
		REPSelectionKey<REPCommand>key = channel.keyFor1(selector);
		REPNode handler = (REPNode)key.attachment();
		key.cancel();
		handler.cancel(channel);
		// we have to remove session/editor
	}

	/**
	 * Main Select routing
	 *     check incoming connection request and incoming packet
	 *     A request is handled by a handler object which is attached
	 *     to the SelectionKey.  
	 * @throws IOException
	 */
	private void select() throws IOException {
		
		Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1();
		for(REPSelectionKey<REPCommand> key : keys){
			if(key.isAcceptable()){
				/*
				 * Incoming connection. We don't know which, editor or
				 * session manager. Assign FirstConnector to distinguish.
				 */
				REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker());
				logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel);
				registerChannel(channel, new FirstConnector(manager,channel));
			} else if(key.isReadable()){
				/*
				 * Incoming packets are handled by a various forwarder.
				 * A handler throw IOException() in case of a trouble to
				 * close the channel.
				 */
				REPNode handler = (REPNode)key.attachment();
				try {
					REPCommand command = key.channel1().read();
					handler.handle(command, key);
				} catch (IOException e) {
					key.cancel();
					handler.cancel(key.channel1());
				}
			}
		}
	}

	public void registerChannel(REPSocketChannel<REPCommand> channel, REPNode handler) throws IOException {
		if(channel == null) {
			return;
		}
		// handler.setChannel(channel);
		channel.configureBlocking(false);
		channel.register(selector, SelectionKey.OP_READ, handler);
	}

	public void setMyHostName(String localHostName) {
		myHost = localHostName + receive_port;
		setHostToEditor(myHost);
	}

	public String myHost() {
		return myHost;
	}
	
	private void setHostToEditor(String myHost2) {
		for(REPNode editor : manager.editorList.values()){
			if (editor.channel!=null)
				editor.setHost(myHost2);
		}
	}


	public void buttonPressed(SessionManagerEvent event) {
		try {
			waitingEventQueue.put(event);
		} catch (InterruptedException e) {}
		selector.wakeup();
	}

	public void syncExec(SessionManagerEvent event) {
		try {
			waitingEventQueue.put(event);
		} catch (InterruptedException e) {
		}
	}

	public void addWriteQueue(PacketSet packetSet) {
		writeQueue.addLast(packetSet);
		assert(writeQueue.size()<SessionManager.packetLimit) ;
	}

	public void setParentPort(int port) {
		parent_port = port;
	}

	public int getParentPort() {
		return parent_port;
	}

	public int getPort() {
		return receive_port;
	}

	public void execAfterConnect(SessionManagerEvent sessionManagerEvent) {
		execAfterConnect  = sessionManagerEvent;
	}

	public void afterConnect() {
		SessionManagerEvent e = execAfterConnect;
		execAfterConnect = null;
		if (e!=null) e.exec(manager);
	}

	void removeChannel(SessionManager sessionManager, REPNode channel) {
		REPSelectionKey<REPCommand> key = channel.channel.keyFor1(selector);
		key.cancel();
		try {
			channel.channel.close1();
		} catch (IOException e) {
		}
	}

	public String toString() {
		return ""+myHost+":"+receive_port;
	}


	public void setGUI(SessionManagerGUI gui) {
		this.gui = gui;
	}

	public void setManager(SessionManager sessionManager) {
		manager = sessionManager;
	}

}