Mercurial > hg > RemoteEditor > REPSessionManager
changeset 307:e4b7af3fdf99
*** empty log message ***
author | kono |
---|---|
date | Sat, 04 Oct 2008 22:12:16 +0900 (2008-10-04) |
parents | ef00df38dd5d |
children | c5be84d53c7f |
files | rep/channel/ChannelSimulator.java rep/handler/REPHandlerImpl.java |
diffstat | 2 files changed, 37 insertions(+), 32 deletions(-) [+] |
line wrap: on
line diff
--- a/rep/channel/ChannelSimulator.java Sat Oct 04 18:16:45 2008 +0900 +++ b/rep/channel/ChannelSimulator.java Sat Oct 04 22:12:16 2008 +0900 @@ -14,49 +14,59 @@ public class ChannelSimulator<P> extends REPSocketChannel<P>{ protected NetworkSimulator<P> ns; protected Queue<P> readQ; - protected Object lock; + protected Selector selector = new NullSelector(); protected boolean isBlocking; protected ChannelSimulator<P> otherEnd; protected SelectionKeySimulator<P> key; private String ownerName; - //protected BlockingQueue<P> qread; - //protected BlockingQueue<P> qwrite; - //protected SelectorSimulator<P> writeSelector; - //protected SelectorSimulator<P> readSelector; /** Constructors. */ public ChannelSimulator(){ super(null, null); ns = NetworkSimulator.<P>singleton(); readQ = new LinkedList<P>(); - lock = new Object(); ownerName = Thread.currentThread().getName(); } /** read from Queue. */ - public P read() throws IOException{ + public synchronized P read() { + // We may lock selector instead of this, but it reduces + // concurrency. P tmp; - synchronized (lock){ - while ( (tmp=readQ.poll())==null && isBlocking ) { - try { - lock.wait(); - } catch (InterruptedException e) { - throw new IOException(); - } + while ( (tmp=readQ.poll())==null && isBlocking ) { + try { + wait(); + } catch (InterruptedException e) { } } + // for write wait + otherEnd.wakeup(); selector.wakeup(); return tmp; } + + private synchronized void wakeup() { + notify(); + } + /** write packet to other end. */ public boolean write(P p){ if (otherEnd==null) throw new NotYetConnectedException(); return otherEnd.enQ(p); } + /** otherEnd Channel enqueue p to own queue using this method. */ - protected boolean enQ(P p){ - synchronized (lock){ - readQ.offer(p); - lock.notifyAll(); + protected synchronized boolean enQ(P p){ + while(true) { + if (readQ.offer(p)) { + notify(); // other end my wait() + selector.wakeup(); // selector may wait + break; + } else { + try { + wait(); + } catch (InterruptedException e) { + } + } } return true; } @@ -83,7 +93,7 @@ return false; } public boolean isReadable() { - synchronized (lock){ + synchronized (selector){ return !readQ.isEmpty(); } } @@ -110,19 +120,15 @@ @SuppressWarnings("unchecked") @Override public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { - SelectorSimulator<P> selector = (SelectorSimulator<P>) sel; - synchronized (lock){ - lock = selector; - } - key = selector.register(this, ops, att); + SelectorSimulator<P> sel1 = (SelectorSimulator<P>) sel; + selector = sel1; + key = sel1.register(this, ops, att); return key; } @SuppressWarnings("unchecked") public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException { - synchronized (lock){ - lock = sel; - } + selector = sel; key = (SelectionKeySimulator<P>) sel.register(this, ops, att); return key; }
--- a/rep/handler/REPHandlerImpl.java Sat Oct 04 18:16:45 2008 +0900 +++ b/rep/handler/REPHandlerImpl.java Sat Oct 04 22:12:16 2008 +0900 @@ -11,17 +11,16 @@ private SessionManager manager; - public REPHandlerImpl(int sid, SessionManager manager) { + public REPHandlerImpl(SessionManager manager) { this.manager = manager; } - @SuppressWarnings("unchecked") public void handle(REPSelectionKey<REPCommand> key) throws IOException { - REPSocketChannel<REPCommand> channel = (REPSocketChannel<REPCommand>) key.channel(); - //System.out.println("REPHandlerImpl.handle() : channel = " + channel); + REPSocketChannel<REPCommand> channel = key.channel1(); + System.out.println("REPHandlerImpl.handle() : channel = " + channel); REPCommand command = channel.read(); - //System.out.println("REPHandlerImpl.handle() : command = " + command); + System.out.println("REPHandlerImpl.handle() : command = " + command); manager.manage(channel, command); }