Mercurial > hg > FederatedLinda
changeset 3:ae7e0e92c651
*** empty log message ***
author | fuchita |
---|---|
date | Mon, 11 Feb 2008 11:54:15 +0900 |
parents | b49e593b2502 |
children | 2023d9b31af9 |
files | src/fdl/AcceptHandler.java src/fdl/ComDebug.java src/fdl/ComDebug_Client.java src/fdl/FDLindaServ.java src/fdl/IOHandler.java src/fdl/PSXLinda.java src/fdl/TestEtc.java |
diffstat | 7 files changed, 113 insertions(+), 62 deletions(-) [+] |
line wrap: on
line diff
--- a/src/fdl/AcceptHandler.java Thu Feb 07 19:06:01 2008 +0900 +++ b/src/fdl/AcceptHandler.java Mon Feb 11 11:54:15 2008 +0900 @@ -1,11 +1,15 @@ package fdl; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.LinkedList; public class AcceptHandler implements TupleHandler, PSXQueueInterface { //public Hashtable<Integer, Tuple> tuple_space; @@ -29,7 +33,11 @@ // アクセプト処理 SocketChannel channel = serverChannel.accept(); channel.configureBlocking(false); - System.out.println("Server: accepted "+channel.socket().getInetAddress()); + System.out.println("Server: accepted "+channel.socket()); + + //ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE); + //LinkedList<SocketChannel> reportCh_list = ComDebug.Report_Channellist; + //AcceptwishComDebug(key, command, reportCh_list); //初期生成 @@ -73,5 +81,5 @@ channel.register(key.selector(), SelectionKey.OP_READ, handler); - } + } }
--- a/src/fdl/ComDebug.java Thu Feb 07 19:06:01 2008 +0900 +++ b/src/fdl/ComDebug.java Mon Feb 11 11:54:15 2008 +0900 @@ -1,6 +1,9 @@ package fdl; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.SelectionKey; @@ -12,9 +15,10 @@ public class ComDebug implements PSXQueueInterface{ static final boolean debug = true; + //public static int seq = 0; public static Hashtable<String, Integer> Com_Hashtable = new Hashtable<String, Integer>(); public static LinkedList<SocketChannel> Report_Channellist = new LinkedList<SocketChannel>(); - + ComDebug(){ } @@ -48,21 +52,59 @@ io.send(it.next(), command, data); } } + private static String getRemoteHostAndPort(SocketChannel channel) { + String socketString = channel.socket().getRemoteSocketAddress().toString(); + String[] split = socketString.split("/"); + int length = split.length; + String hostAndPort = split[length-1]; + split = hostAndPort.split(":"); + String host = split[0]; + String port = split[1]; + int portnum = Integer.parseInt(port); + try { + InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), portnum); + host = address.getHostName().toString(); + return (host +":"+port); + } + catch( UnknownHostException e ){ + return hostAndPort; + } + } - public static void Com_inc(SelectionKey key, Hashtable<String, Integer> comlist, int mode) { + private static String getLocalHostAndPort(SocketChannel channel) { + String socketString = channel.socket().getLocalSocketAddress().toString(); + String[] split = socketString.split("/"); + int length = split.length; + String hostAndPort = split[length-1]; + split = hostAndPort.split(":"); + String host = split[0]; + String port = split[1]; + try { + InetAddress localhost = InetAddress.getLocalHost(); + host = localhost.getHostName(); + return (host +":"+port); + } + catch( UnknownHostException e ){ + return (host +":"+port); + } + } + + public static String Com_inc(SelectionKey key, Hashtable<String, Integer> comlist, int mode) { //通信ログ Hostname:port 'mode' =number 形式でインクリメント int cnt = 0; SocketChannel ch = (SocketChannel) key.channel(); - String socketString = ch.socket().getRemoteSocketAddress().toString(); - String[] split = socketString.split("/"); - int length = split.length; - String ComKey = split[length-1]; - ComKey += " "+(char)mode+ " "; + + String remoteString = getRemoteHostAndPort(ch); + String localString = getLocalHostAndPort(ch); + + String ComKey = localString + "--" + remoteString + " " + (char)mode; if(comlist.containsKey(ComKey)){ cnt = comlist.get(ComKey); } cnt++; comlist.put(ComKey, cnt); + long seq = System.currentTimeMillis(); + return (seq+" "+ComKey+"="+cnt); } public static void addChannel(SelectionKey key, LinkedList<SocketChannel> reportCh_list) {
--- a/src/fdl/ComDebug_Client.java Thu Feb 07 19:06:01 2008 +0900 +++ b/src/fdl/ComDebug_Client.java Mon Feb 11 11:54:15 2008 +0900 @@ -13,7 +13,7 @@ public static void main(String[] args) { FederatedLinda fdl; PSXLinda psx; - String host = "localhost"; + String host = "ged.cr.ie.u-ryukyu.ac.jp"; int port = 10000; int connect_num = 1; @@ -23,12 +23,14 @@ try { for (int i=0; i<args.length; ++i) { if("-h".equals(args[i])) { - host = (String)(args[++i]); + host = (String)(args[++i]); + System.err.println("host = "+host); } else { System.err.println(usages); } if("-p".equals(args[i])) { - port = Integer.parseInt(args[++i]); + port = Integer.parseInt(args[++i]); + System.err.println("port = "+port); } else { System.err.println(usages); }
--- a/src/fdl/FDLindaServ.java Thu Feb 07 19:06:01 2008 +0900 +++ b/src/fdl/FDLindaServ.java Mon Feb 11 11:54:15 2008 +0900 @@ -2,11 +2,11 @@ package fdl; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; +import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; public class FDLindaServ implements PSXQueueInterface { @@ -16,16 +16,17 @@ static final int MAX_TUPLE = 65536; static final int DEF_PORT = 10000; static final String PATHNAME = "/tmp/ldserv"; - + public static final int TIMEOUT = 5*1000; public static Tuple[] tuple_space; + @SuppressWarnings("unchecked") public static void main(final String[] args) throws IOException { @SuppressWarnings("unused") final String usages = "usage: FDLindaServ [-p port]"; int port = DEF_PORT; tuple_space = new Tuple[MAX_TUPLE]; - + Selector selector = SelectorProvider.provider().openSelector(); //引数判定 try { for (int i=0; i<args.length; ++i) { @@ -39,34 +40,34 @@ e.printStackTrace(); } - //セレクタを生成 - Selector selector = Selector.open(); + try { + //セレクタを生成 + - try { //ソケット・チャネルを生成・設定 - ServerSocketChannel ssChannel = ServerSocketChannel.open(); + ServerSocketChannel ssChannel = SelectorProvider.provider().openServerSocketChannel(); + ssChannel.socket().bind(new InetSocketAddress(port)); ssChannel.configureBlocking(false); ssChannel.socket().setReuseAddress(true); - InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), port); - ssChannel.socket().bind(address); System.out.println("Server: litening at "+ssChannel); - //セレクタにチャンネルを登録 - ssChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler(tuple_space)); + //ssChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler(tuple_space)); + //ssChannel.register(selector, ssChannel.validOps(), new AcceptHandler(tuple_space)); + ssChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler(tuple_space)); // セレクタによる監視 - while (selector.select() > 0) { - + while (selector.keys().size() > 0) { + @SuppressWarnings("unused") + int KeyCount = selector.select(TIMEOUT); // Iteratorを用意 - Iterator <SelectionKey>it = selector.selectedKeys().iterator(); - + Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { // SelectionKeyを取り出す SelectionKey selKey = (SelectionKey)it.next(); // 操作に対する処理が行われていると認識させるためにremoveする it.remove(); - + TupleHandler handler = (TupleHandler)selKey.attachment(); handler.handle(selKey); } @@ -82,6 +83,6 @@ } catch(IOException ex) { ex.printStackTrace(); } - } + } } }
--- a/src/fdl/IOHandler.java Thu Feb 07 19:06:01 2008 +0900 +++ b/src/fdl/IOHandler.java Mon Feb 11 11:54:15 2008 +0900 @@ -51,10 +51,9 @@ } count = channel.read(command); - if(count < 0) { - Connection_Close(key); + if(count < 0) { LinkedList<SocketChannel> reportCh_list = ComDebug.Report_Channellist; - com_debug.reportCh_remove(key, reportCh_list); + ClosewishComDebug(key, command, reportCh_list); readsize = -1; return; } @@ -123,8 +122,7 @@ System.out.println("data from : "+key.channel()); } if((mode == '!') || (len == 0)) { - Connection_Close(key); - com_debug.reportCh_remove(key, reportCh_list); + ClosewishComDebug(key, command, reportCh_list); } else if(mode == PSX_CHECK) { Check(key, command); @@ -139,16 +137,17 @@ System.out.println("Uncorrect buffer"); System.exit(1); } + //COM_DEBUG if(id > PRIVILEGED_ID_START && id < PRIVILEGED_ID_END){ ComDebug.addChannel(key, reportCh_list); } //DEBUG用カウンタ - ComDebug.Com_inc(key, com_Loggingtable, mode); + String debug_rep = ComDebug.Com_inc(key, com_Loggingtable, mode); //System.out.println("Com_Debug:"); //System.out.println(com_Loggingtable.toString()); //DEBUG用レポート - ComDebug.Report(reportCh_list, command, com_Loggingtable.toString()); + ComDebug.Report(reportCh_list, command, debug_rep); if (key.interestOps() != (SelectionKey.OP_READ)) { @@ -158,6 +157,21 @@ } + private void ClosewishComDebug(SelectionKey key, ByteBuffer command, + LinkedList<SocketChannel> reportCh_list) throws IOException { + String close_Channel = key.channel().toString(); + String[] split = close_Channel.split("/"); + String local[] = split[1].split(" "); + String remote[] = split[2].split("]"); + + String localAddress = local[0]; + String remoteAddress = remote[0]; + + Connection_Close(key); + com_debug.reportCh_remove(key, reportCh_list); + ComDebug.Report(reportCh_list, command, "CloseInfo >"+localAddress+"--"+remoteAddress); + } + private void Connection_Close(SelectionKey key) throws IOException { System.out.println("Connection closed by "+key.channel()); SocketChannel channel = (SocketChannel)key.channel();
--- a/src/fdl/PSXLinda.java Thu Feb 07 19:06:01 2008 +0900 +++ b/src/fdl/PSXLinda.java Mon Feb 11 11:54:15 2008 +0900 @@ -59,7 +59,8 @@ socket.setTcpNoDelay(true); // can be blocked (thread required?) - socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), _port)); + //socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), _port)); + socketChannel.connect(new InetSocketAddress(_host, _port)); while (! socketChannel.finishConnect()) { if (debug) { System.out.println("waiting for connect");
--- a/src/fdl/TestEtc.java Thu Feb 07 19:06:01 2008 +0900 +++ b/src/fdl/TestEtc.java Mon Feb 11 11:54:15 2008 +0900 @@ -25,33 +25,16 @@ public PSXQueue tp; public static void main(String[] args) throws IOException { - Tuple[] testTS = new Tuple[MAX_TUPLE]; - ByteBuffer buf = ByteBuffer.allocate(4); - buf.putInt(10); - buf.rewind(); + String socketString = "java.nio.channels.SocketChannel[connected local=/133.13.57.210:10001 remote=/133.13.57.210:50634]"; + String[] split = socketString.split("/"); + String local[] = split[1].split(" "); + String remote[] = split[2].split("]"); - Tuple tmpTuple; - - tmpTuple = testTS[MAX_TUPLE-1] = new Tuple(); - - if(tmpTuple != null) { - tmpTuple.setTuple('o', 1, 1, 1, buf); - } + String localAddress = local[0]; + String remoteAddress = remote[0]; - if(testTS[1] == null) { - System.out.println("testTS[1] == null"); - } - tmpTuple.next = testTS[1] = new Tuple(); - - tmpTuple = tmpTuple.next; - if(tmpTuple != null){ - tmpTuple.setTuple('o', 2, 2, 2, buf); - } - for(Tuple element: testTS){ - if(element != null){ - System.out.println(element.toString()); - } - } + System.out.println(local[0]); + System.out.println(remote[0]); // System.out.println("tmpTuple "+tmpTuple.toString()); // System.out.println("testTS "+testTS[MAX_TUPLE-1].toString()); }