Mercurial > hg > RemoteEditor > REPSessionManager
view rep/handler/Editor.java @ 452:d0d2449000f5
checkAck
author | one |
---|---|
date | Thu, 23 Sep 2010 21:19:28 +0900 |
parents | fa7d9ec2008e |
children | 7005658aa52a |
line wrap: on
line source
package rep.handler; import java.io.IOException; import java.util.LinkedList; import java.util.List; import rep.PacketSet; import rep.REP; import rep.REPCommand; import rep.ServerMainLoop; import rep.SessionManager; import rep.channel.REPSelectionKey; import rep.channel.REPSocketChannel; import rep.optimizers.*; public class Editor extends Forwarder { private Translator translator; // REPCommands we are going to send to the next editor private LinkedList<REPCommand> sentList = new LinkedList<REPCommand>(); protected LinkedList<PacketSet> waitingCommandInMerge= new LinkedList<PacketSet>(); private REPCommand quit2=null; private REPCommand preMergeCommand; private boolean merging; private REPCommand mergeMark = new REPCommand(REP.SMCMD_START_MERGE, 0,0, 0, 0, ""); public static boolean noMergeMode=false; static final boolean doOptimize = false; public Editor(SessionManager manager,int editorNo){ // no translator case super(manager, null); } public Editor(int editorNo, SessionManager manager,REPSocketChannel<REPCommand> channel){ super(editorNo,manager,channel); eid = editorNo; sentList.add(mergeMark); // merge mark REPCommandOptimizer optimizer; if (doOptimize) optimizer = new DeleteInsertOptimizer(); //タカノがつくったおぷてぃまいざ else optimizer = new NullOptimizer(); //なにもしないけどOptimizer. translator = new Translator(eid,optimizer); } /* * Merge Protocol (0) Editor へのコマンドは、ack 以外は直接 Editor へ送られてしまう。(next.send(cmd)) Editor から返ってくるコマンドをtranslatorが処理する。 (1) Editor CommandをSession Ring 上に流し、それが戻って来るまでに、他のEditorから 受け取った Editor Command をキューに入れておく。 (2) 戻って来たタイミングで、キュー上のEditor Commandを、eid とCommandの 順序を基にソートする。(self merge) (3) 他のEditorにソートのタイミングを与えるために、Editor Command の ack を、もう一周させる。 (4) 他のEditorのCommandを受け取ってから、ack が来るまでのCommandをキューに 入れておき、ack が来たら、eid とCommandの順序を基にソートする。(other merge) Editor には、ソートした編集結果になるように、それまで行なった編集をUndo して、ソートした編集結果を適用する。Undo が無駄な動作をしないように最適化する。 handle() セッションの処理 manage() 編集コマンドは translate() へ 一周して来た編集コマンドのACKは廃棄 (merge queue から削除) 一周して来た自分のコマンドならself merge 他のエディタの編集コマンドのACK->other merge それ以外は、そのまま実行、merge queue へ格納 merge は checkReturnedCommand() から startMerge() へ まず、接続されている Editor に START_MERGE を送る 邪魔されないように、他のcommand は block する manager() START_MERGE_ACK が来たら、translator.mergeAck() で教えて、 merge()-> translator.checkOwnCommand() へ ここで、sort されて、Merge Command をEditorへ送信 checkEndMerge()から endMerge() が呼ばれる。 自分のエディタにEND_MERGE で Merge終了を通知 自分のコマンドは、ACKに変えて送信 (3) それ以外は、そのまま送信 (一周させる) */ public void translate(REPCommand command){ switch(command.cmd) { case REPCMD_INSERT_ACK: case REPCMD_DELETE_ACK: if (command.eid==eid) { // Second Phase が終わって同期が終了。 // SessionManager.logger.writeLog("Complete "+command); checkAck(command); checkQuit(); return; } checkReturnedCommand(command); return; case REPCMD_INSERT_USER: command.cmd = REP.REPCMD_INSERT; userEditorCommand(command); return; case REPCMD_DELETE_USER: command.cmd = REP.REPCMD_DELETE; userEditorCommand(command); return; case REPCMD_INSERT: case REPCMD_DELETE: if (command.eid == REP.MERGE_EID.id){ //マージコマンドが返ってきた if(translator.checkMergeConflict(command)){ //マージ中にエディタからの割り込みがあった場合 translator.getMergeAgain(this); } checkEndMerge(); return; } else if (command.eid == eid){ // 編集コマンドが一周して来た checkReturnedCommand(command); return; } //他のエディタからの編集コマンド if (waitingRequired(command,null)) return; translator.transReceiveCmd(next,command); sendEditorCommand(command); return; default: assert(false); } } private void userEditorCommand(REPCommand command) { //エディタからの新たな編集コマンド if (next==this) return; // singleton case translator.transSendCmd(command); sendEditorCommand(command); return; } // private void checkDouble(List<REPCommand> sentList) { // if (sentList.size()==0) return; // int count = 0; // REPCommand f = sentList.get(0); // for(REPCommand c:sentList) { // if (c.eid==f.eid&&c.seq==f.seq) { // count++; // } // } // assert(count==1); // if (true) return; // count = 0; // for(PacketSet c:waitingCommandInMerge) { // for(REPCommand g:sentList) { // if (c.command.eid==g.eid&&c.command.seq==g.seq) { // count++; // } // } // } // assert(count==0); // } private boolean waitingRequired(REPCommand command, REPSocketChannel<REPCommand> channel) { if (hasWaitingCommand()) { // We cannot do this operation before watingCommandQueue. addWaitingCommand(new PacketSet(this, new REPCommand(command))); return true; } else if (isMerging()) { addWaitingCommand(new PacketSet(this, new REPCommand(command))); return true; } //ServerMainLoop.logger.writeLog("Editor eid:"+eid+" no waiting"); return false; } public void addWaitingCommand(PacketSet set) { // if (preMergeCommand!=null) { // if (preMergeCommand.eid==set.command.eid // && preMergeCommand.seq==set.command.seq) { // assert(false); // } // } waitingCommandInMerge.add(set); } /** * 他のエディタへのコマンドの送信 * @param command * * sendList にキープする必要がある。 */ private void sendEditorCommand(REPCommand command) { REPCommand keep = new REPCommand(command); sentList.add(keep); //ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList); assert(sentList.size()<limit); if (command.cmd==REP.REPCMD_DELETE) { // delete のundo用の文字列は、外に出す意味はない command.string=null; } next.send(command); } boolean merge(REPCommand command) { //マージして送信 return translator.catchOwnCommand(this, command); } @Override public List<REPCommand> getSentList() { return sentList; } /** * 一周して来たcommandの処理。 * * INSERT/DELETEを受け取った時に、sentListに登録 * INSERT_ACK/DELETE_ACKが来たら一周。そこで、Mergeする。 * * 自分が出したINSERT/DELETEが戻って来たら、ACKに変更して、Merge。 * * 途中から参加した場合、自分が受けとってないcommandのACKが先に来ることが * ある。それは、無視して良い。 * @param command */ void checkReturnedCommand(REPCommand command) { startMerge(command); return; } private boolean checkAck(REPCommand command) { assert(!isMerging()); REPCommand prev; if (sentList.getFirst()==mergeMark) prev=sentList.remove(1); else prev=sentList.remove(0); if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) { // should be more robust to allow communication failure String err = "Editor eid="+eid+" checkReturnedCommand() : command = " + command + " prev="+ (prev==null?"null":prev)+" sentList="; err += sentList; ServerMainLoop.logger.writeLog(err); assert(false); } return true; } private void startMerge(REPCommand command) { ServerMainLoop.logger.writeLog("Editor"+eid+": startMerge "+command); preMergeCommand = new REPCommand(command); // merge は必須だが、EditorのCommand実装をテストするには邪魔なので、off に出来るようにする。 if (noMergeMode) { checkQuit(); endMerge(); return; } // START_MERGE を送る // 送らないで良い場合もある? REPCommand cmd = new REPCommand(REP.SMCMD_START_MERGE,command.sid,REP.SM_EID.id,seq(),0,""); sendToEditor(cmd); merging = true; // Session Manager 側で、このeditorへの他のeditorからの // 入力を止めて、merge にそなえる。merge は、eidtor 側から // ACKが来てから始まる。 translator.startMerge(command); } @Override public void setQuit2(REPCommand cmd) { quit2 = cmd; checkQuit(); // do not send quit2 until we received all pending // command } @Override public void setEID(int eid) { this.eid = eid; if (translator!=null) translator.setEid(eid); } public String toString(){ return ("Editor eid="+eid+" sid="+sid+" " + host + ":" + file); } public boolean isMerging() { return translator.isMerging(); } void checkEndMerge() { if (merging) { if (translator.isMerging()) return; endMerge(); merging = false; } if (quit2!=null) checkQuit(); } private void endMerge() { translator.endMerge(); REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,sid,eid,seq(),0,""); sendToEditor(mergeEnd); checkAck(preMergeCommand); if (preMergeCommand.eid==eid) { // First Phase End, send ACK REPCommand keep = new REPCommand(preMergeCommand); switch(keep.cmd) { case REPCMD_INSERT: keep.cmd = REP.REPCMD_INSERT_ACK;break; case REPCMD_DELETE: keep.cmd = REP.REPCMD_DELETE_ACK;break; default: assert(false); } sentList.addLast(preMergeCommand); //ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList); assert(sentList.size()<limit); next.send(keep); } else { next.send(preMergeCommand); } sentList.remove(mergeMark); sentList.addLast(mergeMark); preMergeCommand = null; } private boolean checkQuit() { if (quit2!=null && sentList.size()==1&&!isMerging()) { sendToEditor(quit2); quit2 = null; return true; } return false; } @Override public boolean manage(REPCommand command) { switch(command.cmd){ // Editor Command case REPCMD_DELETE: case REPCMD_INSERT: case REPCMD_DELETE_USER: case REPCMD_INSERT_USER: case REPCMD_DELETE_ACK: case REPCMD_INSERT_ACK: { translate(command); break; } case SMCMD_START_MERGE_ACK: { // マージの処理と次のエディタへコマンドを送信する処理 translator.mergeAck(); if (!merge(preMergeCommand)) { // nothing to do, send END_MERGE checkEndMerge(); } break; } case SMCMD_SYNC: if (isMaster()) sendToEditor(command); else next.send(command); case SMCMD_QUIT: { next.send(command); break; } case SMCMD_QUIT_2: { // QUIT_2 is returned. if (command.eid!=eid) { // stop this editor unless this is the start, starter will stopped // by QUIT_2_ACK manager.remove(this); } // don't send quit_2 directly to the editor until all pending // merge is processed. // this does not work in distributed case. if (next.isDirect()) next.setQuit2(command); else next.send(command); break; } case SMCMD_QUIT_2_ACK: { manager.remove(this); break; } default: assert false; return false; } return true; } /** * write command to the editor * called from another Editor instance such as next.send(command) */ @Override public void write(REPCommand command) { if (merging) { addWaitingCommand(new PacketSet(this, new REPCommand(command))); return; } if (!waitingRequired(command,channel)) { if (isMergeCommand(command)) { merging = true; ServerMainLoop.logger.writeLog("Editor"+eid+": merging=true "+command); } super.write(command); } } private boolean isMergeCommand(REPCommand command) { switch(command.cmd) { case REPCMD_INSERT: case REPCMD_DELETE: return command.eid==eid; case REPCMD_INSERT_ACK: case REPCMD_DELETE_ACK: return command.eid!=eid; } return false; } public void sendToEditor(REPCommand command) { super.write(command); } @Override public void handle(REPCommand command, REPSelectionKey<REPCommand> key) throws IOException { //ServerMainLoop.logger.writeLog("Manager "+manager.getId()+" read : command = " + command // +" from "+manager.editorList.editorByChannel(channel)); if (command.cmd==REP.SMCMD_JOIN||command.cmd==REP.SMCMD_PUT) { // assert false; // 一つのエディタ上に複数のセッションが作られた場合。 // 若干問題があるらしい next = new Forwarder(manager,next.channel); REPNode first = new FirstConnector(manager,channel); first.handle(command, key); key.attach(new Dispatcher(manager,channel)); return; } if (manager.sessionManage(this, command)) return; manage(command); } @Override public void cancel(REPSocketChannel<REPCommand> socketChannel) { manager.remove(socketChannel); } public boolean isMaster() { return mode==REP.SMCMD_PUT; } /* Handle special case first, usually these cases * are handled in the next Editor in a session manager, but * it is forwarded here. */ public void forwardedCommandManage(REPCommand command) { if (command.cmd==REP.SMCMD_QUIT_2) { // we have to wait next editor's finishing before sending this. // this is odd, but the editor itself does not know it's merging // state. Only this session manager knows it. setQuit2(command); return; } send(command); } /** * Check waiting command in merge * @return true if there is a processed waiting command * @throws IOException */ public void checkWaitingCommandInMerge() { if (translator==null||isMerging()) return; LinkedList<PacketSet> w = waitingCommandInMerge; waitingCommandInMerge = new LinkedList<PacketSet>(); while(w.size()>0) { if (isMerging()) { w.addAll(waitingCommandInMerge); waitingCommandInMerge = w; return; } PacketSet p = w.remove(0); try { if (p.channel!=null) write(p.command); else manage(p.command); } catch (Exception e1) { assert false; manager.close(p.channel.channel); return; } } } public boolean hasWaitingCommand() { return waitingCommandInMerge.size()>0; } }