Mercurial > hg > RemoteEditor > REPSessionManager
changeset 460:ef70109af810
self writeQueue and waitingQueue
author | one |
---|---|
date | Fri, 24 Sep 2010 17:05:19 +0900 |
parents | 66c4f6b29baf |
children | e7eeb8be0de1 |
files | rep/ServerMainLoop.java rep/handler/Editor.java |
diffstat | 2 files changed, 52 insertions(+), 90 deletions(-) [+] |
line wrap: on
line diff
--- a/rep/ServerMainLoop.java Fri Sep 24 03:24:06 2010 +0900 +++ b/rep/ServerMainLoop.java Fri Sep 24 17:05:19 2010 +0900 @@ -120,22 +120,6 @@ } return false; } - - /** - * Move all command to the editor from manager's writing queue to the editor's waiting queue - * @param editor - */ - public void getWriteQueue(Editor editor) { - LinkedList<PacketSet> w = new LinkedList<PacketSet>(); - for(PacketSet p:writeQueue) { - if (p.channel==editor) { - editor.waitingCommandInMerge.addLast(p); - } else { - w.addLast(p); - } - } - writeQueue = w; - } /** * Debug message * @param p
--- a/rep/handler/Editor.java Fri Sep 24 03:24:06 2010 +0900 +++ b/rep/handler/Editor.java Fri Sep 24 17:05:19 2010 +0900 @@ -18,14 +18,15 @@ private Translator translator; // REPCommands we are going to send to the next editor private LinkedList<REPCommand> sentList = new LinkedList<REPCommand>(); - public LinkedList<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>(); + public LinkedList<REPCommand> waitingCommandInMerge= new LinkedList<REPCommand>(); private REPCommand quit2=null; private REPCommand preMergeCommand; private boolean merging; private REPCommand mergeMark = new REPCommand(REP.SMCMD_START_MERGE, 0,0, 0, 0, ""); public static boolean noMergeMode=false; static final boolean doOptimize = false; - private Forwarder toEditor; + private Forwarder toEditor; + private LinkedList<REPCommand> writeQueue = new LinkedList<REPCommand>(); public Editor(SessionManager manager,int editorNo){ // no translator case @@ -162,30 +163,56 @@ // assert(count==0); // } - private boolean waitingRequired(REPCommand command, REPSocketChannel<REPCommand> channel) { - if (hasWaitingCommand()) { - // We cannot do this operation before watingCommandQueue. - addWaitingCommand(new PacketSet(this, new REPCommand(command))); - return true; - } else if (isMerging()) { - addWaitingCommand(new PacketSet(this, new REPCommand(command))); - return true; - } - //ServerMainLoop.logger.writeLog("Editor eid:"+eid+" no waiting"); - return false; + /** + * Sending to Editor and waiting Queue + * +--------+ + * send() --> write() -> | Editor | -> handle() -> manager() + * +--------+ + * waitingQueue + * writeQueue + * + * send() は、他のEditor Node から呼ばれる + * write() は、内部で優先的に送信するのに用いる + * writeQueue は、waitingQueue よりも常に先に実行される必要がある + + * Manageの送信キューはここでは使わない + * send() manage + */ + @Override + public void send(REPCommand command) { + if (merging || isMerging() || waitingCommandInMerge.size()>0) { + waitingCommandInMerge.addLast(command); + ServerMainLoop.logger.writeLog("Editor eid:"+eid+" waitingCommandInMerge = "+waitingCommandInMerge); + return; + } + if (isMergeCommand(command)) { + merging = true; + ServerMainLoop.logger.writeLog("Editor"+eid+": merging=true (send)"+command); + } + writeQueue.add(command); } - public void addWaitingCommand(PacketSet set) { -// if (preMergeCommand!=null) { -// if (preMergeCommand.eid==set.command.eid -// && preMergeCommand.seq==set.command.seq) { -// assert(false); -// } -// } - waitingCommandInMerge.addLast(set); - ServerMainLoop.logger.writeLog("Editor eid:"+eid+" waitingCommandInMerge = "+waitingCommandInMerge); + /** + * Check waiting command in merge + * periodically called from manager + */ + public void checkWaitingCommandInMerge() { + if (writeQueue.size()>0) { + REPCommand command =writeQueue.pollFirst(); + ServerMainLoop.logger.writeLog("Editor"+eid+": write comand="+command); + super.write(command); + return; + } + if (translator==null || merging || isMerging()) return; + if (waitingCommandInMerge.size()>0) { + REPCommand command = waitingCommandInMerge.pollFirst(); + ServerMainLoop.logger.writeLog("Editor"+eid+": send waiting comand="+command); + super.write(command); + if (isMergeCommand(command)) { + merging = true; + } + } } - /** * 他のエディタへのコマンドの送信 * @param command @@ -327,7 +354,7 @@ } private boolean checkQuit() { - if (quit2!=null && sentList.size()==1&&!isMerging() && !hasWaitingCommand()) { + if (quit2!=null && sentList.size()==1&&!isMerging() && waitingCommandInMerge.size()==0) { sendToEditor(quit2); quit2 = null; return true; @@ -404,24 +431,6 @@ return true; } - /** - * write command to the editor - * called from another Editor instance such as next.send(command) - */ - @Override - public void write(REPCommand command) { - if (merging || waitingCommandInMerge.size()>0) { - addWaitingCommand(new PacketSet(this, new REPCommand(command))); - return; - } - if (!waitingRequired(command,channel)) { - if (isMergeCommand(command)) { - merging = true; - ServerMainLoop.logger.writeLog("Editor"+eid+": merging=true "+command); - } - super.write(command); - } - } private boolean isMergeCommand(REPCommand command) { switch(command.cmd) { @@ -434,7 +443,7 @@ } public void sendToEditor(REPCommand command) { - toEditor.send(command); + writeQueue.add(command); } @Override @@ -480,36 +489,5 @@ send(command); } - /** - * Check waiting command in merge - * @return true if there is a processed waiting command - * @throws IOException - */ - public void checkWaitingCommandInMerge() { - if (translator==null||isMerging()) return; - while(waitingCommandInMerge.size()>0) { - if (merging||isMerging()) return; - // to preserve command order, move all elements from manager's writing queue - manager.getWriteQueue(this); - PacketSet p = waitingCommandInMerge.remove(0); - REPCommand command = p.command; - try { - ServerMainLoop.logger.writeLog("Editor"+eid+": resend after merge comand="+command); - toEditor.send(command); - if (isMergeCommand(command)) { - merging = true; - return; - } - } catch (Exception e1) { - assert false; - manager.close(p.channel.channel); - return; - } - } - } - - public boolean hasWaitingCommand() { - return waitingCommandInMerge.size()>0; - } }