view test/channeltest/testSeMaSlave.java @ 207:9aeade335af0

*** empty log message ***
author kono
date Sat, 30 Aug 2008 11:51:42 +0900
parents a2aaf8af7bcc
children 302c4a0afab6
line wrap: on
line source

package test.channeltest;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.LinkedList;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;

import rep.channel.REPLogger;
import rep.channel.REPPack;
import rep.channel.REPSelectionKey;
import rep.channel.REPSelector;
import rep.channel.REPServerSocketChannel;
import rep.channel.REPSocketChannel;

public class testSeMaSlave extends Thread{

	SocketAddress ownIP;
	SocketAddress masterIP;
	boolean running=true;
	REPLogger ns;
	LinkedList<ClientInfo> cis;
	
	public testSeMaSlave(String name, String oname,int oport, String mname,int  mport){
		super(name);
		ownIP = new InetSocketAddress(oname,oport);
		masterIP = new InetSocketAddress(mname,mport);
		cis = new LinkedList<ClientInfo>();
		ns = REPLogger.singleton();
	}
	public void init(){
		
	}

	@SuppressWarnings("unchecked")
	public void run() {
		REPSelector<String> selector;
		REPSocketChannel<String> masterCH ;
		try {
			REPPack<String> pack = new StringPacker();
			REPServerSocketChannel<String> scs = REPServerSocketChannel.<String>open(pack);
			selector = REPSelector.create();
			masterCH = connectToMaster(selector);
			scs.socket().bind(ownIP);
			scs.configureBlocking(false);
			scs.register(selector, SelectionKey.OP_ACCEPT,null);
			masterCH.configureBlocking(false);
			masterCH.register(selector, SelectionKey.OP_READ,null);


			ns.writeLog("SessionManager starts main routin.", 1);

			/* Main Loop */
			while(running){

				selector.select(); 

				for(REPSelectionKey<String> key : selector.selectedKeys1()){

					if(key.isAcceptable()){
						REPSocketChannel<String> channel = key.accept(pack);
						if(channel==null) continue;
						channel.configureBlocking(false);
						selector.register(channel, SelectionKey.OP_READ,null);
						ns.writeLog("accepts a client.", 1);

					}else if(key.isReadable()){
						ns.writeLog("gets readable channel", 1);
						REPSocketChannel<String> channel = (REPSocketChannel<String>) key.channel(pack);
						String packet = channel.read();
						if (channel==masterCH){
							ns.writeLog("receives String from master ==> `"+packet+"\'", 1);
							for (ClientInfo ci: cis){
								if (!packet.matches(".*"+ci.str+".*")) continue;
								ci.channel.write(this.getName()+": loopback master ==>`"+packet+"\'");
							}
						}else{
							ns.writeLog("receives String==> `"+packet+"\'", 1);
							//channlel.write("from "+this.getName()+": save");
							masterCH.write(this.getName()+": pass packet`"+packet+"\'");
							cis.add(new ClientInfo(packet, channel));
						}
					}
				}
			}
		} catch (IOException e1) {
			e1.printStackTrace();
		}

	}
	
	private REPSocketChannel<String> connectToMaster(Selector _selector) throws IOException {
		REPSocketChannel<String> channel = REPSocketChannel.<String>create(new StringPacker());
		ns.writeLog("is connecting to masterSeMa whose ip is"+masterIP, 1);
		while(!channel.connect(masterIP)){
			ns.writeLog("SeMa not listen to socket yet, wait", 1);
			Thread.yield();
		}
		ns.writeLog("connecting was successful.", 1);

		return channel;
	}
}
class ClientInfo{
	String str;
	REPSocketChannel<String> channel;
	ClientInfo(String _str, REPSocketChannel<String> _channel){
		str = _str;
		channel = _channel;
	}
}