Mercurial > hg > RemoteEditor > REPSessionManager
changeset 314:edb373aa421e
use channel lock instead of selector lock.
remove generic type from NetworkSimulator, which simplifies
singleton pattern.
remove lock from REPLogger.
author | kono |
---|---|
date | Mon, 06 Oct 2008 10:34:37 +0900 |
parents | 0585fd2410b8 |
children | 20fb70068089 |
files | rep/channel/ChannelSimulator.java rep/channel/NetworkSimulator.java rep/channel/REPLogger.java rep/channel/SelectorSimulator.java rep/channel/ServerChannelSimulator.java test/channeltest/testNetworkSimulator.java |
diffstat | 6 files changed, 107 insertions(+), 90 deletions(-) [+] |
line wrap: on
line diff
--- a/rep/channel/ChannelSimulator.java Sun Oct 05 22:36:24 2008 +0900 +++ b/rep/channel/ChannelSimulator.java Mon Oct 06 10:34:37 2008 +0900 @@ -12,7 +12,7 @@ import java.util.Queue; public class ChannelSimulator<P> extends REPSocketChannel<P>{ - protected NetworkSimulator<P> ns; + protected NetworkSimulator ns; protected Queue<P> readQ; protected Selector selector = new NullSelector(); protected boolean isBlocking; @@ -23,20 +23,22 @@ /** Constructors. */ public ChannelSimulator(){ super(null, null); - ns = NetworkSimulator.<P>singleton(); + ns = NetworkSimulator.singleton(); readQ = new LinkedList<P>(); ownerName = Thread.currentThread().getName(); } - /** read from Queue. */ - public synchronized P read() { + /** read from Queue. + * @throws IOException */ + public synchronized P read() throws IOException { // We may lock selector instead of this, but it reduces - // concurrency. + // concurrency, but be careful of dead lock. P tmp; while ( (tmp=readQ.poll())==null && isBlocking ) { try { wait(); // queue is empty } catch (InterruptedException e) { + throw new IOException(); } } // for write wait (we don't need this) @@ -55,11 +57,21 @@ } /** otherEnd Channel enqueue p to own queue using this method. */ - protected synchronized boolean enQ(P p){ + + boolean enQ(P p) { + if (enQ1(p)) { + // don't lock this channel while calling selector.wakeup(). + // selector may lock this channel, which may cause dead lock. + selector.wakeup(); + return true; + } + return false; + } + + protected synchronized boolean enQ1(P p){ while(true) { if (readQ.offer(p)) { notify(); // other end my wait() - selector.wakeup(); // selector may wait break; } else { // this can't happen assert(false); @@ -138,5 +150,14 @@ return "ChSim("+ownerName+")"; } + public ChannelSimulator<P> newChannel() { + return new ChannelSimulator<P>(); + } + + @SuppressWarnings("unchecked") + public void setOtherEnd1(ChannelSimulator<?> other) { + otherEnd = (ChannelSimulator<P>) other; + } + }
--- a/rep/channel/NetworkSimulator.java Sun Oct 05 22:36:24 2008 +0900 +++ b/rep/channel/NetworkSimulator.java Mon Oct 06 10:34:37 2008 +0900 @@ -5,47 +5,41 @@ import java.util.HashMap; import java.util.LinkedList; -public class NetworkSimulator<P> { - public static NetworkSimulator<?> ns; - +public class NetworkSimulator { + // we don't use <P> because we need singleton. + public static NetworkSimulator ns = new NetworkSimulator(); public HashMap<SocketAddress,Integer>namedb = new HashMap<SocketAddress,Integer>(); public int ipcount = 1; public REPLogger logger; - @SuppressWarnings("unchecked") // <?> から <T> へのキャストのため. - public static <T> NetworkSimulator<T> singleton(){ - // double check singleton - if (ns==null) - synchronized (NetworkSimulator.class) { - if (ns==null) - ns = new NetworkSimulator<T>(); - } - return (NetworkSimulator<T>) ns; + public static NetworkSimulator singleton() { + // singleton pattern may used here, but it has a little cost. + return ns; } int logLevel=5; /** Listening Servers. */ - private LinkedList<ServerData<P>> serverList; + private LinkedList<ServerData> serverList; /** Constructor. */ public NetworkSimulator(){ - serverList = new LinkedList<ServerData<P>>(); + serverList = new LinkedList<ServerData>(); logger = REPLogger.singleton(); logger.writeLog("construct Networksimulator", 1); // printAllState(); } /* */ - synchronized public void listen(InetSocketAddress ip, ServerChannelSimulator<P> scs) { - serverList.add(new ServerData<P>(ip, scs)); + synchronized public void listen(InetSocketAddress ip, ServerChannelSimulator<?> scs) { + serverList.add(new ServerData(ip, scs)); logger.writeLog("listen", 1); printAllState(); } - synchronized public boolean connect(InetSocketAddress ip, ChannelSimulator<P> CHclient) { + synchronized public boolean connect(InetSocketAddress ip, ChannelSimulator<?> CHclient) { logger.writeLog("connecting..", 1); - for (ServerData<P> sd0: serverList){ + for (ServerData sd0: serverList){ // ANY address (0.0.0.0/0.0.0.0) should be considered. if (sd0.IP.getAddress().isAnyLocalAddress()) { if (sd0.IP.getPort() != ip.getPort()) continue; @@ -54,9 +48,10 @@ // use different port address. } else if (!sd0.IP.equals(ip)) continue; - ChannelSimulator<P> CHserver = new ChannelSimulator<P>(); - CHserver.setOtherEnd(CHclient); - CHclient.setOtherEnd(CHserver); + //ChannelSimulator<?> CHserver = new ChannelSimulator<?>(); + ChannelSimulator<?> CHserver = CHclient.newChannel(); + CHserver.setOtherEnd1(CHclient); + CHclient.setOtherEnd1(CHserver); sd0.connectedListS.add(CHserver); sd0.connectedListC.add(CHclient); @@ -72,16 +67,16 @@ /** for DEBUG methods. */ void printAllState(){ String log = "NetworkSimulator State:"; - for (ServerData<P> sd: serverList){ + for (ServerData sd: serverList){ log += "\tSessionManager(ip="+sd.IP.toString()+"): "; log += channelList(sd.connectedListC); } logger.writeLog(log); } - String channelList(LinkedList<ChannelSimulator<P>> list){ + private String channelList(LinkedList<ChannelSimulator<?>> list){ String tmp = ""; - for (ChannelSimulator<P> ch: list){ + for (ChannelSimulator<?> ch: list){ tmp += ch.toString()+" "; } return "\t"+tmp; @@ -89,7 +84,7 @@ - public int nslookup(SocketAddress semaIP) { + public synchronized int nslookup(SocketAddress semaIP) { Integer ip; if ((ip=namedb.get(semaIP))==null) { namedb.put(semaIP, (ip=ipcount++)); @@ -100,19 +95,19 @@ } -class ServerData<P> { +class ServerData { //int virtualIP; InetSocketAddress IP; //SelectorSimulator<P> selector; - ServerChannelSimulator<P> scs; - LinkedList<ChannelSimulator<P>> connectedListS; - LinkedList<ChannelSimulator<P>> connectedListC; + ServerChannelSimulator<?> scs; + LinkedList<ChannelSimulator<?>> connectedListS; + LinkedList<ChannelSimulator<?>> connectedListC; - ServerData(InetSocketAddress ip, ServerChannelSimulator<P> _scs){ + ServerData(InetSocketAddress ip, ServerChannelSimulator<?> _scs){ IP = ip; //selector = _selector; scs = _scs; - connectedListS = new LinkedList<ChannelSimulator<P>>(); - connectedListC = new LinkedList<ChannelSimulator<P>>(); + connectedListS = new LinkedList<ChannelSimulator<?>>(); + connectedListC = new LinkedList<ChannelSimulator<?>>(); } } \ No newline at end of file
--- a/rep/channel/REPLogger.java Sun Oct 05 22:36:24 2008 +0900 +++ b/rep/channel/REPLogger.java Mon Oct 06 10:34:37 2008 +0900 @@ -1,15 +1,15 @@ package rep.channel; public class REPLogger { - static REPLogger single; + static REPLogger single = new REPLogger(); public static REPLogger singleton(){ - if(single==null){ - synchronized(REPLogger.class){ - if(single==null) - single = new REPLogger(); - } - } +// if(single==null){ +// synchronized(REPLogger.class){ +// if(single==null) +// single = new REPLogger(); +// } +// } return single; } protected REPLogger(){ @@ -17,16 +17,16 @@ private int logLevel; /** simulation log command */ - synchronized public void writeLog(String log, int level){ + public void writeLog(String log, int level){ if ( level<=logLevel ) System.out.println(Thread.currentThread().getName()+": "+log); - System.out.flush(); + //System.out.flush(); } public void writeLog(String log){ writeLog(log, 0); } - synchronized public void setLogLevel(int logLevel) { + public void setLogLevel(int logLevel) { this.logLevel = logLevel; }
--- a/rep/channel/SelectorSimulator.java Sun Oct 05 22:36:24 2008 +0900 +++ b/rep/channel/SelectorSimulator.java Mon Oct 06 10:34:37 2008 +0900 @@ -14,6 +14,7 @@ // access to these set have to be synchronized private Set<SelectionKey> keyList; private Set<SelectionKey> selectedKeys; + private boolean isOpen=true; public SelectorSimulator() { super(null); @@ -40,8 +41,8 @@ getSelectedKeys(); if(selectedKeys.isEmpty()) { try { - this.wait(timeout); - // we cannot know if we timeouted or not + wait(timeout); + // we cannot know if we time outed or not getSelectedKeys(); } catch (InterruptedException e) { throw new IOException("Error, Selector was interrupted!"); @@ -91,7 +92,7 @@ // REPSelectionKeyを生成しないように注意 newKeys.add(new SelectionKeySimulator<P>(k)); } - return newKeys;//(Set<REPSelectionKey<P>>)newKeys; + return newKeys; } public synchronized <T> SelectionKey getKey(ChannelSimulator<T> channel){ @@ -104,13 +105,12 @@ @Override public void close() throws IOException { - // TODO Auto-generated method stub - + isOpen = false; } @Override public boolean isOpen() { - return true; + return isOpen; } @Override
--- a/rep/channel/ServerChannelSimulator.java Sun Oct 05 22:36:24 2008 +0900 +++ b/rep/channel/ServerChannelSimulator.java Mon Oct 06 10:34:37 2008 +0900 @@ -13,20 +13,19 @@ /* シミュレーションの際にコンストラクトされる REPServerSocketChannel の実装 */ public class ServerChannelSimulator<P>extends REPServerSocketChannel<P> { - protected NetworkSimulator<P> ns; + protected NetworkSimulator ns; //public REPServerSocket<REPSocketChannel<P>> socket; protected InetSocketAddress IP; protected Queue<ChannelSimulator<P>> acceptQ; - protected Object lock; + protected Selector selector; protected boolean isBlocking; private SelectionKeySimulator<P> key; /** Constructors. * @throws IOException */ public ServerChannelSimulator() throws IOException { - //socket = REPServerSocket.<REPSocketChannel<P>>create(); - ns = NetworkSimulator.<P>singleton(); - lock = new Object(); + ns = NetworkSimulator.singleton(); + selector = new NullSelector(); // new Object(); acceptQ = new LinkedList<ChannelSimulator<P>>(); } @@ -34,28 +33,36 @@ IP = ip; } - public REPSocketChannel<P> accept1() throws IOException { - ChannelSimulator<P> tmp; - synchronized (lock) { - while ( (tmp=acceptQ.poll())==null && isBlocking ) { - try { - lock.wait(); - } catch (InterruptedException e) { - throw new IOException(); - } + public synchronized REPSocketChannel<P> accept1() throws IOException { + ChannelSimulator<P> channel; + while ( (channel=acceptQ.poll())==null && isBlocking ) { + try { + wait(); + } catch (InterruptedException e) { + throw new IOException(); } } - //return ns.accept(IP); - return tmp; + return channel; } + protected boolean enQ(ChannelSimulator<P> ch){ - synchronized (lock){ + // Don't lock a selector from a locked channel, the selector may + // use channel.isAble() which locks the channel. + synchronized(this) { acceptQ.offer(ch); - lock.notifyAll(); + notify(); } + selector.wakeup(); return true; } + @SuppressWarnings("unchecked") + public void enQ(ChannelSimulator<?> hserver) { + // NetworkSimulator doesn't know P + ChannelSimulator<P>ch = (ChannelSimulator<P>) hserver; + enQ(ch); + } + public ServerSocket socket() { try { return new REPServerSocket(this); @@ -69,36 +76,29 @@ @SuppressWarnings("unchecked") public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException { - synchronized (lock) { - lock = sel; - } - REPSelector<P> selector = sel; + selector = sel; + REPSelector<P> selector1 = sel; ns.listen(IP, this); - key = (SelectionKeySimulator<P>) selector.register(this, ops, att); + key = (SelectionKeySimulator<P>) selector1.register(this, ops, att); return key; } @SuppressWarnings("unchecked") @Override public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { - // TODO - synchronized (lock) { - lock = sel; - } - REPSelector<P> selector = (REPSelector<P>)sel; + selector = sel; + REPSelector<P> selector1 = (REPSelector<P>)sel; ns.listen(IP, this); // bindに移動してもいいよ - key = (SelectionKeySimulator<P>) selector.register(this, ops, att); + key = (SelectionKeySimulator<P>) selector1.register(this, ops, att); return key; } - public boolean isAcceptable() { - synchronized (lock){ - return !acceptQ.isEmpty(); - } + public synchronized boolean isAcceptable() { + return !acceptQ.isEmpty(); } @Override public Object blockingLock() { - return lock; + return selector; } public SelectableChannel configureBlocking(boolean block) throws IOException { @@ -142,4 +142,5 @@ } + }
--- a/test/channeltest/testNetworkSimulator.java Sun Oct 05 22:36:24 2008 +0900 +++ b/test/channeltest/testNetworkSimulator.java Mon Oct 06 10:34:37 2008 +0900 @@ -15,7 +15,7 @@ static public REPLogger logger = REPLogger.singleton(); public static void main(String[] args){ - REPServerSocketChannel.isSimulation = false; + REPServerSocketChannel.isSimulation = true; testNetworkSimulator testns = new testNetworkSimulator(3, 10, 90); logger.setLogLevel(5);