Mercurial > hg > RemoteEditor > REPSessionManager
changeset 315:20fb70068089
*** empty log message ***
author | kono |
---|---|
date | Mon, 06 Oct 2008 18:58:49 +0900 |
parents | edb373aa421e |
children | 77f443f6dc9f |
files | Todo rep/Editor.java rep/Session.java rep/SessionManager.java rep/translater/Translater.java rep/translater/TranslaterImp1.java test/sematest/TestEditor.java |
diffstat | 7 files changed, 162 insertions(+), 176 deletions(-) [+] |
line wrap: on
line diff
--- a/Todo Mon Oct 06 10:34:37 2008 +0900 +++ b/Todo Mon Oct 06 18:58:49 2008 +0900 @@ -1,26 +1,40 @@ +Mon Oct 6 16:39:57 JST 2008 + +Todo: translator にある5つのqueueが、Editor にもある。merge のアルゴリズムの +実装を見直す必要がある。(kono) + +Todo: +SessionManager の向うにあるeditorにREPCommandを送るコードがない。Editor 扱いしても良いが、Editor が複雑すぎるので、それは好ましくない。Editor に nextChannelを持たせるのが良いか? (kono) + +Todo: +SessionManger のeditor がmerge 中のeditor commandをblockするのは良いが、 +sessionManger コマンドをblockされるのは困る。(kono) Wed Oct 1 20:58:51 JST 2008 Todo: Session ring 廻るcommand packetは、基本的に書き換えられるべきではない eid, seq の組でuniqueになる。現状では、そここで書き換えが起きているらしい。 eid = -1 (Session Manager), eid = -2 (MergeCommand) あたりが - 特殊らしい。 でも、実際には生成されてないっぽい。 + 特殊らしい。 でも、実際には生成されてないっぽい。(kono) + Done: Mon Oct 6 16:40:14 JST 2008 (kono) Todo: SessionManagerのprotocolのswitch文で、そこら中でgetEditor/getSessionが - 呼ばれている。これらは、for loopで探しているので、繰り返し行うのは変。 + 呼ばれている。これらは、for loopで探しているので、繰り返し行うのは変。(kono) -Todo: REPCMD_INSERTが止まらない... +Todo: REPCMD_INSERTが止まらない... (kono) + Done: Mon Oct 6 16:40:38 JST 2008 (kono) -Todo: SessionMnager のmessageをREPLogger baseに書き換える。 +Todo: SessionMnager のmessageをREPLogger baseに書き換える。 (kono) Wed Oct 1 15:35:44 JST 2008 -Todo: SessionManager 複数のコマンドをまとめてeditorに送るとdead lockする可能性がある。 - 送信キューを作り、select loop しながら、ひとつずつコマンドを送信する - Done: +Todo: SessionManager 複数のコマンドをまとめてeditorに送るとdead lockする + 可能性がある。送信キューを作り、select loop しながら、ひとつずつコマンドを + 送信する (kono) + Done: (kono) Todo: Editor quit, quit2 の実装 quit2 では、自分の送信したコマンドが戻ってくるまで待つ必要がある。 - editor 毎の状態となる。 - Done: - \ No newline at end of file + editor 毎の状態となる。(kono) + Done: (kono) +
--- a/rep/Editor.java Mon Oct 06 10:34:37 2008 +0900 +++ b/rep/Editor.java Mon Oct 06 18:58:49 2008 +0900 @@ -1,6 +1,5 @@ package rep; -import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -18,9 +17,10 @@ private String file; private TranslaterImp1 translater; private List<REPCommand> sentList; - private List<REPCommand> sentMergedList; + // REPCommands we sent to the next editor + private List<REPCommand> writeQueue; + // REPCommands we are going to send to the next editor private REPCommandOptimizer optimizer; - private List<REPCommand> writeQueue; private REPCommand quit2 = null; private REPLogger ns = REPLogger.singleton(); private final int limit=100; @@ -32,7 +32,6 @@ setHostAndPort(myChannel); translater = new TranslaterImp1(eid); sentList = new LinkedList<REPCommand>(); - sentMergedList = new LinkedList<REPCommand>(); writeQueue = new LinkedList<REPCommand>(); if (doOptimize) optimizer = new DeleteInsertOptimizer(); //タカノがつくったおぷてぃまいざ @@ -45,7 +44,6 @@ this.myChannel = channel; translater = new TranslaterImp1(eid); sentList = new LinkedList<REPCommand>(); - sentMergedList = new LinkedList<REPCommand>(); writeQueue = new LinkedList<REPCommand>(); setHostAndPort(myChannel); } @@ -58,64 +56,50 @@ writeQueue = new LinkedList<REPCommand>(); } - public List<REPCommand> translate(REPCommand command){ - List<REPCommand> list = new LinkedList<REPCommand>(); - if(command.eid == eid){ + enum TranslatorResult { + START_MERGE, NEW_COMMAND, MERGE_RETURN, MERGE_AGAIN, INCOMMING_COMMAND, MERGE_END + } + + public TranslatorResult translate(Editor nextEditor, REPCommand command){ + if(command.eid == nextEditor.getEID()){ if(checkReturnedCommand(command)){ //エディタからのコマンドが元のエディタに戻ってきた // START_MERGE を送る REPCommand cmd = new REPCommand(REP.SMCMD_START_MERGE,command.sid,REP.SM_EID.id,seq(),0,""); - list.add(cmd); - return list; - }else{ + nextEditor.send(cmd); + return TranslatorResult.START_MERGE; + } else assert(false); + } else if(command.eid == eid){ //エディタからの新たな編集コマンド sentList.add(command); assert(sentList.size()<limit); translater.transSendCmd(command); - list.add(command); - } - }else if(eid == REP.MERGE_EID.id){ + nextEditor.send(command); + return TranslatorResult.NEW_COMMAND; + }else if(command.eid == REP.MERGE_EID.id){ //マージコマンドが返ってきた if(translater.checkMergeConflict(command)){ //マージ中にエディタからの割り込みがあった場合 - List<REPCommand> mergeAgainList = translater.getMergeAgain(); - - mergeAgainList = optimizer.optimize(mergeAgainList); - writeQueue.addAll(mergeAgainList); - assert(writeQueue.size()<limit); + if (optimizedSend(translater.getMergeAgain())) { + return TranslatorResult.MERGE_AGAIN; + } } + return isMerging()?TranslatorResult.MERGE_RETURN: + TranslatorResult.MERGE_END; }else{ //他のエディタからの編集コマンド - REPCommand[] cmds = translater.transReceiveCmd(command); - for(REPCommand cmd : cmds){ - list.add(cmd); - } + translater.transReceiveCmd(nextEditor,command); } - return list; + return TranslatorResult.INCOMMING_COMMAND; } - boolean merge(REPCommand command) { + boolean merge(Editor editor,REPCommand command) { REPCommand prev = translater.prev(); if(prev==null) return false; + assert(writeQueue.size()==0); assert(prev.eid==command.eid); //マージして送信 - ArrayList<REPCommand> cmds = translater.catchOwnCommand(); - //optimizer - if (cmds.size()==0) - return false; // no merge phase is necessary - //マージ中のエディタからの割り込み検知に使う - sentMergedList.addAll(cmds); - sendMergedCommand(cmds); - return true; - } - - private void sendMergedCommand(ArrayList<REPCommand> cmds) { - for(REPCommand mergeCommand : cmds){ - mergeCommand.setEID(REP.MERGE_EID.id); - mergeCommand.setSEQID(seq()); - writeQueue.add(mergeCommand); - assert(writeQueue.size()<limit); - } + return translater.catchOwnCommand(editor); } boolean checkReturnedCommand(REPCommand command) { @@ -125,7 +109,7 @@ return true; }else{ System.err.println("Editor.checkReturnedCommand() : command = " + command); - //assert(false); + assert(false); } } return false; @@ -145,7 +129,7 @@ myChannel.write(cmd); return true; } else if (quit2!=null && sentList.size()==0) { - //myChannel.write(quit2); + myChannel.write(quit2); quit2 = null; return true; } @@ -216,5 +200,23 @@ public int seq() { return seq++; } + + /** + * Sent optimized merged command list + * @param output + * @return if any sent commands output + */ + public boolean optimizedSend(LinkedList<REPCommand> output) { + List<REPCommand> output1 = optimizer.optimize(output); + if (output1.size()==0) return false; + for(REPCommand c:output1) { + REPCommand m = new REPCommand(c); + m.setEID(REP.MERGE_EID.id); + m.setSEQID(seq()); + writeQueue.add(m); + } + assert(writeQueue.size()<limit); + return true; + } }
--- a/rep/Session.java Mon Oct 06 10:34:37 2008 +0900 +++ b/rep/Session.java Mon Oct 06 18:58:49 2008 +0900 @@ -2,7 +2,6 @@ import java.util.Iterator; import java.util.LinkedList; -import java.util.List; import rep.channel.REPSocketChannel; public class Session { @@ -87,16 +86,9 @@ } return null; } - public void translate(REPSocketChannel<REPCommand> channel, REPCommand command) { - Editor editor = getEditor(channel); - List<REPCommand> commandList = editor.translate(command); - Editor nextEditor = getNextEditor(editor); - - for(REPCommand cmd: commandList){ - nextEditor.send(cmd); - } - } - private Editor getNextEditor(Editor editor) { + + + Editor getNextEditor(Editor editor) { int eid = editor.getEID(); int neid = (eid+1)%editorList.size(); Editor nextEditor = editorList.get(neid);
--- a/rep/SessionManager.java Mon Oct 06 10:34:37 2008 +0900 +++ b/rep/SessionManager.java Mon Oct 06 18:58:49 2008 +0900 @@ -407,14 +407,15 @@ if (session==null) throw new IOException(); // 次のエディタへコマンドを送信する処理 Editor editor = session.getEditor(channel); - boolean old = editor.isMerging(); - session.translate(channel, receivedCommand); - if(editor.isMerging()!=old){ - assert(old==false); - REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,receivedCommand.sid,editor.getEID(),editor.seq(),0,""); - editor.send(mergeEnd); + Editor.TranslatorResult r = editor.translate(session.getNextEditor(editor), receivedCommand); + if(r==Editor.TranslatorResult.MERGE_END) { + endMerge(receivedCommand, session, editor); + } else if (r==Editor.TranslatorResult.START_MERGE) { + // マージ中のエディタはコマンドを受け取らない + // ここで止めることによって、SMCMD_START_MERGE_ACK + // が来た時には、editor.writeQueue はemptyになる Editor prevEditor = session.getPrevEditor(editor); - setNormalState(prevEditor.getChannel(), session.getSID()); + setMergeState(prevEditor.getChannel(), session.getSID()); } break; } @@ -425,10 +426,9 @@ if (session==null) throw new IOException(); // マージの処理と次のエディタへコマンドを送信する処理 Editor editor = session.getEditor(channel); - if (editor.merge(receivedCommand)) { - //マージ中のエディタはコマンドを受け取らない - Editor prevEditor = session.getPrevEditor(editor); - setMergeState(prevEditor.getChannel(), session.getSID()); + if (!editor.merge(editor,receivedCommand)) { + // nothing to do, send END_MERGE + endMerge(receivedCommand, session, editor); } break; } @@ -454,6 +454,14 @@ } } + private void endMerge(REPCommand receivedCommand, Session session, + Editor editor) { + REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,receivedCommand.sid,editor.getEID(),editor.seq(),0,""); + editor.send(mergeEnd); + Editor prevEditor = session.getPrevEditor(editor); + setNormalState(prevEditor.getChannel(), session.getSID()); + } + private void updateGUI() { //リストのコピーをGUIに渡す LinkedList<Session> sList = new LinkedList<Session>(sessionList);
--- a/rep/translater/Translater.java Mon Oct 06 10:34:37 2008 +0900 +++ b/rep/translater/Translater.java Mon Oct 06 18:58:49 2008 +0900 @@ -1,7 +1,6 @@ package rep.translater; -import java.util.ArrayList; - +import rep.Editor; import rep.REPCommand; public interface Translater { @@ -18,14 +17,14 @@ * but in this case, you can use also transReceiveCmd() * @param command which the editor sent. */ - abstract public ArrayList<REPCommand> catchOwnCommand(); + abstract public boolean catchOwnCommand(Editor editor); /** * Translate Command cmd that was received from SeMa. * @param cmd the command to be translated. * @return translated command. */ - abstract public REPCommand[] transReceiveCmd(REPCommand cmd); + abstract public void transReceiveCmd(Editor nextEditor,REPCommand cmd); /** * set the editor's id.
--- a/rep/translater/TranslaterImp1.java Mon Oct 06 10:34:37 2008 +0900 +++ b/rep/translater/TranslaterImp1.java Mon Oct 06 18:58:49 2008 +0900 @@ -1,23 +1,23 @@ package rep.translater; -import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.LinkedList; import java.util.Stack; import java.util.TreeSet; +import rep.Editor; import rep.REPCommand; import rep.REP; public class TranslaterImp1 implements Translater{ - //List <REPCommand> userList; - //List <REPCommand> tokenList; - private LinkedList<REPCommand> sentCmds; - //private LinkedList<REPCommand> unMergedCmds; - private Stack<REPCommand> unMergedCmds; public int eid; private int seq; + /* + * queue が5つもいるって、あまりに馬鹿げてる。 + */ + private LinkedList<REPCommand> sentCmds; + private Stack<REPCommand> unMergedCmds; private LinkedList<REPCommand> undoReplaceList; private LinkedList<REPCommand> sentMergedList; private LinkedList<REPCommand> mergeAgainList; @@ -32,13 +32,12 @@ } /** - * Translate cmd When the editor send REPCommand. - * but now, Only adding cmd to the queue is available. + * New command from an editor + * The command is sent to the next editor * @param cmd * @return translated command. */ public REPCommand transSendCmd(REPCommand cmd){ - //setCmdState(cmd); sentCmds.add(cmd); unMergedCmds.push(cmd); @@ -50,31 +49,30 @@ return cmd; } /** - * Dequeue command cmd that was returned. + * My command is returned from the session ring. * @param cmd */ - public ArrayList<REPCommand> catchOwnCommand(){ - ArrayList<REPCommand> returnCmds = new ArrayList<REPCommand>(); - ArrayList<REPCommand> cmds = new ArrayList<REPCommand>(); + public boolean catchOwnCommand(Editor editor){ + LinkedList<REPCommand> output = new LinkedList<REPCommand>(); + LinkedList<REPCommand> cmds = new LinkedList<REPCommand>(); prev(); //スタック上にあるコマンドを全部undoコマンドにする while ( !unMergedCmds.isEmpty() ){ REPCommand cmd0 = unMergedCmds.pop(); - returnCmds.add( createUndo(cmd0) ); + output.add( createUndo(cmd0) ); cmds.add(cmd0); } /* 必要な分だけソートして返却用のリストに追加 */ - //if (cmds.size()==0) return null; - returnCmds.addAll( sortCmds(cmds) ); + output.addAll( sortCmds(cmds) ); /* 残ったコマンドも再び実行させるが、まだマージされてないのでunMergedにも入れる */ - for(int i=0; i<cmds.size(); i++){ - returnCmds.add( cmds.get(i)); - unMergedCmds.push( cmds.get(i)); + output.addAll(cmds); + for(REPCommand c: cmds) { + output.add(c); + unMergedCmds.push(c); } - - return returnCmds; + return editor.optimizedSend(output); } public REPCommand prev() { @@ -82,11 +80,7 @@ } private REPCommand createUndo(REPCommand cmd){ - String str = new String(cmd.string); - REPCommand retCmd = new REPCommand(cmd.cmd, cmd.sid, cmd.eid, cmd.seq, cmd.lineno, str); - - retCmd.eid = REP.MERGE_EID.id; - + REPCommand retCmd = new REPCommand(cmd); if (cmd.cmd==REP.REPCMD_INSERT) retCmd.cmd=REP.REPCMD_DELETE; else if (cmd.cmd==REP.REPCMD_DELETE) retCmd.cmd=REP.REPCMD_INSERT; return retCmd; @@ -106,7 +100,7 @@ } - private Collection<REPCommand> sortCmds(ArrayList<REPCommand> cmds) { + private Collection<REPCommand> sortCmds(LinkedList<REPCommand> cmds) { TreeSet<REPCommand> sortedCmds1 = new TreeSet<REPCommand>(new REPCommandComparator()); int top; int prevEid=-1; @@ -120,21 +114,21 @@ } /* search cmd. ordering by min EID that is lower lowEid and min SEQ. */ - private int getPrecedence(ArrayList<REPCommand> cmds, int lowEid) { + private int getPrecedence(LinkedList<REPCommand> cmds, int lowEid) { int cEid, cSeq; cEid=cSeq=Integer.MAX_VALUE; int ret=-1; for (int i=0; i<cmds.size(); i++){ - REPCommand tmp = cmds.get(i); - if ( tmp.eid<lowEid ) continue; - else if ( tmp.eid>cEid ) continue; - else if ( tmp.eid==cEid ) { - if ( tmp.seq>cSeq ) continue; - cSeq=tmp.seq; + REPCommand c = cmds.get(i); + if ( c.eid<lowEid ) continue; + else if ( c.eid>cEid ) continue; + else if ( c.eid==cEid ) { + if ( c.seq>cSeq ) continue; + cSeq=c.seq; ret = i; } else { /* tmp.eid<cEid */ - cEid = tmp.eid; - cSeq = tmp.seq; + cEid = c.eid; + cSeq = c.seq; ret = i; } } @@ -146,42 +140,13 @@ * @param cmd the command to be translated. * @return translated commannd. */ - public REPCommand[] transReceiveCmd(REPCommand cmd){ - int i=0; - REPCommand cmds[]; + public void transReceiveCmd(Editor nextEditor,REPCommand cmd){ assert (cmd.eid != eid); -// if (cmd.eid==eid){ -// return catchOwnCommand(cmd); -// } - - for (REPCommand cmd0 : unMergedCmds){ - if (cmd0.eid==cmd.eid) i++; - } - - if ( sentCmds.size()<i ){ - //自分のエディタでされた編集コマンドより他のエディタでの編集コマンドのほうが多かった場合NOPを挿入 - //自分のコマンドがないとマージできないのでNOPを挿入 - cmds = new REPCommand[2]; - String str = "NO OPERATION"; - cmds[0] = setCmdState( new REPCommand(REP.REPCMD_NOP, 0, eid, 0, 0, str) ); - cmds[1] = cmd; - //unMergedCmds.push(cmds[0]); - unMergedCmds.push(cmd); - sentCmds.add(cmds[0]); - return cmds; - } - - unMergedCmds.push(cmd); /* but.. */ - cmds = new REPCommand[1]; - cmds[0] = cmd; - return cmds; + // nop command の挿入は Editor 側で行って、こちら側ではやらない + unMergedCmds.push(cmd); + nextEditor.send(cmd); } - private REPCommand setCmdState(REPCommand cmd){ - cmd.seq = seq++; - cmd.eid = eid; - return cmd; - } public void setEid(int _eid){ eid = _eid; }
--- a/test/sematest/TestEditor.java Mon Oct 06 10:34:37 2008 +0900 +++ b/test/sematest/TestEditor.java Mon Oct 06 18:58:49 2008 +0900 @@ -32,6 +32,7 @@ private int eid = 0; private int sid = 0; REPSocketChannel<REPCommand> channel; + REPCommand nop = new REPCommand(REP.REPCMD_NOP, 0, 0, 0, 0, ""); boolean running = true; long timeout = 1; private String name; @@ -58,7 +59,7 @@ cmds.add(new REPCommand(REP.SMCMD_PUT,0,0,0,0,name+"-file")); cmds.add(new REPCommand(REP.REPCMD_INSERT,0,0,0,0,"m0")); //cmds.add(new REPCommand(REP.REPCMD_DELETE,0,0,0,0,"m0")); - //cmds.add(new REPCommand(REP.SMCMD_QUIT,0,0,0,0,"")); + cmds.add(new REPCommand(REP.SMCMD_QUIT,0,0,0,0,"")); } else { text = new Text(new String[0]); cmds.add(new REPCommand(REP.SMCMD_JOIN,0,0,0,0,name)); @@ -145,8 +146,8 @@ int i=syncCounter-1; REPCommand del = new REPCommand(REP.REPCMD_DELETE,sid,eid,0,i, text.get(i)); REPCommand ins = new REPCommand(REP.REPCMD_INSERT,sid,eid,0,i, text.get(i)); - sendCommand(del,seq++); - sendCommand(ins,seq++); + sendCommand(del); + sendCommand(ins); syncCounter++; } } @@ -160,12 +161,12 @@ switch(cmd.cmd) { case REPCMD_INSERT: text.insert(cmd.lineno, cmd.string); - sendCommand(cmd,seq++); + sendCommand(cmd); break; case REPCMD_DELETE: String del = text.delete(cmd.lineno); cmd.setString(del); - sendCommand(cmd,seq++); + sendCommand(cmd); break; case SMCMD_QUIT: /* @@ -174,11 +175,11 @@ * clients simply disconnect from the session manager. */ cmds.clear(); - sendCommand(cmd,seq++); + sendCommand(cmd); break; case SMCMD_JOIN: case SMCMD_PUT: - sendCommand(cmd,seq++); + sendCommand(cmd); prevSeq = seq; /* * To prevent confusion, stop user input until the ack @@ -195,42 +196,48 @@ } - private void sendCommand(REPCommand cmd1,int seq) { + private void sendCommand(REPCommand cmd1) { REPCommand cmd = new REPCommand(cmd1); - cmd.setSEQID(seq); + cmd.setSEQID(seq++); cmd.setEID(eid); cmd.setSID(sid); ns.writeLog(name +" send "+cmd); channel.write(cmd); } + private void forwardCommand(REPCommand cmd1) { + REPCommand cmd = new REPCommand(cmd1); + ns.writeLog(name +" forward "+cmd); + channel.write(cmd); + } + private void handle(REPCommand cmd) { if (cmd==null) return; ns.writeLog(name +": read "+cmd); switch(cmd.cmd) { case REPCMD_INSERT : text.insert(cmd.lineno, cmd.string); - if (cmd.eid==REP.MERGE_EID.id) break; - addNop(); - sendCommand(cmd,cmd.seq); + if (cmd.eid!=REP.MERGE_EID.id) + addNop(); + forwardCommand(cmd); break; case REPCMD_INSERT_ACK : assert(false); break; case REPCMD_DELETE : String del = text.delete(cmd.lineno); - if (cmd.eid==REP.MERGE_EID.id) break; - addNop(); + if (cmd.eid!=REP.MERGE_EID.id) + addNop(); cmd.setString(del); - sendCommand(cmd,cmd.seq); + forwardCommand(cmd); break; case REPCMD_DELETE_ACK : assert(false); break; case REPCMD_NOP : - if (cmd.eid==REP.MERGE_EID.id) break; - addNop(); - sendCommand(cmd,cmd.seq); + if (cmd.eid!=REP.MERGE_EID.id) + addNop(); + forwardCommand(cmd); break; case REPCMD_CLOSE : case REPCMD_CLOSE_2 : assert(false); @@ -247,13 +254,12 @@ inputLock = false; break; case SMCMD_QUIT : - if (false) { + if(true) return; if (cmd.eid!=eid) - sendCommand(cmd,cmd.seq); + forwardCommand(cmd); else sendCommand(new REPCommand(REP.SMCMD_QUIT_2, - sid, eid, seq, 0, ""),seq++); - } + sid, eid, seq, 0, "")); cmds.clear(); break; case SMCMD_QUIT_ACK : @@ -263,7 +269,7 @@ // lock user input during merge (optional) inputLock = hasInputLock; cmd.cmd = REP.SMCMD_START_MERGE_ACK; - sendCommand(cmd,seq++); + sendCommand(cmd); break; case SMCMD_START_MERGE_ACK : assert(false); @@ -273,13 +279,14 @@ break; case SMCMD_QUIT_2 : if (cmd.eid!=eid) { - sendCommand(cmd,cmd.seq); + forwardCommand(cmd); } running = false; break; case SMCMD_SYNC: // start contents sync with newly joined editor - cmd.cmd = REP.SMCMD_SYNC_ACK; sendCommand(cmd,cmd.seq); + cmd.cmd = REP.SMCMD_SYNC_ACK; + forwardCommand(cmd); if (cmd.eid==eid) syncCounter = 1; break; @@ -292,8 +299,7 @@ private void addNop() { if (seq!=prevSeq) return; // We haven't send any command, add nop before retransmition. - REPCommand nop = new REPCommand(REP.REPCMD_NOP, sid, eid, seq, 0, ""); - sendCommand(nop,seq++); + sendCommand(nop); prevSeq = seq; } }