Mercurial > hg > RemoteEditor > REPSessionManager
changeset 399:19705f4b8015
waitingCommandInMerge
author | one |
---|---|
date | Mon, 24 Nov 2008 23:11:51 +0900 |
parents | 7de83b6a34e7 |
children | 29f01a7ce71f |
files | Todo rep/ServerMainLoop.java rep/SessionManager.java rep/handler/Editor.java rep/handler/Forwarder.java rep/handler/REPNode.java test/sematest/TestEditor.java |
diffstat | 7 files changed, 97 insertions(+), 63 deletions(-) [+] |
line wrap: on
line diff
--- a/Todo Sun Nov 23 18:38:52 2008 +0900 +++ b/Todo Mon Nov 24 23:11:51 2008 +0900 @@ -1,3 +1,8 @@ +Mon Nov 24 22:51:45 JST 2008 + +watingCommandInMerge のqueueを一旦0にしてから、manageを +呼ぶと、queueが既にあるのに、lockが外れた状態になってしまう。 + Wed Nov 19 19:21:47 JST 2008 ACK base に書き換えるのは良いが、途中でjoinして
--- a/rep/ServerMainLoop.java Sun Nov 23 18:38:52 2008 +0900 +++ b/rep/ServerMainLoop.java Mon Nov 24 23:11:51 2008 +0900 @@ -6,7 +6,6 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.util.LinkedList; -import java.util.List; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -28,7 +27,6 @@ public SessionManager manager; protected SessionManagerGUI gui; protected REPSelector<REPCommand> selector; - protected List<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>(); private BlockingQueue<SessionManagerEvent> waitingEventQueue = new LinkedBlockingQueue<SessionManagerEvent>(); public String myHost; @@ -54,7 +52,7 @@ public void mainLoop() throws IOException { while(true){ - checkWaitingCommandInMerge(); + manager.checkWaitingCommandInMerge(); if (checkInputEvent() || checkWaitingWrite()) { // try to do fair execution for waiting task @@ -103,44 +101,8 @@ return false; } - /** - * Check waiting command in merge - * @return true if there is a processed waiting command - * @throws IOException - */ - public void checkWaitingCommandInMerge() { - List<PacketSet> w = waitingCommandInMerge; - waitingCommandInMerge = new LinkedList<PacketSet>(); - for(PacketSet p: w) { - REPNode e = p.getEditor(); - if(e.isMerging()) { // still merging do nothing - waitingCommandInMerge.add(p); - } else { - try { -// if (manager.sessionManage(e, p.command)) { // we don't need this -// assert false; -// return; -// } - e.manage(p.command); - } catch (Exception e1) { - // should be e.close()? - close(p.channel); - } - } - } - } - - public boolean hasWaitingCommand(REPSocketChannel<REPCommand>c) { - for(PacketSet p:waitingCommandInMerge) { - if (p.channel==c) { - return true; - } - } - return false; - } - - private void close(REPSocketChannel<REPCommand> channel) { + public void close(REPSocketChannel<REPCommand> channel) { REPSelectionKey<REPCommand>key = channel.keyFor1(selector); REPNode handler = (REPNode)key.attachment(); key.cancel(); @@ -223,9 +185,6 @@ } } - public void addWaitingCommand(PacketSet set) { - waitingCommandInMerge.add(set); - } public void buttonPressed(SessionManagerEvent event) { try {
--- a/rep/SessionManager.java Sun Nov 23 18:38:52 2008 +0900 +++ b/rep/SessionManager.java Mon Nov 24 23:11:51 2008 +0900 @@ -607,4 +607,11 @@ return smList.sessionManagerID(); } + public void checkWaitingCommandInMerge() { + for(REPNode e:editorList.values()) { + e.checkWaitingCommandInMerge(); + } + + } + }
--- a/rep/handler/Editor.java Sun Nov 23 18:38:52 2008 +0900 +++ b/rep/handler/Editor.java Mon Nov 24 23:11:51 2008 +0900 @@ -18,6 +18,7 @@ private Translator translator; // REPCommands we are going to send to the next editor private List<REPCommand> sentList = new LinkedList<REPCommand>(); + protected List<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>(); private REPCommand quit2=null; private boolean merging; private REPCommand preMergeCommand; @@ -74,8 +75,9 @@ if (waitingRequired(command)) return; keep = new REPCommand(command); sentList.add(keep); - ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList); - ((Editor) next).checkReturnedCommand(command); + checkDouble(sentList); + next.forwardedCommandManage(command); + // ((Editor) next).checkReturnedCommand(command); } else next.send(command); } else { @@ -91,23 +93,40 @@ return; } + private void checkDouble(List<REPCommand> sentList) { + if (sentList.size()==0) return; + int count = 0; + REPCommand f = sentList.get(0); + for(REPCommand c:sentList) { + if (c.eid==f.eid&&c.seq==f.seq) { + count++; + } + } + assert(count==1); + } + private boolean waitingRequired(REPCommand command) { - if (manager.hasWaitingCommand(channel)) { + if (hasWaitingCommand()) { // We cannot do this operation before watingCommandQueue. - manager.addWaitingCommand(new PacketSet(channel, this, command)); + addWaitingCommand(new PacketSet(channel, this, command)); return true; } else if (isMerging()) { - manager.addWaitingCommand(new PacketSet(getChannel(), this, new REPCommand(command))); + addWaitingCommand(new PacketSet(getChannel(), this, new REPCommand(command))); return true; } ServerMainLoop.logger.writeLog("Editor eid:"+eid+" no waiting"); return false; } + public void addWaitingCommand(PacketSet set) { + waitingCommandInMerge.add(set); + } + private void sendEditorCommand(REPCommand command) { REPCommand keep = new REPCommand(command); sentList.add(keep); ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList); + checkDouble(sentList); assert(sentList.size()<limit); next.send(command); } @@ -122,6 +141,11 @@ * @param command */ void checkReturnedCommand(REPCommand command) { + assert(!merging); + if (sentList.size()==0) { + ServerMainLoop.logger.writeLog("Editor eid="+eid+" looped command not registered: "+command); + assert(false); + } REPCommand prev = sentList.remove(0); ServerMainLoop.logger.writeLog("Editor eid="+eid+" remove sentList:"+(prev==null?"null":prev)); if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) { @@ -201,6 +225,7 @@ sentList.add(keep); ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList); assert(sentList.size()<limit); + checkDouble(sentList); next.send(keep); } else { next.send(preMergeCommand); @@ -318,6 +343,7 @@ * it is forwarded here. */ public void forwardedCommandManage(REPCommand command) { + if (waitingRequired(command)) return; if (command.cmd==REP.SMCMD_QUIT_2) { // we have to wait next editor's finishing before sending this. // this is odd, but the editor itself does not know it's merging @@ -326,22 +352,16 @@ } else if (command.eid==eid) { // if we handle in editor.manage(), this editor cannot distinguish this // and user input command from the editor. - REPCommand keep; +// REPCommand keep; switch(command.cmd) { - case REPCMD_DELETE_ACK: - case REPCMD_INSERT_ACK: - checkReturnedCommand(command); - return ; case REPCMD_INSERT: - keep = new REPCommand(command); - keep.cmd = REP.REPCMD_INSERT_ACK; - sentList.add(keep); - checkReturnedCommand(command); - return; case REPCMD_DELETE: - keep = new REPCommand(command); - keep.cmd = REP.REPCMD_DELETE_ACK; - sentList.add(keep); +// keep = new REPCommand(command); +// sentList.add(keep); +// ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList); +// checkDouble(sentList); + case REPCMD_INSERT_ACK: + case REPCMD_DELETE_ACK: checkReturnedCommand(command); return; } @@ -349,4 +369,32 @@ send(command); } + /** + * Check waiting command in merge + * @return true if there is a processed waiting command + * @throws IOException + */ + public void checkWaitingCommandInMerge() { + int count = waitingCommandInMerge.size(); + if (count==0) return; + if (isMerging()) return; + while(count++>0) { + PacketSet p = waitingCommandInMerge.remove(0); + try { + // if (manager.sessionManage(e, p.command)) { // we don't need this + // assert false; + // return; + // } + manage(p.command); + } catch (Exception e1) { + // should be e.close()? + manager.close(p.channel); + } + } + } + + + public boolean hasWaitingCommand() { + return waitingCommandInMerge.size()>0; + } }
--- a/rep/handler/Forwarder.java Sun Nov 23 18:38:52 2008 +0900 +++ b/rep/handler/Forwarder.java Mon Nov 24 23:11:51 2008 +0900 @@ -133,6 +133,16 @@ } } + @Override + public void forwardedCommandManage(REPCommand command) { + + } + + @Override + public void checkWaitingCommandInMerge() { + + } +
--- a/rep/handler/REPNode.java Sun Nov 23 18:38:52 2008 +0900 +++ b/rep/handler/REPNode.java Mon Nov 24 23:11:51 2008 +0900 @@ -156,5 +156,10 @@ public abstract void joinAck(REPCommand sendCommand, int sid) ; + public abstract void forwardedCommandManage(REPCommand command) ; + + public abstract void checkWaitingCommandInMerge(); + + }
--- a/test/sematest/TestEditor.java Sun Nov 23 18:38:52 2008 +0900 +++ b/test/sematest/TestEditor.java Mon Nov 24 23:11:51 2008 +0900 @@ -61,13 +61,13 @@ this.master=true; text = new Text(txts); cmds.add(new REPCommand(REP.SMCMD_PUT,0,0,0,0,name+"-file")); - //cmds.add(new REPCommand(REP.REPCMD_INSERT,0,0,0,0,"m0")); + cmds.add(new REPCommand(REP.REPCMD_INSERT,0,0,0,0,"m0")); cmds.add(new REPCommand(REP.REPCMD_DELETE,0,0,0,0,"m0")); //cmds.add(new REPCommand(REP.SMCMD_QUIT,0,0,0,0,"")); } else { text = new Text(new String[0]); cmds.add(new REPCommand(REP.SMCMD_JOIN,0,0,0,0,name)); - cmds.add(new REPCommand(REP.REPCMD_INSERT,0,0,0,0,"c0")); + //cmds.add(new REPCommand(REP.REPCMD_INSERT,0,0,0,0,"c0")); //cmds.add(new REPCommand(REP.REPCMD_DELETE,0,0,0,0,"c0")); } }