view src/pathfinder/BlockingQ/SeMaSimulator.java @ 84:5ac3df98f780

merger simulator using BlockingQ
author kent
date Mon, 12 Nov 2007 17:57:38 +0900
parents
children 0a483aa8cf71
line wrap: on
line source

package pathfinder.BlockingQ;

import java.util.ArrayList;
import java.util.List;

public class SeMaSimulator<P> extends Thread {
	private int MAX_PACKET;
	private int MAX_CLIENT;
	private boolean running=true;
	private NetworkSimulator<P> ns;
	private List<ChannelSimulator<P>> csList;

	public SeMaSimulator(NetworkSimulator<P> _ns, int max_client, int max_packet){
		ns = _ns;
		MAX_CLIENT = max_client;
		MAX_PACKET = max_packet;
		csList = new ArrayList<ChannelSimulator<P>>();
	}
	public SeMaSimulator(NetworkSimulator<P> _ns){
		this(_ns, 2, 0);
	}

	synchronized public void finish(){
		synchronized(ns){
			running = false;
			ns.notify();
		}
	}

	/**
	 * Check whether the NetworkSimulator hold waiting connections.
	 */
	private void checkAccept(){
		ChannelSimulator<P> cs;
		while((cs=ns.accept())!=null){
			csList.add(cs);
		}
	}

	public void run(){
		int i=0;
		int count=0;
		P packet;

		while(csList.size()<MAX_CLIENT){ checkAccept(); Thread.yield(); }
		System.out.println("SessionManager start.");

		/* Main Loop */
		ChannelSimulator<P> cs = csList.get(i);
		while(running
				&& (MAX_PACKET==0 || count<MAX_PACKET)){
			synchronized(ns){
				int prev_i=i;
				while((packet=cs.poll())==null && running){
					i = (i+1)%csList.size();   // i++
					cs = csList.get(i);        // 次のChennelをゲット
					if(i==prev_i) try { ns.wait(); } catch (InterruptedException e) { e.printStackTrace(); }
				}
			}
			if(!running) break;

			System.out.println("SeMa pass packet to "+i+":>> "+packet.toString());
			i = (i+1)%csList.size();   // i++
			cs = csList.get(i);        // 次のChennelをゲット

			// readできていたならそれを書き込む
			try {
				cs.write(packet);
			} catch (InterruptedException e) {
				System.out.println("SeMa cannot write!!");
				e.printStackTrace();
			}
			count++;
		}
/*
		ChannelSimulator<P> cs = csList.get(i);
		while(running
				&& MAX_PACKET==0 || count<MAX_PACKET){
			packet=cs.poll();          // [i]からread
			//if(packet!=null) System.out.println("SeMa catch packet to "+i+":>> "+packet.toString());
			i = (i+1)%csList.size();   // i++
			cs = csList.get(i);        // 次のChennelをゲット
			if (packet!=null) {
				System.out.println("SeMa pass packet to "+i+":>> "+packet.toString());
				cs.write(packet);      // readできていたならそれを書き込む
				count++;
			}
			//if (i==0) checkAccept();   //全部回ったらaccept待ちをチェック
		}
*/
		System.out.println("SessionManager finish.");
	}
}