view rep/channel/SelectableChannelSimulator.java @ 225:e173411a2499

*** empty log message ***
author kent
date Sun, 31 Aug 2008 13:32:15 +0900
parents 4c0a94836357
children
line wrap: on
line source

package rep.channel;

import java.nio.channels.SocketChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public abstract class SelectableChannelSimulator<P> extends REPSocketChannel<P>{

	protected BlockingQueue<P> qread;
	protected BlockingQueue<P> qwrite;
	protected SelectorSimulator<P> writeSelector;
	protected SelectorSimulator<P> readSelector;

	public SelectableChannelSimulator(SocketChannel channel) {
		super(channel, null);
	}
	
	/* read from Queue.  */
	public P read(){
		try {
			if(readSelector!=null)
				synchronized (readSelector){
					return qread.take();
				}
			else{
				return qread.take();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
			return null;
		}
	}
	/* write to Queue.  */
	public boolean write(P p){
		try {
			if (writeSelector!=null)
				synchronized (writeSelector){
					qwrite.put(p);
					writeSelector.notifyAll();
				}
			else {
				qwrite.put(p);
			}
			return true;
		} catch (InterruptedException e) {
			e.printStackTrace();
			return false;
		}
	}
	public abstract ChannelSimulator<P> accept();
	
	/* accessor methods.  */
	public BlockingQueue<P> getReadQ(){
		return qread;
	}
	public BlockingQueue<P> getWriteQ(){
		return qwrite;
	}
	public void createReadQ(){
		qread = new LinkedBlockingQueue<P>();
	}
	public void createWriteQ(){
		qwrite = new LinkedBlockingQueue<P>();
	}
	public void setWriteSelector(SelectorSimulator<P> _selector){
		writeSelector = _selector; 
	}


	/* return state of the Queue */
	abstract public boolean isReadable();
	abstract public boolean isWritable();
	abstract public boolean isAcceptable();
	
}