Mercurial > hg > RemoteEditor > REPSessionManager
view test/channeltest/testSeMaSlave.java @ 382:4b87f89b3afd
REP Session Manager (Java version)
new structure
author | one@firefly.cr.ie.u-ryukyu.ac.jp |
---|---|
date | Mon, 10 Nov 2008 22:07:45 +0900 |
parents | c5be84d53c7f |
children |
line wrap: on
line source
package test.channeltest; import java.io.IOException; import java.net.SocketAddress; import java.util.LinkedList; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import rep.channel.REPLogger; import rep.channel.REPPack; import rep.channel.REPSelectionKey; import rep.channel.REPSelector; import rep.channel.REPServerSocketChannel; import rep.channel.REPSocketChannel; public class testSeMaSlave extends Thread{ SocketAddress ownIP; SocketAddress masterIP; boolean running=true; REPLogger ns; LinkedList<ClientInfo> cis; public testSeMaSlave(String name, String oname,int oport, String mname,int mport){ super(name); ownIP = new InetSocketAddress(oname,oport); masterIP = new InetSocketAddress(mname,mport); cis = new LinkedList<ClientInfo>(); ns = REPLogger.singleton(); } public void init(){ } public void run() { REPSelector<String> selector; REPSocketChannel<String> masterCH ; try { REPPack<String> pack = new StringPacker(); REPServerSocketChannel<String> scs = REPServerSocketChannel.<String>open(pack); selector = REPSelector.create(); masterCH = connectToMaster(selector); scs.socket().bind(ownIP); scs.configureBlocking(false); scs.register(selector, SelectionKey.OP_ACCEPT,null); masterCH.configureBlocking(false); masterCH.register(selector, SelectionKey.OP_READ,null); ns.writeLog("Slave SessionManager starts main routin.", 1); /* Main Loop */ while(running){ selector.select(); for(REPSelectionKey<String> key : selector.selectedKeys1()){ if(key.isAcceptable()){ REPSocketChannel<String> channel = key.accept(pack); if(channel==null) continue; channel.configureBlocking(false); //selector.register(channel, SelectionKey.OP_READ,null); channel.register(selector, SelectionKey.OP_READ, null); ns.writeLog("accepts a client.", 1); }else if(key.isReadable()){ try { REPSocketChannel<String> channel = key.channel1(); String packet = channel.read(); if (packet==null) continue; //if (channel==masterCH){ if (packet.matches("^SeMa\\d.*")){ ns.writeLog("receives String from master ==> `"+packet+"\'", 1); for (ClientInfo ci: cis){ if (!packet.matches(".*"+ci.str+".*")) continue; ci.channel.write(this.getName()+": loopback master ==>`"+packet+"\'"); } }else{ ns.writeLog("receives String==> `"+packet+"\'", 1); //channlel.write("from "+this.getName()+": save"); masterCH.write(this.getName()+": pass packet`"+packet+"\'"); cis.add(new ClientInfo(packet, channel)); } }catch (IOException e1) { ns.writeLog("channel "+ns+" closed."); key.cancel(); } } } } } catch (IOException e1) { e1.printStackTrace(); } } private REPSocketChannel<String> connectToMaster(Selector _selector) throws IOException { REPSocketChannel<String> channel = REPSocketChannel.<String>create(new StringPacker()); ns.writeLog("is connecting to masterSeMa whose ip is"+masterIP, 1); while(!channel.connect(masterIP)){ ns.writeLog("SeMa not listen to socket yet, wait", 1); Thread.yield(); } ns.writeLog("connecting was successful.", 1); return channel; } } class ClientInfo{ String str; REPSocketChannel<String> channel; ClientInfo(String _str, REPSocketChannel<String> _channel){ str = _str; channel = _channel; } }