changeset 2:b49e593b2502

Add ComDebug_Client
author fuchita
date Thu, 07 Feb 2008 19:06:01 +0900
parents cdc08d4722ec
children ae7e0e92c651
files src/fdl/ComDebug.java src/fdl/ComDebug_Client.java src/fdl/FederatedLinda.java src/fdl/IOHandler.java src/fdl/PSXQueueInterface.java
diffstat 5 files changed, 246 insertions(+), 24 deletions(-) [+]
line wrap: on
line diff
--- a/src/fdl/ComDebug.java	Thu Feb 07 15:51:20 2008 +0900
+++ b/src/fdl/ComDebug.java	Thu Feb 07 19:06:01 2008 +0900
@@ -24,18 +24,21 @@
 		if(reportCh_list.isEmpty()) {
 			return;
 		}
-		//(ByteBuffer)dataに(String)report_txtを入れる
-		byte[] txt = report_txt.getBytes();		
-		ByteBuffer data = ByteBuffer.wrap(txt);
-		data.rewind();
-
+		//dataをセット
+		ByteBuffer data = ByteBuffer.allocateDirect(24+(report_txt).length()*2);
+		data.clear();  // position = 0 
+		for(int i=0;i<report_txt.length();i++) {
+			data.putChar(report_txt.charAt(i));
+		}
+    	data.flip();    // limit = current position, position = 0
+    	
 		//commandをセット
     	command.order(ByteOrder.BIG_ENDIAN);
-    	command.putInt(LINDA_PACKET_LENGTH_OFFSET,txt.length+LINDA_HEADER_SIZE-INT_SIZE);
+    	command.putInt(LINDA_PACKET_LENGTH_OFFSET,(report_txt).length()*2+LINDA_HEADER_SIZE-INT_SIZE);
     	command.put(LINDA_MODE_OFFSET,(byte)'D');
     	command.putShort(LINDA_ID_OFFSET,(short) 0);
     	command.putInt(LINDA_SEQ_OFFSET,0);
-    	command.putInt(LINDA_DATA_LENGTH_OFFSET,txt.length);
+    	command.putInt(LINDA_DATA_LENGTH_OFFSET,(report_txt).length()*2);
     	command.rewind();
 
 		//送信
@@ -67,4 +70,20 @@
 		reportCh_list.add(repch);
 	}
 	
+	public static void delChannel(SelectionKey key, LinkedList<SocketChannel> reportCh_list) {
+		SocketChannel repch = (SocketChannel) key.channel();
+		reportCh_list.remove(repch);
+	}
+
+	
+	public void reportCh_remove(SelectionKey key, LinkedList<SocketChannel> reportCh_list) throws IOException {
+		//レポートするチャンネルが0ならreturn
+		if(reportCh_list.isEmpty()) {
+			return;
+		}else {
+			System.out.println("ComDebug Report Channel remove :"+key.channel());
+			delChannel(key,reportCh_list);
+		}
+	}
+	
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/fdl/ComDebug_Client.java	Thu Feb 07 19:06:01 2008 +0900
@@ -0,0 +1,72 @@
+package fdl;
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+
+
+public class ComDebug_Client implements PSXQueueInterface{
+
+	static int id;
+
+	public static void main(String[] args) {
+		FederatedLinda fdl;
+		PSXLinda psx;
+		String host  = "localhost";
+		int port = 10000;
+		int connect_num = 1;
+		
+		final String usages = "usage: ComDebug_Client [-h host -p port]";
+
+		//引数判定
+		try {
+			for (int i=0; i<args.length; ++i) {
+				if("-h".equals(args[i])) {
+					host = (String)(args[++i]);					
+				} else {
+					System.err.println(usages);
+				}
+				if("-p".equals(args[i])) {
+					port = Integer.parseInt(args[++i]);					
+				} else {
+					System.err.println(usages);
+				}
+			}
+		} catch (NumberFormatException e) {
+			e.printStackTrace();
+		}
+		
+		
+		try {
+			PSXReply r;
+		    fdl = FederatedLinda.init();
+		    psx = fdl.open(host,port);
+		    r = psx.in(65535);
+			fdl.sync(1);
+
+		    System.out.println("COM_DEBUG Connected.["+host+":"+port+"]");
+		    psx.in(PRIVILEGED_ID_START+connect_num);
+		    connect_num++;
+		    while(true) {
+		    	fdl.sync_com(1000);
+		    }
+		} catch (IOException nfex) {
+			nfex.printStackTrace();
+		    System.out.println("Faild.");
+		    return;
+		}
+	}
+
+	/***private static void print_comdata(PSXReply r) {
+		String comdata ="";
+		ByteBuffer data = r.getData();
+		CharBuffer chardata = data.asCharBuffer();
+		comdata = chardata.toString();
+
+		System.out.println("Com_data =>");
+		System.out.println(data);
+		//System.out.println(comdata);
+
+	}***/
+}
--- a/src/fdl/FederatedLinda.java	Thu Feb 07 15:51:20 2008 +0900
+++ b/src/fdl/FederatedLinda.java	Thu Feb 07 19:06:01 2008 +0900
@@ -15,6 +15,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.CharBuffer;
 import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
