Mercurial > hg > FederatedLinda
view src/fdl/IOHandler.java @ 14:006015077e99
add 3 obj
author | axmo |
---|---|
date | Mon, 11 Aug 2008 22:24:57 +0900 |
parents | 4391c9fac885 |
children | aced4bfc15af |
line wrap: on
line source
package fdl; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Hashtable; import java.util.LinkedList; public class IOHandler extends IOParam { static final boolean debug = false; public Tuple[] tuple_space; public ComDebug com_debug; public IOHandler(Tuple[] _tuple_space) { super(_tuple_space); } public void handle(SelectionKey key) throws ClosedChannelException, IOException { // 書き込み可であれば,読み込みを行う if (key.isReadable()) { read(key); } // 書き込み可であれば,書き込みを行う /*if (key.isWritable() && key.isValid()) { write(key); }*/ } private void read(SelectionKey key) throws ClosedChannelException, IOException { SocketChannel channel = (SocketChannel)key.channel(); // 読み込み用のバッファの生成 ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE); command.order(ByteOrder.BIG_ENDIAN); command.clear(); int readsize = LINDA_HEADER_SIZE; int count = 0; // 読み込み while(readsize>0) { if(debug){ System.out.print("reading command..."); } count = channel.read(command); if(count < 0) { LinkedList<SocketChannel> reportCh_list = ComDebug.Report_Channellist; ClosewishComDebug(key, command, reportCh_list); readsize = -1; return; } readsize -= count; } command.rewind(); int len = command.getInt(LINDA_PACKET_LENGTH_OFFSET); int datalen = command.getInt(LINDA_DATA_LENGTH_OFFSET); ByteBuffer data = ByteBuffer.allocate(datalen); int read = datalen; if (debug) { System.out.println("reading: " +datalen); } data.order(ByteOrder.BIG_ENDIAN); data.clear(); while(read>0) { //System.out.print("reading2..."); read -= channel.read(data); } data.rewind(); /* static final int LINDA_PACKET_LENGTH_OFFSET =0; static final int LINDA_MODE_OFFSET =0+4; static final int LINDA_ID_OFFSET =1+4; static final int LINDA_SEQ_OFFSET =3+4; static final int LINDA_DATA_LENGTH_OFFSET =7+4; static final int LINDA_HEADER_SIZE =12+4; */ command.order(ByteOrder.BIG_ENDIAN); command.rewind(); /*** print data ***/ char id = (char)command.getShort(LINDA_ID_OFFSET); System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+ "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+ "ID:"+(int)id+" "+ "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "); //"SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+ //"DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" "); //System.out.println("DATA:"+data); //データ処理 command.rewind(); manager_run(key, command, data, len); key.interestOps(key.interestOps() | SelectionKey.OP_READ ); } public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data, int len) throws IOException { command.order(ByteOrder.BIG_ENDIAN); int mode = command.get(LINDA_MODE_OFFSET); char idc = (char)command.getShort(LINDA_ID_OFFSET); int id = (int)idc; int seq = command.getInt(LINDA_SEQ_OFFSET); command.rewind(); String sendtext = "none"; com_debug = new ComDebug(); Hashtable<String, Integer> com_Loggingtable = ComDebug.Com_Hashtable; LinkedList<SocketChannel> reportCh_list = ComDebug.Report_Channellist; if (debug) { System.out.println("data from : "+key.channel()); } if((mode == '!') || (len == 0)) { ClosewishComDebug(key, command, reportCh_list); } else if(mode == PSX_CHECK) { sendtext = Check(key, command); } else if(mode == PSX_IN || mode == PSX_RD){ sendtext = In_Rd(key, command, mode); } else if (mode == PSX_WAIT_RD) { Wait_Rd(key, command, mode); } else if(mode == PSX_OUT) { sendtext = Out(command, data); } else { System.out.println("Uncorrect buffer"); System.exit(1); } //COM_DEBUG if(id > PRIVILEGED_ID_START && id < PRIVILEGED_ID_END){ ComDebug.addChannel(key, reportCh_list); } //DEBUG用カウンタ String debug_rep = ComDebug.Com_inc(key, com_Loggingtable, mode, id, seq, sendtext); //DEBUG用レポート ComDebug.Report(reportCh_list, command, debug_rep); if (key.interestOps() != (SelectionKey.OP_READ)) { // 読み込み操作に対する監視を行う key.interestOps(SelectionKey.OP_READ ); } } private void ClosewishComDebug(SelectionKey key, ByteBuffer command, LinkedList<SocketChannel> reportCh_list) throws IOException { String close_Channel = key.channel().toString(); String[] split = close_Channel.split("/"); String local[] = split[1].split(" "); String remote[] = split[2].split("]"); String localAddress = local[0]; String remoteAddress = remote[0]; Connection_Close(key); com_debug.reportCh_remove(key, reportCh_list); ComDebug.Report(reportCh_list, command, "CloseInfo >"+localAddress+"--"+remoteAddress); } private void Connection_Close(SelectionKey key) throws IOException { System.out.println("Connection closed by "+key.channel()); SocketChannel channel = (SocketChannel)key.channel(); key.cancel(); channel.close(); } }