comparison rep/SessionManager.java @ 212:e8f716498caf

*** empty log message ***
author pin
date Sat, 30 Aug 2008 15:49:42 +0900
parents 0beb2bcfefe2
children 4d9b32666ed2
comparison
equal deleted inserted replaced
211:44d502851c9e 212:e8f716498caf
48 private String myHost; 48 private String myHost;
49 private boolean isMaster = true; 49 private boolean isMaster = true;
50 private List<Editor> editorList; 50 private List<Editor> editorList;
51 private String maxHost; 51 private String maxHost;
52 private boolean isSimulation; 52 private boolean isSimulation;
53 private List<PacketSet> packetSetList; 53 private List<PacketSet> waitingCommandInMerge;
54 private BlockingQueue<SessionManagerEvent> waitingQueue; 54 private BlockingQueue<SessionManagerEvent> waitingQueue;
55 private static int temp_port; 55 private static int temp_port;
56 private static int send_port; 56 private static int send_port;
57 static final int DEFAULT_PORT = 8766; 57 static final int DEFAULT_PORT = 8766;
58 58
67 public void init(int port) throws InterruptedException, IOException { 67 public void init(int port) throws InterruptedException, IOException {
68 68
69 REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker()); 69 REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open(new REPCommandPacker());
70 ssc.configureBlocking(false); //reuse address 必須 70 ssc.configureBlocking(false); //reuse address 必須
71 ssc.socket().setReuseAddress(true); 71 ssc.socket().setReuseAddress(true);
72 ssc.socket().bind(new InetSocketAddress("localhost", port)); 72 //getAllByNameで取れた全てのアドレスに対してbindする
73 ssc.socket().bind(new InetSocketAddress(port));
73 ssc.register(selector, SelectionKey.OP_ACCEPT, new REPHandlerImpl(-1, this)); 74 ssc.register(selector, SelectionKey.OP_ACCEPT, new REPHandlerImpl(-1, this));
74 //registerChannel(selector, ssc, SelectionKey.OP_ACCEPT); 75 //registerChannel(selector, ssc, SelectionKey.OP_ACCEPT);
75 76
76 77
77 //sessionlist = new SessionList(); 78 //sessionlist = new SessionList();
78 sessionList = new LinkedList<Session>(); 79 sessionList = new LinkedList<Session>();
79 smList = new SessionManagerList(); 80 smList = new SessionManagerList();
80 //ownEditorList = new EditorList(); 81 //ownEditorList = new EditorList();
81 editorList = new LinkedList<Editor>(); 82 editorList = new LinkedList<Editor>();
82 packetSetList = new LinkedList<PacketSet>(); 83 waitingCommandInMerge = new LinkedList<PacketSet>();
83 84
84 waitingQueue = new LinkedBlockingQueue<SessionManagerEvent>(); 85 waitingQueue = new LinkedBlockingQueue<SessionManagerEvent>();
85 // main loop 86 // main loop
86 //mainLoop(); 87 //mainLoop();
87 } 88 }
92 if(selector.selectNow() > 0){ 93 if(selector.selectNow() > 0){
93 select(); 94 select();
94 } 95 }
95 continue; 96 continue;
96 } 97 }
97 selector.select(); 98 System.out.println("selected number : " + selector.select());
98 select(); 99 select();
99 } 100 }
100 } 101 }
101 102
102 private boolean checkSend() { 103 private boolean checkSend() {
103 for(Iterator<PacketSet> it = packetSetList.iterator(); it.hasNext();){ 104 for(Iterator<PacketSet> it = waitingCommandInMerge.iterator(); it.hasNext();){
104 PacketSet p = it.next(); 105 PacketSet p = it.next();
105 if(p.getEditor().isMerging()) { 106 if(p.getEditor().isMerging()) {
106 continue; 107 continue;
107 }else{ 108 }else{
108 manage(p.channel, p.command); 109 manage(p.channel, p.command);
109 it.remove(); 110 it.remove();
111 return true;
110 } 112 }
111 } 113 }
112 return false; 114 return false;
113 } 115 }
114 116
115 private void select() throws IOException { 117 private void select() throws IOException {
116 SessionManagerEvent e = waitingQueue.poll(); 118 SessionManagerEvent e;
117 if(e != null) { 119 while((e = waitingQueue.poll())!=null){
118 e.exec(); 120 e.exec();
119 } 121 }
120 for(REPSelectionKey<REPCommand> key : selector.selectedKeys1()){ 122 for(REPSelectionKey<REPCommand> key : selector.selectedKeys1()){
121 if(key.isAcceptable()){ 123 if(key.isAcceptable()){
122 /*** serverChannelはenableになったSelectionKeyのchannel ***/ 124 /*** serverChannelはenableになったSelectionKeyのchannel ***/
123 // REPServerSocketChannel serverChannel = (REPServerSocketChannel)key.channel();
124 REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker()); 125 REPSocketChannel<REPCommand> channel = key.accept(new REPCommandPacker());
125 //REPSocketChannel channel = serverChannel.accept1(); //keyからchannelを取って、accept
126 registerChannel (selector, channel, SelectionKey.OP_READ); 126 registerChannel (selector, channel, SelectionKey.OP_READ);
127 channel = null; 127 channel = null;
128 128
129 }else if(key.isReadable()){ 129 }else if(key.isReadable()){
130 130 REPHandler handler = (REPHandler)(key.attachment());
131 //REPHandler handler = (REPHandler)key.attachment();
132
133 //いんちき
134 REPHandler handler = new REPHandlerImpl(-1, this);
135 handler.handle(key); 131 handler.handle(key);
136 132
137 }else if(key.isConnectable()){ 133 }
138 System.out.println("Connectable"); 134 }
139 } 135 }
140 } 136
141 } 137 private void registerChannel(REPSelector selector, SelectableChannel channel, int ops) throws IOException {
142
143 private void registerChannel(Selector selector, SelectableChannel channel, int ops) throws IOException {
144 if(channel == null) { 138 if(channel == null) {
145 return; 139 return;
146 } 140 }
147 channel.configureBlocking(false); 141 channel.configureBlocking(false);
148 REPHandler handler = new REPHandlerImpl(-1, this); 142 REPHandler handler = new REPHandlerImpl(-1, this);
158 switch(receivedCommand.cmd){ 152 switch(receivedCommand.cmd){
159 153
160 case REP.SMCMD_JOIN: 154 case REP.SMCMD_JOIN:
161 { 155 {
162 //どのSessionにも属さないエディタをリストに追加 156 //どのSessionにも属さないエディタをリストに追加
157 //エディタとchannelは1対1
158 //エディタが新しくputする場合は新しくソケットを作る
163 Editor editor = new Editor(editorList.size(), channel); 159 Editor editor = new Editor(editorList.size(), channel);
164 editor.setHost(myHost); 160 editor.setHost(myHost);
165 editorList.add(editor); 161 editorList.add(editor);
166 162
167 //リストのコピーをGUIに渡す 163 guiUpdate();
168 LinkedList<Session> sList = new LinkedList<Session>(sessionList);
169 LinkedList<Editor> eList = new LinkedList<Editor>(editorList);
170 //GUIに反映
171 Runnable doRun = new DoGUIUpdate(sList, eList, gui);
172 SwingUtilities.invokeLater(doRun);
173 164
174 } 165 }
175 166
176 break; 167 break;
177 168
178 case REP.SMCMD_JOIN_ACK: 169 case REP.SMCMD_JOIN_ACK:
179 170 assert (false);
180 break; 171 break;
181 172
182 case REP.SMCMD_PUT: 173 case REP.SMCMD_PUT:
183 { 174 {
184 //エディタのリストに追加 175 //エディタのリストに追加
191 editor.setHost(myHost); 182 editor.setHost(myHost);
192 Session session = new Session(sid, editor); 183 Session session = new Session(sid, editor);
193 session.hasOwner(true); 184 session.hasOwner(true);
194 sessionList.add(new Session(sid, editor)); 185 sessionList.add(new Session(sid, editor));
195 186
196 //リストのコピーをGUIに渡す 187 guiUpdate();
197 LinkedList<Session> sList = new LinkedList<Session>(sessionList);
198 LinkedList<Editor> eList = new LinkedList<Editor>(editorList);
199 //GUIに反映
200 Runnable doRun = new DoGUIUpdate(sList, eList, gui);
201 SwingUtilities.invokeLater(doRun);
202 188
203 //エディタにAckを送信 189 //エディタにAckを送信
204 sendCommand.setCMD(REP.SMCMD_PUT_ACK); 190 sendCommand.setCMD(REP.SMCMD_PUT_ACK);
205 sendCommand.setEID(editor.getEID()); 191 sendCommand.setEID(editor.getEID());
206 sendCommand.setSID(session.getSID()); 192 sendCommand.setSID(session.getSID());
207 editor.send(sendCommand); 193 editor.send(sendCommand);
208 194
209 //他のSessionManagerへSessionの追加を報告 195 //他のSessionManagerへSessionの追加を報告
196 //親に送って、親から子へ
210 SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session); 197 SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session);
211 REPCommand command = new REPCommand(); 198 REPCommand command = new REPCommand();
212 command.setSID(session.getSID()); 199 command.setSID(session.getSID());
213 command.setString(sessionEncoder.sessionListToXML()); 200 command.setString(sessionEncoder.sessionListToXML());
214 command.setCMD(REP.SMCMD_UPDATE); 201 command.setCMD(REP.SMCMD_UPDATE);
268 255
269 //XMLからSessionListオブジェクトを生成する。 256 //XMLからSessionListオブジェクトを生成する。
270 SessionXMLDecoder decoder = new SessionXMLDecoder(); 257 SessionXMLDecoder decoder = new SessionXMLDecoder();
271 SessionList receivedSessionList = decoder.decode(receivedCommand.string); 258 SessionList receivedSessionList = decoder.decode(receivedCommand.string);
272 259
273 //SessionListへ追加し変換テーブルを生成する。
274 //sessionlist.update(channel, receivedSessionList);
275
276 //myHost を設定。 260 //myHost を設定。
277 //立ち上げ時にやるとlocalhostしか取れない 261 //立ち上げ時にやるとlocalhostしか取れない
278 if(myHost == null) setMyHostName(getLocalHostName(channel)); 262 if(myHost == null) setMyHostName(getLocalHostName(channel));
279 263
280 //maxHost を設定。 264 //maxHost を設定。
397 } 381 }
398 break; 382 break;
399 } 383 }
400 } 384 }
401 385
386 private void guiUpdate() {
387 //リストのコピーをGUIに渡す
388 LinkedList<Session> sList = new LinkedList<Session>(sessionList);
389 LinkedList<Editor> eList = new LinkedList<Editor>(editorList);
390 //GUIに反映
391 Runnable doRun = new DoGUIUpdate(sList, eList, gui);
392 SwingUtilities.invokeLater(doRun);
393 }
394
402 private void setNormalState(REPSocketChannel<REPCommand> channel, int sid) { 395 private void setNormalState(REPSocketChannel<REPCommand> channel, int sid) {
403 SelectionKey key = channel.keyFor(selector); 396 SelectionKey key = channel.keyFor(selector);
404 key.attach(new REPHandlerImpl(sid, this)); 397 key.attach(new REPHandlerImpl(sid, this));
405 } 398 }
406 399
456 editor.setHost(myHost2); 449 editor.setHost(myHost2);
457 } 450 }
458 } 451 }
459 452
460 public static void main(String[] args) throws InterruptedException, IOException { 453 public static void main(String[] args) throws InterruptedException, IOException {
461
462 REPServerSocketChannel.isSimulation = true;
463 454
464 int port = DEFAULT_PORT; 455 int port = DEFAULT_PORT;
465 int port_s = DEFAULT_PORT; 456 int port_s = DEFAULT_PORT;
466 //System.setProperty("file.encoding", "UTF-8"); 457 //System.setProperty("file.encoding", "UTF-8");
467 if(args.length > 0){ 458 if(args.length > 0){
538 } 529 }
539 selector.wakeup(); 530 selector.wakeup();
540 } 531 }
541 532
542 public void ActionOccured(REPActionEvent event) { 533 public void ActionOccured(REPActionEvent event) {
543 534 //selectSession(event);
535 try {
536 waitingQueue.put(event);
537 } catch (InterruptedException e) {
538 e.printStackTrace();
539 }
540 }
541
542 public void selectSession(REPActionEvent event) {
544 REPSocketChannel<REPCommand> channel = event.getEditorChannel(); 543 REPSocketChannel<REPCommand> channel = event.getEditorChannel();
545 int sid = event.getSID(); 544 int sid = event.getSID();
546 Session session = getSession(sid); 545 Session session = getSession(sid);
547 if(session.hasOwner()){ 546 if(session.hasOwner()){
548 Editor editor = new Editor(channel); 547 Editor editor = new Editor(channel);
566 command.setCMD(REP.SMCMD_SELECT); 565 command.setCMD(REP.SMCMD_SELECT);
567 command.setSID(sid); 566 command.setSID(sid);
568 command.setString(editor.getHost()); 567 command.setString(editor.getHost());
569 owner.send(command); 568 owner.send(command);
570 } 569 }
571
572
573 } 570 }
574 571
575 public void addWaitingCommand(PacketSet set) { 572 public void addWaitingCommand(PacketSet set) {
576 packetSetList.add(set); 573 waitingCommandInMerge.add(set);
577 } 574 }
578 575
579 public List<Session> getSessionList() { 576 public List<Session> getSessionList() {
580 // TODO Auto-generated method stub 577 // TODO Auto-generated method stub
581 return sessionList; 578 return sessionList;