@@ -40,7 +41,7 @@
 
     static FederatedLinda fdl;
     static int MAX_SEQUENCE = 2048;
-    static final boolean debug = true;
+    static boolean debug = true;
 
     public int tid;
     public int seq;
@@ -171,10 +172,126 @@
 
         return key_num;
     }
+    
+    public int sync_com(long mtimeout) 
+    throws IOException {
+    	int key_num = 0;
+    	Set<SelectionKey> keys;
+
+    	while (q_top != null){
+    		PSXQueue c = q_top;
+    		c.Send();
+    		q_top = c.next;
+    		// psx_free(c);
+    		// q_top = c = t;
+    		qsize--;
+    	}
+
+    	try {
+    		key_num = selector.select(mtimeout);
+    		keys = selector.selectedKeys();
+    		for (SelectionKey key : keys) {
+    			SocketChannel sock = (SocketChannel)key.channel();
+    			chkCom(sock);
+    		}
+    	} catch (IOException e) {
+    		e.printStackTrace();
+    	} catch (ClosedSelectorException e) {
+    		e.printStackTrace();
+    	}
+
+    	return key_num;
+    }
 
 // should be in PSXLinda, but sock->linda is unknown here
 
-    private void chkServe(SocketChannel sock) 
+    private void chkCom(SocketChannel sock)  throws IOException {
+
+    	int length;
+    	ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE);
+    	command.order(ByteOrder.BIG_ENDIAN);
+    	debug = false;
+
+    	sock.read(command);
+    	command.rewind();
+    	length =  command.getInt(LINDA_DATA_LENGTH_OFFSET);
+    	if (length>0) {
+    	    ByteBuffer data = ByteBuffer.allocate(length);
+    	    int read = length;
+    	    if (debug) {
+    	    	System.out.print("reading:");
+    	    	System.out.println(length);
+    	    }
+
+    	    data.order(ByteOrder.BIG_ENDIAN);
+    	    while(read>0) {
+    		read -= sock.read(data);
+    	    }
+    	    data.rewind();
+
+    	    if (debug) {
+    	    	char id = (char)command.getShort(LINDA_ID_OFFSET);
+    	        System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+
+    	        		"MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+
+    	        		"ID:"+(int)id+" "+
+    	        		"SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+
+    	        		"DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" ");
+    	    	System.out.println("DATA:"+data);
+    	        command.rewind();
+    	    }
+    	    //if (debug_com) {
+    	    String comdata ="";
+    	    CharBuffer chardata = data.asCharBuffer();
+    	    comdata = chardata.toString();
+
+    	    //System.out.println("Com_data =>");
+    	    System.out.println(comdata);
+    	    //}
+    	    /***if (debug) {
+    	    	System.out.print("header:");
+    	    	for(int i=0;i<LINDA_HEADER_SIZE;i++) {
+    	    		System.out.println(command.get(i));
+    	    	}
+    	    	System.out.print("data:");
+    	    	for(int i=0;i<length;i++) {
+    	    		System.out.println(data.get(i));
+    	    	}
+    	    	data.rewind();
+    	    }***/
+    	    
+    	    int rseq = command.getInt(LINDA_SEQ_OFFSET);
+    	    int mode = command.get(LINDA_MODE_OFFSET);
+    	    Integer a;
+    	    /***
+    	    if (debug) {
+    	    	System.out.print("mode = ");
+    	    	System.out.println(mode);
+    	    	System.out.print("seq = ");
+    	    	System.out.println(rseq);
+    	    }***/
+    	    try {
+    	    	PSXReply r = seqHash.get((a = new Integer(rseq)));
+    		if (debug) {
+    			System.out.print("hash value = ");
+    			System.out.println(a.hashCode());
+    		}
+    		
+    		r.setAnswer(mode,command,data);
+
+    		if (r.callback != null ) {
+    		    r.callback.callback(data);
+    		}
+    	    } catch (NullPointerException e ) {
+    	    	if (debug) {
+    	    		System.out.println("hashed reply not found");
+    	    	}
+    		// can't happen
+    		return ;
+    	    }
+        }
+	}
+
+	private void chkServe(SocketChannel sock) 
 				    throws IOException {
 	int length;
 	ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE);
