0
|
1 package rep;
|
|
2
|
|
3 import java.io.IOException;
|
|
4 import java.net.InetSocketAddress;
|
11
|
5 import java.net.SocketAddress;
|
111
|
6 import java.nio.ByteBuffer;
|
|
7 import java.nio.CharBuffer;
|
2
|
8 import java.nio.channels.SelectableChannel;
|
0
|
9 import java.nio.channels.SelectionKey;
|
|
10 import java.nio.channels.Selector;
|
|
11 import java.nio.channels.ServerSocketChannel;
|
|
12 import java.nio.channels.SocketChannel;
|
111
|
13 import java.nio.charset.CharacterCodingException;
|
|
14 import java.nio.charset.Charset;
|
|
15 import java.nio.charset.CharsetEncoder;
|
83
|
16 import java.util.LinkedList;
|
144
|
17 import java.util.List;
|
|
18 import java.util.Set;
|
15
|
19 import java.util.StringTokenizer;
|
0
|
20
|
123
|
21 import rep.channel.REPServerSocketChannel;
|
133
|
22 import rep.channel.REPSocketChannel;
|
146
|
23 import rep.channel.SelectionKeySimulator;
|
|
24 import rep.channel.SelectorSimulator;
|
144
|
25 import rep.handler.PacketSet;
|
146
|
26 import rep.handler.REPHandler;
|
148
|
27 import rep.handler.REPHandlerImpl;
|
164
|
28 import rep.handler.REPHandlerInMerge;
|
158
|
29 import rep.channel.REPSelector;
|
56
|
30 import rep.xml.SessionXMLDecoder;
|
45
|
31 import rep.xml.SessionXMLEncoder;
|
|
32
|
1
|
33 //+-------+--------+--------+-------+--------+---------+------+
|
|
34 //| cmd | session| editor | seqid | lineno | textsiz | text |
|
|
35 //| | id | id | | | | |
|
|
36 //+-------+--------+--------+-------+--------+---------+------+
|
|
37 //o-------header section (network order)-------------o
|
|
38 /*int cmd; // command
|
101
|
39 int sid; // session ID : uniqu to editing file
|
123
|
40 int eid; // editor ID : owner editor ID = 1。Session に対して unique
|
122
|
41 int seqno; // Sequence number : sequence number はエディタごとに管理
|
1
|
42 int lineno; // line number
|
101
|
43 int textsize; // textsize : bytesize
|
1
|
44 byte[] text;*/
|
|
45
|
8
|
46 public class SessionManager implements ConnectionListener, REPActionListener{
|
0
|
47
|
|
48
|
163
|
49 //private SessionList sessionlist;
|
164
|
50 private LinkedList<Session> sessionList;
|
83
|
51 private SessionManagerGUI gui;
|
2
|
52 private Selector selector;
|
7
|
53 private SessionManagerList smList;
|
17
|
54 private String myHost;
|
21
|
55 private boolean isMaster = true;
|
160
|
56 //private EditorList ownEditorList;
|
144
|
57 private List<Editor> editorList;
|
78
|
58 private String maxHost;
|
148
|
59 private boolean isSimulation;
|
155
|
60 private List<PacketSet> packetSetList;
|
163
|
61 //private List<SessionManagerNode> managerList;
|
95
|
62 private static int temp_port;
|
|
63 private static int send_port;
|
101
|
64
|
|
65 static final int DEFAULT_PORT = 8766;
|
|
66
|
2
|
67 public SessionManager(int port) {
|
83
|
68 gui = new SessionManagerGUI();
|
2
|
69 }
|
|
70
|
|
71 public void openSelector() throws IOException{
|
123
|
72 selector = REPSelector.open();
|
2
|
73 }
|
0
|
74
|
155
|
75 public void init(int port) throws InterruptedException, IOException {
|
2
|
76
|
148
|
77 REPServerSocketChannel<REPCommand> ssc = REPServerSocketChannel.<REPCommand>open();
|
122
|
78 ssc.configureBlocking(false); //reuse address 必須
|
101
|
79 ssc.socket().setReuseAddress(true);
|
0
|
80 ssc.socket().bind(new InetSocketAddress(port));
|
|
81 ssc.register(selector, SelectionKey.OP_ACCEPT);
|
6
|
82
|
|
83
|
163
|
84 //sessionlist = new SessionList();
|
144
|
85 sessionList = new LinkedList<Session>();
|
7
|
86 smList = new SessionManagerList();
|
162
|
87 //ownEditorList = new EditorList();
|
144
|
88 editorList = new LinkedList<Editor>();
|
155
|
89 packetSetList = new LinkedList<PacketSet>();
|
0
|
90
|
155
|
91 // main loop
|
|
92 mainLoop();
|
|
93 }
|
|
94
|
|
95 private void mainLoop() throws IOException {
|
0
|
96 while(true){
|
|
97 selector.select();
|
144
|
98 select();
|
|
99 }
|
|
100 }
|
|
101
|
|
102 private void select() throws IOException {
|
148
|
103 for(SelectionKey key : selector.selectedKeys()){
|
144
|
104 if(key.isAcceptable()){
|
|
105 /*** serverChannelはenableになったSelectionKeyのchannel ***/
|
146
|
106 REPServerSocketChannel serverChannel = (REPServerSocketChannel)key.channel();
|
|
107 REPSocketChannel channel = serverChannel.accept(); //keyからchannelを取って、accept
|
144
|
108 registerChannel (selector, channel, SelectionKey.OP_READ);
|
|
109 channel = null;
|
123
|
110
|
144
|
111 }else if(key.isReadable()){
|
|
112
|
146
|
113 // REPSocketChannel<REPCommand> channel = (REPSocketChannel<REPCommand>)key.channel();
|
|
114 // REPPacketReceive receive = new REPPacketReceive(channel);
|
|
115 // receive.setkey(key);
|
|
116 // REPCommand receivedCommand = receive.unpackUConv();
|
|
117 // manage(channel, receivedCommand);
|
144
|
118
|
146
|
119 REPHandler handler = (REPHandler)key.attachment();
|
|
120 handler.handle(key);
|
144
|
121
|
|
122 }else if(key.isConnectable()){
|
|
123 System.out.println("Connectable");
|
0
|
124 }
|
|
125 }
|
|
126 }
|
1
|
127
|
2
|
128 private synchronized void registerChannel(Selector selector, SelectableChannel channel, int ops) throws IOException {
|
|
129 if(channel == null) {
|
|
130 return;
|
|
131 }
|
|
132 channel.configureBlocking(false);
|
6
|
133 selector.wakeup();
|
148
|
134 REPHandler handler = new REPHandlerImpl(this);
|
|
135 channel.register(selector, ops, handler);
|
2
|
136 }
|
|
137
|
144
|
138 public void manage(REPSocketChannel<REPCommand> channel, REPCommand receivedCommand) {
|
75
|
139 if(receivedCommand == null) return;
|
158
|
140 //Session session;
|
141
|
141 REPCommand sendCommand = new REPCommand(receivedCommand);
|
78
|
142 REPPacketSend send = new REPPacketSend(channel);
|
144
|
143
|
75
|
144 switch(receivedCommand.cmd){
|
144
|
145
|
0
|
146 case REP.SMCMD_JOIN:
|
164
|
147 {
|
|
148 //どのSessionにも属さないエディタをリストに追加
|
|
149 Editor editor = new Editor(editorList.size(), channel);
|
|
150 editor.setHost(myHost);
|
|
151 editorList.add(editor);
|
144
|
152
|
164
|
153 //GUIに反映
|
|
154 gui.setComboEditor(editor.getEID(), channel);
|
|
155 }
|
|
156
|
|
157
|
|
158 break;
|
144
|
159
|
1
|
160 case REP.SMCMD_JOIN_ACK:
|
144
|
161
|
1
|
162 break;
|
144
|
163
|
0
|
164 case REP.SMCMD_PUT:
|
164
|
165 {
|
|
166 //エディタのリストに追加
|
|
167 Editor editor = new Editor(editorList.size(), channel);
|
|
168 editorList.add(editor);
|
|
169
|
|
170 //Sessionを生成
|
|
171 int sid = sessionList.size();
|
|
172 editor = new Editor(0, channel);
|
|
173 editor.setHost(myHost);
|
|
174 Session session = new Session(sid, editor);
|
|
175 session.hasOwner(true);
|
|
176 sessionList.add(new Session(sid, editor));
|
|
177
|
|
178 //GUIに反映
|
|
179 gui.setComboSession(session.getSID(), session.getName());
|
|
180 gui.setComboEditor(editor.getEID(), editor.getChannel());
|
158
|
181
|
164
|
182 //エディタにAckを送信
|
|
183 sendCommand.setCMD(REP.SMCMD_PUT_ACK);
|
|
184 sendCommand.setEID(editor.getEID());
|
|
185 sendCommand.setSID(session.getSID());
|
|
186 editor.send(sendCommand);
|
144
|
187
|
164
|
188 //他のSessionManagerへSessionの追加を報告
|
|
189 SessionXMLEncoder sessionEncoder = new SessionXMLEncoder(session);
|
|
190 REPCommand command = new REPCommand();
|
|
191 command.setSID(session.getSID());
|
|
192 command.setString(sessionEncoder.sessionListToXML());
|
|
193 command.setCMD(REP.SMCMD_UPDATE);
|
|
194 smList.sendExcept(channel, command);
|
|
195
|
|
196 }
|
|
197
|
|
198 break;
|
133
|
199
|
0
|
200 case REP.SMCMD_SELECT:
|
164
|
201 {
|
|
202 //エディタをSessionに追加
|
|
203 Editor editor = new Editor(channel);
|
|
204 Session session = getSession(receivedCommand.sid);
|
|
205 session.addEditor(editor);
|
|
206
|
|
207 if(session.hasOwner()){
|
|
208 //このSessionManagerがオーナーを持っている場合、Sessionにエディタを追加し、エディタへAckを返す
|
|
209 sendCommand.setCMD(REP.SMCMD_SELECT_ACK);
|
|
210 sendCommand.setEID(editor.getEID());
|
|
211 editor.send(sendCommand);
|
|
212 }else{
|
|
213 //オーナーを持ってない場合は、オーナーを持っているSessionManagerへSELECTコマンドを中継する
|
|
214 Editor owner = session.getOwner();
|
|
215 owner.send(receivedCommand);
|
148
|
216 }
|
164
|
217 }
|
144
|
218
|
164
|
219 break;
|
144
|
220
|
8
|
221 case REP.SMCMD_SELECT_ACK:
|
160
|
222 {
|
85
|
223 String hostport = receivedCommand.string;
|
160
|
224 Editor editor = getEditor(hostport);
|
164
|
225
|
160
|
226 if(editor != null) {
|
|
227 //host, port を見て、このコマンドが自分が送信したSelectコマンドのAckかどうかを判断する
|
|
228 REPCommand command = new REPCommand();
|
|
229 command.setCMD(REP.SMCMD_JOIN_ACK);
|
|
230 command.setSID(receivedCommand.sid);
|
|
231 command.setEID(receivedCommand.eid);
|
|
232 editor.send(command);
|
164
|
233
|
85
|
234 }else{
|
160
|
235 //自分が送信したコマンドでなければ、次のSessionManagerへ中継する
|
85
|
236 smList.sendExcept(channel, receivedCommand);
|
|
237 }
|
160
|
238 }
|
144
|
239
|
164
|
240 break;
|
144
|
241
|
8
|
242 case REP.SMCMD_SM_JOIN:
|
164
|
243
|
160
|
244 {
|
122
|
245 //SessionManagerのリストへ追加
|
83
|
246 smList.add(channel);
|
144
|
247
|
122
|
248 //XMLからSessionListオブジェクトを生成する。
|
77
|
249 SessionXMLDecoder decoder = new SessionXMLDecoder();
|
79
|
250 SessionList receivedSessionList = decoder.decode(receivedCommand.string);
|
144
|
251
|
122
|
252 //SessionListへ追加し変換テーブルを生成する。
|
163
|
253 //sessionlist.update(channel, receivedSessionList);
|
144
|
254
|
122
|
255 //myHost を設定。
|
76
|
256 if(myHost == null) setMyHostName(getLocalHostName(channel));
|
144
|
257
|
122
|
258 //maxHost を設定。
|
95
|
259 if(setMaxHost(channel, receivedSessionList.getMaxHost())){
|
|
260 sendCommand = new REPCommand();
|
|
261 sendCommand.setCMD(REP.SMCMD_CH_MASTER);
|
|
262 sendCommand.setString(maxHost);
|
|
263 smList.sendExcept(channel, sendCommand);
|
|
264 }
|
144
|
265
|
122
|
266 //SessionListからXMLを生成。
|
|
267 //joinしてきたSessionManagerに対してACKを送信。
|
164
|
268 SessionXMLEncoder sessionlistEncoder = new SessionXMLEncoder(sessionList);
|
78
|
269 sendCommand = new REPCommand();
|
|
270 sendCommand.setCMD(REP.SMCMD_SM_JOIN_ACK);
|
|
271 sendCommand.setString(sessionlistEncoder.sessionListToXML());
|
|
272 send.send(sendCommand);
|
144
|
273
|
122
|
274 //その他の SessionManager に対して SMCMD_UPDATEを 送信。
|
78
|
275 sendCommand = new REPCommand();
|
83
|
276 sendCommand.setCMD(REP.SMCMD_UPDATE);
|
78
|
277 sendCommand.setString(receivedCommand.string);
|
|
278 smList.sendExcept(channel, sendCommand);
|
144
|
279
|
160
|
280 }
|
164
|
281 break;
|
144
|
282
|
8
|
283 case REP.SMCMD_SM_JOIN_ACK:
|
144
|
284
|
122
|
285 //XMLからSessionListオブジェクトを生成。
|
82
|
286 SessionXMLDecoder decoder2 = new SessionXMLDecoder();
|
|
287 SessionList receivedSessionList2 = decoder2.decode(receivedCommand.string);
|
144
|
288
|
122
|
289 //maxHostを決定。
|
95
|
290 if(setMaxHost(channel, receivedSessionList2.getMaxHost())){
|
|
291 sendCommand = new REPCommand();
|
|
292 sendCommand.setCMD(REP.SMCMD_CH_MASTER);
|
|
293 sendCommand.setString(maxHost);
|
|
294 smList.sendExcept(channel, sendCommand);
|
|
295 }
|
144
|
296
|
6
|
297 break;
|
144
|
298
|
8
|
299 case REP.SMCMD_UPDATE:
|
144
|
300
|
99
|
301 SessionXMLDecoder decoder3 = new SessionXMLDecoder();
|
|
302 SessionList receivedSessionList3 = decoder3.decode(receivedCommand.string);
|
144
|
303
|
122
|
304 //SessionListへ追加し変換テーブルを生成する。
|
163
|
305 //sessionlist.update(channel, receivedSessionList3);
|
144
|
306
|
99
|
307 smList.sendExcept(channel, receivedCommand);
|
144
|
308
|
100
|
309 for(Session session3 : receivedSessionList3.getList()){
|
|
310 gui.setComboSession(session3.getSID(), session3.getName());
|
|
311 }
|
144
|
312
|
9
|
313 break;
|
144
|
314
|
9
|
315 case REP.SMCMD_UPDATE_ACK:
|
164
|
316 if(receivedCommand.sid > sessionList.size()){
|
148
|
317 Editor editor = new Editor(channel);
|
75
|
318 editor.setName(receivedCommand.string);
|
144
|
319
|
158
|
320 Session session = new Session(editor);
|
73
|
321 session.addEditor(editor);
|
144
|
322
|
164
|
323 sessionList.add(session);
|
144
|
324
|
83
|
325 gui.setComboSession(session.getSID(), session.getName());
|
73
|
326 }
|
75
|
327 smList.sendToSlave(receivedCommand);
|
1
|
328 break;
|
144
|
329
|
|
330 // case REP.REPCMD_READ:
|
108
|
331 // //sessionlist.sendCmd(channel, repCmd);
|
|
332 // break;
|
144
|
333
|
95
|
334 case REP.SMCMD_CH_MASTER:
|
122
|
335 //maxHost を設定。
|
95
|
336 if(setMaxHost(channel, receivedCommand.string)){
|
|
337 sendCommand = new REPCommand();
|
|
338 sendCommand.setCMD(REP.SMCMD_CH_MASTER);
|
|
339 sendCommand.setString(maxHost);
|
|
340 smList.sendExcept(channel, sendCommand);
|
|
341 }
|
|
342 break;
|
144
|
343
|
0
|
344 default:
|
164
|
345 {
|
144
|
346 //sid から Session を取得
|
158
|
347 Session session = getSession(receivedCommand.sid);
|
144
|
348 //マージの処理と次のエディタへコマンドを送信する処理
|
|
349 session.translate(channel, receivedCommand);
|
164
|
350
|
|
351 Editor editor = getEditor(channel);
|
|
352 if(editor.isMerging()){
|
|
353 //Handlerを切り替える
|
|
354 setMergeState(channel, selector);
|
|
355 }
|
|
356 }
|
144
|
357 break;
|
|
358 }
|
|
359 }
|
|
360
|
164
|
361 private void setMergeState(REPSocketChannel<REPCommand> channel, Selector selector2) {
|
|
362 SelectionKey key = channel.keyFor(selector2);
|
|
363 key.attach(new REPHandlerInMerge(this));
|
|
364 }
|
|
365
|
160
|
366 private Editor getEditor(String hostport) {
|
|
367 return null;
|
|
368 }
|
|
369
|
144
|
370 private Editor getEditor(REPSocketChannel<REPCommand> channel) {
|
|
371 // TODO Auto-generated method stub
|
|
372 for(Editor editor : editorList){
|
|
373 if(editor.getChannel() == channel){
|
|
374 return editor;
|
122
|
375 }
|
0
|
376 }
|
144
|
377 return null;
|
|
378 }
|
|
379
|
|
380 private Session getSession(int sid) {
|
|
381 for(Session session : sessionList){
|
|
382 if(session.getSID() == sid) return session;
|
|
383 }
|
|
384 return null;
|
0
|
385 }
|
83
|
386
|
139
|
387 private boolean setMaxHost(REPSocketChannel channel, String maxHost2) {
|
|
388 // TODO Auto-generated method stub
|
|
389 return false;
|
|
390 }
|
|
391
|
76
|
392 private void setMyHostName(String localHostName) {
|
95
|
393 myHost = localHostName + temp_port;
|
81
|
394 if(maxHost == null) {
|
|
395 maxHost = myHost;
|
|
396 }
|
164
|
397 setHostToEditor(myHost);
|
|
398 }
|
|
399
|
|
400 private void setHostToEditor(String myHost2) {
|
|
401 for(Editor editor : editorList){
|
|
402 editor.setHost(myHost2);
|
|
403 }
|
76
|
404 }
|
0
|
405
|
|
406 public static void main(String[] args) throws InterruptedException, IOException {
|
101
|
407 int port = DEFAULT_PORT;
|
|
408 int port_s = DEFAULT_PORT;
|
113
|
409 //System.setProperty("file.encoding", "UTF-8");
|
82
|
410 if(args.length > 0){
|
39
|
411 port = Integer.parseInt(args[0]);
|
95
|
412 port_s = Integer.parseInt(args[1]);
|
0
|
413 }
|
95
|
414 temp_port = port;
|
|
415 send_port = port_s;
|
0
|
416 SessionManager sm = new SessionManager(port);
|
2
|
417 sm.openSelector();
|
|
418 sm.openWindow();
|
155
|
419 sm.init(port);
|
|
420 sm.mainLoop();
|
0
|
421 }
|
|
422
|
2
|
423 private void openWindow() {
|
83
|
424 Thread th = new Thread( gui );
|
2
|
425 th.start();
|
75
|
426 //System.out.println(sessionmanagerGUI.toString());
|
83
|
427 gui.addConnectionListener(this);
|
|
428 gui.addREPActionListener(this);
|
2
|
429 }
|
|
430
|
|
431 private void connectSession(String host) {
|
101
|
432 int port = DEFAULT_PORT;
|
95
|
433 port = send_port;
|
1
|
434 InetSocketAddress addr = new InetSocketAddress(host, port);
|
|
435 try {
|
164
|
436 REPSocketChannel<REPCommand> sessionchannel = REPSocketChannel.<REPCommand>create();
|
1
|
437 sessionchannel.configureBlocking(true);
|
|
438 sessionchannel.connect(addr);
|
6
|
439 while(!sessionchannel.finishConnect()){
|
77
|
440 System.out.print("test afro");
|
6
|
441 }
|
|
442 System.out.println("");
|
2
|
443 registerChannel(selector, sessionchannel, SelectionKey.OP_READ);
|
45
|
444
|
77
|
445 sm_join(sessionchannel);
|
45
|
446
|
1
|
447 }catch (IOException e) {
|
|
448 e.printStackTrace();
|
|
449 }
|
|
450 }
|
77
|
451
|
164
|
452 private void sm_join(REPSocketChannel<REPCommand> channel){
|
79
|
453
|
122
|
454 //SM_JOINコマンドを生成。
|
77
|
455 REPCommand command = new REPCommand();
|
|
456 command.setCMD(REP.SMCMD_SM_JOIN);
|
79
|
457
|
122
|
458 //hostnameをセット。
|
82
|
459 setMyHostName(getLocalHostName(channel));
|
|
460
|
122
|
461 //XMLを生成。送信コマンドにセット。
|
164
|
462 SessionXMLEncoder encoder = new SessionXMLEncoder(sessionList);
|
77
|
463 String string = encoder.sessionListToXML();
|
|
464 command.setString(string);
|
|
465
|
122
|
466 //SM_JOINコマンドを送信。
|
77
|
467 REPPacketSend send = new REPPacketSend(channel);
|
|
468 send.send(command);
|
|
469
|
122
|
470 //SessionManagerのListに追加。
|
77
|
471 smList.add(channel);
|
|
472 }
|
2
|
473
|
139
|
474 private String getLocalHostName(REPSocketChannel channel) {
|
74
|
475 String host = null;
|
|
476 host = channel.socket().getLocalAddress().getHostName();
|
|
477 return host;
|
|
478 }
|
|
479
|
2
|
480 public void connectionOccured(ConnectionEvent event) {
|
|
481 connectSession(event.getHost());
|
|
482 }
|
8
|
483
|
|
484 public void ActionOccured(REPActionEvent event) {
|
104
|
485
|
163
|
486 REPSocketChannel<REPCommand> channel = event.getEditorChannel();
|
107
|
487 int sid = event.getSID();
|
164
|
488 Session session = getSession(sid);
|
158
|
489 if(session.hasOwner()){
|
148
|
490 Editor editor = new Editor(channel);
|
|
491 session.addEditor(new Editor(channel));
|
107
|
492 REPCommand sendCommand = new REPCommand();
|
|
493 sendCommand.setCMD(REP.SMCMD_JOIN_ACK);
|
148
|
494 sendCommand.setEID(editor.getEID());
|
107
|
495 sendCommand.setSID(sid);
|
|
496 REPPacketSend sender = new REPPacketSend(channel);
|
|
497 sender.send(sendCommand);
|
|
498 }else {
|
164
|
499 REPSocketChannel<REPCommand> editorChannel = event.getEditorChannel();
|
107
|
500 sid = event.getSID();
|
|
501 Editor editor = new Editor(editorChannel);
|
|
502 editor.setHost(myHost);
|
164
|
503 session = getSession(sid);
|
107
|
504 session.addEditor(editor);
|
|
505
|
158
|
506 Editor owner = session.getOwner();
|
107
|
507
|
|
508 REPCommand command = new REPCommand();
|
|
509 command.setCMD(REP.SMCMD_SELECT);
|
|
510 command.setSID(sid);
|
|
511 command.setString(editor.getHost() + ":" + editor.getPort());
|
|
512 owner.send(command);
|
|
513 }
|
72
|
514
|
|
515
|
8
|
516 }
|
122
|
517
|
144
|
518 public void addWaitingCommand(PacketSet set) {
|
|
519 // TODO Auto-generated method stub
|
155
|
520 packetSetList.add(set);
|
144
|
521 }
|
148
|
522
|
|
523 public void undo() {
|
|
524 // TODO Auto-generated method stub
|
|
525
|
|
526 }
|
0
|
527 }
|