Mercurial > hg > RemoteEditor > REPSessionManager
changeset 316:77f443f6dc9f
add session manager channel handler
author | kono |
---|---|
date | Tue, 07 Oct 2008 18:15:33 +0900 |
parents | 20fb70068089 |
children | c83a3faec487 |
files | rep/Editor.java rep/EditorList.java rep/Forwarder.java rep/RPanel.java rep/SelectButtonEvent.java rep/Session.java rep/SessionList.java rep/SessionManager.java rep/SessionManagerEventListener.java rep/handler/REPEditorHandler.java rep/handler/REPHandlerDoWaiting.java rep/handler/REPHandlerEditorInMerge.java rep/handler/REPHandlerImpl.java rep/handler/REPHandlerInMerge.java rep/handler/REPSessionManagerHandler.java rep/translater/Translater.java rep/translater/TranslaterImp1.java |
diffstat | 17 files changed, 304 insertions(+), 243 deletions(-) [+] |
line wrap: on
line diff
--- a/rep/Editor.java Mon Oct 06 18:58:49 2008 +0900 +++ b/rep/Editor.java Tue Oct 07 18:15:33 2008 +0900 @@ -3,70 +3,64 @@ import java.util.LinkedList; import java.util.List; -import rep.channel.REPLogger; import rep.channel.REPSocketChannel; import rep.optimizers.*; import rep.translater.TranslaterImp1; -public class Editor { - private int eid; // unique id in a session +public class Editor extends Forwarder { private int sid = -1 ; // globally unique session id private int seq = 0; - private REPSocketChannel<REPCommand> myChannel; + + //private Forwarder next; private String host; private String file; private TranslaterImp1 translater; - private List<REPCommand> sentList; - // REPCommands we sent to the next editor - private List<REPCommand> writeQueue; + private List<REPCommand> sentList = new LinkedList<REPCommand>(); // REPCommands we are going to send to the next editor private REPCommandOptimizer optimizer; private REPCommand quit2 = null; - private REPLogger ns = REPLogger.singleton(); - private final int limit=100; + + public Editor(){ - this(true); + this(true,-1); } - public Editor(boolean doOptimize){ + + public Editor(boolean doOptimize,int editorNo){ setHostAndPort(myChannel); + eid = editorNo; translater = new TranslaterImp1(eid); - sentList = new LinkedList<REPCommand>(); - writeQueue = new LinkedList<REPCommand>(); - if (doOptimize) optimizer = new DeleteInsertOptimizer(); //タカノがつくったおぷてぃまいざ else optimizer = new NullOptimizer(); //なにもしないけどOptimizer. } public Editor(int editorNo, REPSocketChannel<REPCommand> channel){ - this.eid = editorNo; + this(false,editorNo); this.myChannel = channel; - translater = new TranslaterImp1(eid); - sentList = new LinkedList<REPCommand>(); - writeQueue = new LinkedList<REPCommand>(); setHostAndPort(myChannel); } public Editor(REPSocketChannel<REPCommand> channel) { + this(false,-1); this.myChannel = channel; setHostAndPort(myChannel); - translater = new TranslaterImp1(eid); - sentList = new LinkedList<REPCommand>(); - writeQueue = new LinkedList<REPCommand>(); } enum TranslatorResult { START_MERGE, NEW_COMMAND, MERGE_RETURN, MERGE_AGAIN, INCOMMING_COMMAND, MERGE_END } - public TranslatorResult translate(Editor nextEditor, REPCommand command){ + public TranslatorResult translate(Forwarder nextEditor, REPCommand command){ if(command.eid == nextEditor.getEID()){ if(checkReturnedCommand(command)){ - //エディタからのコマンドが元のエディタに戻ってきた + // エディタからのコマンドが元のエディタに戻ってきた // START_MERGE を送る REPCommand cmd = new REPCommand(REP.SMCMD_START_MERGE,command.sid,REP.SM_EID.id,seq(),0,""); nextEditor.send(cmd); + // Session Manager 側で、このeditorへの他のeditorからの + // 入力を止めて、merge にそなえる。merge は、eidtor 側から + // ACKが来てから始まる。 return TranslatorResult.START_MERGE; } else assert(false); } else if(command.eid == eid){ @@ -116,25 +110,7 @@ } - public boolean doWaitingWrite() { - // 一気に送ると、向こう側(Editor)で、dead lock する可能性がある。 - // select loop の中で一つ一つ送るしかない。Editor側から割り込まれる可能性も - // ある。その時に複数のコマンドを送っていると、どこに割り込まれたかを判断する - // ことが出来ない。そこで、一つ一つReturnを確認する必要がある。つまり、 - // select loop で送るしかない。 - REPCommand cmd; - if (writeQueue.size()>0) { - cmd = new REPCommand(writeQueue.remove(0)); - ns.writeLog("SessionManager write to "+myChannel+" cmd="+cmd); - myChannel.write(cmd); - return true; - } else if (quit2!=null && sentList.size()==0) { - myChannel.write(quit2); - quit2 = null; - return true; - } - return false; - } + public void setQuit2(REPCommand cmd) { quit2 = cmd; @@ -145,10 +121,6 @@ } - public REPSocketChannel<REPCommand> getChannel() { - return myChannel; - } - public void setHost(String host){ this.host = host; } @@ -159,14 +131,13 @@ } - public int getEID() { - return eid; - } + @Override public void setEID(int eid) { this.eid = eid; translater.setEid(eid); } + public String toString(){ return ("Editor eid="+eid+" sid="+sid+" " + host + ":" + file); } @@ -179,14 +150,6 @@ file = string; } - public void send(REPCommand command) { - writeQueue.add(command); - assert(writeQueue.size()<limit); - } - - public void setChannel(REPSocketChannel<REPCommand> channel) { - myChannel = channel; - } public boolean isMerging() { return translater.isMerging(); @@ -200,6 +163,17 @@ public int seq() { return seq++; } + + @Override + public boolean doWaitingWrite() { + if(super.doWaitingWrite()) return true; + if (quit2!=null && sentList.size()==0) { + myChannel.write(quit2); + quit2 = null; + return true; + } + return false; + } /** * Sent optimized merged command list
--- a/rep/EditorList.java Mon Oct 06 18:58:49 2008 +0900 +++ b/rep/EditorList.java Tue Oct 07 18:15:33 2008 +0900 @@ -10,8 +10,8 @@ private LinkedList<Editor> editorList = new LinkedList<Editor>(); public void sendJoinAck(REPCommand repCmd) { - Editor editor = null; - for(Editor editor2 : editorList){ + Forwarder editor = null; + for(Forwarder editor2 : editorList){ error(String.valueOf(editor2.getEID()), String.valueOf(repCmd.eid)); if(editor2.getEID() == repCmd.eid){ editor = editor2; @@ -41,7 +41,7 @@ } public void setEID(REPCommand repCmd) { - for(Editor editor : editorList){ + for(Forwarder editor : editorList){ if(editor.getEID() == 0){ editor.setEID(repCmd.eid); break; @@ -80,7 +80,7 @@ } - public Editor getEditor(String hostport) { + public Forwarder getEditor(String hostport) { // TODO Auto-generated method stub for(Editor editor : editorList){ String[] splited = hostport.split(":"); @@ -105,10 +105,10 @@ } - public Editor getEditor(REPSocketChannel<REPCommand> channel) { + public Forwarder getEditor(REPSocketChannel<REPCommand> channel) { // TODO Auto-generated method stub - Editor editor1 = null; - for(Editor editor: editorList){ + Forwarder editor1 = null; + for(Forwarder editor: editorList){ if(channel == editor.getChannel()){ editor1 = editor; }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rep/Forwarder.java Tue Oct 07 18:15:33 2008 +0900 @@ -0,0 +1,52 @@ +package rep; + +import java.util.LinkedList; + +import rep.channel.REPLogger; +import rep.channel.REPSocketChannel; + +public class Forwarder { + int eid; // unique id in a session + REPSocketChannel<REPCommand> myChannel; + // REPCommands we sent to the next editor + LinkedList<REPCommand> writeQueue = new LinkedList<REPCommand>(); + final int limit=100; + REPLogger ns = REPLogger.singleton(); + + public int getEID() { + return eid; + } + + public void setEID(int eid) { + this.eid = eid; + } + + public void send(REPCommand command) { + writeQueue.add(command); + assert(writeQueue.size()<limit); + } + + public REPSocketChannel<REPCommand> getChannel() { + return myChannel; + } + + public void setChannel(REPSocketChannel<REPCommand> channel) { + myChannel = channel; + } + + public boolean doWaitingWrite() { + // 一気に送ると、向こう側(Editor)で、dead lock する可能性がある。 + // select loop の中で一つ一つ送るしかない。Editor側から割り込まれる可能性も + // ある。その時に複数のコマンドを送っていると、どこに割り込まれたかを判断する + // ことが出来ない。そこで、一つ一つReturnを確認する必要がある。つまり、 + // select loop で送るしかない。 + REPCommand cmd; + if (writeQueue.size()>0) { + cmd = new REPCommand(writeQueue.remove(0)); + ns.writeLog("SessionManager write to "+myChannel+" cmd="+cmd); + myChannel.write(cmd); + return true; + } + return false; + } +} \ No newline at end of file
--- a/rep/RPanel.java Mon Oct 06 18:58:49 2008 +0900 +++ b/rep/RPanel.java Tue Oct 07 18:15:33 2008 +0900 @@ -153,7 +153,7 @@ protected void setTableEditor(LinkedList<Editor> list) { e_tableModel.setRowCount(0); editorList = list; - for(Editor editor : list){ + for(Forwarder editor : list){ setTableEditor(editor.getEID(), editor.getChannel()); } }
--- a/rep/SelectButtonEvent.java Mon Oct 06 18:58:49 2008 +0900 +++ b/rep/SelectButtonEvent.java Tue Oct 07 18:15:33 2008 +0900 @@ -1,5 +1,7 @@ package rep; +import java.io.IOException; + import rep.channel.REPSocketChannel; public class SelectButtonEvent implements SessionManagerEvent{ @@ -9,7 +11,7 @@ private int eid; private SessionManager manager; - public SelectButtonEvent(Editor editor, Session session, SessionManagerEventListener listener) { + public SelectButtonEvent(Forwarder editor, Session session, SessionManagerEventListener listener) { this.editorChannel = editor.getChannel(); this.eid = editor.getEID(); this.sid = session.getSID(); @@ -29,7 +31,10 @@ } public void exec() { - manager.selectSession(this); + try { + manager.selectSession(this); + } catch (IOException e) { + } } }
--- a/rep/Session.java Mon Oct 06 18:58:49 2008 +0900 +++ b/rep/Session.java Tue Oct 07 18:15:33 2008 +0900 @@ -5,7 +5,8 @@ import rep.channel.REPSocketChannel; public class Session { - private Editor masterEditor; + private Forwarder masterEditor; + private Forwarder next; private int sessionID; private String sessionName; private LinkedList<Editor> editorList = new LinkedList<Editor>(); @@ -39,7 +40,7 @@ public int getSID() { return sessionID; } - public Editor getOwner() { + public Forwarder getOwner() { return masterEditor; } public String getName() { @@ -56,7 +57,7 @@ boolean flag = false; for (Iterator<Editor> it = editorList.iterator();it.hasNext(); ) { - Editor e = it.next(); + Forwarder e = it.next(); if (e.getChannel()==channel) { it.remove(); // to avoid concurrent modification flag = true; @@ -76,7 +77,7 @@ isOwner = true; } public void sendToEditor(REPCommand repCmd) { - for(Editor editor : editorList){ + for(Forwarder editor : editorList){ editor.getChannel().write(repCmd); } } @@ -88,28 +89,43 @@ } - Editor getNextEditor(Editor editor) { + Forwarder getNextEditor(Forwarder editor) { int eid = editor.getEID(); int neid = (eid+1)%editorList.size(); - Editor nextEditor = editorList.get(neid); + Forwarder nextEditor = editorList.get(neid); return nextEditor; } - public Editor getPrevEditor(Editor editor) { + public Forwarder getPrevEditor(Forwarder editor) { int eid = editor.getEID(); int peid = (eid + editorList.size() - 1)%editorList.size(); - Editor prevEditor = editorList.get(peid); + Forwarder prevEditor = editorList.get(peid); return prevEditor; } public void closeSession() { REPCommand command = new REPCommand(); command.setCMD(REP.REPCMD_CLOSE); - for(Editor editor : editorList){ + for(Forwarder editor : editorList){ command.setEID(editor.getEID()); editor.send(command); } } public void sendToNextEditor(REPSocketChannel<REPCommand>channel,REPCommand command) { - Editor next = getNextEditor(getEditor(channel)); + Forwarder next = getNextEditor(getEditor(channel)); next.send(command); } + + public void setForwarder(Forwarder next) { + this.next = next; + } + + /** + * Dispatch REPCommand in this session + * @param eid + * @return + */ + public Forwarder getForwarder(int eid) { + Forwarder f = editorList.get(eid); + if (f==null) return next; + return f; + } }
--- a/rep/SessionList.java Mon Oct 06 18:58:49 2008 +0900 +++ b/rep/SessionList.java Tue Oct 07 18:15:33 2008 +0900 @@ -67,7 +67,7 @@ public void sendCmd(REPSocketChannel<REPCommand> channel2, REPCommand repCmd) { // LinkedList <Editor> editorList = session3.get(repCmd.sid).getEditorList(); LinkedList <Editor> editorList = sessionLinkedList.get(repCmd.sid).getEditorList(); //ここもforループで検索しないといけないよ。 - for(Editor editor : editorList){ + for(Forwarder editor : editorList){ REPSocketChannel<REPCommand> channel = editor.getChannel(); if(channel.equals(channel2)) { System.out.println("equals"); @@ -80,7 +80,7 @@ public void sendCmd(REPSocketChannel<REPCommand> channel, REPCommand repCmd, boolean ring){ // LinkedList <Editor> editorList = session3.get(repCmd.sid).getEditorList(); LinkedList <Editor> editorList = sessionLinkedList.get(repCmd.sid).getEditorList(); //ここもforループで検索しないといけないよ。 - for(Editor editor : editorList){ + for(Forwarder editor : editorList){ REPSocketChannel<REPCommand> channel2 = editor.getChannel(); if(channel.equals(channel2)){ System.out.println("equals");
--- a/rep/SessionManager.java Mon Oct 06 18:58:49 2008 +0900 +++ b/rep/SessionManager.java Tue Oct 07 18:15:33 2008 +0900 @@ -17,8 +17,9 @@ import rep.channel.REPSocketChannel; import rep.handler.PacketSet; import rep.handler.REPHandler; -import rep.handler.REPHandlerImpl; -import rep.handler.REPHandlerInMerge; +import rep.handler.REPEditorHandler; +import rep.handler.REPHandlerEditorInMerge; +import rep.handler.REPSessionManagerHandler; import rep.channel.REPSelector; import rep.xml.SessionXMLDecoder; import rep.xml.SessionXMLEncoder; @@ -52,14 +53,31 @@ // editorList は、sessionList に入っているeditorとは別なeditorのlistらしい。 private String maxHost; private List<PacketSet> waitingCommandInMerge; - REPHandler normalHandler = new REPHandlerImpl(this); - REPHandler handlerInMerge =new REPHandlerInMerge(this); + REPHandler normalHandler = new REPEditorHandler(this); + REPHandler handlerInMerge =new REPHandlerEditorInMerge(this); private BlockingQueue<SessionManagerEvent> waitingEventQueue = new LinkedBlockingQueue<SessionManagerEvent>();; private String myHost; private static int receive_port; private static int parent_port; static final int DEFAULT_PORT = 8766; - + + public static void main(String[] args) throws InterruptedException, IOException { + + int port = DEFAULT_PORT; + int port_s = DEFAULT_PORT; + //System.setProperty("file.encoding", "UTF-8"); + if(args.length > 0){ + port = Integer.parseInt(args[0]); + port_s = Integer.parseInt(args[1]); + } + receive_port = port; + parent_port = port_s; + SessionManager sm = new SessionManager(); + sm.init(port,new SessionManagerGUIimpl(sm)); + + + } + public void openSelector() throws IOException{ selector = REPSelector.<REPCommand>create(); @@ -121,7 +139,7 @@ private boolean checkWaitingWrite() throws IOException { for(Session s:sessionList) { - for(Editor editor: s.getEditorList()) + for(Forwarder editor: s.getEditorList()) if (editor.doWaitingWrite()) return true; } return false; @@ -184,11 +202,26 @@ public void manage(REPSocketChannel<REPCommand> channel, REPCommand receivedCommand) throws IOException { if(receivedCommand == null) return; //Session session; - REPCommand sendCommand = new REPCommand(receivedCommand); REPSocketChannel<REPCommand> send = channel; switch(receivedCommand.cmd){ + case REPCMD_DELETE: + case REPCMD_INSERT: + case REPCMD_NOP: + { + // sid から Session を取得 + Session session = getSession(receivedCommand.sid); + if (session==null) throw new IOException(); + // 次のエディタへコマンドを送信する処理 + Editor editor = session.getEditor(channel); + if (editor.isMerging()) { + addWaitingCommand(new PacketSet(channel, editor, receivedCommand)); + break; + } + editor.translate(session.getNextEditor(editor), receivedCommand); + break; + } case SMCMD_JOIN: { //どのSessionにも属さないエディタをリストに追加 @@ -225,6 +258,7 @@ updateGUI(); //エディタにAckを送信 + REPCommand sendCommand = new REPCommand(receivedCommand); sendCommand.setCMD(REP.SMCMD_PUT_ACK); sendCommand.setEID(editor.getEID()); sendCommand.setSID(session.getSID()); @@ -252,12 +286,13 @@ if(session.hasOwner()){ //このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す + REPCommand sendCommand = new REPCommand(receivedCommand); sendCommand.setCMD(REP.SMCMD_SELECT_ACK); sendCommand.setEID(editor.getEID()); editor.send(sendCommand); }else{ //オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する - Editor owner = session.getOwner(); + Forwarder owner = session.getOwner(); owner.send(receivedCommand); } } @@ -267,7 +302,7 @@ case SMCMD_SELECT_ACK: { String hostport = receivedCommand.string; - Editor editor = getEditor(hostport); + Forwarder editor = getEditor(hostport); if(editor != null) { //host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する @@ -288,6 +323,11 @@ case SMCMD_SM_JOIN: { + // このchannelの相手は、SessionManager なので、 + // 特別なhandlerを接続する必要がある + channel.register(selector, SelectionKey.OP_READ, + new REPSessionManagerHandler(this)); + //SessionManagerのリストへ追加 smList.add(channel); @@ -301,7 +341,7 @@ //maxHost を設定。 if(setMaxHost(channel, receivedSessionList.getMaxHost())){ - sendCommand = new REPCommand(); + REPCommand sendCommand = new REPCommand(); sendCommand.setCMD(REP.SMCMD_CH_MASTER); sendCommand.setString(maxHost); smList.sendExcept(channel, sendCommand); @@ -310,7 +350,7 @@ //SessionListからXMLを生成。 //joinしてきたSessionManagerに対してACKを送信。 SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList); - sendCommand = new REPCommand(); + REPCommand sendCommand = new REPCommand(); sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK); sendCommand.setString(sessionlistEncoder.sessionListToXML()); send.write(sendCommand); @@ -332,7 +372,7 @@ //maxHostを決定。 if(setMaxHost(channel, receivedSessionList2.getMaxHost())){ - sendCommand = new REPCommand(); + REPCommand sendCommand = new REPCommand(); sendCommand.setCMD(REP.SMCMD_CH_MASTER); sendCommand.setString(maxHost); smList.sendExcept(channel, sendCommand); @@ -390,7 +430,7 @@ { //maxHost を設定。 if(setMaxHost(channel, receivedCommand.string)){ - sendCommand = new REPCommand(); + REPCommand sendCommand = new REPCommand(); sendCommand.setCMD(REP.SMCMD_CH_MASTER); sendCommand.setString(maxHost); smList.sendExcept(channel, sendCommand); @@ -398,27 +438,6 @@ } break; - case REPCMD_DELETE: - case REPCMD_INSERT: - case REPCMD_NOP: - { - // sid から Session を取得 - Session session = getSession(receivedCommand.sid); - if (session==null) throw new IOException(); - // 次のエディタへコマンドを送信する処理 - Editor editor = session.getEditor(channel); - Editor.TranslatorResult r = editor.translate(session.getNextEditor(editor), receivedCommand); - if(r==Editor.TranslatorResult.MERGE_END) { - endMerge(receivedCommand, session, editor); - } else if (r==Editor.TranslatorResult.START_MERGE) { - // マージ中のエディタはコマンドを受け取らない - // ここで止めることによって、SMCMD_START_MERGE_ACK - // が来た時には、editor.writeQueue はemptyになる - Editor prevEditor = session.getPrevEditor(editor); - setMergeState(prevEditor.getChannel(), session.getSID()); - } - break; - } case SMCMD_START_MERGE_ACK: { // sid から Session を取得 @@ -458,8 +477,6 @@ Editor editor) { REPCommand mergeEnd = new REPCommand(REP.SMCMD_END_MERGE,receivedCommand.sid,editor.getEID(),editor.seq(),0,""); editor.send(mergeEnd); - Editor prevEditor = session.getPrevEditor(editor); - setNormalState(prevEditor.getChannel(), session.getSID()); } private void updateGUI() { @@ -471,17 +488,7 @@ gui.invokeLater(doRun); } - private void setNormalState(REPSocketChannel<REPCommand> channel, int sid) { - SelectionKey key = channel.keyFor(selector); - key.attach(normalHandler); - } - - private void setMergeState(REPSocketChannel<REPCommand> channel, int sid) { - SelectionKey key = channel.keyFor(selector); - key.attach(handlerInMerge); - } - - private Editor getEditor(String hostport) { + private Forwarder getEditor(String hostport) { for(Editor editor : editorList){ if(editor.getHost() == hostport){ return editor; @@ -490,11 +497,11 @@ return null; } - public Session getSession(int sid) { + public Session getSession(int sid) throws IOException { for(Session session : sessionList){ if(session.getSID() == sid) return session; } - return null; + throw new IOException(); } private boolean setMaxHost(REPSocketChannel<REPCommand> channel, String maxHost2) { @@ -520,23 +527,6 @@ } } - public static void main(String[] args) throws InterruptedException, IOException { - - int port = DEFAULT_PORT; - int port_s = DEFAULT_PORT; - //System.setProperty("file.encoding", "UTF-8"); - if(args.length > 0){ - port = Integer.parseInt(args[0]); - port_s = Integer.parseInt(args[1]); - } - receive_port = port; - parent_port = port_s; - SessionManager sm = new SessionManager(); - sm.init(port,new SessionManagerGUIimpl(sm)); - - - } - public void connectSession(String host) { int port = DEFAULT_PORT; port = parent_port; @@ -584,7 +574,7 @@ return host; } - public void selectSession(SelectButtonEvent event) { + public void selectSession(SelectButtonEvent event) throws IOException { REPSocketChannel<REPCommand> channel = event.getEditorChannel(); int sid = event.getSID(); int eid = event.getEID(); @@ -613,7 +603,7 @@ session = getSession(sid); session.addEditor(editor); - Editor owner = session.getOwner(); + Forwarder owner = session.getOwner(); REPCommand command = new REPCommand(); command.setCMD(REP.SMCMD_SELECT);
--- a/rep/SessionManagerEventListener.java Mon Oct 06 18:58:49 2008 +0900 +++ b/rep/SessionManagerEventListener.java Tue Oct 07 18:15:33 2008 +0900 @@ -1,10 +1,12 @@ package rep; +import java.io.IOException; + //implemented class is SessionManager. public interface SessionManagerEventListener { public void buttonPressed(SessionManagerEvent event); - public void selectSession(SelectButtonEvent selectButtonEvent); + public void selectSession(SelectButtonEvent selectButtonEvent) throws IOException; public void closeSession(SessionManagerEvent event); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rep/handler/REPEditorHandler.java Tue Oct 07 18:15:33 2008 +0900 @@ -0,0 +1,32 @@ +package rep.handler; + +import java.io.IOException; + +import rep.REPCommand; +import rep.SessionManager; +import rep.channel.REPSelectionKey; +import rep.channel.REPSocketChannel; + +public class REPEditorHandler implements REPHandler { + + private SessionManager manager; + + + public REPEditorHandler(SessionManager manager) { + this.manager = manager; + } + + public void handle(REPSelectionKey<REPCommand> key) throws IOException { + REPSocketChannel<REPCommand> channel = key.channel1(); + REPCommand command = channel.read(); + System.out.println("REPHandlerImpl.handle() read : command = " + command +" from "+channel); + + manager.manage(channel, command); + } + + public void cancel(REPSocketChannel<REPCommand> socketChannel) { + manager.remove(socketChannel); + } + + +}
--- a/rep/handler/REPHandlerDoWaiting.java Mon Oct 06 18:58:49 2008 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,24 +0,0 @@ -package rep.handler; - -import java.io.IOException; -import rep.REPCommand; -import rep.SessionManager; -import rep.channel.REPSelectionKey; -import rep.channel.REPSocketChannel; - -public class REPHandlerDoWaiting implements REPHandler { - SessionManager manager; - - public REPHandlerDoWaiting(SessionManager manager) { - this.manager = manager; - } - - public void handle(REPSelectionKey<REPCommand> key) throws IOException { - - } - - public void cancel(REPSocketChannel<REPCommand> socketChannel) { - manager.remove(socketChannel); - } - -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rep/handler/REPHandlerEditorInMerge.java Tue Oct 07 18:15:33 2008 +0900 @@ -0,0 +1,36 @@ +package rep.handler; + +import java.io.IOException; +import rep.Editor; +import rep.REPCommand; +import rep.Session; +import rep.SessionManager; +import rep.channel.REPSelectionKey; +import rep.channel.REPSocketChannel; + +public class REPHandlerEditorInMerge implements REPHandler { + + private SessionManager manager; + + public REPHandlerEditorInMerge(SessionManager manager) { + this.manager = manager; + } + + + public void handle(REPSelectionKey<REPCommand> key) throws IOException { + //マージ中のエディタの前のエディタのコマンドをWaitingListに追加する + REPSocketChannel<REPCommand> channel = key.channel1(); + REPCommand command = channel.read(); + System.out.println("REPHandlerImpl.handle() : command = " + command); + // if (manager.isMerging(command.sid()))... + // 同じchannelで、merge中のsessionは一つは限らない。 + // なので、sid をinstanceで持つのではだめ。 + Session s = manager.getSession(command.sid); + Editor editor = s.getEditor(channel); + manager.addWaitingCommand(new PacketSet(channel, editor, command)); + } + + public void cancel(REPSocketChannel<REPCommand> socketChannel) { + manager.remove(socketChannel); + } +}
--- a/rep/handler/REPHandlerImpl.java Mon Oct 06 18:58:49 2008 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,31 +0,0 @@ -package rep.handler; - -import java.io.IOException; -import rep.REPCommand; -import rep.SessionManager; -import rep.channel.REPSelectionKey; -import rep.channel.REPSocketChannel; - -public class REPHandlerImpl implements REPHandler { - - private SessionManager manager; - - - public REPHandlerImpl(SessionManager manager) { - this.manager = manager; - } - - public void handle(REPSelectionKey<REPCommand> key) throws IOException { - REPSocketChannel<REPCommand> channel = key.channel1(); - REPCommand command = channel.read(); - System.out.println("REPHandlerImpl.handle() read : command = " + command +" from "+channel); - - manager.manage(channel, command); - } - - public void cancel(REPSocketChannel<REPCommand> socketChannel) { - manager.remove(socketChannel); - } - - -}
--- a/rep/handler/REPHandlerInMerge.java Mon Oct 06 18:58:49 2008 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,36 +0,0 @@ -package rep.handler; - -import java.io.IOException; -import rep.Editor; -import rep.REPCommand; -import rep.Session; -import rep.SessionManager; -import rep.channel.REPSelectionKey; -import rep.channel.REPSocketChannel; - -public class REPHandlerInMerge implements REPHandler { - - private SessionManager manager; - - public REPHandlerInMerge(SessionManager manager) { - this.manager = manager; - } - - - public void handle(REPSelectionKey<REPCommand> key) throws IOException { - //マージ中のエディタの前のエディタのコマンドをWaitingListに追加する - REPSocketChannel<REPCommand> channel = key.channel1(); - REPCommand command = channel.read(); - System.out.println("REPHandlerImpl.handle() : command = " + command); - // if (manager.isMerging(command.sid()))... - // 同じchannelで、merge中のsessionは一つは限らない。 - // なので、sid をinstanceで持つのではだめ。 - Session s = manager.getSession(command.sid); - Editor editor = s.getEditor(channel); - manager.addWaitingCommand(new PacketSet(channel, editor, command)); - } - - public void cancel(REPSocketChannel<REPCommand> socketChannel) { - manager.remove(socketChannel); - } -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rep/handler/REPSessionManagerHandler.java Tue Oct 07 18:15:33 2008 +0900 @@ -0,0 +1,42 @@ +package rep.handler; + +import java.io.IOException; + +import rep.Forwarder; +import rep.REPCommand; +import rep.Session; +import rep.SessionManager; +import rep.channel.REPSelectionKey; +import rep.channel.REPSocketChannel; + +public class REPSessionManagerHandler implements REPHandler { + + private SessionManager manager; + + public REPSessionManagerHandler(SessionManager manager) { + this.manager = manager; + } + + public void cancel(REPSocketChannel<REPCommand> socketChannel) { + manager.remove(socketChannel); + } + + public void handle(REPSelectionKey<REPCommand> key) throws IOException { + /* + * SessionManagerから来たコマンドは、Editor関係のコマンドは、 + * sessionとeidを判定して、そのeditorにforwardしてやれば良い。 + * 残りは、manager.manage() で処理する。 + */ + REPSocketChannel<REPCommand> channel = key.channel1(); + REPCommand command = channel.read(); + System.out.println("REPHandlerImpl.handle() : command = " + command); + Session s = manager.getSession(command.sid); + Forwarder next = s.getForwarder(command.eid); + if (next!=null) { + next.send(command); + } else { + manager.manage(channel, command); + } + } + +}
--- a/rep/translater/Translater.java Mon Oct 06 18:58:49 2008 +0900 +++ b/rep/translater/Translater.java Tue Oct 07 18:15:33 2008 +0900 @@ -1,6 +1,7 @@ package rep.translater; import rep.Editor; +import rep.Forwarder; import rep.REPCommand; public interface Translater { @@ -24,7 +25,7 @@ * @param cmd the command to be translated. * @return translated command. */ - abstract public void transReceiveCmd(Editor nextEditor,REPCommand cmd); + abstract public void transReceiveCmd(Forwarder nextEditor,REPCommand cmd); /** * set the editor's id.
--- a/rep/translater/TranslaterImp1.java Mon Oct 06 18:58:49 2008 +0900 +++ b/rep/translater/TranslaterImp1.java Tue Oct 07 18:15:33 2008 +0900 @@ -7,12 +7,12 @@ import java.util.TreeSet; import rep.Editor; +import rep.Forwarder; import rep.REPCommand; import rep.REP; public class TranslaterImp1 implements Translater{ public int eid; - private int seq; /* * queue が5つもいるって、あまりに馬鹿げてる。 */ @@ -49,7 +49,9 @@ return cmd; } /** - * My command is returned from the session ring. + * My command is returned from the session ring. At this + * stage writeQueue is empty, our editor is waiting for me. + * Start merge process. * @param cmd */ public boolean catchOwnCommand(Editor editor){ @@ -140,7 +142,7 @@ * @param cmd the command to be translated. * @return translated commannd. */ - public void transReceiveCmd(Editor nextEditor,REPCommand cmd){ + public void transReceiveCmd(Forwarder nextEditor,REPCommand cmd){ assert (cmd.eid != eid); // nop command の挿入は Editor 側で行って、こちら側ではやらない unMergedCmds.push(cmd);