Mercurial > hg > RemoteEditor > REPSessionManager
view test/channeltest/testSeMaSlave.java @ 207:9aeade335af0
*** empty log message ***
author | kono |
---|---|
date | Sat, 30 Aug 2008 11:51:42 +0900 |
parents | a2aaf8af7bcc |
children | 302c4a0afab6 |
line wrap: on
line source
package test.channeltest; import java.io.IOException; import java.net.SocketAddress; import java.util.LinkedList; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import rep.channel.REPLogger; import rep.channel.REPPack; import rep.channel.REPSelectionKey; import rep.channel.REPSelector; import rep.channel.REPServerSocketChannel; import rep.channel.REPSocketChannel; public class testSeMaSlave extends Thread{ SocketAddress ownIP; SocketAddress masterIP; boolean running=true; REPLogger ns; LinkedList<ClientInfo> cis; public testSeMaSlave(String name, String oname,int oport, String mname,int mport){ super(name); ownIP = new InetSocketAddress(oname,oport); masterIP = new InetSocketAddress(mname,mport); cis = new LinkedList<ClientInfo>(); ns = REPLogger.singleton(); } public void init(){ } @SuppressWarnings("unchecked") public void run() { REPSelector<String> selector; REPSocketChannel<String> masterCH ; try { REPPack<String> pack = new StringPacker(); REPServerSocketChannel<String> scs = REPServerSocketChannel.<String>open(pack); selector = REPSelector.create(); masterCH = connectToMaster(selector); scs.socket().bind(ownIP); scs.configureBlocking(false); scs.register(selector, SelectionKey.OP_ACCEPT,null); masterCH.configureBlocking(false); masterCH.register(selector, SelectionKey.OP_READ,null); ns.writeLog("SessionManager starts main routin.", 1); /* Main Loop */ while(running){ selector.select(); for(REPSelectionKey<String> key : selector.selectedKeys1()){ if(key.isAcceptable()){ REPSocketChannel<String> channel = key.accept(pack); if(channel==null) continue; channel.configureBlocking(false); selector.register(channel, SelectionKey.OP_READ,null); ns.writeLog("accepts a client.", 1); }else if(key.isReadable()){ ns.writeLog("gets readable channel", 1); REPSocketChannel<String> channel = (REPSocketChannel<String>) key.channel(pack); String packet = channel.read(); if (channel==masterCH){ ns.writeLog("receives String from master ==> `"+packet+"\'", 1); for (ClientInfo ci: cis){ if (!packet.matches(".*"+ci.str+".*")) continue; ci.channel.write(this.getName()+": loopback master ==>`"+packet+"\'"); } }else{ ns.writeLog("receives String==> `"+packet+"\'", 1); //channlel.write("from "+this.getName()+": save"); masterCH.write(this.getName()+": pass packet`"+packet+"\'"); cis.add(new ClientInfo(packet, channel)); } } } } } catch (IOException e1) { e1.printStackTrace(); } } private REPSocketChannel<String> connectToMaster(Selector _selector) throws IOException { REPSocketChannel<String> channel = REPSocketChannel.<String>create(new StringPacker()); ns.writeLog("is connecting to masterSeMa whose ip is"+masterIP, 1); while(!channel.connect(masterIP)){ ns.writeLog("SeMa not listen to socket yet, wait", 1); Thread.yield(); } ns.writeLog("connecting was successful.", 1); return channel; } } class ClientInfo{ String str; REPSocketChannel<String> channel; ClientInfo(String _str, REPSocketChannel<String> _channel){ str = _str; channel = _channel; } }