Mercurial > hg > RemoteEditor > REPSessionManager
view rep/SessionManager.java @ 318:dc57e24ea3df
*** empty log message ***
author | kono |
---|---|
date | Wed, 08 Oct 2008 10:09:02 +0900 |
parents | c83a3faec487 |
children | dfed28488274 |
line wrap: on
line source
package rep; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import rep.channel.REPServerSocketChannel; import rep.channel.REPSocketChannel; import rep.handler.PacketSet; import rep.handler.REPHandler; import rep.handler.REPEditorHandler; import rep.handler.REPSessionManagerHandler; import rep.channel.REPSelector; import rep.xml.SessionXMLDecoder; import rep.xml.SessionXMLEncoder; import rep.channel.REPSelectionKey; /* +-------+--------+--------+-------+--------+---------+------+ | cmd | session| editor | seqid | lineno | textsiz | text | | | id | id | | | | | +-------+--------+--------+-------+--------+---------+------+ o---------- header section (network order) ----------o int cmd; kind of command int sid; session ID : uniqu to editing file int eid; editor ID : owner editor ID = 1。Session に対して unique -1 session manager command -2 merge command int seqno; Sequence number : sequence number はエディタごとに管理 int lineno; line number int textsize; textsize : bytesize byte[] text; */ public class SessionManager implements SessionManagerEventListener{ private LinkedList<Session> sessionList; private SessionManagerGUI gui; private REPSelector<REPCommand> selector; private SessionManagerList smList; private List<Editor> editorList; // editorList は、sessionList に入っているeditorとは別なeditorのlistらしい。 private String maxHost; private List<PacketSet> waitingCommandInMerge; REPHandler normalHandler = new REPEditorHandler(this); private BlockingQueue<SessionManagerEvent> waitingEventQueue = new LinkedBlockingQueue<SessionManagerEvent>();; private String myHost; private LinkedList<PacketSet> writeQueue = new LinkedList<PacketSet>(); private static int receive_port; private static int parent_port; static final int DEFAULT_PORT = 8766; public static void main(String[] args) throws InterruptedException, IOException { int port = DEFAULT_PORT; int port_s = DEFAULT_PORT; //System.setProperty("file.encoding", "UTF-8"); if(args.length > 0){ port = Integer.parseInt(args[0]); port_s = Integer.parseInt(args[1]); } receive_port = port; parent_port = port_s; SessionManager sm = new SessionManager(); sm.init(port,new SessionManagerGUIimpl(sm)); } public void openSelector() throws IOException{ selector = REPSelector.<REPCommand>create(); } public void init(int port, SessionManagerGUI gui) throws IOException, InterruptedException { this.gui = gui; openSelector(); init(port); mainLoop(); } private void init(int port) throws InterruptedException, IOException { REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker()); ssc.configureBlocking(false); //reuse address 必須 ssc.socket().setReuseAddress(true); //getAllByNameで取れた全てのアドレスに対してbindする ssc.socket().bind(new InetSocketAddress(port)); ssc.register(selector, SelectionKey.OP_ACCEPT, normalHandler); sessionList = new LinkedList<Session>(); smList = new SessionManagerList(); editorList = new LinkedList<Editor>(); waitingCommandInMerge = new LinkedList<PacketSet>(); } /* * We wrote everything in one thread, but we can assign * one thread for each communication channel and GUI event. */ public void mainLoop() throws IOException { while(true){ if (checkInputEvent() || checkWaitingWrite() || checkWaitingCommandInMerge()) { // try to do fair execution for waiting task if(selector.selectNow() > 0) select(); continue; } // now we can wait for input packet or event selector.select(); select(); } } private boolean checkInputEvent() { SessionManagerEvent e; if((e = waitingEventQueue.poll())!=null){ e.exec(); return true; } return false; } private boolean checkWaitingWrite() throws IOException { PacketSet p = writeQueue.poll(); if (p!=null) { REPCommand cmd = new REPCommand(p.command); p.channel.write(cmd); return true; } return false; } /** * Check waiting command in merge * @return true if there is a processed waiting command * @throws IOException */ private boolean checkWaitingCommandInMerge() throws IOException { for(Iterator<PacketSet> it = waitingCommandInMerge.iterator(); it.hasNext();){ PacketSet p = it.next(); if(p.getEditor().isMerging()) { // still merging do nothing continue; }else{ // process one command and return true manage(p.channel, p.command); it.remove(); return true; } } return false; } private void select() throws IOException { Set<REPSelectionKey<REPCommand>> keys = selector.selectedKeys1(); for(REPSelectionKey<REPCommand> key : keys){ if(key.isAcceptable()){ REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); System.out.println("SessionManager.select() : key.isAcceptable : channel = " + channel); registerChannel (channel, SelectionKey.OP_READ); channel = null; }else if(key.isReadable()){ REPHandler handler = (REPHandler)(key.attachment()); try { handler.handle(key); } catch (ClosedChannelException x) { key.cancel(); handler.cancel(key.channel1()); } catch (IOException x) { key.cancel(); handler.cancel( key.channel1()); } } } } private void registerChannel(REPSocketChannel<REPCommand> channel, int ops) throws IOException { if(channel == null) { return; } channel.configureBlocking(false); REPHandler handler = normalHandler; channel.register(selector, ops, handler); } public void manage(REPSocketChannel<REPCommand> channel, REPCommand receivedCommand) throws IOException { if(receivedCommand == null) return; //Session session; REPSocketChannel<REPCommand> send = channel; switch(receivedCommand.cmd){ // Editor Command case REPCMD_DELETE: case REPCMD_INSERT: case REPCMD_NOP: { // sid から Session を取得 Session session = getSession(receivedCommand.sid); if (session==null) throw new IOException(); // 次のエディタへコマンドを送信する処理 Editor editor = session.getEditor(channel); editor.translate(session.getNextEditor(editor), receivedCommand); break; } case SMCMD_JOIN: { //どのSessionにも属さないエディタをリストに追加 //エディタとchannelは1対1 (ではない) //エディタが新しくputする場合は新しくソケットを作る // ここのeditorList はsessionのとは別物 Editor editor = new Editor(this,editorList.size(),channel); editor.setHost(myHost); editorList.add(editor); updateGUI(); } break; case SMCMD_JOIN_ACK: assert (false); break; case SMCMD_PUT: { //エディタのリストに追加 Editor editor = new Editor(this,editorList.size(), channel); //editorList.add(editor); //Sessionを生成 int sid = sessionList.size(); editor = new Editor(this,0, channel); editor.setHost(myHost); Session session = new Session(sid, receivedCommand.string, editor); session.hasOwner(true); sessionList.add(session); updateGUI(); //エディタにAckを送信 REPCommand sendCommand = new REPCommand(receivedCommand); sendCommand.setCMD(REP.SMCMD_PUT_ACK); sendCommand.setEID(editor.getEID()); sendCommand.setSID(session.getSID()); editor.send(sendCommand); //他のSessionManagerへSessionの追加を報告 //親に送って、親から子へ SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session); REPCommand command = new REPCommand(); command.setSID(session.getSID()); command.setString(sessionEncoder.sessionListToXML()); command.setCMD(REP.SMCMD_UPDATE); smList.sendExcept(channel, command); } break; // SELECT is no longer used in a editor. Select // operation is handled in Session Manager Only case SMCMD_SELECT: { //他のSessionManagerをエディタとしてSessionに追加 Editor editor = new Editor(this,0,channel); Session session = getSession(receivedCommand.sid); session.addEditor(editor); if(session.hasOwner()){ //このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す REPCommand sendCommand = new REPCommand(receivedCommand); sendCommand.setCMD(REP.SMCMD_SELECT_ACK); sendCommand.setEID(editor.getEID()); editor.send(sendCommand); }else{ //オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する Forwarder owner = session.getOwner(); owner.send(receivedCommand); } } break; case SMCMD_SELECT_ACK: { String hostport = receivedCommand.string; Forwarder editor = getEditor(hostport); if(editor != null) { //host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する REPCommand command = new REPCommand(); command.setCMD(REP.SMCMD_JOIN_ACK); command.setSID(receivedCommand.sid); command.setEID(receivedCommand.eid); editor.send(command); }else{ //自分が送信したコマンドでなければ、次のSessionManagerへ中継する smList.sendExcept(channel, receivedCommand); } } break; // Session Manager Command case SMCMD_SM_JOIN: { // このchannelの相手は、SessionManager なので、 // 特別なhandlerを接続する必要がある channel.register(selector, SelectionKey.OP_READ, new REPSessionManagerHandler(this)); //SessionManagerのリストへ追加 smList.add(channel); //XMLからSessionListオブジェクトを生成する。 SessionXMLDecoder decoder = new SessionXMLDecoder(); SessionList receivedSessionList = decoder.decode(receivedCommand.string); //myHost を設定。 //立ち上げ時にやるとlocalhostしか取れない if(myHost == null) setMyHostName(getLocalHostName(channel)); //maxHost を設定。 if(setMaxHost(channel, receivedSessionList.getMaxHost())){ REPCommand sendCommand = new REPCommand(); sendCommand.setCMD(REP.SMCMD_CH_MASTER); sendCommand.setString(maxHost); smList.sendExcept(channel, sendCommand); } //SessionListからXMLを生成。 //joinしてきたSessionManagerに対してACKを送信。 SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList); REPCommand sendCommand = new REPCommand(); sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK); sendCommand.setString(sessionlistEncoder.sessionListToXML()); send.write(sendCommand); //その他の SessionManager に対して SMCMD_UPDATEを 送信。 sendCommand = new REPCommand(); sendCommand.setCMD(REP.SMCMD_UPDATE); sendCommand.setString(receivedCommand.string); smList.sendExcept(channel, sendCommand); } break; case SMCMD_SM_JOIN_ACK: //XMLからSessionListオブジェクトを生成。 SessionXMLDecoder decoder2 = new SessionXMLDecoder(); SessionList receivedSessionList2 = decoder2.decode(receivedCommand.string); //maxHostを決定。 if(setMaxHost(channel, receivedSessionList2.getMaxHost())){ REPCommand sendCommand = new REPCommand(); sendCommand.setCMD(REP.SMCMD_CH_MASTER); sendCommand.setString(maxHost); smList.sendExcept(channel, sendCommand); } break; case SMCMD_UPDATE: { SessionXMLDecoder decoder3 = new SessionXMLDecoder(); SessionList receivedSessionList3 = decoder3.decode(receivedCommand.string); //UPDATEコマンドにより送られてきたSessionの情報を追加する LinkedList<Session> list = receivedSessionList3.getList(); for(Session session : list){ session.getEditorList().get(0).setChannel(channel); sessionList.add(session); } //他のSessionManagerへ中継する smList.sendExcept(channel, receivedCommand); //リストのコピーをGUIに渡す LinkedList<Session> sList = new LinkedList<Session>(sessionList); LinkedList<Editor> eList = new LinkedList<Editor>(editorList); //GUIに反映 Runnable doRun = new DoGUIUpdate(sList, eList, gui); gui.invokeLater(doRun); } break; case SMCMD_UPDATE_ACK: { if(receivedCommand.sid > sessionList.size()){ Editor editor = new Editor(this,0,channel); editor.setName(receivedCommand.string); Session session = new Session(editor); session.addEditor(editor); sessionList.add(session); //リストのコピーをGUIに渡す LinkedList<Session> sList = new LinkedList<Session>(sessionList); LinkedList<Editor> eList = new LinkedList<Editor>(editorList); //GUIに反映 Runnable doRun = new DoGUIUpdate(sList, eList, gui); gui.invokeLater(doRun); } smList.sendToSlave(receivedCommand); } break; case SMCMD_CH_MASTER: { //maxHost を設定。 if(setMaxHost(channel, receivedCommand.string)){ REPCommand sendCommand = new REPCommand(); sendCommand.setCMD(REP.SMCMD_CH_MASTER); sendCommand.setString(maxHost); smList.sendExcept(channel, sendCommand); } } break; case SMCMD_START_MERGE_ACK: { // sid から Session を取得 Session session = getSession(receivedCommand.sid); if (session==null) throw new IOException(); // マージの処理と次のエディタへコマンドを送信する処理 Editor editor = session.getEditor(channel); if (!editor.merge(editor,receivedCommand)) { // nothing to do, send END_MERGE editor.endMerge(); } break; } case SMCMD_QUIT: { Session session = getSession(receivedCommand.sid); if (session==null) throw new IOException(); session.sendToNextEditor(channel,receivedCommand); break; } case SMCMD_QUIT_2: { Session session = getSession(receivedCommand.sid); if (session==null) throw new IOException(); Forwarder me = session.getEditor(channel); if (me==null) break; // already removed. Forwarder editor = session.getNextEditor(me); // don't send quit2 to the editor until all pending // merge is processed. editor.setQuit2(receivedCommand); break; } default: assert(false); break; } } private void updateGUI() { //リストのコピーをGUIに渡す LinkedList<Session> sList = new LinkedList<Session>(sessionList); LinkedList<Editor> eList = new LinkedList<Editor>(editorList); //GUIに反映 Runnable doRun = new DoGUIUpdate(sList, eList, gui); gui.invokeLater(doRun); } private Forwarder getEditor(String hostport) { for(Editor editor : editorList){ if(editor.getHost() == hostport){ return editor; } } return null; } public Session getSession(int sid) throws IOException { for(Session session : sessionList){ if(session.getSID() == sid) return session; } throw new IOException(); } private boolean setMaxHost(REPSocketChannel<REPCommand> channel, String maxHost2) { if(maxHost.compareTo(maxHost2) > 0){ return false; }else{ maxHost = maxHost2; return true; } } private void setMyHostName(String localHostName) { myHost = localHostName + receive_port; if(maxHost == null) { maxHost = myHost; } setHostToEditor(myHost); } private void setHostToEditor(String myHost2) { for(Editor editor : editorList){ editor.setHost(myHost2); } } public void connectSession(String host) { int port = DEFAULT_PORT; port = parent_port; InetSocketAddress addr = new InetSocketAddress(host, port); try { REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create(new REPCommandPacker()); sessionchannel.configureBlocking(true); sessionchannel.connect(addr); while(!sessionchannel.finishConnect()){ System.out.print("test afro"); } System.out.println(""); registerChannel(sessionchannel, SelectionKey.OP_READ); sm_join(sessionchannel); }catch (IOException e) { e.printStackTrace(); } } private void sm_join(REPSocketChannel<REPCommand> channel){ //SM_JOINコマンドを生成。 REPCommand command = new REPCommand(); command.setCMD(REP.SMCMD_SM_JOIN); //hostnameをセット。 setMyHostName(getLocalHostName(channel)); //XMLを生成。送信コマンドにセット。 SessionXMLEncoder encoder = new SessionXMLEncoder(sessionList); String string = encoder.sessionListToXML(); command.setString(string); //SM_JOINコマンドを送信。 channel.write(command); //SessionManagerのListに追加。 smList.add(channel); } private String getLocalHostName(REPSocketChannel<?> channel) { String host = null; host = channel.socket().getLocalAddress().getHostName(); return host; } public void selectSession(SelectButtonEvent event) throws IOException { REPSocketChannel<REPCommand> channel = event.getEditorChannel(); int sid = event.getSID(); int eid = event.getEID(); Session session = getSession(sid); Editor editor = editorList.get(eid); if(editor == null){ System.out.println("SessionManager.selectSession():editor = " + editor); return; } session.addEditor(editor); System.out.println("SessionManager.session.hasOnwer="+session.hasOwner()); if(session.hasOwner()){ REPCommand sendCommand = new REPCommand(); sendCommand.setCMD(REP.SMCMD_JOIN_ACK); sendCommand.setEID(editor.getEID()); sendCommand.setSID(sid); sendCommand.string = ""; channel.write(sendCommand); }else { sid = event.getSID(); editor = new Editor(this,0,channel); editor.setHost(myHost); session = getSession(sid); session.addEditor(editor); Forwarder owner = session.getOwner(); REPCommand command = new REPCommand(); command.setCMD(REP.SMCMD_SELECT); command.setSID(sid); command.setString(editor.getHost()); owner.send(command); } } public void addWaitingCommand(PacketSet set) { waitingCommandInMerge.add(set); } public void buttonPressed(SessionManagerEvent event) { try { waitingEventQueue.put(event); } catch (InterruptedException e) {} selector.wakeup(); } public void syncExec(SessionManagerEvent event) { try { waitingEventQueue.put(event); } catch (InterruptedException e) { } } public void closeSession(SessionManagerEvent event) { Session session = ((CloseButtonEvent) event).getSession(); session.closeSession(); sessionList.remove(session); updateGUI(); } public void remove(REPSocketChannel<REPCommand> channel) { for(Session s:sessionList) { if (s.deleteEditor(channel)) { return ; } } assert(false); // can be other session manager? what should I do? } public void addWriteQueue(PacketSet packetSet) { writeQueue.add(packetSet); } public void remove(Editor editor) { for(Session s:sessionList) { s.deleteForwarder(editor); } //assert(false); } }