Mercurial > hg > RemoteEditor > REPSessionManager
view rep/handler/Editor.java @ 398:7de83b6a34e7
not yet fixed....
author | one |
---|---|
date | Sun, 23 Nov 2008 18:38:52 +0900 |
parents | 149c9a53fc37 |
children | 19705f4b8015 |
line wrap: on
line source
package rep.handler; import java.io.IOException; import java.util.LinkedList; import java.util.List; import rep.PacketSet; import rep.REP; import rep.REPCommand; import rep.ServerMainLoop; import rep.SessionManager; import rep.channel.REPSelectionKey; import rep.channel.REPSocketChannel; import rep.optimizers.*; public class Editor extends Forwarder { private Translator translator; // REPCommands we are going to send to the next editor private List<REPCommand> sentList = new LinkedList<REPCommand>(); private REPCommand quit2=null; private boolean merging; private REPCommand preMergeCommand; public static boolean noMergeMode=false; static final boolean doOptimize = true; public Editor(SessionManager manager,int editorNo){ // no translator case super(manager, null); } public Editor(int editorNo, SessionManager manager,REPSocketChannel<REPCommand> channel){ super(editorNo,manager,channel); eid = editorNo; REPCommandOptimizer optimizer; if (doOptimize) optimizer = new DeleteInsertOptimizer(); //タカノがつくったおぷてぃまいざ else optimizer = new NullOptimizer(); //なにもしないけどOptimizer. translator = new Translator(eid,optimizer); } public void translate(REPCommand command){ switch(command.cmd) { case REPCMD_INSERT_ACK: case REPCMD_DELETE_ACK: if (command.eid==eid) { // Second Phase が終わって同期が終了。 return; } if (waitingRequired(command)) return; checkReturnedCommand(command); return; } if (command.eid == eid){ //エディタからの新たな編集コマンド if (next==this) return; // singleton case translator.transSendCmd(command); sendEditorCommand(command); return; } else if (command.eid == REP.MERGE_EID.id){ //マージコマンドが返ってきた if(translator.checkMergeConflict(command)){ //マージ中にエディタからの割り込みがあった場合 translator.getMergeAgain(this); } checkEndMerge(); } else if (command.eid == next.getEID()){ // 次のEditorで一周するコマンドが来た // この方法、あんまり良くない... if (next==this) return; // singleton case // これは、distributed case では、うまくいかないので、送り先のforwarder で処理する。 if (next.isDirect()) { REPCommand keep; if (waitingRequired(command)) return; keep = new REPCommand(command); sentList.add(keep); ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList); ((Editor) next).checkReturnedCommand(command); } else next.send(command); } else { //他のエディタからの編集コマンド if (waitingRequired(command)) return; translator.transReceiveCmd(next,command); if(command.cmd==REP.REPCMD_DELETE) { // delete のundo用の文字列は、外に出す意味はない command.string=null; } sendEditorCommand(command); } return; } private boolean waitingRequired(REPCommand command) { if (manager.hasWaitingCommand(channel)) { // We cannot do this operation before watingCommandQueue. manager.addWaitingCommand(new PacketSet(channel, this, command)); return true; } else if (isMerging()) { manager.addWaitingCommand(new PacketSet(getChannel(), this, new REPCommand(command))); return true; } ServerMainLoop.logger.writeLog("Editor eid:"+eid+" no waiting"); return false; } private void sendEditorCommand(REPCommand command) { REPCommand keep = new REPCommand(command); sentList.add(keep); ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList); assert(sentList.size()<limit); next.send(command); } boolean merge(REPCommand command) { //マージして送信 return translator.catchOwnCommand(this); } /** * 一周して来たcommandの処理。 * @param command */ void checkReturnedCommand(REPCommand command) { REPCommand prev = sentList.remove(0); ServerMainLoop.logger.writeLog("Editor eid="+eid+" remove sentList:"+(prev==null?"null":prev)); if (prev==null || prev.seq != command.seq || prev.eid!=command.eid) { String err = "Editor eid="+eid+" checkReturnedCommand() : command = " + command + " prev="+ (prev==null?"null":prev)+" sentList="; err += sentList; ServerMainLoop.logger.writeLog(err); assert(false); } startMerge(command); return; } private void startMerge(REPCommand command) { preMergeCommand = new REPCommand(command); preMergeCommand.string = ""; // merge は必須だが、EditorのCommand実装をテストするには邪魔なので、off に出来るようにする。 if (noMergeMode) { endMerge(); return; } // START_MERGE を送る // 送らないで良い場合もある? REPCommand cmd = new REPCommand(REP.SMCMD_START_MERGE,command.sid,REP.SM_EID.id,seq(),0,""); send(cmd); merging = true; // Session Manager 側で、このeditorへの他のeditorからの // 入力を止めて、merge にそなえる。merge は、eidtor 側から // ACKが来てから始まる。 translator.startMerge(cmd); } @Override public void setQuit2(REPCommand cmd) { quit2 = cmd; checkQuit(); // do not send quit2 until we received all pending // command } @Override public void setEID(int eid) { this.eid = eid; if (translator!=null) translator.setEid(eid); } public String toString(){ return ("Editor eid="+eid+" sid="+sid+" " + host + ":" + file); } public boolean isMerging() { return translator.isMerging(); } void checkEndMerge() { if (merging) { if(translator.isMerging()) return; endMerge(); merging = false; } if (quit2!=null) checkQuit(); } private void endMerge() { REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,sid,eid,seq(),0,""); send(mergeEnd); if (preMergeCommand.eid==eid) { // First Phase End, send ACK REPCommand keep = new REPCommand(preMergeCommand); switch(keep.cmd) { case REPCMD_INSERT: keep.cmd = REP.REPCMD_INSERT_ACK;break; case REPCMD_DELETE: keep.cmd = REP.REPCMD_DELETE_ACK;break; } sentList.add(keep); ServerMainLoop.logger.writeLog("Editor eid:"+eid+" sentList = "+sentList); assert(sentList.size()<limit); next.send(keep); } else { next.send(preMergeCommand); } preMergeCommand = null; } private boolean checkQuit() { if (sentList.size()==0&&!isMerging()) { send(quit2); quit2 = null; return true; } return false; } @Override public boolean manage(REPCommand command) { switch(command.cmd){ // Editor Command case REPCMD_DELETE: case REPCMD_INSERT: case REPCMD_DELETE_ACK: case REPCMD_INSERT_ACK: case REPCMD_NOP: { translate(command); break; } case SMCMD_START_MERGE_ACK: { // マージの処理と次のエディタへコマンドを送信する処理 translator.mergeAck(); if (!merge(command)) { // nothing to do, send END_MERGE checkEndMerge(); } break; } case SMCMD_SYNC: if (isMaster()) send(command); else next.send(command); case SMCMD_QUIT: { next.send(command); break; } case SMCMD_QUIT_2: { // QUIT_2 is returned. if (command.eid!=eid) { // stop this editor unless this is the start, starter will stopped // by QUIT_2_ACK manager.remove(this); } // don't send quit_2 directly to the editor until all pending // merge is processed. // this does not work in distributed case. if (next.isDirect()) next.setQuit2(command); else next.send(command); break; } case SMCMD_QUIT_2_ACK: { manager.remove(this); break; } default: assert false; return false; } return true; } @Override public void handle(REPCommand command, REPSelectionKey<REPCommand> key) throws IOException { ServerMainLoop.logger.writeLog("Manager "+manager.getId()+"read : command = " + command +" from "+manager.editorList.editorByChannel(channel)); if (command.cmd==REP.SMCMD_JOIN||command.cmd==REP.SMCMD_PUT) { assert false; // 若干問題があるらしい next = new Forwarder(manager,next.channel); REPNode first = new FirstConnector(manager,channel); first.handle(command, key); key.attach(new Dispatcher(manager,channel)); return; } if (manager.sessionManage(this, command)) return; manage(command); } @Override public void cancel(REPSocketChannel<REPCommand> socketChannel) { manager.remove(socketChannel); } public boolean isMaster() { return mode==REP.SMCMD_PUT; } /* Handle special case first, usually these cases * are handled in the next Editor in a session manager, but * it is forwarded here. */ public void forwardedCommandManage(REPCommand command) { if (command.cmd==REP.SMCMD_QUIT_2) { // we have to wait next editor's finishing before sending this. // this is odd, but the editor itself does not know it's merging // state. Only this session manager knows it. setQuit2(command); } else if (command.eid==eid) { // if we handle in editor.manage(), this editor cannot distinguish this // and user input command from the editor. REPCommand keep; switch(command.cmd) { case REPCMD_DELETE_ACK: case REPCMD_INSERT_ACK: checkReturnedCommand(command); return ; case REPCMD_INSERT: keep = new REPCommand(command); keep.cmd = REP.REPCMD_INSERT_ACK; sentList.add(keep); checkReturnedCommand(command); return; case REPCMD_DELETE: keep = new REPCommand(command); keep.cmd = REP.REPCMD_DELETE_ACK; sentList.add(keep); checkReturnedCommand(command); return; } } send(command); } }