Mercurial > hg > RemoteEditor > REPSessionManager
changeset 355:98607350f7d1
*** empty log message ***
author | kono |
---|---|
date | Fri, 17 Oct 2008 22:11:34 +0900 |
parents | 6ea3aa6c795f |
children | b18c24dcc5d2 |
files | rep/FirstConnector.java rep/Forwarder.java rep/SessionManager.java rep/SessionManagerList.java test/sematest/TestInterManagerSession.java |
diffstat | 5 files changed, 238 insertions(+), 103 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rep/FirstConnector.java Fri Oct 17 22:11:34 2008 +0900 @@ -0,0 +1,75 @@ +package rep; + +import java.io.IOException; + +import rep.channel.REPSelectionKey; +import rep.channel.REPSocketChannel; + +public class FirstConnector extends Forwarder { + + public FirstConnector(SessionManager manager) { + super(manager); + } + + public void cancel(REPSocketChannel<REPCommand> socketChannel) { + manager.remove(socketChannel); + } + + public void handle(REPSelectionKey<REPCommand> key) throws Exception { + /* + * 接続要求は、EditorかSlave Editorで、 + * join, put, sm_join + * が来る。それ以外はエラー。master もありか? + * sm_join_ack + */ + Forwarder fw; + REPSocketChannel<REPCommand> channel = key.channel1(); + REPCommand command = channel.read(); + SessionManager.logger.writeLog("FirstConnector: command = " + command); + switch(command.cmd) { + case SMCMD_JOIN: + { + //どのSessionにも属さないエディタをリストに追加 + //エディタとchannelは1対1 (ではない) + //エディタが新しくputする場合は新しくソケットを作る + // 1対1でない場合は、multiplexerを挿めば良い + // ここのeditorList はsessionのとは別物 + Editor editor = new Editor(manager,-1,channel); + editor.setHost(manager.myHost); + manager.editorList.add(editor); + fw = editor; + break; + } + case SMCMD_PUT: + { + // put の場合でも、eid は、masterまで聞きにいく必要が + // ある。 + Editor editor = new Editor(manager,0,channel); + editor.setHost(manager.myHost); + manager.editorList.add(editor); + fw = editor; + break; + } + case SMCMD_SM_JOIN: + { + fw = new Forwarder(manager); + manager.smList.addWaitingSessionManager(fw, command); + break; + } + case SMCMD_SM_JOIN_ACK: + manager.setSessionManagerID(command.sid); + fw = new Forwarder(manager); + break; + default: throw new IOException(); + } + //myHost を設定。 + //立ち上げ時にやるとlocalhostしか取れない + if(manager.myHost == null) manager.setMyHostName(getLocalHostName()); + + fw.setMode(command.cmd); + manager.registerChannel(channel, fw); + manager.sessionManage(fw, command); + + } + +}
--- a/rep/Forwarder.java Fri Oct 17 18:40:08 2008 +0900 +++ b/rep/Forwarder.java Fri Oct 17 22:11:34 2008 +0900 @@ -16,6 +16,7 @@ final int limit=100; REPLogger ns = REPLogger.singleton(); SessionManager manager; + public REP mode = null; public Forwarder(SessionManager manager) { this.manager = manager; @@ -85,4 +86,17 @@ } else throw new IOException(); } + public void setMode(REP cmd) { + mode = cmd; + } + + public boolean isEditor() { + return mode==REP.SMCMD_JOIN||mode==REP.SMCMD_PUT; + } + + public boolean isForwarder() { + return mode==REP.SMCMD_SM_JOIN||mode==REP.SMCMD_SM_JOIN_ACK; + } + + } \ No newline at end of file
--- a/rep/SessionManager.java Fri Oct 17 18:40:08 2008 +0900 +++ b/rep/SessionManager.java Fri Oct 17 22:11:34 2008 +0900 @@ -62,8 +62,7 @@ static final int DEFAULT_PORT = 8766; private static final int packetLimit = 200; SessionXMLDecoder decoder = new SessionXMLDecoder(); - - boolean smjoin_mode; + private Forwarder sm_join_channel; public static void main(String[] args) throws InterruptedException, IOException { @@ -208,7 +207,7 @@ if(key.isAcceptable()){ REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); - registerChannel (channel, SelectionKey.OP_READ,new Forwarder(this)); + registerChannel (channel, new FirstConnector(this)); channel = null; }else if(key.isReadable()){ @@ -223,18 +222,29 @@ } } - void registerChannel(REPSocketChannel<REPCommand> channel, int ops,Forwarder handler) throws IOException { + void registerChannel(REPSocketChannel<REPCommand> channel,Forwarder handler) throws IOException { if(channel == null) { return; } handler.setChannel(channel); channel.configureBlocking(false); - channel.register(selector, ops, handler); + channel.register(selector, SelectionKey.OP_READ, handler); } void cancel_sm_join() { - smjoin_mode=false; + removeChannel(sm_join_channel); + sm_join_channel=null; + } + + + private void removeChannel(Forwarder sm_join_channel) { + REPSelectionKey<REPCommand> key = sm_join_channel.channel.keyFor1(selector); + key.cancel(); + try { + sm_join_channel.channel.close(); + } catch (IOException e) { + } } @@ -294,7 +304,16 @@ } } + + /** + * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては + * ならない。複数のSession Managerにjoinすることは出来ない。(NATを実装するまでは)。 + * @param host + */ public void connectSession(String host) { + if (sm_join_channel!=null) return; + if (!sessionList.isEmpty()) return; + if (!smList.isMaster()) return; int port = parent_port; InetSocketAddress addr = new InetSocketAddress(host, port); try { @@ -303,14 +322,14 @@ sessionchannel.connect(addr); while(!sessionchannel.finishConnect()); Forwarder sm = new Forwarder(this); - registerChannel(sessionchannel, SelectionKey.OP_READ,sm); + registerChannel(sessionchannel, sm); sm_join(sm); }catch (IOException e) { } } private void sm_join(Forwarder channel){ - smjoin_mode = true; + sm_join_channel = channel; //SM_JOINコマンドを生成。 REPCommand command = new REPCommand(); command.setCMD(REP.SMCMD_SM_JOIN); @@ -321,8 +340,9 @@ setMyHostName(channel.getLocalHostName()); //XMLを生成。送信コマンドにセット。 - SessionXMLEncoder encoder = new SessionXMLEncoder(sessionList); - String string = encoder.sessionListToXML(); + //SessionXMLEncoder encoder = new SessionXMLEncoder(sessionList); + //String string = encoder.sessionListToXML(); + String string = myHost; command.setString(string); //SM_JOINコマンドを送信。 @@ -439,25 +459,15 @@ } - boolean sessionManage(Forwarder forwarder, REPCommand receivedCommand) throws ClosedChannelException, + boolean sessionManage(Forwarder forwarder, REPCommand command) throws ClosedChannelException, IOException, SAXException { - switch(receivedCommand.cmd){ + switch(command.cmd){ // Session Manager Command case SMCMD_JOIN: { - //どのSessionにも属さないエディタをリストに追加 - //エディタとchannelは1対1 (ではない) - //エディタが新しくputする場合は新しくソケットを作る - // ここのeditorList はsessionのとは別物 - Editor editor1 = new Editor(this,-1,forwarder.channel); - registerChannel(forwarder.channel,SelectionKey.OP_READ,editor1); - editor1.setHost(myHost); - editorList.add(editor1); - updateGUI(); - } break; @@ -473,18 +483,15 @@ // 自分の親に作ってもらう必要がある。自分が親なら自分で作って良い。 int sid = sessionList.size(); - Editor editor = new Editor(this,0, forwarder.channel); - registerChannel(forwarder.channel,SelectionKey.OP_READ,editor); - editorList.add(editor); - editor.setHost(myHost); - Session session = new Session(sid, receivedCommand.string, editor); + Editor editor = (Editor) forwarder; + Session session = new Session(sid, command.string, (Editor)forwarder); session.hasOwner(true); sessionList.add(session); updateGUI(); //エディタにAckを送信 - REPCommand sendCommand = new REPCommand(receivedCommand); + REPCommand sendCommand = new REPCommand(command); sendCommand.setCMD(REP.SMCMD_PUT_ACK); sendCommand.setEID(editor.getEID()); sendCommand.setSID(session.getSID()); @@ -493,10 +500,10 @@ //他のSessionManagerへSessionの追加を報告 //親に送って、親から子へ SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session); - REPCommand command = new REPCommand(); - command.setSID(session.getSID()); - command.setString(sessionEncoder.sessionListToXML()); - command.setCMD(REP.SMCMD_UPDATE); + REPCommand command1 = new REPCommand(); + command1.setSID(session.getSID()); + command1.setString(sessionEncoder.sessionListToXML()); + command1.setCMD(REP.SMCMD_UPDATE); smList.sendToSlaves( command); } @@ -510,19 +517,19 @@ //他のSessionManagerをエディタとしてSessionに追加 Forwarder next = new Forwarder(this); next.setChannel(forwarder.channel); - Session session = getSession(receivedCommand.sid); + Session session = getSession(command.sid); session.addForwarder(next); if(session.hasOwner()){ //このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す - REPCommand sendCommand = new REPCommand(receivedCommand); + REPCommand sendCommand = new REPCommand(command); sendCommand.setCMD(REP.SMCMD_SELECT_ACK); sendCommand.setEID(next.getEID()); next.send(sendCommand); }else{ //オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する Forwarder owner = session.getOwner(); - owner.send(receivedCommand); + owner.send(command); } } @@ -530,20 +537,20 @@ case SMCMD_SELECT_ACK: { - String hostport = receivedCommand.string; + String hostport = command.string; Forwarder editor1 = getEditor(hostport); if(editor1 != null) { //host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する - REPCommand command = new REPCommand(); - command.setCMD(REP.SMCMD_JOIN_ACK); - command.setSID(receivedCommand.sid); - command.setEID(receivedCommand.eid); + REPCommand command1 = new REPCommand(); + command1.setCMD(REP.SMCMD_JOIN_ACK); + command1.setSID(command.sid); + command1.setEID(command.eid); editor1.send(command); }else{ //自分が送信したコマンドでなければ、次のSessionManagerへ中継する - smList.sendToSlaves(receivedCommand); + smList.sendToSlaves(command); } } @@ -551,68 +558,51 @@ case SMCMD_SM_JOIN: { - // SM_JOIN中にSMCMD_SM_JOINが来たら、自分のSM_JOINを - // 取り消す。 - if (smjoin_mode) cancel_sm_join(); + // SM_JOIN中にSMCMD_SM_JOINが来たら、これはループなので、 + ///自分のSM_JOINを取り消す。 + if (sm_join_channel!=null) cancel_sm_join(); // SMCMD_SM_JOIN は、master まで上昇する。 // masterでなければ、自分のparentに転送する。 - if(smList.isMaster()) { + if(isMaster()) { // master であれば、SessionManager IDを決めて、 // 自分のsmList に登録 - int sid = smList.addNewSessionManager(receivedCommand); - //SessionListからXMLを生成。 - //joinしてきたSessionManagerに対してACKを送信。 - SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList); - REPCommand sendCommand = new REPCommand(); + Forwarder sm; + int psid = command.eid; + if (forwarder.sid!=-1) { + // すでに channelはSessionManager Idを持っていて、 + // direct link ではないので、 + // channel を持たないForwarderとして登録する + sm = new Forwarder(this); + } else { + sm = forwarder; + } + int sid = smList.addNewSessionManager(sm,command); + REPCommand sendCommand = makeREPCommandWithSessionList(REP.SMCMD_SM_JOIN_ACK); + // command.eid==smList.sesionManagerID() の場合は、 + // 待っている自分の下のsessionManagerにsidをassignする必要がある。 sendCommand.setSID(sid); // new Session manager ID // 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた // Session manager IDを使う。 - sendCommand.setEID(receivedCommand.eid); - sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK); - sendCommand.setString(sessionlistEncoder.sessionListToXML()); - smList.sendToSlaves(sendCommand); - break; + sendCommand.setEID(psid); + send_sm_join_ack(psid, sid, sendCommand); + } else { + if (forwarder.sid==-1) { + // direct link の場合は、識別のために、EIDに直上の + // smid を入れておく。 + command.setEID(smList.sessionManagerID()); + } + smList.sendToMaster(command); } - // - - //XMLからSessionListオブジェクトを生成する。 - //SessionList receivedSessionList = decoder.decode(receivedCommand.string); - - //myHost を設定。 - //立ち上げ時にやるとlocalhostしか取れない - if(myHost == null) setMyHostName(forwarder.getLocalHostName()); - - //maxHost を設定。 - // if(setMaxHost(channel, receivedSessionList.getMaxHost())){ - // REPCommand sendCommand = new REPCommand(); - // sendCommand.setCMD(REP.SMCMD_CH_MASTER); - // sendCommand.setString(maxHost); - // smList.sendExcept(channel, sendCommand); - // } - - } break; - case SMCMD_SM_JOIN_ACK: - - //XMLからSessionListオブジェクトを生成。 - //SessionList receivedSessionList2 = decoder.decode(receivedCommand.string); - - //maxHostを決定。 - // if(setMaxHost(channel, receivedSessionList2.getMaxHost())){ - // REPCommand sendCommand = new REPCommand(); - // sendCommand.setCMD(REP.SMCMD_CH_MASTER); - // sendCommand.setString(maxHost); - // smList.sendExcept(channel, sendCommand); - // } - - + case SMCMD_SM_JOIN_ACK: + send_sm_join_ack(command.eid, command.sid, command); break; case SMCMD_UPDATE: { - SessionList receivedSessionList3 = decoder.decode(receivedCommand.string); + SessionList receivedSessionList3 = decoder.decode(command.string); //UPDATEコマンドにより送られてきたSessionの情報を追加する LinkedList<Session> list = receivedSessionList3.getList(); @@ -622,7 +612,7 @@ } //他のSessionManagerへ中継する - smList.sendToSlaves(receivedCommand); + smList.sendToSlaves(command); updateGUI(); } @@ -630,20 +620,20 @@ case SMCMD_UPDATE_ACK: { - if(!hasSession(receivedCommand.sid)) { + if(!hasSession(command.sid)) { // accept new Session // ここで初めてsession id が決まる。 // このコマンドは、master session manager が出すはず Forwarder sm = new Forwarder(this); sm.setChannel(forwarder.channel); - Session session = new Session(receivedCommand.sid,receivedCommand.string,null); + Session session = new Session(command.sid,command.string,null); session.addForwarder(sm); sessionList.add(session); updateGUI(); } - smList.sendToSlaves(receivedCommand); + smList.sendToSlaves(command); } break; @@ -666,4 +656,36 @@ return true; } + + void send_sm_join_ack(int psid, int sid,REPCommand sendCommand) { + if (psid==smList.sessionManagerID()) { + // 直下のsessionManagerにIDを割り振る必要がある。 + smList.assignSessionManagerIDtoWaitingSM(sid); + // ここで smList に一つだけ追加されるので + // 待っている最初のsm一つにだけ、sm_joinが新たに送られる。 + } + smList.sendToSlaves(sendCommand); + } + + + private REPCommand makeREPCommandWithSessionList(REP cmd) { + //SessionListからXMLを生成。 + //joinしてきたSessionManagerに対してACKを送信。 + SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList); + REPCommand sendCommand = new REPCommand(); + sendCommand.setCMD(cmd); + sendCommand.setString(sessionlistEncoder.sessionListToXML()); + return sendCommand; + } + + + public boolean isMaster() { + return smList.isMaster(); + } + + + public void setSessionManagerID(int sid) { + smList.setSessionManagerID(sid); + } + }
--- a/rep/SessionManagerList.java Fri Oct 17 18:40:08 2008 +0900 +++ b/rep/SessionManagerList.java Fri Oct 17 22:11:34 2008 +0900 @@ -8,17 +8,12 @@ private LinkedList<Forwarder> list = new LinkedList<Forwarder>(); private int mySMID; private Forwarder parent=null; + private LinkedList<Forwarder> waiting= new LinkedList<Forwarder>(); public void add(Forwarder channel) { list.add(channel); } - public void sendUpdate(int sessionID, String string) { - for(Forwarder f : list){ - f.send(new REPCommand(REP.SMCMD_UPDATE, 0, mySMID, 0, 0, string)); - } - } - public void setMaster(Forwarder f) { this.parent = f; } @@ -37,10 +32,12 @@ return parent==null; } - public int addNewSessionManager(REPCommand receivedCommand) { - return mySMID; - // TODO Auto-generated method stub - + public int addNewSessionManager(Forwarder fw,REPCommand receivedCommand) { + list.add(fw); + int sid = list.size(); + fw.setSID(sid); + fw.setName(receivedCommand.string); + return sid; } public boolean isSessionManagerChannel(REPSocketChannel<REPCommand> channel) { @@ -50,6 +47,32 @@ return false; } + public void setSessionManagerID(int sid) { + mySMID = sid; + } + + public int sessionManagerID() { + return mySMID; + } + + public void addWaitingSessionManager(Forwarder fw, REPCommand command) { + // SID assign 待ちのSessionManager Channelを登録する + waiting.add(fw); + + } + + public void assignSessionManagerIDtoWaitingSM(int sid) { + // 待っていたSession Manager ChannelにSession IDを登録し,Session Manager List + // に登録する。この次のsm_join_ackでSIDが確定する。 + Forwarder waiter; + if ((waiter=waiting.poll())!=null) { + waiter.setSID(sid); + list.add(waiter); + return; + } + assert false; + } +
--- a/test/sematest/TestInterManagerSession.java Fri Oct 17 18:40:08 2008 +0900 +++ b/test/sematest/TestInterManagerSession.java Fri Oct 17 22:11:34 2008 +0900 @@ -103,7 +103,8 @@ * Define pending command and set null command for now. */ LinkedList<REPCommand>cmds = new LinkedList<REPCommand>(); - cmds.add(new REPCommand(REP.SMCMD_PUT,0,0,0,0,"Editor0-file")); + cmds.add(new REPCommand(REP.SMCMD_JOIN,0,0,0,0,"Editor0-file")); + //cmds.add(new REPCommand(REP.SMCMD_PUT,0,0,0,0,"Editor0-file")); cmds.add(new REPCommand(REP.REPCMD_INSERT,0,0,0,0,"m0")); cmds.add(new REPCommand(REP.REPCMD_DELETE,0,0,0,0,"m0")); editorStartCmds = cmds;