Mercurial > hg > RemoteEditor > REPSessionManager
changeset 248:e72e0eae1261
*** empty log message ***
author | kent |
---|---|
date | Wed, 03 Sep 2008 18:44:08 +0900 |
parents | 2a185042dcd0 |
children | e44c1773d121 |
files | rep/channel/ChannelSimulator.java rep/channel/NetworkSimulator.java rep/channel/ServerChannelSimulator.java test/channeltest/testNetworkSimulator.java test/channeltest/testSeMa.java test/channeltest/testSeMaSlave.java |
diffstat | 6 files changed, 156 insertions(+), 145 deletions(-) [+] |
line wrap: on
line diff
--- a/rep/channel/ChannelSimulator.java Wed Sep 03 17:03:41 2008 +0900 +++ b/rep/channel/ChannelSimulator.java Wed Sep 03 18:44:08 2008 +0900 @@ -3,91 +3,71 @@ import java.io.IOException; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; +import java.nio.channels.NotYetConnectedException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.LinkedList; +import java.util.Queue; public class ChannelSimulator<P> extends REPSocketChannel<P>{ protected NetworkSimulator<P> ns; - protected BlockingQueue<P> qread; - protected BlockingQueue<P> qwrite; - protected SelectorSimulator<P> writeSelector; - protected SelectorSimulator<P> readSelector; + protected Queue<P> readQ; + protected Object lock; + protected boolean isBlocking; + protected ChannelSimulator<P> otherEnd; + protected SelectionKeySimulator<P> key; + private String ownerName; + //protected BlockingQueue<P> qread; + //protected BlockingQueue<P> qwrite; + //protected SelectorSimulator<P> writeSelector; + //protected SelectorSimulator<P> readSelector; /** Constructors. */ public ChannelSimulator(){ super(null, null); ns = NetworkSimulator.<P>singleton(); + readQ = new LinkedList<P>(); + lock = new Object(); + ownerName = Thread.currentThread().getName(); } - public ChannelSimulator<P> createConjugatedChannel() { - ChannelSimulator<P> ret = new ChannelSimulator<P>(); - ret.qread=qwrite; - ret.qwrite=qread; - ret.readSelector=writeSelector; - ret.writeSelector=readSelector; - return ret; - } - - /* read from Queue. */ - public P read(){ - try { - if(readSelector!=null) - synchronized (readSelector){ - return qread.take(); + /** read from Queue. */ + public P read() throws IOException{ + P tmp; + synchronized (lock){ + while ( (tmp=readQ.poll())==null && isBlocking ) { + try { + lock.wait(); + } catch (InterruptedException e) { + throw new IOException(); } - else{ - return qread.take(); } - } catch (InterruptedException e) { - e.printStackTrace(); - return null; } + return tmp; } - /* write to Queue. */ + /** write packet to other end. */ public boolean write(P p){ - try { - if (writeSelector!=null) - synchronized (writeSelector){ - qwrite.put(p); - writeSelector.notifyAll(); - } - else { - qwrite.put(p); - } - return true; - } catch (InterruptedException e) { - e.printStackTrace(); - return false; + if (otherEnd==null) throw new NotYetConnectedException(); + return otherEnd.enQ(p); + } + /** otherEnd Channel enqueue p to own queue using this method. */ + protected boolean enQ(P p){ + synchronized (lock){ + readQ.offer(p); + lock.notifyAll(); } + return true; } - /** Connecting methods */ // for clients. public boolean connect(SocketAddress ip){ - return ns.connect(ip, this); - } - - /* accessor methods. */ - public BlockingQueue<P> getReadQ(){ - return qread; - } - public BlockingQueue<P> getWriteQ(){ - return qwrite; + return ns.connect(ip, this); // ns.connectはotherEndを返した方がよいか? } - public void createReadQ(){ - qread = new LinkedBlockingQueue<P>(); + void setOtherEnd(ChannelSimulator<P> other){ + otherEnd = other; } - public void createWriteQ(){ - qwrite = new LinkedBlockingQueue<P>(); - } - public void setWriteSelector(SelectorSimulator<P> _selector){ - writeSelector = _selector; - } - public ChannelSimulator<P> accept(){ return null; @@ -97,8 +77,8 @@ return false; } public boolean isReadable() { - synchronized (qread){ - return !qread.isEmpty(); + synchronized (lock){ + return !readQ.isEmpty(); } } public boolean isWritable() { @@ -106,7 +86,8 @@ } @Override public SelectableChannel configureBlocking(boolean block) throws IOException { - return null; + isBlocking = block; + return this; } @@ -119,12 +100,25 @@ @Override public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { SelectorSimulator<P> selector = (SelectorSimulator<P>) sel; - return selector.register(this, ops, att); + synchronized (lock){ + lock = selector; + } + key = selector.register(this, ops, att); + return key; } + @SuppressWarnings("unchecked") public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException { - return sel.register(this, ops, att); + synchronized (lock){ + lock = sel; + } + key = (SelectionKeySimulator<P>) sel.register(this, ops, att); + return key; } + public String toString(){ + return "ChSim("+ownerName+")"; + } + }
--- a/rep/channel/NetworkSimulator.java Wed Sep 03 17:03:41 2008 +0900 +++ b/rep/channel/NetworkSimulator.java Wed Sep 03 18:44:08 2008 +0900 @@ -4,7 +4,6 @@ import java.util.HashMap; import java.util.LinkedList; - public class NetworkSimulator<P> { public static NetworkSimulator<?> ns; @@ -37,72 +36,51 @@ } /* */ - synchronized public void listen(SocketAddress ip, SelectorSimulator<P> selector) { - serverList.add(new ServerData<P>(ip, selector)); + synchronized public void listen(SocketAddress ip, ServerChannelSimulator<P> scs) { + serverList.add(new ServerData<P>(ip, scs)); logger.writeLog("listen", 1); printAllState(); } - synchronized public ChannelSimulator<P> accept(SocketAddress ip) { - for (ServerData<P> sd: serverList){ - if (!sd.IP.equals(ip)) continue; - logger.writeLog("accepting..", 1); - - ChannelSimulator<P> serverCH = sd.acceptWaitingList.remove(); - sd.establishedList.add(serverCH); + synchronized public boolean connect(SocketAddress ip, ChannelSimulator<P> CHclient) { + logger.writeLog("connecting..", 1); + for (ServerData<P> sd0: serverList){ + if (!sd0.IP.equals(ip)) continue; - logger.writeLog("accepted", 1); + ChannelSimulator<P> CHserver = new ChannelSimulator<P>(); + CHserver.setOtherEnd(CHclient); + CHclient.setOtherEnd(CHserver); + + sd0.connectedListS.add(CHserver); + sd0.connectedListC.add(CHclient); + sd0.scs.enQ(CHserver); + + logger.writeLog("connected", 1); printAllState(); - return serverCH; - } - return null; - } - synchronized public boolean canAccept(SocketAddress ip){ - for (ServerData<P> sd: serverList){ - if (!sd.IP.equals(ip)) continue; - return !sd.acceptWaitingList.isEmpty(); + return true; } return false; } - public boolean connect(SocketAddress ip, ChannelSimulator<P> clientCH) { - ServerData<P> sd = null; - logger.writeLog("connecting..", 1); - synchronized (this){ - for (ServerData<P> sd0: serverList){ - if (sd0.IP.equals(ip)){ - sd=sd0; - break; - } - } - if (sd==null) return false; - - //ChannelSimulator<P> channel = new ChannelSimulator<P>(sd.selector); - clientCH.createReadQ(); - clientCH.createWriteQ(); - clientCH.setWriteSelector(sd.selector); - - ChannelSimulator<P> serverCH = clientCH.createConjugatedChannel(); - sd.acceptWaitingList.add(serverCH); - } - - synchronized (sd.selector) { - sd.selector.notifyAll(); - } - logger.writeLog("connected", 1); - printAllState(); - return true; - } - /** for DEBUG methods. */ synchronized void printAllState(){ + synchronized (logger){ logger.writeLog("NetworkSimulator State:"); for (ServerData<P> sd: serverList){ logger.writeLog("\tSessionManager(ip="+sd.IP.toString()+"): "); - logger.writeLog("\tacceptWaitingList="+sd.acceptWaitingList.size()); - logger.writeLog("\testablishedList="+sd.establishedList.size()); + //writeLog("\tacceptWaitingList="+sd.acceptWaitingList.size()); + printChannelList(sd.connectedListC); + //writeLog("\testablishedList="+sd.establishedList.size()); + } } } + synchronized void printChannelList(LinkedList<ChannelSimulator<P>> list){ + String tmp = ""; + for (ChannelSimulator<P> ch: list){ + tmp += ch.toString()+" "; + } + logger.writeLog("\t"+tmp); + } @@ -120,14 +98,16 @@ class ServerData<P> { //int virtualIP; SocketAddress IP; - SelectorSimulator<P> selector; - LinkedList<ChannelSimulator<P>> acceptWaitingList; - LinkedList<ChannelSimulator<P>> establishedList; + //SelectorSimulator<P> selector; + ServerChannelSimulator<P> scs; + LinkedList<ChannelSimulator<P>> connectedListS; + LinkedList<ChannelSimulator<P>> connectedListC; - ServerData(SocketAddress ip, SelectorSimulator<P> _selector){ + ServerData(SocketAddress ip, ServerChannelSimulator<P> _scs){ IP = ip; - selector = _selector; - acceptWaitingList = new LinkedList<ChannelSimulator<P>>(); - establishedList = new LinkedList<ChannelSimulator<P>>(); + //selector = _selector; + scs = _scs; + connectedListS = new LinkedList<ChannelSimulator<P>>(); + connectedListC = new LinkedList<ChannelSimulator<P>>(); } } \ No newline at end of file
--- a/rep/channel/ServerChannelSimulator.java Wed Sep 03 17:03:41 2008 +0900 +++ b/rep/channel/ServerChannelSimulator.java Wed Sep 03 18:44:08 2008 +0900 @@ -8,26 +8,52 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; +import java.util.LinkedList; +import java.util.Queue; /* シミュレーションの際にコンストラクトされる REPServerSocketChannel の実装 */ public class ServerChannelSimulator<P>extends REPServerSocketChannel<P> { protected NetworkSimulator<P> ns; //public REPServerSocket<REPSocketChannel<P>> socket; protected SocketAddress IP; + protected Queue<ChannelSimulator<P>> acceptQ; + protected Object lock; + protected boolean isBlocking; + private SelectionKeySimulator<P> key; /** Constructors. * @throws IOException */ public ServerChannelSimulator() throws IOException { //socket = REPServerSocket.<REPSocketChannel<P>>create(); ns = NetworkSimulator.<P>singleton(); + lock = new Object(); + acceptQ = new LinkedList<ChannelSimulator<P>>(); } public void bind(SocketAddress ip){ IP = ip; } - public REPSocketChannel<P> accept1() /*throws IOException*/ { - return ns.accept(IP); + public REPSocketChannel<P> accept1() throws IOException { + ChannelSimulator<P> tmp; + synchronized (lock) { + while ( (tmp=acceptQ.poll())==null && isBlocking ) { + try { + lock.wait(); + } catch (InterruptedException e) { + throw new IOException(); + } + } + } + //return ns.accept(IP); + return tmp; + } + protected boolean enQ(ChannelSimulator<P> ch){ + synchronized (lock){ + acceptQ.offer(ch); + lock.notifyAll(); + } + return true; } public ServerSocket socket() { @@ -41,32 +67,48 @@ + @SuppressWarnings("unchecked") public SelectionKey register(REPSelector<P> sel, int ops, Object att) throws ClosedChannelException { + synchronized (lock) { + lock = sel; + } REPSelector<P> selector = sel; - ns.listen(IP, (SelectorSimulator<P>) selector); - return selector.register(this, ops, att); + ns.listen(IP, this); + key = (SelectionKeySimulator<P>) selector.register(this, ops, att); + return key; + } + @SuppressWarnings("unchecked") + @Override + public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { + // TODO + synchronized (lock) { + lock = sel; + } + REPSelector<P> selector = (REPSelector<P>)sel; + ns.listen(IP, this); // bindに移動してもいいよ + key = (SelectionKeySimulator<P>) selector.register(this, ops, att); + return key; } public boolean isAcceptable() { - return ns.canAccept(IP); + synchronized (lock){ + return !acceptQ.isEmpty(); + } } @Override public Object blockingLock() { - // TODO Auto-generated method stub - return null; + return lock; } - public SelectableChannel configureBlocking(boolean block) - throws IOException { - // TODO Auto-generated method stub - return null; + public SelectableChannel configureBlocking(boolean block) throws IOException { + isBlocking = block; + return this; } @Override public boolean isBlocking() { - // TODO Auto-generated method stub - return false; + return isBlocking; } @Override @@ -87,12 +129,6 @@ return null; } - @Override - public SelectionKey register(Selector sel, int ops, Object att) - throws ClosedChannelException { - // TODO Auto-generated method stub - return null; - } @Override public int validOps() {
--- a/test/channeltest/testNetworkSimulator.java Wed Sep 03 17:03:41 2008 +0900 +++ b/test/channeltest/testNetworkSimulator.java Wed Sep 03 18:44:08 2008 +0900 @@ -15,8 +15,8 @@ static public REPLogger logger = REPLogger.singleton(); public static void main(String[] args){ - REPServerSocketChannel.isSimulation = true; - testNetworkSimulator testns = new testNetworkSimulator(3, 12, 90); + REPServerSocketChannel.isSimulation = false; + testNetworkSimulator testns = new testNetworkSimulator(3, 10, 90); logger.setLogLevel(5); testns.startTest();
--- a/test/channeltest/testSeMa.java Wed Sep 03 17:03:41 2008 +0900 +++ b/test/channeltest/testSeMa.java Wed Sep 03 18:44:08 2008 +0900 @@ -58,7 +58,7 @@ while(running){ try { - selector.select(2000); + selector.select(); Set<REPSelectionKey<String>> set = selector.selectedKeys1(); for(REPSelectionKey<String> key : set) {
--- a/test/channeltest/testSeMaSlave.java Wed Sep 03 17:03:41 2008 +0900 +++ b/test/channeltest/testSeMaSlave.java Wed Sep 03 18:44:08 2008 +0900 @@ -54,7 +54,7 @@ /* Main Loop */ while(running){ - selector.select(2000); + selector.select(); for(REPSelectionKey<String> key : selector.selectedKeys1()){ @@ -62,7 +62,8 @@ REPSocketChannel<String> channel = key.accept(pack); if(channel==null) continue; channel.configureBlocking(false); - selector.register(channel, SelectionKey.OP_READ,null); + //selector.register(channel, SelectionKey.OP_READ,null); + channel.register(selector, SelectionKey.OP_READ, null); ns.writeLog("accepts a client.", 1); }else if(key.isReadable()){