Mercurial > hg > FederatedLinda
diff src/fdl/IOHandler.java @ 19:0243987383b7
Meta Protocol Engine and sample implementation of event logger.
ComDebug_Client needs fixes.
author | kono |
---|---|
date | Tue, 19 Aug 2008 05:33:32 +0900 |
parents | 609b288f47f9 |
children | 56e015e8f5dc |
line wrap: on
line diff
--- a/src/fdl/IOHandler.java Mon Aug 18 09:36:13 2008 +0900 +++ b/src/fdl/IOHandler.java Tue Aug 19 05:33:32 2008 +0900 @@ -1,33 +1,45 @@ package fdl; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; -import java.util.Hashtable; -import java.util.LinkedList; - -public class IOHandler extends TupleSpace { +public class IOHandler { static final boolean debug = false; - public Tuple[] tuple_space; - public ComDebug com_debug; + public TupleSpace tupleSpace; - public IOHandler(Tuple[] _tuple_space) { - super(_tuple_space); - } + String remoteString; + String localString; + public int cnt = 0; - public void handle(SelectionKey key) - throws ClosedChannelException, IOException { + public IOHandler(TupleSpace tupleSpace,SelectionKey key) { + this.tupleSpace = tupleSpace; + + SocketChannel ch = (SocketChannel) key.channel(); + remoteString = getRemoteHostAndPort(ch); + localString = getLocalHostAndPort(ch); + } + + public void handle(SelectionKey key) { // 書き込み可であれば,読み込みを行う if (key.isReadable()) { - read(key); + try { + read(key); + } catch (ClosedChannelException e) { + tupleSpace.hook.closeHook(key); + } catch (IOException e) { + tupleSpace.hook.closeHook(key); + } } } - private void read(SelectionKey key) + void read(SelectionKey key) throws ClosedChannelException, IOException { SocketChannel channel = (SocketChannel)key.channel(); @@ -45,18 +57,12 @@ System.out.print("reading command..."); } count = channel.read(command); - - if(count < 0) { - LinkedList<SocketChannel> reportCh_list = ComDebug.Report_Channellist; - ClosewishComDebug(key, command, reportCh_list); - readsize = -1; - return; - } + if(count < 0) throw new IOException(); readsize -= count; } command.rewind(); - int len = command.getInt(PSX.LINDA_PACKET_LENGTH_OFFSET); + command.getInt(PSX.LINDA_PACKET_LENGTH_OFFSET); int datalen = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET); ByteBuffer data = ByteBuffer.allocate(datalen); @@ -79,82 +85,81 @@ if (debug) { PSX.printData(command); - } - manager_run(key, command, data, len); - - key.interestOps(key.interestOps() | SelectionKey.OP_READ ); + } + // I believe we don't need this + //key.interestOps(key.interestOps() | SelectionKey.OP_READ ); + assert((key.interestOps()& SelectionKey.OP_READ) !=0); } public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data, int len) throws IOException { command.order(ByteOrder.BIG_ENDIAN); int mode = command.get(PSX.LINDA_MODE_OFFSET); - char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET); - int id = (int)idc; - int seq = command.getInt(PSX.LINDA_SEQ_OFFSET); command.rewind(); - String sendtext = "none"; - - - com_debug = new ComDebug(); - Hashtable<String, Integer> com_Loggingtable = ComDebug.Com_Hashtable; - LinkedList<SocketChannel> reportCh_list = ComDebug.Report_Channellist; - + if (debug) { System.out.println("data from : "+key.channel()); } if((mode == '!') || (len == 0)) { - ClosewishComDebug(key, command, reportCh_list); - } - else if(mode == PSX.PSX_CHECK) { - sendtext = Check(key, command); + tupleSpace.hook.closeHook(key); + } else if(mode == PSX.PSX_CHECK) { + tupleSpace.Check(key, command); } else if(mode == PSX.PSX_IN || mode == PSX.PSX_RD){ - sendtext = In_Rd(key, command, mode); + tupleSpace.In_Rd(key, command, mode); } else if (mode == PSX.PSX_WAIT_RD) { - Wait_Rd(key, command, mode); + tupleSpace.Wait_Rd(key, command, mode); } else if(mode == PSX.PSX_OUT) { - sendtext = Out(command, data); + tupleSpace.Out(key, command, data); } else { - System.out.println("Uncorrect buffer"); + tupleSpace.hook.closeHook(key); + System.out.println("Incorrect tuple operation"); System.exit(1); } - - //COM_DEBUG - if(id > PSX.PRIVILEGED_ID_START && id < PSX.PRIVILEGED_ID_END){ - ComDebug.addChannel(key, reportCh_list); - } - //DEBUG用カウンタ - String debug_rep = ComDebug.Com_inc(key, com_Loggingtable, mode, id, seq, sendtext); - //DEBUG用レポート - ComDebug.Report(reportCh_list, command, debug_rep); - - if (key.interestOps() - != (SelectionKey.OP_READ)) { - // 読み込み操作に対する監視を行う - key.interestOps(SelectionKey.OP_READ ); - } } - private void ClosewishComDebug(SelectionKey key, ByteBuffer command, - LinkedList<SocketChannel> reportCh_list) throws IOException { - String close_Channel = key.channel().toString(); - String[] split = close_Channel.split("/"); - String local[] = split[1].split(" "); - String remote[] = split[2].split("]"); - - String localAddress = local[0]; - String remoteAddress = remote[0]; - - Connection_Close(key); - com_debug.reportCh_remove(key, reportCh_list); - ComDebug.Report(reportCh_list, command, "CloseInfo >"+localAddress+"--"+remoteAddress); - } - - private void Connection_Close(SelectionKey key) throws IOException { + void Connection_Close(SelectionKey key) throws IOException { System.out.println("Connection closed by "+key.channel()); SocketChannel channel = (SocketChannel)key.channel(); key.cancel(); channel.close(); } + + private static String getRemoteHostAndPort(SocketChannel channel) { + String socketString = channel.socket().getRemoteSocketAddress().toString(); + String[] split = socketString.split("/"); + int length = split.length; + String hostAndPort = split[length-1]; + split = hostAndPort.split(":"); + String host = split[0]; + String port = split[1]; + int portnum = Integer.parseInt(port); + try { + InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), portnum); + host = address.getHostName().toString(); + return (host +":"+port); + } + catch( UnknownHostException e ){ + return hostAndPort; + } + } + + private static String getLocalHostAndPort(SocketChannel channel) { + String socketString = channel.socket().getLocalSocketAddress().toString(); + String[] split = socketString.split("/"); + int length = split.length; + String hostAndPort = split[length-1]; + split = hostAndPort.split(":"); + String host = split[0]; + String port = split[1]; + try { + InetAddress localhost = InetAddress.getLocalHost(); + host = localhost.getHostName(); + return (host +":"+port); + } + catch( UnknownHostException e ){ + return (host +":"+port); + } + } + }