@@ -196,7 +313,18 @@
 		read -= sock.read(data);
 	    }
 	    data.rewind();
+
 	    if (debug) {
+	    	char id = (char)command.getShort(LINDA_ID_OFFSET);
+	        System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+
+	        		"MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+
+	        		"ID:"+(int)id+" "+
+	        		"SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+
+	        		"DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" ");
+	    	System.out.println("DATA:"+data);
+	        command.rewind();
+	    }
+	    /***if (debug) {
 	    	System.out.print("header:");
 	    	for(int i=0;i<LINDA_HEADER_SIZE;i++) {
 	    		System.out.println(command.get(i));
@@ -206,16 +334,18 @@
 	    		System.out.println(data.get(i));
 	    	}
 	    	data.rewind();
-	    }
+	    }***/
+	    
 	    int rseq = command.getInt(LINDA_SEQ_OFFSET);
 	    int mode = command.get(LINDA_MODE_OFFSET);
 	    Integer a;
+	    /***
 	    if (debug) {
 	    	System.out.print("mode = ");
 	    	System.out.println(mode);
 	    	System.out.print("seq = ");
 	    	System.out.println(rseq);
-	    }
+	    }***/
 	    try {
 	    	PSXReply r = seqHash.get((a = new Integer(rseq)));
 		if (debug) {
--- a/src/fdl/IOHandler.java	Thu Feb 07 15:51:20 2008 +0900
+++ b/src/fdl/IOHandler.java	Thu Feb 07 19:06:01 2008 +0900
@@ -14,8 +14,6 @@
     static final boolean debug = false;
 	public Tuple[] tuple_space;     
 	public ComDebug com_debug;
-	//public static Hashtable<String, Integer> com_Loggingtable = new Hashtable<String, Integer>();
-	//public static LinkedList<SocketChannel> reportCh_list = new LinkedList<SocketChannel>();
 
     public IOHandler(Tuple[] _tuple_space) {
 		super(_tuple_space);
@@ -54,10 +52,9 @@
         	count = channel.read(command);
 
         	if(count < 0) {
-            	System.out.println("Connection closed by "+key.channel());
-            	//System.out.println("Close channel on EOF; channel: "+channel);
-            	key.cancel();
-            	channel.close();
+        		Connection_Close(key);
+        		LinkedList<SocketChannel> reportCh_list = ComDebug.Report_Channellist;
+        		com_debug.reportCh_remove(key, reportCh_list);
             	readsize = -1;
             	return;
             }        	
@@ -114,7 +111,8 @@
     public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data, int len) throws IOException {
     	command.order(ByteOrder.BIG_ENDIAN);
 	    int mode = command.get(LINDA_MODE_OFFSET);
-	    int id = command.get(LINDA_ID_OFFSET);
+	    char idc = (char)command.getShort(LINDA_ID_OFFSET);
+	    int id = (int)idc;
 	    command.rewind();
 	    
 	    com_debug = new ComDebug();
@@ -126,8 +124,7 @@
     	}
 		if((mode == '!') || (len == 0)) {
 			Connection_Close(key);
-		}else if(id > PRIVILEGED_ID && id < MAX_TUPLE-1){
-			ComDebug.addChannel(key, reportCh_list);
+    		com_debug.reportCh_remove(key, reportCh_list);			
 		}
 		else if(mode == PSX_CHECK) {
             Check(key, command);
@@ -142,10 +139,14 @@
     		System.out.println("Uncorrect buffer");
     		System.exit(1);
 	    }
+		//COM_DEBUG
+	   	if(id > PRIVILEGED_ID_START && id < PRIVILEGED_ID_END){
+			ComDebug.addChannel(key, reportCh_list);
+		}
 		//DEBUG用カウンタ
         ComDebug.Com_inc(key, com_Loggingtable, mode);
-        System.out.println("Com_Debug:");
-        System.out.println(com_Loggingtable.toString());
+        //System.out.println("Com_Debug:");
+        //System.out.println(com_Loggingtable.toString());
 		//DEBUG用レポート
 		ComDebug.Report(reportCh_list, command, com_Loggingtable.toString());
         
--- a/src/fdl/PSXQueueInterface.java	Thu Feb 07 15:51:20 2008 +0900
+++ b/src/fdl/PSXQueueInterface.java	Thu Feb 07 19:06:01 2008 +0900
@@ -41,8 +41,8 @@
     static final int INT_SIZE          =4;
     static final int SHORT_SIZE          =2;
     
-    static final int PRIVILEGED_ID   = 32767;
-
+    static final int PRIVILEGED_ID_START   = 32768;
+    static final int PRIVILEGED_ID_END   = 36864;
 }
 
 /* end */