Mercurial > hg > RemoteEditor > REPSessionManager
changeset 234:0498425202a4
made ChannelSimulator no extends SelectableChannelSimulator
but REPSocketChannel directly
author | kent |
---|---|
date | Sun, 31 Aug 2008 19:02:34 +0900 |
parents | dae90ded1bcd |
children | a8302aa5a495 |
files | rep/channel/ChannelSimulator.java rep/channel/REPSocketChannel.java rep/channel/SelectionKeySimulator.java rep/channel/SelectorSimulator.java |
diffstat | 4 files changed, 84 insertions(+), 12 deletions(-) [+] |
line wrap: on
line diff
--- a/rep/channel/ChannelSimulator.java Sun Aug 31 19:01:03 2008 +0900 +++ b/rep/channel/ChannelSimulator.java Sun Aug 31 19:02:34 2008 +0900 @@ -6,13 +6,19 @@ import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; -public class ChannelSimulator<P> extends SelectableChannelSimulator<P>{ +public class ChannelSimulator<P> extends REPSocketChannel<P>{ protected NetworkSimulator<P> ns; + protected BlockingQueue<P> qread; + protected BlockingQueue<P> qwrite; + protected SelectorSimulator<P> writeSelector; + protected SelectorSimulator<P> readSelector; /** Constructors. */ public ChannelSimulator(){ - super(null); + super(null, null); ns = NetworkSimulator.<P>singleton(); } @@ -24,6 +30,40 @@ ret.writeSelector=readSelector; return ret; } + + /* read from Queue. */ + public P read(){ + try { + if(readSelector!=null) + synchronized (readSelector){ + return qread.take(); + } + else{ + return qread.take(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + return null; + } + } + /* write to Queue. */ + public boolean write(P p){ + try { + if (writeSelector!=null) + synchronized (writeSelector){ + qwrite.put(p); + writeSelector.notifyAll(); + } + else { + qwrite.put(p); + } + return true; + } catch (InterruptedException e) { + e.printStackTrace(); + return false; + } + } + /** Connecting methods */ // for clients. @@ -31,22 +71,36 @@ return ns.connect(ip, this); } + /* accessor methods. */ + public BlockingQueue<P> getReadQ(){ + return qread; + } + public BlockingQueue<P> getWriteQ(){ + return qwrite; + } + public void createReadQ(){ + qread = new LinkedBlockingQueue<P>(); + } + public void createWriteQ(){ + qwrite = new LinkedBlockingQueue<P>(); + } + public void setWriteSelector(SelectorSimulator<P> _selector){ + writeSelector = _selector; + } + public ChannelSimulator<P> accept(){ return null; } - @Override public boolean isAcceptable() { return false; } - @Override public boolean isReadable() { synchronized (qread){ return !qread.isEmpty(); } } - @Override public boolean isWritable() { return true; }
--- a/rep/channel/REPSocketChannel.java Sun Aug 31 19:01:03 2008 +0900 +++ b/rep/channel/REPSocketChannel.java Sun Aug 31 19:02:34 2008 +0900 @@ -139,6 +139,7 @@ return sc.connect(semaIP); } + public SelectionKey register1(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException { if(sel instanceof REPSelector) { @@ -159,5 +160,4 @@ return sc.register(sel, ops,att); } - } \ No newline at end of file
--- a/rep/channel/SelectionKeySimulator.java Sun Aug 31 19:01:03 2008 +0900 +++ b/rep/channel/SelectionKeySimulator.java Sun Aug 31 19:02:34 2008 +0900 @@ -36,7 +36,7 @@ } public void setFlag() { - SelectableChannelSimulator<?> scs = (SelectableChannelSimulator<?>) channel; + ChannelSimulator<?> scs = (ChannelSimulator<?>) channel; ready = 0; if(scs.isAcceptable()) ready |= OP_ACCEPT; if(scs.isReadable()) ready |= OP_READ;
--- a/rep/channel/SelectorSimulator.java Sun Aug 31 19:01:03 2008 +0900 +++ b/rep/channel/SelectorSimulator.java Sun Aug 31 19:02:34 2008 +0900 @@ -41,6 +41,29 @@ } return selectedKeys.size(); } + + @Override + public int select(long timeout) throws IOException { + selectedKeys = new HashSet<SelectionKey>(); + + synchronized(this) { + if (!wakeFlag){ + for(SelectionKey key : keyList){ + if(((SelectionKeySimulator<?>) key).isAble()) + selectedKeys.add(key); + } + + if(selectedKeys.isEmpty()) + try { + this.wait(timeout); + } catch (InterruptedException e) { + throw new IOException("Error, Selector was interrupted!"); + } + } + wakeFlag=false; + } + return selectedKeys.size(); + } @Override public int selectNow() throws IOException { @@ -108,11 +131,6 @@ return null; } - @Override - public int select(long timeout) throws IOException { - // TODO Auto-generated method stub - return 0; - } @Override public Selector wakeup() {