Mercurial > hg > RemoteEditor > REPSessionManager
view rep/handler/Editor.java @ 502:49b689b17d06 default tip
merged TestEditor to REPEditor
author | suika6039 |
---|---|
date | Tue, 21 Dec 2010 18:01:15 +0900 |
parents | 4bcc6b563d52 |
children |
line wrap: on
line source
package rep.handler; import java.io.IOException; import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.TreeSet; import rep.REP; import rep.REPCommand; import rep.ServerMainLoop; import rep.SessionManager; import rep.channel.REPLogger; import rep.channel.REPSelectionKey; import rep.channel.REPSocketChannel; import rep.optimizers.*; public class Editor extends Forwarder { // REPCommands we are going to send to the next editor private LinkedList<REPCommand> sentList = new LinkedList<REPCommand>(); // Expected acknowledge list private LinkedList<REPCommand> ackList = new LinkedList<REPCommand>(); public LinkedList<REPCommand> waitingCommandInMerge= new LinkedList<REPCommand>(); private REPCommand quit_2=null; private REPCommand preMergeCommand; public REPCommandOptimizer optimizer; private LinkedList<REPCommand> sentMergedList; private TreeSet<REPCommand> sortedEditCmds; boolean mergeAgain; public REPLogger logger = SessionManager.logger; private boolean blocking = false; private boolean merging = false; private LinkedList<REPCommand> writeQueue = new LinkedList<REPCommand>(); private REPCommand mergeMark =new REPCommand(REP.REPCMD_MERGE_MARK,0,0, REP.MERGE_EID.id, 0, ""); public enum MergeMode { NoMerge, // no merge Slow, // merge at Ack Early, // merge at returned command and Ack Direct // merge at incoming command } public static MergeMode mergeMode = MergeMode.Direct; static final boolean doOptimize = false; public Editor(SessionManager manager,int editorNo){ // no translator case super(manager, null); } public Editor(int eid, REPCommandOptimizer optimizer) { super(null, null); this.optimizer = optimizer; } public Editor(int editorNo, SessionManager manager,REPSocketChannel<REPCommand> channel){ super(editorNo,manager,channel); eid = editorNo; if (doOptimize) optimizer = new DeleteInsertOptimizer(); //タカノがつくったおぷてぃまいざ else optimizer = new NullOptimizer(); //なにもしないけどOptimizer. mergeAgain = false; sentMergedList = new LinkedList<REPCommand>(); } /* * Merge Protocol (0) Editor へのコマンドは、ack 以外は直接 Editor へ送られてしまう。(next.send(cmd)) Editor から返ってくるコマンドをtranslatorが処理する。 (1) Editor CommandをSession Ring 上に流し、それが戻って来るまでに、他のEditorから 受け取った Editor Command をキューに入れておく。 sentList 外に送り出したEditor Command MergingSentList Mergeするlist 。Mergeのやり直し用。 Slow/Early (2) 戻って来たタイミングで、キュー上のEditor Commandを、eid とCommandの 順序を基にソートする。(self merge (Early)) (3) 他のEditorにソートのタイミングを与えるために、Editor Command の ack を、もう一周させる。 (4) 他のEditorのCommandを受け取ってから、ack が来るまでのCommandをキューに 入れておき、ack が来たら、eid とCommandの順序を基にソートする。(other merge (Slow)) Direct seq = gseq*limit + lseq (5) 他のEditor Command が来た時点で、すぐにmergeする。 gseqを他コマンドのgseqよりも 大きくなるように設定。sentListに追加 (6) 自分のEditor Command はsentListに追加 (7) Ackが来たら、そのEditor Command まで確定 (sentList/unMergedListから削除) Editor には、ソートした編集結果になるように、それまで行なった編集をUndo して、ソートした編集結果を適用する。Undo が無駄な動作をしないように最適化する。 */ /* handle() セッションの処理 manage() 編集コマンドは translate() へ 一周して来た編集コマンドのACKは廃棄 (merge queue から削除) 一周して来た自分のコマンドならself merge 他のエディタの編集コマンドのACK->other merge それ以外は、そのまま実行、merge queue へ格納 merge は checkReturnedCommand() から startMerge() へ まず、接続されている Editor に START_MERGE を送る 邪魔されないように、他のcommand は block する manager() START_MERGE_ACK が来たら、translator.mergeAck() で教えて、 merge()-> translator.checkOwnCommand() へ ここで、sort されて、Merge Command をEditorへ送信 checkEndMerge()から endMerge() が呼ばれる。 自分のエディタにEND_MERGE で Merge終了を通知 自分のコマンドは、ACKに変えて送信 (3) それ以外は、そのまま送信 (一周させる) */ public void translate(REPCommand command){ switch(command.cmd) { case REPCMD_INSERT_ACK: case REPCMD_DELETE_ACK: if (command.eid==eid) { if (mergeMode==MergeMode.Slow) { checkReturnedCommand(command); checkQuit(); return; } else if (mergeMode==MergeMode.Direct) { truncateUnMergedCmds(command); } // Second Phase が終わって同期が終了。 // SessionManager.logger.writeLog("Complete "+command); checkAck(command); checkQuit(); return; } if (mergeMode==MergeMode.Direct) { checkAck(command); truncateUnMergedCmds(command); ServerMainLoop.logger.writeLog("Editor"+eid+": send ackCommand "+command+report()); next.send(command); checkQuit(); } else checkReturnedCommand(command); return; case REPCMD_INSERT_USER: command.cmd = REP.REPCMD_INSERT; userEditorCommand(command); return; case REPCMD_DELETE_USER: command.cmd = REP.REPCMD_DELETE; userEditorCommand(command); return; case REPCMD_INSERT: case REPCMD_DELETE: case REPCMD_MERGE_MARK: if (command.eid == REP.MERGE_EID.id){ //マージコマンドが返ってきた if(checkMergeConflict(command)){ //マージ中にエディタからの割り込みがあった場合 getMergeAgain(); } checkEndMerge(); return; } if (command.eid == eid){ // 編集コマンドが一周して来た if (mergeMode==MergeMode.Slow) { checkAck(command); sendAck(command); } else if (mergeMode==MergeMode.Direct) { // truncateUnMergedCmds(command); checkAck(command); sendAck(command); } else { checkReturnedCommand(command); } return; } //他のエディタからの編集コマンド transReceiveCmd(next,command); if (mergeMode==MergeMode.Direct) { sendEditorCommand(command); // Own commands may enter after here. To distinguish put mark here sentList.addLast(mergeMark ); startMerge(command); } else sendEditorCommand(command); return; default: assert(false); } } /** * エディタからの新たな編集コマンド * @param command */ private void userEditorCommand(REPCommand command) { if (next==this) return; // singleton case transSendCmd(command); sendEditorCommand(command); if (mergeMode==MergeMode.Direct) { ServerMainLoop.logger.writeLog("Editor"+eid+": User Command Before "+command+report()); truncateSentList(command,true); ServerMainLoop.logger.writeLog("Editor"+eid+": User Command After "+command+report()); } return; } // private void checkDouble(List<REPCommand> sentList) { // if (sentList.size()==0) return; // int count = 0; // REPCommand f = sentList.get(0); // for(REPCommand c:sentList) { // if (c.eid==f.eid&&c.seq==f.seq) { // count++; // } // } // assert(count==1); // if (true) return; // count = 0; // for(PacketSet c:waitingCommandInMerge) { // for(REPCommand g:sentList) { // if (c.command.eid==g.eid&&c.command.seq==g.seq) { // count++; // } // } // } // assert(count==0); // } /** * Sending to Editor and waiting Queue * +--------+ * send() --> write() -> | Editor | -> handle() -> manager() * +--------+ * waitingQueue * writeQueue * * send() は、他のEditor Node から呼ばれる * write() は、内部で優先的に送信するのに用いる * writeQueue は、waitingQueue よりも常に先に実行される必要がある * Manageの送信キューはここでは使わない * send() manage */ @Override public void send(REPCommand command) { if (blocking || isMerging() || waitingCommandInMerge.size()>0) { waitingCommandInMerge.addLast(command); ServerMainLoop.logger.writeLog("Editor eid:"+eid+" waitingCommandInMerge = "+waitingCommandInMerge); return; } if (isMergeCommand(command)) { blocking = true; ServerMainLoop.logger.writeLog("Editor"+eid+": merging=true (send)"+command); } writeQueue.add(command); } /** * Check waiting command in merge * periodically called from manager */ public void checkWaitingCommandInMerge() { if (writeQueue.size()>0) { REPCommand command =writeQueue.pollFirst(); ServerMainLoop.logger.writeLog("Editor"+eid+": write comand="+command); super.write(command); return; } if (blocking || isMerging()) return; if (waitingCommandInMerge.size()>0) { REPCommand command = waitingCommandInMerge.pollFirst(); ServerMainLoop.logger.writeLog("Editor"+eid+": send waiting comand="+command); super.write(command); if (isMergeCommand(command)) { blocking = true; } } } /** * 他のエディタへのコマンドの送信 * @param command * * sendList にキープする必要がある。 */ private void sendEditorCommand(REPCommand command) { REPCommand keep = new REPCommand(command); sentList.add(keep); ackList.add(keep); //ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList); assert(ackList.size()<limit); if (command.cmd==REP.REPCMD_DELETE) { // delete のundo用の文字列は、外に出す意味はない command.string=null; } next.send(command); } /** * 一周して来たcommandの処理。 * * INSERT/DELETEを受け取った時に、sentListに登録 * INSERT_ACK/DELETE_ACKが来たら一周。そこで、Mergeする。 * * 自分が出したINSERT/DELETEが戻って来たら、ACKに変更して、Merge。 * * 途中から参加した場合、自分が受けとってないcommandのACKが先に来ることが * ある。それは、無視して良い。 * @param command */ void checkReturnedCommand(REPCommand command) { startMerge(command); } void startMerge(REPCommand command) { ServerMainLoop.logger.writeLog("Editor"+eid+": startMerge "+command); preMergeCommand = new REPCommand(command); // merge は必須だが、EditorのCommand実装をテストするには邪魔なので、off に出来るようにする。 if (mergeMode==MergeMode.NoMerge) { checkQuit(); endMerge(); return; } // START_MERGE を送る // 送らないで良い場合もある? REPCommand cmd = new REPCommand(REP.SMCMD_START_MERGE,command.sid,REP.SM_EID.id,seq(),0,""); sendToEditor(cmd); // merging = true; // Session Manager 側で、このeditorへの他のeditorからの // 入力を止めて、merge にそなえるのは、ここでは間に合わないので、 // send() で行っている。USER Command は、止められないが、問題ない。 // merge は、eidtor 側からACKが来てから始まる。 } /** * sentList と ack を見比べて、正しい順序で来たかどうかを調べる。途中参加したEditorの場合は、Ackは * 無視して良い。 * @param command * @return */ private boolean checkAck(REPCommand command) { REPCommand prev = null; try { if(mergeMode!=MergeMode.Direct && isMerging()) throw new Exception(); if(ackList.size()==0) throw new Exception(); prev=ackList.remove(0); if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) throw new Exception(); } catch (Exception n) { // should be more robust to allow communication failure String err = "Editor eid="+eid+" checkReturnedCommand() : command = " + command + " prev="+ (prev==null?"null":prev)+" ackList="; err += ackList; err += "merging="+isMerging(); ServerMainLoop.logger.writeLog(err); assert(false); } return true; } @Override public void setQuit2(REPCommand cmd) { quit_2 = cmd; checkQuit(); // do not send quit2 until we received all pending // command } @Override public void setEID(int eid) { this.eid = eid; } public String toString(){ return ("Editor eid="+eid+" sid="+sid+" " + host + ":" + file); } void checkEndMerge() { if (blocking) { if (isMerging()) return; endMerge(); blocking = false; } if (quit_2!=null) checkQuit(); } private void endMerge() { REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,sid,eid,seq(),0,""); sendToEditor(mergeEnd); if (mergeMode==MergeMode.Direct) { REPCommand last = sentList.size()==0?null:sentList.getLast(); ServerMainLoop.logger.writeLog("Editor"+eid+": EndMerge Before"+report()); if (last!=null && last.eid==eid && last.sid==sid) { truncateSentList(last,false); // Own command interrupts us, trucate sentList } sentList.remove(mergeMark); preMergeCommand = null; ServerMainLoop.logger.writeLog("Editor"+eid+": EndMerge "+report()); return ; } sortedEditCmds = null; checkAck(preMergeCommand); if (preMergeCommand.eid==eid) { if (mergeMode==MergeMode.Early) { sendAck(preMergeCommand); } } else { ServerMainLoop.logger.writeLog("Editor"+eid+": send preMergeCommand "+preMergeCommand); next.send(preMergeCommand); } // sentList.clear(); preMergeCommand = null; } /** * User Editor Command * no truncate here */ private void truncateSentList(REPCommand commit, boolean mode) { if (mode && blocking) return; // merging is not enough except from endMerge() } /** * Returned command fixed command order. Remove from * sentList and unMergedCmds * @param commit */ public void truncateUnMergedCmds(REPCommand commit) { assert(!merging); boolean flag = false; LinkedList<REPCommand>s = new LinkedList<REPCommand>(); for(REPCommand command:sentList) { if (command.isSameSeq(commit)) { flag = true; continue; } if (flag) s.addLast(command); } if (flag) sentList = s; } /** * Send ack command after receiving self command * @param command */ private void sendAck(REPCommand command) { REPCommand keep = new REPCommand(command); // First Phase End, send ACK switch(keep.cmd) { case REPCMD_INSERT: keep.cmd = REP.REPCMD_INSERT_ACK;break; case REPCMD_DELETE: keep.cmd = REP.REPCMD_DELETE_ACK;break; default: assert(false); } ackList.addLast(keep); ServerMainLoop.logger.writeLog("Editor"+eid+": sendAck sentList = "+sentList); assert(ackList.size()<limit); keep.string = ""; next.send(keep); } private boolean checkQuit() { if (quit_2!=null && ackList.size()==0 &&!isMerging() && waitingCommandInMerge.size()==0) { if (emptySentList() ){ sendToEditor(quit_2); quit_2 = null; return true; } } return false; } private boolean emptySentList() { return sentList.size()==0||(sentList.size()==1 && sentList.getFirst().cmd==REP.REPCMD_MERGE_MARK); } @Override public boolean manage(REPCommand command) { switch(command.cmd){ // Editor Command case REPCMD_DELETE: case REPCMD_INSERT: case REPCMD_DELETE_USER: case REPCMD_INSERT_USER: case REPCMD_DELETE_ACK: case REPCMD_INSERT_ACK: case REPCMD_MERGE_MARK: { translate(command); break; } case SMCMD_START_MERGE_ACK: { // マージの処理と次のエディタへコマンドを送信する処理 mergeAck(); if (!merge(preMergeCommand)) { // nothing to do, send END_MERGE checkEndMerge(); } break; } case SMCMD_SYNC: if (isMaster()) sendToEditor(command); else next.send(command); case SMCMD_QUIT: { next.send(command); break; } case SMCMD_QUIT_2: { // QUIT_2 is returned. if (command.eid!=eid) { // stop this editor unless this is the start, starter will stopped // by QUIT_2_ACK manager.remove(this); } // don't send quit_2 directly to the editor until all pending // merge is processed. // this does not work in distributed case. if (next.isDirect()) next.setQuit2(command); else next.send(command); break; } case SMCMD_QUIT_2_ACK: { manager.remove(this); break; } default: assert false; return false; } return true; } private boolean isMergeCommand(REPCommand command) { if (mergeMode==MergeMode.Direct) return (command.eid!=eid&&command.cmd==REP.REPCMD_INSERT || command.cmd==REP.REPCMD_DELETE); switch(command.cmd) { case REPCMD_INSERT: case REPCMD_DELETE: return mergeMode==MergeMode.Slow?false:command.eid==eid; case REPCMD_INSERT_ACK: case REPCMD_DELETE_ACK: return mergeMode==MergeMode.Slow?true:command.eid!=eid; } return false; } public void sendToEditor(REPCommand command) { writeQueue.add(command); } @Override public void handle(REPCommand command, REPSelectionKey<REPCommand> key) throws IOException { if (command.cmd==REP.SMCMD_JOIN||command.cmd==REP.SMCMD_PUT) { // assert false; // 一つのエディタ上に複数のセッションが作られた場合。 // 若干問題があるらしい next = new Forwarder(manager,next.channel); REPNode first = new FirstConnector(manager,channel); first.handle(command, key); key.attach(new Dispatcher(manager,channel)); return; } if (manager.sessionManage(this, command)) return; // ServerMainLoop.logger.writeLog("Editor"+eid+": handle command="+command); manage(command); } @Override public void cancel(REPSocketChannel<REPCommand> socketChannel) { manager.remove(socketChannel); } public boolean isMaster() { return mode==REP.SMCMD_PUT; } /* Handle special case first, usually these cases * are handled in the next Editor in a session manager, but * it is forwarded here. */ public void forwardedCommandManage(REPCommand command) { if (command.cmd==REP.SMCMD_QUIT_2) { // we have to wait next editor's finishing before sending this. // this is odd, but the editor itself does not know it's merging // state. Only this session manager knows it. setQuit2(command); return; } send(command); } /** * New command from an editor * The command is sent to the next editor * @param cmd * @return translated command. */ public REPCommand transSendCmd(REPCommand cmd){ assert(cmd.eid==eid); cmd.seq = seq(); // renumber editor's seq //マージ中にユーザから割り込みがあった場合 if(isMerging()){ logger.writeLog("mergeAgain"+eid+":"+cmd); mergeAgain = true; } return cmd; } /** * My command is returned from the session ring, and START_MERGE_ACK * is returned. At this * stage my writeQueue is empty, our editor is waiting for me. * Start merge process. * @param cmd */ public boolean merge(REPCommand prev){ logger.writeLog("beforeMerge"+eid+":"+sentList); LinkedList<REPCommand> output = new LinkedList<REPCommand>(); // merge queue上にあるコマンドを全部undoコマンドするのと同時に // sort したコマンド列を生成する for( REPCommand cmd0 : sentList) { if (cmd0.cmd==REP.REPCMD_INSERT || cmd0.cmd==REP.REPCMD_DELETE) output.addLast( createUndo(cmd0) ); } sortedEditCmds = new TreeSet<REPCommand>(new REPCommandComparator(1)); logger.writeLog("sentList"+eid+":"+sentList); for( REPCommand cmd0 : sentList ) { if (cmd0.cmd==REP.REPCMD_INSERT || cmd0.cmd==REP.REPCMD_DELETE) { sortedEditCmds.add(cmd0); } } output.addLast(mergeMark); output.addAll(sortedEditCmds); LinkedList<REPCommand> ns = new LinkedList<REPCommand>(); ns.addAll(sortedEditCmds); sentList = ns; logger.writeLog("sortedMerge"+eid+":"+sortedEditCmds); // unMerged command のdeleteのundo string は、この時点で使えない。 // Editor 側から送り返して来たものを使う必要がある。 logger.writeLog("outputMerge"+eid+":"+output); return optimizedSend(this,output); } public boolean mergeEarly(REPCommand prev){ logger.writeLog("beforeMerge"+eid+":"+sentList); LinkedList<REPCommand> output = new LinkedList<REPCommand>(); LinkedList<REPCommand> newSentList = new LinkedList<REPCommand>(); // merge queue上にあるコマンドを全部undoコマンドするのと同時に // sort したコマンド列を生成する for( REPCommand cmd0 : sentList) { output.addLast( createUndo(cmd0) ); } sortedEditCmds = new TreeSet<REPCommand>(new REPCommandComparator(1)); logger.writeLog("sentList"+eid+":"+sentList); boolean flag = true; for( REPCommand cmd0 : sentList ) { if (cmd0.cmd==REP.REPCMD_INSERT || cmd0.cmd==REP.REPCMD_DELETE) { if (flag) sortedEditCmds.add(cmd0); else newSentList.add(cmd0); } } output.addAll(sortedEditCmds); output.addLast(mergeMark); logger.writeLog("sortedMerge"+eid+":"+sortedEditCmds); // sentList の command.string は、 // Editor 側から送り返して来たものを使う必要がある。 sentList = newSentList; logger.writeLog("outputMerge"+eid+":"+output); return optimizedSend(this,output); } /** * Sent optimized merged command list * @param editor * @param output * @return if any sent commands output */ public boolean optimizedSend(REPNode editor, LinkedList<REPCommand> output) { /* * Optimized send の場合は、command.original を意識する必要がある */ sentMergedList.clear(); List<REPCommand> output1 = optimizer.optimize(output); if (output1.size()==0) { merging = false; return false; } for(REPCommand c:output1) { REPCommand m = new REPCommand(c); m.setEID(REP.MERGE_EID.id); m.setSEQID(editor.seq()); m.original = c; sentMergedList.addLast(m); editor.sendToEditor(m); } logger.writeLog("OptimizedOutputMerge"+eid+":"+sentMergedList); return true; } private REPCommand createUndo(REPCommand cmd){ REPCommand retCmd = new REPCommand(cmd); retCmd.original = cmd; if (cmd.cmd==REP.REPCMD_INSERT) { retCmd.cmd=REP.REPCMD_DELETE; retCmd.string=""; } else if (cmd.cmd==REP.REPCMD_DELETE) retCmd.cmd=REP.REPCMD_INSERT; return retCmd; } class REPCommandComparator implements Comparator<REPCommand>{ int base; REPCommandComparator(int base) { this.base = base; } public int compare(REPCommand o1, REPCommand o2) { int eid1 = o1.eid-base; if (eid1<0) eid1 += Integer.MAX_VALUE; int eid2 = o2.eid-base; if (eid2<0) eid2 += Integer.MAX_VALUE; if ( gSeq(o1.seq)<gSeq(o2.seq) ) return -1; if ( gSeq(o1.seq)>gSeq(o2.seq) ) return 1; if ( eid1<eid2 ) return -1; if ( eid1>eid2 ) return 1; if ( o1.seq<o2.seq ) return -1; if ( o1.seq>o2.seq ) return 1; // assert(false); // this can happen in MergedAgain case return 0; } } /** * Translate Command that was received from SeMa. * @param cmd the command to be translated. * @return translated command. */ public void transReceiveCmd(REPNode nextEditor,REPCommand cmd){ assert (cmd.eid != eid); incrementGseq(cmd); } final int gseqLimit = 1000; private int gSeq(int seq) { return seq/gseqLimit; } /** * increment global sequence part * @param oseq */ private void incrementGseq( REPCommand cmd) { if (gSeq(cmd.seq) >= gSeq(seq)) { setSeq((gSeq(cmd.seq)+1)*gseqLimit); } } public void setEid(int _eid){ eid = _eid; } public boolean checkMergeConflict(REPCommand command) { REPCommand prev = sentMergedList.getFirst(); if (prev.seq==command.seq) { // logger.writeLog("Input eid="+eid+"SentMergedList = "+sentMergedList); sentMergedList.removeFirst(); if (prev.original!=null && command.string!=null && !command.string.equals("")) { prev.original.string = command.string; } } // previous merge command may be returned if(sentMergedList.size()==0 && !mergeAgain) { merging=false; } return mergeAgain; } public void getMergeAgain() { if (sentMergedList.size()>0) return; // wait for previous merge completion if (mergeMode==MergeMode.Direct) { logger.writeLog("MergeAgain "+eid); mergeAgain = false; merge(preMergeCommand); return; } LinkedList<REPCommand> returnCommand = new LinkedList<REPCommand>(); for(REPCommand command : sentList) { if (command.cmd==REP.REPCMD_INSERT||command.cmd==REP.REPCMD_DELETE) returnCommand.addLast(createUndo(command)); } returnCommand.addAll(sortedEditCmds); returnCommand.addLast(new REPCommand(REP.REPCMD_MERGE_MARK,0, sid, REP.MERGE_EID.id, seq(), "")); returnCommand.addAll(sentList); logger.writeLog("MergeAgain "+eid+" ret="+returnCommand.size()); mergeAgain = false; optimizedSend(this, returnCommand); } // // public boolean isFinished() { // if(unMergedCmds.size() > 0) return false; // if(sentMergedList.size() > 0) return false; // return true; // } public boolean isMerging() { return merging; } /** * receive SMCMD_START_MERGE_ACK */ public void mergeAck() { logger.writeLog("Editor"+eid+": START MERGE "+ sentList); // これ以降のUser command の割り込みはmergeのやり直しが必要 merging = true; } /** * Dead lock reporter */ public String report() { String s = ""; s += "\n sentList:"+sentList; s += "\n ackList:"+ackList; s += "\n mergeMode=:"+merging; return s; } }