Mercurial > hg > RemoteEditor > REPSessionManager
changeset 308:c5be84d53c7f channel-simulator-update **INVALID**
*** empty log message ***
author | kono |
---|---|
date | Sat, 04 Oct 2008 22:12:34 +0900 |
parents | e4b7af3fdf99 |
children | f27c8551e877 |
files | rep/Editor.java rep/SessionManager.java rep/channel/ChannelSimulator.java rep/channel/NetworkSimulator.java rep/channel/NullSelector.java rep/channel/REPSelectionKey.java rep/channel/SelectableChannelSimulator.java rep/channel/SelectionKeySimulator.java rep/channel/SelectorSimulator.java rep/handler/REPHandlerImpl.java rep/handler/REPHandlerInMerge.java test/channeltest/testSeMa.java test/channeltest/testSeMaSlave.java test/sematest/TestEditor.java |
diffstat | 14 files changed, 197 insertions(+), 200 deletions(-) [+] |
line wrap: on
line diff
--- a/rep/Editor.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/Editor.java Sat Oct 04 22:12:34 2008 +0900 @@ -22,6 +22,7 @@ private List<REPCommand> writeQueue; private REPCommand quit2 = null; private REPLogger ns = REPLogger.singleton(); + private final int limit=100; public Editor(){ this(true); @@ -71,6 +72,7 @@ }else{ //エディタからの新たな編集コマンド sentList.add(command); + assert(sentList.size()<limit); translater.transSendCmd(command); list.add(command); } @@ -82,6 +84,7 @@ mergeAgainList = optimizer.optimize(mergeAgainList); writeQueue.addAll(mergeAgainList); + assert(writeQueue.size()<limit); } }else{ //他のエディタからの編集コマンド @@ -97,6 +100,7 @@ for(REPCommand mergeCommand : cmds){ mergeCommand.setEID(REP.MERGE_EID.id); writeQueue.add(mergeCommand); + assert(writeQueue.size()<limit); } } @@ -179,6 +183,7 @@ public void send(REPCommand command) { writeQueue.add(command); + assert(writeQueue.size()<limit); } public void setChannel(REPSocketChannel<REPCommand> channel) {
--- a/rep/SessionManager.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/SessionManager.java Sat Oct 04 22:12:34 2008 +0900 @@ -34,6 +34,8 @@ int cmd; kind of command int sid; session ID : uniqu to editing file int eid; editor ID : owner editor ID = 1。Session に対して unique + -1 session manager command + -2 merge command int seqno; Sequence number : sequence number はエディタごとに管理 int lineno; line number int textsize; textsize : bytesize @@ -46,14 +48,16 @@ private SessionManagerGUI gui; private REPSelector<REPCommand> selector; private SessionManagerList smList; - private String myHost; private List<Editor> editorList; // editorList は、sessionList に入っているeditorとは別なeditorのlistらしい。 private String maxHost; private List<PacketSet> waitingCommandInMerge; - private BlockingQueue<SessionManagerEvent> waitingQueue = new LinkedBlockingQueue<SessionManagerEvent>();; - private static int temp_port; - private static int send_port; + REPHandler normalHandler = new REPHandlerImpl(this); + REPHandler handlerInMerge =new REPHandlerInMerge(this); + private BlockingQueue<SessionManagerEvent> waitingEventQueue = new LinkedBlockingQueue<SessionManagerEvent>();; + private String myHost; + private static int receive_port; + private static int parent_port; static final int DEFAULT_PORT = 8766; @@ -76,31 +80,28 @@ ssc.socket().setReuseAddress(true); //getAllByNameで取れた全てのアドレスに対してbindする ssc.socket().bind(new InetSocketAddress(port)); - ssc.register(selector, SelectionKey.OP_ACCEPT, new REPHandlerImpl(-1, this)); + ssc.register(selector, SelectionKey.OP_ACCEPT, normalHandler); sessionList = new LinkedList<Session>(); smList = new SessionManagerList(); editorList = new LinkedList<Editor>(); waitingCommandInMerge = new LinkedList<PacketSet>(); - //デフォルトのSessionを作っておく(テスト用に?) - //if(sessionList.size() > 0) System.out.println("Error : SessionManager.init():"); - //Session defaultSession = new Session(sessionList.size(), "DefaultSession.txt", new Editor(0,null)); - //sessionList.add(defaultSession); } public void mainLoop() throws IOException { while(true){ SessionManagerEvent e; - while((e = waitingQueue.poll())!=null){ + while((e = waitingEventQueue.poll())!=null){ e.exec(); } for(Session s:sessionList) { for(Editor editor: s.getEditorList()) if (editor.doWaitingWrite()) break; } - if(checkSend()){ + // if there are waiting command during merge operation, do process it + if(checkWaitingCommandInMerge()){ if(selector.selectNow() > 0){ select(); } @@ -111,12 +112,18 @@ } } - private boolean checkSend() throws IOException { + /** + * Check waiting command in merge + * @return true if there is a processed waiting command + * @throws IOException + */ + private boolean checkWaitingCommandInMerge() throws IOException { for(Iterator<PacketSet> it = waitingCommandInMerge.iterator(); it.hasNext();){ PacketSet p = it.next(); - if(p.getEditor().isMerging()) { + if(p.getEditor().isMerging()) { // still merging do nothing continue; }else{ + // process one command and return true manage(p.channel, p.command); it.remove(); return true; @@ -125,7 +132,6 @@ return false; } - @SuppressWarnings("unchecked") private void select() throws IOException { Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); @@ -142,10 +148,10 @@ handler.handle(key); } catch (ClosedChannelException x) { key.cancel(); - handler.cancel((REPSocketChannel<REPCommand>)key.channel()); + handler.cancel(key.channel1()); } catch (IOException x) { key.cancel(); - handler.cancel((REPSocketChannel<REPCommand>)key.channel()); + handler.cancel( key.channel1()); } } } @@ -156,7 +162,7 @@ return; } channel.configureBlocking(false); - REPHandler handler = new REPHandlerImpl(-1, this); + REPHandler handler = normalHandler; channel.register(selector, ops, handler); } @@ -390,6 +396,7 @@ session.translate(channel, receivedCommand); boolean newState = editor.isMerging(); if (old!=newState) { + // prevEditor なのは変だと思うが... Editor prevEditor = session.getPrevEditor(editor); //マージ中のエディタはコマンドを受け取らない // この代入は状態が変わったときだけ行えば良い。毎回、new するのは変 @@ -435,12 +442,12 @@ private void setNormalState(REPSocketChannel<REPCommand> channel, int sid) { SelectionKey key = channel.keyFor(selector); - key.attach(new REPHandlerImpl(sid, this)); + key.attach(normalHandler); } private void setMergeState(REPSocketChannel<REPCommand> channel, int sid) { SelectionKey key = channel.keyFor(selector); - key.attach(new REPHandlerInMerge(sid, this)); + key.attach(handlerInMerge); } private Editor getEditor(String hostport) { @@ -478,7 +485,7 @@ } private void setMyHostName(String localHostName) { - myHost = localHostName + temp_port; + myHost = localHostName + receive_port; if(maxHost == null) { maxHost = myHost; } @@ -500,8 +507,8 @@ port = Integer.parseInt(args[0]); port_s = Integer.parseInt(args[1]); } - temp_port = port; - send_port = port_s; + receive_port = port; + parent_port = port_s; SessionManager sm = new SessionManager(); sm.init(port,new SessionManagerGUIimpl(sm)); @@ -510,7 +517,7 @@ public void connectSession(String host) { int port = DEFAULT_PORT; - port = send_port; + port = parent_port; InetSocketAddress addr = new InetSocketAddress(host, port); try { REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker()); @@ -600,14 +607,14 @@ public void buttonPressed(SessionManagerEvent event) { try { - waitingQueue.put(event); + waitingEventQueue.put(event); } catch (InterruptedException e) {} selector.wakeup(); } public void syncExec(SessionManagerEvent event) { try { - waitingQueue.put(event); + waitingEventQueue.put(event); } catch (InterruptedException e) { } }
--- a/rep/channel/ChannelSimulator.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/channel/ChannelSimulator.java Sat Oct 04 22:12:34 2008 +0900 @@ -35,7 +35,7 @@ P tmp; while ( (tmp=readQ.poll())==null && isBlocking ) { try { - wait(); + wait(); // queue is empty } catch (InterruptedException e) { } } @@ -63,7 +63,7 @@ break; } else { try { - wait(); + wait(); // queue is full, we have to wait here } catch (InterruptedException e) { } }
--- a/rep/channel/NetworkSimulator.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/channel/NetworkSimulator.java Sat Oct 04 22:12:34 2008 +0900 @@ -63,7 +63,7 @@ sd0.scs.enQ(CHserver); logger.writeLog("connected", 1); - printAllState(); + //printAllState(); return true; } return false;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rep/channel/NullSelector.java Sat Oct 04 22:12:34 2008 +0900 @@ -0,0 +1,56 @@ +package rep.channel; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.util.Set; + +public class NullSelector extends Selector { + + @Override + public void close() throws IOException { + + } + + @Override + public boolean isOpen() { + return false; + } + + @Override + public Set<SelectionKey> keys() { + return null; + } + + @Override + public SelectorProvider provider() { + return null; + } + + @Override + public int select() throws IOException { + return 0; + } + + @Override + public int select(long timeout) throws IOException { + return 0; + } + + @Override + public int selectNow() throws IOException { + return 0; + } + + @Override + public Set<SelectionKey> selectedKeys() { + return null; + } + + @Override + public Selector wakeup() { + return this; + } + +}
--- a/rep/channel/REPSelectionKey.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/channel/REPSelectionKey.java Sat Oct 04 22:12:34 2008 +0900 @@ -35,6 +35,22 @@ return rsc; } + @SuppressWarnings("unchecked") + public REPSocketChannel<P> channel1() { + assert (!REPServerSocketChannel.isSimulation); + SelectableChannel sc = key.channel(); + REPSocketChannel<P> rsc = (REPSocketChannel<P>) REPSocketChannel.channels.get(sc); + return rsc; + } + + @SuppressWarnings("unchecked") + public REPServerSocketChannel<P> serverSocketChannel() { + assert (!REPServerSocketChannel.isSimulation); + SelectableChannel sc = key.channel(); + REPServerSocketChannel<P> rsc = (REPServerSocketChannel<P>) REPSocketChannel.channels.get(sc); + return rsc; + } + @Override public int interestOps() { return key.interestOps();
--- a/rep/channel/SelectableChannelSimulator.java Sat Oct 04 22:12:16 2008 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,75 +0,0 @@ -package rep.channel; - -import java.nio.channels.SocketChannel; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -public abstract class SelectableChannelSimulator<P> extends REPSocketChannel<P>{ - - protected BlockingQueue<P> qread; - protected BlockingQueue<P> qwrite; - protected SelectorSimulator<P> writeSelector; - protected SelectorSimulator<P> readSelector; - - public SelectableChannelSimulator(SocketChannel channel) { - super(channel, null); - } - - /* read from Queue. */ - public P read(){ - try { - if(readSelector!=null) - synchronized (readSelector){ - return qread.take(); - } - else{ - return qread.take(); - } - } catch (InterruptedException e) { - e.printStackTrace(); - return null; - } - } - /* write to Queue. */ - public boolean write(P p){ - try { - if (writeSelector!=null) - synchronized (writeSelector){ - qwrite.put(p); - writeSelector.notifyAll(); - } - else { - qwrite.put(p); - } - return true; - } catch (InterruptedException e) { - e.printStackTrace(); - return false; - } - } - public abstract ChannelSimulator<P> accept(); - - /* accessor methods. */ - public BlockingQueue<P> getReadQ(){ - return qread; - } - public BlockingQueue<P> getWriteQ(){ - return qwrite; - } - public void createReadQ(){ - qread = new LinkedBlockingQueue<P>(); - } - public void createWriteQ(){ - qwrite = new LinkedBlockingQueue<P>(); - } - public void setWriteSelector(SelectorSimulator<P> _selector){ - writeSelector = _selector; - } - - - /* return state of the Queue */ - abstract public boolean isReadable(); - abstract public boolean isWritable(); - abstract public boolean isAcceptable(); - -}
--- a/rep/channel/SelectionKeySimulator.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/channel/SelectionKeySimulator.java Sat Oct 04 22:12:34 2008 +0900 @@ -49,6 +49,18 @@ return channel; } + @SuppressWarnings("unchecked") + @Override + public REPSocketChannel<P> channel1() { + return (REPSocketChannel<P>)channel; + } + + @SuppressWarnings("unchecked") + @Override + public REPServerSocketChannel<P> serverSocketChannel() { + return (REPServerSocketChannel<P>)channel; + } + public SelectableChannel channel(REPPack<P> packer) { return channel; } @@ -93,12 +105,10 @@ @Override public int readyOps() { int ops=0; - //if ( channel instanceof SelectableChannelSimulator){ if ( channel instanceof ServerChannelSimulator ){ ServerChannelSimulator<?> scs = (ServerChannelSimulator<?>) channel; ops = ( OP_ACCEPT * (scs.isAcceptable()? 1:0) ); - } - if ( channel instanceof ChannelSimulator ){ + } else if ( channel instanceof ChannelSimulator ){ ChannelSimulator<?> scs = (ChannelSimulator<?>) channel; ops = ( OP_READ * (scs.isReadable()? 1:0) ) | ( OP_WRITE * (scs.isWritable()? 1:0) );
--- a/rep/channel/SelectorSimulator.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/channel/SelectorSimulator.java Sat Oct 04 22:12:34 2008 +0900 @@ -10,72 +10,57 @@ import java.util.Set; public class SelectorSimulator<P> extends REPSelector<P>{ - + + // access to these set have to be synchronized private Set<SelectionKey> keyList; private Set<SelectionKey> selectedKeys; - private boolean wakeFlag=false; - + public SelectorSimulator() { super(null); keyList = new HashSet<SelectionKey>(); } - public int select() throws IOException { - selectedKeys = new HashSet<SelectionKey>(); - - synchronized(this) { - - while(selectedKeys.isEmpty() && !wakeFlag){ - for(SelectionKey key : keyList){ - if(((SelectionKeySimulator<?>) key).isAble()) - selectedKeys.add(key); + public synchronized int select() throws IOException { + while(true) { + getSelectedKeys(); + if(selectedKeys.isEmpty()) { + try { + this.wait(); + } catch (InterruptedException e) { + throw new IOException("Error, Selector was interrupted!"); } - - if(selectedKeys.isEmpty()) - try { - this.wait(); - } catch (InterruptedException e) { - throw new IOException("Error, Selector was interrupted!"); - } - } - wakeFlag=false; - } - return selectedKeys.size(); - } - - @Override - public int select(long timeout) throws IOException { - selectedKeys = new HashSet<SelectionKey>(); - - synchronized(this) { - if (!wakeFlag){ - for(SelectionKey key : keyList){ - if(((SelectionKeySimulator<?>) key).isAble()) - selectedKeys.add(key); - } - - if(selectedKeys.isEmpty()) - try { - this.wait(timeout); - } catch (InterruptedException e) { - throw new IOException("Error, Selector was interrupted!"); - } - } - wakeFlag=false; + } else + break; } return selectedKeys.size(); } @Override - public int selectNow() throws IOException { + public synchronized int select(long timeout) throws IOException { + getSelectedKeys(); + if(selectedKeys.isEmpty()) { + try { + this.wait(timeout); + // we cannot know if we timeouted or not + getSelectedKeys(); + } catch (InterruptedException e) { + throw new IOException("Error, Selector was interrupted!"); + } + } + return selectedKeys.size(); + } + + private void getSelectedKeys() { selectedKeys = new HashSet<SelectionKey>(); - - synchronized(this) { - for(SelectionKey key : keyList){ - if(((SelectionKeySimulator<?>) key).isAble()) - selectedKeys.add(key); - } + for(SelectionKey key : keyList){ + if(((SelectionKeySimulator<?>) key).isAble()) + selectedKeys.add(key); } + } + + @Override + public synchronized int selectNow() throws IOException { + getSelectedKeys(); return selectedKeys.size(); } @@ -85,29 +70,31 @@ public SelectionKeySimulator<P> register(SelectableChannel cs, int opt, Object handler){ SelectionKeySimulator<P> key = new SelectionKeySimulator<P>(cs, opt, this); key.attach(handler); - keyList.add(key); + synchronized(this) { + keyList.add(key); + } return key; } - - public void deregister(SelectableChannel channel) { + + public synchronized void deregister(SelectableChannel channel) { for(Iterator<SelectionKey> it = keyList.iterator();it.hasNext();) { if(it.next().channel() == channel) it.remove(); } } - + - public Set<REPSelectionKey<P>> selectedKeys1() { - Set<SelectionKey> keys = keyList; - Set<REPSelectionKey<P>> newKeys = new HashSet<REPSelectionKey<P>>(); - for(SelectionKey k: keys) { - // REPSelectionKeyを生成しないように注意 - newKeys.add(new SelectionKeySimulator<P>(k)); - } - return newKeys;//(Set<REPSelectionKey<P>>)newKeys; + public synchronized Set<REPSelectionKey<P>> selectedKeys1() { + Set<SelectionKey> keys = keyList; + Set<REPSelectionKey<P>> newKeys = new HashSet<REPSelectionKey<P>>(); + for(SelectionKey k: keys) { + // REPSelectionKeyを生成しないように注意 + newKeys.add(new SelectionKeySimulator<P>(k)); + } + return newKeys;//(Set<REPSelectionKey<P>>)newKeys; } - - public <T> SelectionKey getKey(ChannelSimulator<T> channel){ + + public synchronized <T> SelectionKey getKey(ChannelSimulator<T> channel){ for(SelectionKey key : keyList){ if(key.channel() == channel) return key; @@ -118,7 +105,7 @@ @Override public void close() throws IOException { // TODO Auto-generated method stub - + } @Override @@ -139,16 +126,13 @@ @Override - public Selector wakeup() { - synchronized(this){ - this.notifyAll(); - } + public synchronized Selector wakeup() { + notifyAll(); return this; } @Override - public Set<SelectionKey> selectedKeys() { - // TODO Auto-generated method stub + public synchronized Set<SelectionKey> selectedKeys() { return (Set<SelectionKey>)selectedKeys; }
--- a/rep/handler/REPHandlerImpl.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/handler/REPHandlerImpl.java Sat Oct 04 22:12:34 2008 +0900 @@ -17,10 +17,8 @@ public void handle(REPSelectionKey<REPCommand> key) throws IOException { REPSocketChannel<REPCommand> channel = key.channel1(); - System.out.println("REPHandlerImpl.handle() : channel = " + channel); - REPCommand command = channel.read(); - System.out.println("REPHandlerImpl.handle() : command = " + command); + System.out.println("REPHandlerImpl.handle() read : command = " + command +" from "+channel); manager.manage(channel, command); }
--- a/rep/handler/REPHandlerInMerge.java Sat Oct 04 22:12:16 2008 +0900 +++ b/rep/handler/REPHandlerInMerge.java Sat Oct 04 22:12:34 2008 +0900 @@ -10,29 +10,22 @@ public class REPHandlerInMerge implements REPHandler { private SessionManager manager; - private int sid; public REPHandlerInMerge(SessionManager manager) { this.manager = manager; } - public REPHandlerInMerge(int sid, SessionManager manager2) { - this.manager = manager2; - this.sid = sid; - } - @SuppressWarnings("unchecked") public void handle(REPSelectionKey<REPCommand> key) throws IOException { //マージ中のエディタの前のエディタのコマンドをWaitingListに追加する - REPSocketChannel<REPCommand> channel = (REPSocketChannel<REPCommand>) key.channel(); + REPSocketChannel<REPCommand> channel = key.channel1(); REPCommand command = channel.read(); System.out.println("REPHandlerImpl.handle() : command = " + command); - if(command.sid == sid){ - Editor editor = manager.getEditor(channel); - manager.addWaitingCommand(new PacketSet(channel, editor, command)); - }else{ - manager.manage(channel, command); - } + // if (manager.isMerging(command.sid()))... + // 同じchannelで、merge中のsessionは一つは限らない。 + // なので、sid をinstanceで持つのではだめ。 + Editor editor = manager.getEditor(channel); + manager.addWaitingCommand(new PacketSet(channel, editor, command)); } public void cancel(REPSocketChannel<REPCommand> socketChannel) {
--- a/test/channeltest/testSeMa.java Sat Oct 04 22:12:16 2008 +0900 +++ b/test/channeltest/testSeMa.java Sat Oct 04 22:12:34 2008 +0900 @@ -31,7 +31,6 @@ } - @SuppressWarnings("unchecked") public void run() { REPSelector<String> selector=null; @@ -71,7 +70,7 @@ }else if(key.isReadable()){ try { - REPSocketChannel<String> channel = (REPSocketChannel<String>) key.channel(); + REPSocketChannel<String> channel = key.channel1(); String packet; packet = channel.read(); if (packet==null) continue;
--- a/test/channeltest/testSeMaSlave.java Sat Oct 04 22:12:16 2008 +0900 +++ b/test/channeltest/testSeMaSlave.java Sat Oct 04 22:12:34 2008 +0900 @@ -33,7 +33,6 @@ } - @SuppressWarnings("unchecked") public void run() { REPSelector<String> selector; REPSocketChannel<String> masterCH ; @@ -68,7 +67,7 @@ }else if(key.isReadable()){ try { - REPSocketChannel<String> channel = (REPSocketChannel<String>) key.channel(); + REPSocketChannel<String> channel = key.channel1(); String packet = channel.read(); if (packet==null) continue; //if (channel==masterCH){
--- a/test/sematest/TestEditor.java Sat Oct 04 22:12:16 2008 +0900 +++ b/test/sematest/TestEditor.java Sat Oct 04 22:12:34 2008 +0900 @@ -9,6 +9,7 @@ import rep.REPCommand; import rep.REPCommandPacker; import rep.channel.REPLogger; +import rep.channel.REPSelectionKey; import rep.channel.REPSelector; import rep.channel.REPSocketChannel; import test.Text; @@ -124,8 +125,11 @@ syncText(); // send the master editor buffer to clients. } userInput(); - } else { - handle(channel.read()); + } + // selector(timeout) returns 0, but it may contain readable channel.. + for(REPSelectionKey<REPCommand> key : selector.selectedKeys1()) { + REPSocketChannel<REPCommand> ch = key.channel1(); + handle(ch.read()); } } } @@ -199,6 +203,7 @@ } private void handle(REPCommand cmd) { + if (cmd==null) return; ns.writeLog(name +": read "+cmd); switch(cmd.cmd) { case REPCMD_INSERT :