0
|
1
|
|
2 package fdl;
|
|
3 import java.io.IOException;
|
|
4 import java.nio.ByteBuffer;
|
|
5 import java.nio.ByteOrder;
|
|
6 import java.nio.channels.ClosedChannelException;
|
|
7 import java.nio.channels.SelectionKey;
|
|
8 import java.nio.channels.SocketChannel;
|
|
9 import java.util.Hashtable;
|
|
10 import java.util.LinkedList;
|
|
11
|
|
12
|
|
13 public class IOHandler extends IOParam {
|
|
14 static final boolean debug = false;
|
|
15 public Tuple[] tuple_space;
|
|
16 public ComDebug com_debug;
|
|
17 //public static Hashtable<String, Integer> com_Loggingtable = new Hashtable<String, Integer>();
|
|
18 //public static LinkedList<SocketChannel> reportCh_list = new LinkedList<SocketChannel>();
|
|
19
|
|
20 public IOHandler(Tuple[] _tuple_space) {
|
|
21 super(_tuple_space);
|
|
22 }
|
|
23
|
|
24 public void handle(SelectionKey key)
|
|
25 throws ClosedChannelException, IOException {
|
|
26 // 書き込み可であれば,読み込みを行う
|
|
27 if (key.isReadable()) {
|
|
28 read(key);
|
|
29 }
|
|
30
|
|
31 // 書き込み可であれば,書き込みを行う
|
|
32 /*if (key.isWritable() && key.isValid()) {
|
|
33 write(key);
|
|
34 }*/
|
|
35 }
|
|
36
|
|
37 private void read(SelectionKey key)
|
|
38 throws ClosedChannelException, IOException {
|
|
39 SocketChannel channel = (SocketChannel)key.channel();
|
|
40
|
|
41 // 読み込み用のバッファの生成
|
|
42 ByteBuffer command = ByteBuffer.allocate(LINDA_HEADER_SIZE);
|
|
43 command.order(ByteOrder.BIG_ENDIAN);
|
|
44 command.clear();
|
|
45
|
|
46 int readsize = LINDA_HEADER_SIZE;
|
|
47 int count = 0;
|
|
48
|
|
49 // 読み込み
|
|
50 while(readsize>0) {
|
|
51 if(debug){
|
|
52 System.out.print("reading command...");
|
|
53 }
|
|
54 count = channel.read(command);
|
|
55
|
|
56 if(count < 0) {
|
|
57 System.out.println("Connection closed by "+key.channel());
|
|
58 //System.out.println("Close channel on EOF; channel: "+channel);
|
|
59 key.cancel();
|
|
60 channel.close();
|
|
61 readsize = -1;
|
|
62 return;
|
|
63 }
|
|
64 readsize -= count;
|
|
65 }
|
|
66 command.rewind();
|
|
67
|
|
68 int len = command.getInt(LINDA_PACKET_LENGTH_OFFSET);
|
|
69 int datalen = command.getInt(LINDA_DATA_LENGTH_OFFSET);
|
|
70
|
|
71 ByteBuffer data = ByteBuffer.allocate(datalen);
|
|
72 int read = datalen;
|
|
73
|
|
74 if (debug) {
|
|
75 System.out.println("reading: " +datalen);
|
|
76 }
|
|
77
|
|
78 data.order(ByteOrder.BIG_ENDIAN);
|
|
79 data.clear();
|
|
80 while(read>0) {
|
|
81 //System.out.print("reading2...");
|
|
82 read -= channel.read(data);
|
|
83 }
|
|
84 data.rewind();
|
|
85
|
|
86 /*
|
|
87 static final int LINDA_PACKET_LENGTH_OFFSET =0;
|
|
88 static final int LINDA_MODE_OFFSET =0+4;
|
|
89 static final int LINDA_ID_OFFSET =1+4;
|
|
90 static final int LINDA_SEQ_OFFSET =3+4;
|
|
91 static final int LINDA_DATA_LENGTH_OFFSET =7+4;
|
|
92 static final int LINDA_HEADER_SIZE =12+4;
|
|
93 */
|
|
94 command.order(ByteOrder.BIG_ENDIAN);
|
|
95 command.rewind();
|
|
96
|
|
97 /*** print data ***/
|
|
98 char id = (char)command.getShort(LINDA_ID_OFFSET);
|
|
99 System.out.println("LENGTH:"+command.getInt(LINDA_PACKET_LENGTH_OFFSET)+" "+
|
|
100 "MODE:"+(char)command.get(LINDA_MODE_OFFSET)+" "+
|
|
101 "ID:"+(int)id+" "+
|
|
102 "SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" ");
|
|
103 //"SEQ:"+command.getInt(LINDA_SEQ_OFFSET)+" "+
|
|
104 //"DATA LENGTH:"+command.getInt(LINDA_DATA_LENGTH_OFFSET)+" ");
|
|
105 //System.out.println("DATA:"+data);
|
|
106 //データ処理
|
|
107 command.rewind();
|
|
108 manager_run(key, command, data, len);
|
|
109
|
|
110 key.interestOps(key.interestOps() | SelectionKey.OP_READ );
|
|
111 }
|
|
112
|
|
113
|
|
114 public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data, int len) throws IOException {
|
|
115 command.order(ByteOrder.BIG_ENDIAN);
|
|
116 int mode = command.get(LINDA_MODE_OFFSET);
|
|
117 int id = command.get(LINDA_ID_OFFSET);
|
|
118 command.rewind();
|
|
119
|
|
120 com_debug = new ComDebug();
|
|
121 Hashtable<String, Integer> com_Loggingtable = ComDebug.Com_Hashtable;
|
|
122 LinkedList<SocketChannel> reportCh_list = ComDebug.Report_Channellist;
|
|
123
|
|
124 if (debug) {
|
|
125 System.out.println("data from : "+key.channel());
|
|
126 }
|
|
127 if((mode == '!') || (len == 0)) {
|
|
128 Connection_Close(key);
|
|
129 }else if(id > PRIVILEGED_ID && id < MAX_TUPLE-1){
|
|
130 ComDebug.addChannel(key, reportCh_list);
|
|
131 }
|
|
132 else if(mode == PSX_CHECK) {
|
|
133 Check(key, command);
|
|
134 }
|
|
135 else if(mode == PSX_IN || mode == PSX_RD){
|
|
136 In_Rd(key, command, mode);
|
|
137 } else if (mode == PSX_WAIT_RD) {
|
|
138 Wait_Rd(key, command, mode);
|
|
139 } else if(mode == PSX_OUT) {
|
|
140 Out(command, data);
|
|
141 } else {
|
|
142 System.out.println("Uncorrect buffer");
|
|
143 System.exit(1);
|
|
144 }
|
|
145 //DEBUG用カウンタ
|
|
146 ComDebug.Com_inc(key, com_Loggingtable, mode);
|
|
147 System.out.println("Com_Debug:");
|
|
148 System.out.println(com_Loggingtable.toString());
|
|
149 //DEBUG用レポート
|
|
150 ComDebug.Report(reportCh_list, command, com_Loggingtable.toString());
|
|
151
|
|
152 if (key.interestOps()
|
|
153 != (SelectionKey.OP_READ)) {
|
|
154 // 読み込み操作に対する監視を行う
|
|
155 key.interestOps(SelectionKey.OP_READ );
|
|
156 }
|
|
157
|
|
158 }
|
|
159
|
|
160 private void Connection_Close(SelectionKey key) throws IOException {
|
|
161 System.out.println("Connection closed by "+key.channel());
|
|
162 SocketChannel channel = (SocketChannel)key.channel();
|
|
163 key.cancel();
|
|
164 channel.close();
|
|
165 }
|
|
166 }
|