view src/fdl/IOHandler.java @ 20:a0fd653d1121

Debug Client and Meta Engine for logging.
author kono
date Tue, 19 Aug 2008 06:26:20 +0900
parents 0243987383b7
children 56e015e8f5dc
line wrap: on
line source


package fdl;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

public class IOHandler {
    static final boolean debug = false;
    public TupleSpace tupleSpace;

	String remoteString;
	String localString;
	public int cnt = 0;
    
    public IOHandler(TupleSpace tupleSpace,SelectionKey key) {
    	this.tupleSpace = tupleSpace;

		SocketChannel ch = (SocketChannel) key.channel();
		remoteString = getRemoteHostAndPort(ch);
		localString =  getLocalHostAndPort(ch);
    }

    public void handle(SelectionKey key) {
        // 書き込み可であれば,読み込みを行う
        if (key.isReadable()) {
            try {
				read(key);
			} catch (ClosedChannelException e) {
				tupleSpace.hook.closeHook(key);
			} catch (IOException e) {
				tupleSpace.hook.closeHook(key);
			}
        }
    }

     void read(SelectionKey key)
                    throws ClosedChannelException, IOException {
        SocketChannel channel = (SocketChannel)key.channel();

        // 読み込み用のバッファの生成
        ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE);
        command.order(ByteOrder.BIG_ENDIAN);
        command.clear();
        
        int readsize = PSX.LINDA_HEADER_SIZE;
        int count = 0;
        
        // 読み込み
        while(readsize>0) {
        	if(debug){
        		System.out.print("reading command...");
        	}
        	count = channel.read(command);
        	if(count < 0) throw new IOException();        		
        	readsize -= count;
    	}
        command.rewind();
                
        command.getInt(PSX.LINDA_PACKET_LENGTH_OFFSET);
        int datalen = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
        
        ByteBuffer data = ByteBuffer.allocate(datalen);
        int read = datalen;
        
        if (debug) {
        	System.out.println("reading: " +datalen);
    	}

        data.order(ByteOrder.BIG_ENDIAN);
        data.clear();
        while(read>0) {
        	//System.out.print("reading2...");
        	read -= channel.read(data);
    	}
        data.rewind();    	    
        
        command.order(ByteOrder.BIG_ENDIAN);
        command.rewind();

        if (debug) {
        	PSX.printData(command);
        }	
        // I believe we don't need this
        //key.interestOps(key.interestOps() | SelectionKey.OP_READ );
        assert((key.interestOps()& SelectionKey.OP_READ) !=0);
    }

	public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data, int len) throws IOException {
    	command.order(ByteOrder.BIG_ENDIAN);
	    int mode = command.get(PSX.LINDA_MODE_OFFSET);
	    command.rewind();

    	if (debug) {
        	System.out.println("data from : "+key.channel());
    	}
		if((mode == '!') || (len == 0)) {
			tupleSpace.hook.closeHook(key); 
		} else if(mode == PSX.PSX_CHECK) {
            tupleSpace.Check(key, command);
	    }
    	else if(mode == PSX.PSX_IN || mode == PSX.PSX_RD){
    		tupleSpace.In_Rd(key, command, mode); 
    	} else if (mode == PSX.PSX_WAIT_RD) {	
			tupleSpace.Wait_Rd(key, command, mode); 			
    	} else if(mode == PSX.PSX_OUT) {
	    	tupleSpace.Out(key, command, data);		
	    } else {
	    	tupleSpace.hook.closeHook(key);
    		System.out.println("Incorrect tuple operation");
    		System.exit(1);
	    }
    	
    }

	void Connection_Close(SelectionKey key) throws IOException {
		System.out.println("Connection closed by "+key.channel());
		SocketChannel channel = (SocketChannel)key.channel();
		key.cancel();
		channel.close();
	}

	private static String getRemoteHostAndPort(SocketChannel channel) {
		String socketString = channel.socket().getRemoteSocketAddress().toString();
		String[] split = socketString.split("/");
		int length = split.length;
		String hostAndPort = split[length-1];
		split = hostAndPort.split(":");
		String host = split[0];
		String port = split[1];
		int portnum = Integer.parseInt(port);
		try {
			InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), portnum);
			host = address.getHostName().toString();
            return (host +":"+port);
        }
        catch( UnknownHostException e ){
        	return hostAndPort;
        }		
	}
	
	private static String getLocalHostAndPort(SocketChannel channel) {
		String socketString = channel.socket().getLocalSocketAddress().toString();
		String[] split = socketString.split("/");
		int length = split.length;
		String hostAndPort = split[length-1];
		split = hostAndPort.split(":");
		String host = split[0];
		String port = split[1];
		try {
            InetAddress localhost = InetAddress.getLocalHost();
            host = localhost.getHostName();            
            return (host +":"+port);
        }
        catch( UnknownHostException e ){
        	return (host +":"+port);
        }
	}
	
}