Mercurial > hg > RemoteEditor > REPSessionManager
view rep/SessionManager.java @ 358:034acadc0cdc
*** empty log message ***
author | kono |
---|---|
date | Sun, 19 Oct 2008 16:54:37 +0900 |
parents | b18c24dcc5d2 |
children | fa041bae35f1 |
line wrap: on
line source
package rep; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.xml.sax.SAXException; import rep.channel.REPLogger; import rep.channel.REPServerSocketChannel; import rep.channel.REPSocketChannel; import rep.handler.PacketSet; import rep.handler.REPHandler; import rep.channel.REPSelector; import rep.xml.SessionXMLDecoder; import rep.xml.SessionXMLEncoder; import rep.channel.REPSelectionKey; /* +-------+--------+--------+-------+--------+---------+------+ | cmd | session| editor | seqid | lineno | textsiz | text | | | id | id | | | | | +-------+--------+--------+-------+--------+---------+------+ o---------- header section (network order) ----------o int cmd; kind of command int sid; session ID : uniqu to editing file int eid; editor ID : owner editor ID = 1。Session に対して unique -1 session manager command -2 merge command int seqno; Sequence number : sequence number はエディタごとに管理 int lineno; line number int textsize; textsize : bytesize byte[] text; */ public class SessionManager implements SessionManagerEventListener{ static public REPLogger logger = REPLogger.singleton(); SessionList sessionList; private SessionManagerGUI gui; private REPSelector<REPCommand> selector; SessionManagerList smList; EditorList editorList; // editorList は、sessionList に入っているeditorとは別なeditorのlistらしい。 // private String maxHost; private List<PacketSet> waitingCommandInMerge; private BlockingQueue<SessionManagerEvent> waitingEventQueue = new LinkedBlockingQueue<SessionManagerEvent>();; String myHost; private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); private int receive_port; private int parent_port; static final int DEFAULT_PORT = 8766; private static final int packetLimit = 200; private static final int MAXID = 10000; SessionXMLDecoder decoder = new SessionXMLDecoder(); SessionXMLEncoder encoder = new SessionXMLEncoder(); private Forwarder sm_join_channel; private RoutingTable routingTable = new RoutingTable(); public static void main(String[] args) throws InterruptedException, IOException { int port = DEFAULT_PORT; int port_s = DEFAULT_PORT; //System.setProperty("file.encoding", "UTF-8"); if(args.length > 0){ port = Integer.parseInt(args[0]); port_s = Integer.parseInt(args[1]); } SessionManager sm = new SessionManager(); sm.setReceivePort(port); sm.setParentPort(port_s); sm.init(port,new SessionManagerGUIimpl(sm)); } public void setReceivePort(int port) { receive_port = port; } public void openSelector() throws IOException{ selector = REPSelector.<REPCommand>create(); } public void init(int port, SessionManagerGUI gui) throws IOException, InterruptedException { this.gui = gui; openSelector(); init(port); mainLoop(); } private void init(int port) throws InterruptedException, IOException { REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker()); ssc.configureBlocking(false); //reuse address 必須 ssc.socket().setReuseAddress(true); //getAllByNameで取れた全てのアドレスに対してbindする ssc.socket().bind(new InetSocketAddress(port)); ssc.register(selector, SelectionKey.OP_ACCEPT, new Forwarder(this)); sessionList = new SessionList(); smList = new SessionManagerList(); editorList = new EditorList(); waitingCommandInMerge = new LinkedList<PacketSet>(); } /* * We wrote everything in one thread, but we can assign * one thread for each communication channel and GUI event. */ public void mainLoop() throws IOException { while(true){ checkWaitingCommandInMerge(); if (checkInputEvent() || checkWaitingWrite()) { // try to do fair execution for waiting task if(selector.selectNow() > 0) select(); continue; } // now we can wait for input packet or event selector.select(); select(); } } private boolean checkInputEvent() { SessionManagerEvent e; if((e = waitingEventQueue.poll())!=null){ e.exec(this); return true; } return false; } private boolean checkWaitingWrite() throws IOException { PacketSet p = writeQueue.poll(); if (p!=null) { p.channel.write(p.command); return true; } return false; } /** * Check waiting command in merge * @return true if there is a processed waiting command * @throws IOException */ private void checkWaitingCommandInMerge() { List<PacketSet> w = waitingCommandInMerge; waitingCommandInMerge = new LinkedList<PacketSet>(); for(PacketSet p: w) { Editor e = p.getEditor(); if(e.isMerging()) { // still merging do nothing waitingCommandInMerge.add(p); } else { try { if (sessionManage(e, p.command)) { // we don't need this assert false; return; } e.manage(p.command); } catch (Exception e1) { // should be e.close()? close(p.channel); } } } } private void close(REPSocketChannel<REPCommand> channel) { REPSelectionKey<REPCommand>key = channel.keyFor1(selector); REPHandler handler = (REPHandler)key.attachment(); key.cancel(); handler.cancel(channel); // we have to remove session/enditor } public boolean hasWaitingCommand(REPSocketChannel<REPCommand>c) { for(PacketSet p:waitingCommandInMerge) { if (p.channel==c) { return true; } } return false; } private void select() throws IOException { Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); for(REPSelectionKey<REPCommand> key : keys){ if(key.isAcceptable()){ REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); logger.writeLog("SessionManager.select() : key.isAcceptable : channel = " + channel); registerChannel (channel, new FirstConnector(this)); channel = null; }else if(key.isReadable()){ REPHandler handler = (REPHandler)(key.attachment()); try { handler.handle(key); } catch (IOException e) { key.cancel(); handler.cancel(key.channel1()); } } } } void registerChannel(REPSocketChannel<REPCommand> channel,Forwarder handler) throws IOException { if(channel == null) { return; } handler.setChannel(channel); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ, handler); } void cancel_sm_join() { removeChannel(sm_join_channel); sm_join_channel=null; } private void removeChannel(Forwarder sm_join_channel) { REPSelectionKey<REPCommand> key = sm_join_channel.channel.keyFor1(selector); key.cancel(); try { sm_join_channel.channel.close(); } catch (IOException e) { } } void updateGUI() { //リストのコピーをGUIに渡す LinkedList<Session> sList = new LinkedList<Session>(sessionList.values()); LinkedList<Editor> eList = new LinkedList<Editor>(editorList.values()); //GUIに反映 Runnable doRun = new DoGUIUpdate(sList, eList, gui); gui.invokeLater(doRun); } void setMyHostName(String localHostName) { myHost = localHostName + receive_port; // if(maxHost == null) { // maxHost = myHost; // } setHostToEditor(myHost); } private void setHostToEditor(String myHost2) { for(Editor editor : editorList.values()){ if (editor.channel!=null) editor.setHost(myHost2); } } /** * Host 名のSession Manager に SM_JOIN する。自分は、Session を持っていては * ならない。複数のSession Managerにjoinすることは出来ない。(NATを実装するまでは)。 * @param host */ public void connectSession(String host) { if (sm_join_channel!=null) return; if (!sessionList.isEmpty()) return; if (!smList.isMaster()) return; int port = parent_port; InetSocketAddress addr = new InetSocketAddress(host, port); try { REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker()); sessionchannel.connect(addr); while(!sessionchannel.finishConnect()); Forwarder sm = new Forwarder(this); registerChannel(sessionchannel, sm); sm_join(sm); }catch (IOException e) { } } private void sm_join(Forwarder channel){ sm_join_channel = channel; //SM_JOINコマンドを生成。 REPCommand command = new REPCommand(); command.setCMD(REP.SMCMD_SM_JOIN); command.setEID(-1); // request Parent SessionManagerID command.setSID(-1); // request SessionManagerID //hostnameをセット。 setMyHostName(channel.getLocalHostName()); String string = myHost; command.setString(string); //SM_JOINコマンドを送信。 channel.send(command); //SessionManagerのListに追加。 } public void selectSession(SelectButtonEvent event) throws IOException { int sid = event.getSID(); Session session = sessionList.get(sid); Editor editor = (Editor)event.getEditor(); if(editor == null){ logger.writeLog("SessionManager.selectSession():editor = " + editor); return; } if (editor.hasSession()) return; selectSession(sid, session, editor.getEID(), editor); } private void selectSession(int sid, Session session, int eid, Forwarder editor) { if(session.hasOwner()){ session.addForwarder(editor); REPCommand sendCommand = new REPCommand(); if (editor.isDirect()&&editor.getEID()==eid) { sendUpdate(); sendCommand.setCMD(REP.SMCMD_JOIN_ACK); } else { // SELECT_ACK is sent to the session ring to // find out joined editor sendCommand.setCMD(REP.SMCMD_SELECT_ACK); } sendCommand.setEID(editor.getEID()); sendCommand.setSID(sid); sendCommand.string = ""; editor.send(sendCommand); }else { session.addForwarder(editor); editor.setHost(myHost); editor.setSID(sid); Forwarder next = routingTable.toSession(sid); REPCommand command = new REPCommand(); command.setCMD(REP.SMCMD_SELECT); command.setSID(sid); command.setEID(eid); command.setString(editor.getHost()); next.send(command); } } private void sendUpdate() { // TODO Auto-generated method stub } public void addWaitingCommand(PacketSet set) { waitingCommandInMerge.add(set); } public void buttonPressed(SessionManagerEvent event) { try { waitingEventQueue.put(event); } catch (InterruptedException e) {} selector.wakeup(); } public void syncExec(SessionManagerEvent event) { try { waitingEventQueue.put(event); } catch (InterruptedException e) { } } public void closeSession(SessionManagerEvent event) { Session session = ((CloseButtonEvent) event).getSession(); session.closeSession(); sessionList.remove(session); updateGUI(); } public void remove(REPSocketChannel<REPCommand> channel) { int i = 0; for(Session s:sessionList.values()) { if (s.deleteEditor(channel)) { i++; } } assert(i==1); // can be other session manager? what should I do? } public void addWriteQueue(PacketSet packetSet) { writeQueue.addLast(packetSet); assert(writeQueue.size()<packetLimit) ; } public void remove(Editor editor) { Session s = sessionList.get(editor.getSID()); if (editor.isMaster()) { removeSession(s); } else { s.deleteForwarder(editor); editorList.remove(editor); } updateGUI(); } private void removeSession(Session s0) { s0.remove(this); sessionList.remove(s0); sendUpdate(); } public void setParentPort(int port) { parent_port = port; } public int getParentPort() { return parent_port; } public int getPort() { return receive_port; } boolean sessionManage(Forwarder forwarder, REPCommand command) throws ClosedChannelException, IOException { switch(command.cmd){ // Session Manager Command case SMCMD_JOIN: { // first connection or forwarded command if (forwarder.isDirect()) { // direct linked editor なので、ここでIDを作成する command.setEID(makeID(editorList.newEid())); } if(isMaster()) { REPCommand ackCommand = new REPCommand(); ackCommand.setCMD(REP.SMCMD_JOIN_ACK); ackCommand.setEID(command.eid); ackCommand.string = command.string; smList.sendToSlaves(ackCommand); registEditor(forwarder,ackCommand); } else { routingTable.add(forwarder,getSMID(command.eid),command.sid); smList.sendToMaster(command); } } break; case SMCMD_JOIN_ACK: case SMCMD_PUT_ACK: registEditor(forwarder,command); break; case SMCMD_PUT: { // first connection or forwarded command if (forwarder.isDirect()) { // direct link, make new ID int eid = makeID(editorList.newEid()); int sid = makeID(sessionList.newSessionID()); command.setEID(eid); command.setSID(sid); } if(isMaster()) { command.setCMD(REP.SMCMD_PUT_ACK); command.string = command.string; command.setEID(command.eid); command.setSID(command.sid); smList.sendToSlaves(command); registEditor(forwarder,command); } else { routingTable.add(forwarder,getSMID(command.eid),command.sid); smList.sendToMaster(command); } } break; case SMCMD_SELECT: case SMCMD_SELECT_ACK: { Session session = sessionList.get(command.sid); selectSession(command.sid, session, command.eid, forwarder); } break; case SMCMD_SM_JOIN: { // SM_JOIN中にSMCMD_SM_JOINが来たら、これはループなので、 ///自分のSM_JOINを取り消す。 if (sm_join_channel!=null) cancel_sm_join(); // SMCMD_SM_JOIN は、master まで上昇する。 // masterでなければ、自分のparentに転送する。 if(isMaster()) { // master であれば、SessionManager IDを決めて、 // 自分のsmList に登録 Forwarder sm; int psid = command.eid; if (forwarder.sid!=-1) { // すでに channelはSessionManager Idを持っていて、 // direct link ではないので、 // channel を持たないForwarderとして登録する sm = new Forwarder(this); } else { sm = forwarder; } int sid = smList.addNewSessionManager(sm,command); REPCommand sendCommand = makeREPCommandWithSessionList(REP.SMCMD_SM_JOIN_ACK); // command.eid==smList.sesionManagerID() の場合は、 // 待っている自分の下のsessionManagerにsidをassignする必要がある。 sendCommand.setSID(sid); // new Session manager ID // 複数のSM_JOIN_ACKを識別するには、最初にSM_JOINを受け付けた // Session manager IDを使う。 sendCommand.setEID(psid); send_sm_join_ack(psid, sid, sendCommand); } else { if (forwarder.sid==-1) { // direct link の場合は、識別のために、EIDに直上の // smid を入れておく。 command.setEID(smList.sessionManagerID()); } smList.sendToMaster(command); } } break; case SMCMD_SM_JOIN_ACK: send_sm_join_ack(command.eid, command.sid, command); break; case SMCMD_UPDATE: command.setString(mergeUpdate(command)); // 上に知らせる smList.sendToMaster(command); break; case SMCMD_UPDATE_ACK: command.setString(mergeUpdate(command)); // 下に知らせる smList.sendToSlaves(command); updateGUI(); break; default: return false; } return true; } private String mergeUpdate(REPCommand command) throws IOException { SessionList receivedSessionList; try { receivedSessionList = decoder.decode(command.string); } catch (SAXException e) { throw new IOException(); } // UPDATE/UPDATE_ACKにより送られてきたSessionの情報を追加する //XMLを生成。送信コマンドにセット。 sessionList.merge(receivedSessionList); return encoder.sessionListToXML(sessionList); } /* * id has SessionManager ID part */ private int makeID(int newid) { return newid+smList.sessionManagerID()*MAXID; } private int getSessionID(int id) { return id%MAXID; } private int getSMID(int id) { return id/MAXID; } private void registEditor(Forwarder forwarder,REPCommand command) { // make ack for PUT/JOIN. Do not send this to the editor, // before select. After select, ack is sent to the editor. routingTable.add(forwarder,getSMID(command.eid),command.sid); Editor editor; if (getSessionID(command.sid)==smList.sessionManagerID() && forwarder.isDirect()) { // direct link だった editor = (Editor)forwarder; } else { editor = new Editor(this, command.cmd==REP.SMCMD_PUT_ACK, command.eid); } editor.setName(command.string); editor.setSID(command.sid); if (!editorList.hasEid(command.eid)) { editorList.add(editor); updateGUI(); } // we don't join ack to the direct linked editor. We // have to wait select command } void send_sm_join_ack(int psid, int sid,REPCommand sendCommand) { if (psid==smList.sessionManagerID()) { // 直下のsessionManagerにIDを割り振る必要がある。 smList.assignSessionManagerIDtoWaitingSM(sid); // ここで smList に一つだけ追加されるので // 待っている最初のsm一つにだけ、sm_join_ackが新たに送られる。 } smList.sendToSlaves(sendCommand); } private REPCommand makeREPCommandWithSessionList(REP cmd) { //SessionListからXMLを生成。 //joinしてきたSessionManagerに対してACKを送信。 REPCommand sendCommand = new REPCommand(); sendCommand.setCMD(cmd); sendCommand.setString(encoder.sessionListToXML(sessionList)); return sendCommand; } public boolean isMaster() { return smList.isMaster(); } public void setSessionManagerID(int sid) { smList.setSessionManagerID(sid); } public Session getSession(int sid) { return sessionList.get(sid); } }