Mercurial > hg > RemoteEditor > REPSessionManager
changeset 349:ef4afcae0c92
now Forwarder associates each client channel.
author | kono |
---|---|
date | Thu, 16 Oct 2008 01:12:34 +0900 |
parents | 1bc132a6b879 |
children | 59ef23ee73ad |
files | rep/Editor.java rep/EditorPlus.java rep/Forwarder.java rep/SessionManager.java rep/SessionManagerList.java rep/channel/SelectorSimulator.java rep/handler/REPEditorHandler.java rep/handler/REPSessionManagerHandler.java |
diffstat | 8 files changed, 197 insertions(+), 222 deletions(-) [+] |
line wrap: on
line diff
--- a/rep/Editor.java Tue Oct 14 23:03:29 2008 +0900 +++ b/rep/Editor.java Thu Oct 16 01:12:34 2008 +0900 @@ -2,6 +2,8 @@ import java.util.LinkedList; import java.util.List; + +import rep.channel.REPSelectionKey; import rep.channel.REPSocketChannel; import rep.handler.PacketSet; import rep.optimizers.*; @@ -34,10 +36,8 @@ } public void translate(REPCommand command){ - if(command.eid == next.getEID()){ - if(next==this) return; // singleton case - ((Editor) next).checkReturnedCommand(command); - } else if(command.eid == eid){ + + if(command.eid == eid){ //エディタからの新たな編集コマンド translator.transSendCmd(command); sentList.add(new REPCommand(command)); @@ -51,8 +51,18 @@ translator.getMergeAgain(this); } endMerge(); - }else{ + }else if(command.eid == next.getEID()){ + // 次のEditorで一周するコマンドが来た + if(next==this) return; // singleton case + ((Editor) next).checkReturnedCommand(command); + } else { //他のエディタからの編集コマンド + assert (command.eid!=REP.MERGE_EID.id && command.eid!=eid ); + if (manager.hasWaitingCommand(channel)) { + // We cannot do this operation before watingCommandQueue. + manager.addWaitingCommand(new PacketSet(channel, this, command)); + return; + } if(!isMerging()) { translator.transReceiveCmd(next,command); return; @@ -129,13 +139,6 @@ @Override public boolean manage(REPCommand receivedCommand) { - if (receivedCommand.eid!=REP.MERGE_EID.id && receivedCommand.eid!=eid ) { - if (manager.hasWaitingCommand(channel)) { - // We cannot do this operation before watingCommandQueue. - manager.addWaitingCommand(new PacketSet(channel, this, receivedCommand)); - return true; - } - } switch(receivedCommand.cmd){ // Editor Command @@ -187,5 +190,18 @@ return true; } - + + @Override + public void handle(REPSelectionKey<REPCommand> key) throws Exception { + REPSocketChannel<REPCommand> channel = key.channel1(); + REPCommand command = channel.read(); + SessionManager.logger.writeLog("REPHandlerImpl.handle() read : command = " + command +" from "+channel); + if (manager.manage(this, command)) return; + manage(command); + } + + @Override + public void cancel(REPSocketChannel<REPCommand> socketChannel) { + manager.remove(socketChannel); + } }
--- a/rep/EditorPlus.java Tue Oct 14 23:03:29 2008 +0900 +++ b/rep/EditorPlus.java Thu Oct 16 01:12:34 2008 +0900 @@ -53,7 +53,8 @@ } public void setHost(String host){ - this.host = host; + if (channel!=null) + this.host = host; }
--- a/rep/Forwarder.java Tue Oct 14 23:03:29 2008 +0900 +++ b/rep/Forwarder.java Thu Oct 16 01:12:34 2008 +0900 @@ -1,10 +1,14 @@ package rep; +import java.io.IOException; + import rep.channel.REPLogger; +import rep.channel.REPSelectionKey; import rep.channel.REPSocketChannel; import rep.handler.PacketSet; +import rep.handler.REPHandler; -public class Forwarder extends EditorPlus { +public class Forwarder extends EditorPlus implements REPHandler { int seq = 0; Forwarder next; // REPCommands we sent to the next editor @@ -51,4 +55,33 @@ return true; } + public String toString(){ + return ("Forwarder:" + channel); + } + + public String getLocalHostName() { + return channel.getLocalHostName(); + } + + public void cancel(REPSocketChannel<REPCommand> socketChannel) { + manager.remove(socketChannel); + } + + public void handle(REPSelectionKey<REPCommand> key) throws Exception { + /* + * SessionManagerから来たコマンドは、Editor関係のコマンドは、 + * sessionとeidを判定して、そのeditorにforwardしてやれば良い。 + * 残りは、manager.manage() で処理する。 + */ + REPSocketChannel<REPCommand> channel = key.channel1(); + REPCommand command = channel.read(); + SessionManager.logger.writeLog("REPHandlerImpl.handle() : command = " + command); + if (manager.manage(this, command)) return; + Session s = manager.getSession(command.sid); + Forwarder editor = s.getFirstForwarder(); + if (editor!=null) { + editor.manage(command); + } else throw new IOException(); + } + } \ No newline at end of file
--- a/rep/SessionManager.java Tue Oct 14 23:03:29 2008 +0900 +++ b/rep/SessionManager.java Thu Oct 16 01:12:34 2008 +0900 @@ -19,8 +19,6 @@ import rep.channel.REPSocketChannel; import rep.handler.PacketSet; import rep.handler.REPHandler; -import rep.handler.REPEditorHandler; -import rep.handler.REPSessionManagerHandler; import rep.channel.REPSelector; import rep.xml.SessionXMLDecoder; import rep.xml.SessionXMLEncoder; @@ -53,9 +51,8 @@ SessionManagerList smList; List<Editor> editorList; // editorList は、sessionList に入っているeditorとは別なeditorのlistらしい。 - private String maxHost; + // private String maxHost; private List<PacketSet> waitingCommandInMerge; - REPHandler normalHandler = new REPEditorHandler(this); private BlockingQueue<SessionManagerEvent> waitingEventQueue = new LinkedBlockingQueue<SessionManagerEvent>();; String myHost; private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); @@ -65,6 +62,8 @@ private static final int packetLimit = 200; private SessionXMLDecoder decoder = new SessionXMLDecoder(); + private boolean smjoin_mode; + public static void main(String[] args) throws InterruptedException, IOException { int port = DEFAULT_PORT; @@ -107,7 +106,8 @@ ssc.socket().setReuseAddress(true); //getAllByNameで取れた全てのアドレスに対してbindする ssc.socket().bind(new InetSocketAddress(port)); - ssc.register(selector, SelectionKey.OP_ACCEPT, normalHandler); + ssc.register(selector, SelectionKey.OP_ACCEPT, + new Forwarder(this)); sessionList = new LinkedList<Session>(); smList = new SessionManagerList(); @@ -169,8 +169,9 @@ waitingCommandInMerge.add(p); } else { try { - manage(p.channel, p.command); + manage(e, p.command); } catch (Exception e1) { + // should be e.close()? close(p.channel); } } @@ -202,7 +203,7 @@ if(key.isAcceptable()){ REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); - registerChannel (channel, SelectionKey.OP_READ); + registerChannel (channel, SelectionKey.OP_READ,new Forwarder(this)); channel = null; }else if(key.isReadable()){ @@ -217,24 +218,17 @@ } } - private void registerChannel(REPSocketChannel<REPCommand> channel, int ops) throws IOException { + private void registerChannel(REPSocketChannel<REPCommand> channel, int ops,Forwarder handler) throws IOException { if(channel == null) { return; } + handler.setChannel(channel); channel.configureBlocking(false); - REPHandler handler = normalHandler; channel.register(selector, ops, handler); } - public void manage(REPSocketChannel<REPCommand> channel, REPCommand receivedCommand) throws IOException, SAXException { - if (sessionManagerCommand(channel, receivedCommand)) return; - Session s = getSession(receivedCommand.sid); - Editor e = s.getEditor(channel); - e.manage(receivedCommand); - } - - private boolean sessionManagerCommand(REPSocketChannel<REPCommand> channel, + boolean manage(Forwarder channel, REPCommand receivedCommand) throws ClosedChannelException, IOException, SAXException { switch(receivedCommand.cmd){ @@ -247,7 +241,8 @@ //エディタとchannelは1対1 (ではない) //エディタが新しくputする場合は新しくソケットを作る // ここのeditorList はsessionのとは別物 - Editor editor1 = new Editor(this,-1,channel); + Editor editor1 = new Editor(this,-1,channel.channel); + registerChannel(channel.channel,SelectionKey.OP_READ,editor1); editor1.setHost(myHost); editorList.add(editor1); @@ -268,7 +263,8 @@ // 自分の親に作ってもらう必要がある。自分が親なら自分で作って良い。 int sid = sessionList.size(); - Editor editor = new Editor(this,0, channel); + Editor editor = new Editor(this,0, channel.channel); + registerChannel(channel.channel,SelectionKey.OP_READ,editor); editorList.add(editor); editor.setHost(myHost); Session session = new Session(sid, receivedCommand.string, editor); @@ -291,7 +287,7 @@ command.setSID(session.getSID()); command.setString(sessionEncoder.sessionListToXML()); command.setCMD(REP.SMCMD_UPDATE); - smList.sendExcept(channel, command); + smList.sendToSlaves( command); } @@ -303,7 +299,7 @@ { //他のSessionManagerをエディタとしてSessionに追加 Forwarder next = new Forwarder(this); - next.setChannel(channel); + next.setChannel(channel.channel); Session session = getSession(receivedCommand.sid); session.addForwarder(next); @@ -337,50 +333,53 @@ }else{ //自分が送信したコマンドでなければ、次のSessionManagerへ中継する - smList.sendExcept(channel, receivedCommand); + smList.sendToSlaves(receivedCommand); } } - + break; case SMCMD_SM_JOIN: { - // このchannelの相手は、SessionManager なので、 - // 特別なhandlerを接続する必要がある - channel.register(selector, SelectionKey.OP_READ, - new REPSessionManagerHandler(this)); - - //SessionManagerのリストへ追加 - smList.add(channel); + // SM_JOIN中にSMCMD_SM_JOINが来たら、自分のSM_JOINを + // 取り消す。 + if (smjoin_mode) cancel_sm_join(); + // SMCMD_SM_JOIN は、master まで上昇する。 + // masterでなければ、自分のparentに転送する。 + if(smList.isMaster()) { + // master であれば、SessionManager IDを決めて、 + // 自分のsmList に登録 + int sid = smList.addNewSessionManager(receivedCommand); + //SessionListからXMLを生成。 + //joinしてきたSessionManagerに対してACKを送信。 + SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList); + REPCommand sendCommand = new REPCommand(); + sendCommand.setSID(sid); // new Session manager ID + // 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた + // Session manager IDを使う。 + sendCommand.setEID(receivedCommand.eid); + sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK); + sendCommand.setString(sessionlistEncoder.sessionListToXML()); + smList.sendToSlaves(sendCommand); + break; + } + // //XMLからSessionListオブジェクトを生成する。 - SessionList receivedSessionList = decoder.decode(receivedCommand.string); + //SessionList receivedSessionList = decoder.decode(receivedCommand.string); //myHost を設定。 //立ち上げ時にやるとlocalhostしか取れない - if(myHost == null) setMyHostName(getLocalHostName(channel)); + if(myHost == null) setMyHostName(channel.getLocalHostName()); //maxHost を設定。 - if(setMaxHost(channel, receivedSessionList.getMaxHost())){ - REPCommand sendCommand = new REPCommand(); - sendCommand.setCMD(REP.SMCMD_CH_MASTER); - sendCommand.setString(maxHost); - smList.sendExcept(channel, sendCommand); - } + // if(setMaxHost(channel, receivedSessionList.getMaxHost())){ + // REPCommand sendCommand = new REPCommand(); + // sendCommand.setCMD(REP.SMCMD_CH_MASTER); + // sendCommand.setString(maxHost); + // smList.sendExcept(channel, sendCommand); + // } - //SessionListからXMLを生成。 - //joinしてきたSessionManagerに対してACKを送信。 - SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList); - REPCommand sendCommand = new REPCommand(); - sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK); - sendCommand.setString(sessionlistEncoder.sessionListToXML()); - channel.write(sendCommand); - - //その他の SessionManager に対して SMCMD_UPDATEを 送信。 - sendCommand = new REPCommand(); - sendCommand.setCMD(REP.SMCMD_UPDATE); - sendCommand.setString(receivedCommand.string); - smList.sendExcept(channel, sendCommand); } break; @@ -388,15 +387,16 @@ case SMCMD_SM_JOIN_ACK: //XMLからSessionListオブジェクトを生成。 - SessionList receivedSessionList2 = decoder.decode(receivedCommand.string); + //SessionList receivedSessionList2 = decoder.decode(receivedCommand.string); //maxHostを決定。 - if(setMaxHost(channel, receivedSessionList2.getMaxHost())){ - REPCommand sendCommand = new REPCommand(); - sendCommand.setCMD(REP.SMCMD_CH_MASTER); - sendCommand.setString(maxHost); - smList.sendExcept(channel, sendCommand); - } + // if(setMaxHost(channel, receivedSessionList2.getMaxHost())){ + // REPCommand sendCommand = new REPCommand(); + // sendCommand.setCMD(REP.SMCMD_CH_MASTER); + // sendCommand.setString(maxHost); + // smList.sendExcept(channel, sendCommand); + // } + break; @@ -407,12 +407,12 @@ //UPDATEコマンドにより送られてきたSessionの情報を追加する LinkedList<Session> list = receivedSessionList3.getList(); for(Session session : list){ - session.getEditorList().get(0).setChannel(channel); + session.getEditorList().get(0).setChannel(channel.channel); sessionList.add(session); } //他のSessionManagerへ中継する - smList.sendExcept(channel, receivedCommand); + smList.sendToSlaves(receivedCommand); updateGUI(); } @@ -425,7 +425,7 @@ // ここで初めてsession id が決まる。 // このコマンドは、master session manager が出すはず Forwarder sm = new Forwarder(this); - sm.setChannel(channel); + sm.setChannel(channel.channel); Session session = new Session(receivedCommand.sid,receivedCommand.string,null); session.addForwarder(sm); @@ -433,21 +433,21 @@ updateGUI(); } - smList.sendToSlave(receivedCommand); + smList.sendToSlaves(receivedCommand); } break; - case SMCMD_CH_MASTER: - { - //maxHost を設定。 - if(setMaxHost(channel, receivedCommand.string)){ - REPCommand sendCommand = new REPCommand(); - sendCommand.setCMD(REP.SMCMD_CH_MASTER); - sendCommand.setString(maxHost); - smList.sendExcept(channel, sendCommand); - } - } - break; +// case SMCMD_CH_MASTER: +// { +// //maxHost を設定。 +// if(setMaxHost(channel, receivedCommand.string)){ +// REPCommand sendCommand = new REPCommand(); +// sendCommand.setCMD(REP.SMCMD_CH_MASTER); +// sendCommand.setString(maxHost); +// smList.sendExcept(channel, sendCommand); +// } +// } +// break; default: @@ -456,6 +456,10 @@ return true; } + private void cancel_sm_join() { + smjoin_mode=false; + } + private boolean hasSession(int sid) { for(Session s:sessionList) { @@ -490,20 +494,20 @@ throw new IOException(); } - private boolean setMaxHost(REPSocketChannel<REPCommand> channel, String maxHost2) { - if(maxHost.compareTo(maxHost2) > 0){ - return false; - }else{ - maxHost = maxHost2; - return true; - } - } +// private boolean setMaxHost(REPSocketChannel<REPCommand> channel, String maxHost2) { +// if(maxHost.compareTo(maxHost2) > 0){ +// return false; +// }else{ +// maxHost = maxHost2; +// return true; +// } +// } private void setMyHostName(String localHostName) { myHost = localHostName + receive_port; - if(maxHost == null) { - maxHost = myHost; - } +// if(maxHost == null) { +// maxHost = myHost; +// } setHostToEditor(myHost); } @@ -518,25 +522,26 @@ InetSocketAddress addr = new InetSocketAddress(host, port); try { REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker()); - sessionchannel.configureBlocking(true); + sessionchannel.connect(addr); while(!sessionchannel.finishConnect()); - registerChannel(sessionchannel, SelectionKey.OP_READ); - - sm_join(sessionchannel); - + Forwarder sm = new Forwarder(this); + registerChannel(sessionchannel, SelectionKey.OP_READ,sm); + sm_join(sm); }catch (IOException e) { } } - private void sm_join(REPSocketChannel<REPCommand> channel){ + private void sm_join(Forwarder channel){ //SM_JOINコマンドを生成。 REPCommand command = new REPCommand(); command.setCMD(REP.SMCMD_SM_JOIN); + command.setEID(-1); // request Parent SessionManagerID + command.setSID(-1); // request SessionManagerID //hostnameをセット。 - setMyHostName(getLocalHostName(channel)); + setMyHostName(channel.getLocalHostName()); //XMLを生成。送信コマンドにセット。 SessionXMLEncoder encoder = new SessionXMLEncoder(sessionList); @@ -544,16 +549,11 @@ command.setString(string); //SM_JOINコマンドを送信。 - channel.write(command); + channel.send(command); //SessionManagerのListに追加。 - smList.add(channel); + } - - private String getLocalHostName(REPSocketChannel<?> channel) { - //host = channel.socket().getLocalAddress().getHostName(); - return channel.getLocalHostName(); - } - + public void selectSession(SelectButtonEvent event) throws IOException { int sid = event.getSID(); Session session = getSession(sid); @@ -564,7 +564,7 @@ return; } if (editor.hasSession()) return; - REPSocketChannel<REPCommand> channel = editor.getChannel(); + //REPSocketChannel<REPCommand> channel = editor.getChannel(); // System.out.println("SessionManager.session.hasOnwer="+session.hasOwner()); if(session.hasOwner()){ @@ -576,7 +576,7 @@ sendCommand.setEID(editor.getEID()); sendCommand.setSID(sid); sendCommand.string = ""; - channel.write(sendCommand); + editor.send(sendCommand); }else { editor.setHost(myHost); editor.setSID(sid);
--- a/rep/SessionManagerList.java Tue Oct 14 23:03:29 2008 +0900 +++ b/rep/SessionManagerList.java Thu Oct 16 01:12:34 2008 +0900 @@ -1,62 +1,56 @@ package rep; import java.util.LinkedList; -import java.util.List; - import rep.channel.REPSocketChannel; -import rep.xml.SessionXMLEncoder; public class SessionManagerList { - private List<REPSocketChannel<REPCommand>> list = new LinkedList<REPSocketChannel<REPCommand>>(); + private LinkedList<Forwarder> list = new LinkedList<Forwarder>(); private int mySMID; - private REPSocketChannel<REPCommand> master; + private Forwarder parent=null; - public void add(REPSocketChannel<REPCommand> channel) { + public void add(Forwarder channel) { list.add(channel); } public void sendUpdate(int sessionID, String string) { - for(REPSocketChannel<REPCommand> channel : list){ - channel.write(new REPCommand(REP.SMCMD_UPDATE, 0, mySMID, 0, 0, string)); + for(Forwarder f : list){ + f.send(new REPCommand(REP.SMCMD_UPDATE, 0, mySMID, 0, 0, string)); + } + } + + public void setMaster(Forwarder f) { + this.parent = f; + } + + public void sendToMaster(REPCommand repCmd) { + parent.send(repCmd); + } + + public void sendToSlaves(REPCommand repCmd) { + for(Forwarder channel : list){ + channel.send(repCmd); } } - public void sendJoin(REPCommand command) { - for(REPSocketChannel<REPCommand> channel : list){ - channel.write(command); - } - } - - public void setMaster(REPSocketChannel<REPCommand> channel){ - this.master = channel; + public boolean isMaster() { + return parent==null; } - public void sendSessionList(SessionList sessionlist, REPCommand command) { - SessionXMLEncoder encoder = new SessionXMLEncoder(sessionlist); - command.setString(encoder.sessionListToXML()); + public int addNewSessionManager(REPCommand receivedCommand) { + return mySMID; + // TODO Auto-generated method stub - for(REPSocketChannel<REPCommand> channel : list){ - channel.write(command); - } } - public void sendToMaster(REPCommand repCmd) { - master.write(repCmd); + public boolean isSessionManagerChannel(REPSocketChannel<REPCommand> channel) { + for(Forwarder f : list){ + if (f.channel==channel) return true; + } + return false; } - public void sendToSlave(REPCommand repCmd) { - for(REPSocketChannel<REPCommand> channel : list){ - if(channel.equals(master)) continue; - channel.write(repCmd); - } - } - public void sendExcept(REPSocketChannel<REPCommand> channel2, REPCommand command) { - for(REPSocketChannel<REPCommand> channel : list){ - if(channel.equals(channel2)) continue; - channel.write(command); - } - } + }
--- a/rep/channel/SelectorSimulator.java Tue Oct 14 23:03:29 2008 +0900 +++ b/rep/channel/SelectorSimulator.java Thu Oct 16 01:12:34 2008 +0900 @@ -71,6 +71,7 @@ public SelectionKeySimulator<P> register(SelectableChannel cs, int opt, Object handler){ SelectionKeySimulator<P> key = new SelectionKeySimulator<P>(cs, opt, this); key.attach(handler); + deregister(cs); synchronized(this) { keyList.add(key); }
--- a/rep/handler/REPEditorHandler.java Tue Oct 14 23:03:29 2008 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,30 +0,0 @@ -package rep.handler; - -import rep.REPCommand; -import rep.SessionManager; -import rep.channel.REPSelectionKey; -import rep.channel.REPSocketChannel; - -public class REPEditorHandler implements REPHandler { - - private SessionManager manager; - - - public REPEditorHandler(SessionManager manager) { - this.manager = manager; - } - - public void handle(REPSelectionKey<REPCommand> key) throws Exception { - REPSocketChannel<REPCommand> channel = key.channel1(); - REPCommand command = channel.read(); - SessionManager.logger.writeLog("REPHandlerImpl.handle() read : command = " + command +" from "+channel); - - manager.manage(channel, command); - } - - public void cancel(REPSocketChannel<REPCommand> socketChannel) { - manager.remove(socketChannel); - } - - -}
--- a/rep/handler/REPSessionManagerHandler.java Tue Oct 14 23:03:29 2008 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,40 +0,0 @@ -package rep.handler; - -import rep.Forwarder; -import rep.REPCommand; -import rep.Session; -import rep.SessionManager; -import rep.channel.REPSelectionKey; -import rep.channel.REPSocketChannel; - -public class REPSessionManagerHandler implements REPHandler { - - private SessionManager manager; - - public REPSessionManagerHandler(SessionManager manager) { - this.manager = manager; - } - - public void cancel(REPSocketChannel<REPCommand> socketChannel) { - manager.remove(socketChannel); - } - - public void handle(REPSelectionKey<REPCommand> key) throws Exception { - /* - * SessionManagerから来たコマンドは、Editor関係のコマンドは、 - * sessionとeidを判定して、そのeditorにforwardしてやれば良い。 - * 残りは、manager.manage() で処理する。 - */ - REPSocketChannel<REPCommand> channel = key.channel1(); - REPCommand command = channel.read(); - SessionManager.logger.writeLog("REPHandlerImpl.handle() : command = " + command); - Session s = manager.getSession(command.sid); - Forwarder editor = s.getFirstForwarder(); - if (editor!=null) { - editor.manage(command); - } else { - manager.manage(channel, command); - } - } - -}