diff src/fdl/IOHandler.java @ 19:0243987383b7

Meta Protocol Engine and sample implementation of event logger. ComDebug_Client needs fixes.
author kono
date Tue, 19 Aug 2008 05:33:32 +0900
parents 609b288f47f9
children 56e015e8f5dc
line wrap: on
line diff
--- a/src/fdl/IOHandler.java	Mon Aug 18 09:36:13 2008 +0900
+++ b/src/fdl/IOHandler.java	Tue Aug 19 05:33:32 2008 +0900
@@ -1,33 +1,45 @@
 
 package fdl;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
-import java.util.Hashtable;
-import java.util.LinkedList;
 
-
-public class IOHandler extends TupleSpace {
+public class IOHandler {
     static final boolean debug = false;
-	public Tuple[] tuple_space;     
-	public ComDebug com_debug;
+    public TupleSpace tupleSpace;
 
-    public IOHandler(Tuple[] _tuple_space) {
-		super(_tuple_space);
-	}
+	String remoteString;
+	String localString;
+	public int cnt = 0;
     
-    public void handle(SelectionKey key)
-                    throws ClosedChannelException, IOException {        
+    public IOHandler(TupleSpace tupleSpace,SelectionKey key) {
+    	this.tupleSpace = tupleSpace;
+
+		SocketChannel ch = (SocketChannel) key.channel();
+		remoteString = getRemoteHostAndPort(ch);
+		localString =  getLocalHostAndPort(ch);
+    }
+
+    public void handle(SelectionKey key) {
         // 書き込み可であれば,読み込みを行う
         if (key.isReadable()) {
-            read(key);
+            try {
+				read(key);
+			} catch (ClosedChannelException e) {
+				tupleSpace.hook.closeHook(key);
+			} catch (IOException e) {
+				tupleSpace.hook.closeHook(key);
+			}
         }
     }
 
-    private void read(SelectionKey key)
+     void read(SelectionKey key)
                     throws ClosedChannelException, IOException {
         SocketChannel channel = (SocketChannel)key.channel();
 
@@ -45,18 +57,12 @@
         		System.out.print("reading command...");
         	}
         	count = channel.read(command);
-
-        	if(count < 0) {        		
-        		LinkedList<SocketChannel> reportCh_list = ComDebug.Report_Channellist;
-        		ClosewishComDebug(key, command, reportCh_list);
-            	readsize = -1;
-            	return;
-            }        	
+        	if(count < 0) throw new IOException();        		
         	readsize -= count;
     	}
         command.rewind();
                 
-        int len = command.getInt(PSX.LINDA_PACKET_LENGTH_OFFSET);
+        command.getInt(PSX.LINDA_PACKET_LENGTH_OFFSET);
         int datalen = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
         
         ByteBuffer data = ByteBuffer.allocate(datalen);
@@ -79,82 +85,81 @@
 
         if (debug) {
         	PSX.printData(command);
-        }
-    	manager_run(key, command, data, len);    	    	
-        
-        key.interestOps(key.interestOps() | SelectionKey.OP_READ );
+        }	
+        // I believe we don't need this
+        //key.interestOps(key.interestOps() | SelectionKey.OP_READ );
+        assert((key.interestOps()& SelectionKey.OP_READ) !=0);
     }
 
 	public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data, int len) throws IOException {
     	command.order(ByteOrder.BIG_ENDIAN);
 	    int mode = command.get(PSX.LINDA_MODE_OFFSET);
-	    char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
-	    int id = (int)idc;
-		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
 	    command.rewind();
-	    String sendtext = "none";
-	    
-	    
-	    com_debug = new ComDebug();
-	    Hashtable<String, Integer> com_Loggingtable = ComDebug.Com_Hashtable;
-	    LinkedList<SocketChannel> reportCh_list = ComDebug.Report_Channellist;
-		
+
     	if (debug) {
         	System.out.println("data from : "+key.channel());
     	}
 		if((mode == '!') || (len == 0)) {
-    		ClosewishComDebug(key, command, reportCh_list);
-		}
-		else if(mode == PSX.PSX_CHECK) {
-            sendtext = Check(key, command);
+			tupleSpace.hook.closeHook(key); 
+		} else if(mode == PSX.PSX_CHECK) {
+            tupleSpace.Check(key, command);
 	    }
     	else if(mode == PSX.PSX_IN || mode == PSX.PSX_RD){
-    		sendtext = In_Rd(key, command, mode);
+    		tupleSpace.In_Rd(key, command, mode); 
     	} else if (mode == PSX.PSX_WAIT_RD) {	
-			Wait_Rd(key, command, mode);						
+			tupleSpace.Wait_Rd(key, command, mode); 			
     	} else if(mode == PSX.PSX_OUT) {
-	    	sendtext = Out(command, data);	    	
+	    	tupleSpace.Out(key, command, data);		
 	    } else {
-    		System.out.println("Uncorrect buffer");
+	    	tupleSpace.hook.closeHook(key);
+    		System.out.println("Incorrect tuple operation");
     		System.exit(1);
 	    }
-		
-		//COM_DEBUG
-	   	if(id > PSX.PRIVILEGED_ID_START && id < PSX.PRIVILEGED_ID_END){
-			ComDebug.addChannel(key, reportCh_list);
-		}
-		//DEBUG用カウンタ
-        String debug_rep = ComDebug.Com_inc(key, com_Loggingtable, mode, id, seq, sendtext);
- 		//DEBUG用レポート
-		ComDebug.Report(reportCh_list, command, debug_rep);
-        
-		if (key.interestOps()
-	            != (SelectionKey.OP_READ)) {
-	            // 読み込み操作に対する監視を行う
-	            key.interestOps(SelectionKey.OP_READ );
-		}
     	
     }
 
-	private void ClosewishComDebug(SelectionKey key, ByteBuffer command,
-			LinkedList<SocketChannel> reportCh_list) throws IOException {
-		String close_Channel = key.channel().toString();
-		String[] split = close_Channel.split("/");
-		String local[] = split[1].split(" ");
-		String remote[] = split[2].split("]");
-		
-		String localAddress = local[0];
-		String remoteAddress = remote[0];
-		
-		Connection_Close(key);
-		com_debug.reportCh_remove(key, reportCh_list);
-		ComDebug.Report(reportCh_list, command, "CloseInfo >"+localAddress+"--"+remoteAddress);
-	}
-
-	private void Connection_Close(SelectionKey key) throws IOException {
+	void Connection_Close(SelectionKey key) throws IOException {
 		System.out.println("Connection closed by "+key.channel());
 		SocketChannel channel = (SocketChannel)key.channel();
 		key.cancel();
 		channel.close();
 	}
+
+	private static String getRemoteHostAndPort(SocketChannel channel) {
+		String socketString = channel.socket().getRemoteSocketAddress().toString();
+		String[] split = socketString.split("/");
+		int length = split.length;
+		String hostAndPort = split[length-1];
+		split = hostAndPort.split(":");
+		String host = split[0];
+		String port = split[1];
+		int portnum = Integer.parseInt(port);
+		try {
+			InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), portnum);
+			host = address.getHostName().toString();
+            return (host +":"+port);
+        }
+        catch( UnknownHostException e ){
+        	return hostAndPort;
+        }		
+	}
+	
+	private static String getLocalHostAndPort(SocketChannel channel) {
+		String socketString = channel.socket().getLocalSocketAddress().toString();
+		String[] split = socketString.split("/");
+		int length = split.length;
+		String hostAndPort = split[length-1];
+		split = hostAndPort.split(":");
+		String host = split[0];
+		String port = split[1];
+		try {
+            InetAddress localhost = InetAddress.getLocalHost();
+            host = localhost.getHostName();            
+            return (host +":"+port);
+        }
+        catch( UnknownHostException e ){
+        	return (host +":"+port);
+        }
+	}
+	
 }