comparison Linda/ldserv.c @ 0:0fae5658fb0b

Initial revision
author gongo
date Thu, 02 Nov 2006 08:55:19 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:0fae5658fb0b
1 /**********************************************************************/
2 /* */
3 /* Linda Server for PlayStation */
4 // $Id$
5 //
6 /* */
7 /**********************************************************************/
8 #include <stdio.h> //ヘッダーファイルの作成
9 #include <stdlib.h>
10 #include <string.h>
11 #include <unistd.h>
12 #include <fcntl.h>
13 #include <sys/uio.h>
14 #include <sys/time.h>
15 #include <sys/select.h>
16 #include <sys/stat.h>
17 #include <sys/types.h>
18 #include <sys/socket.h>
19 #include <sys/un.h>
20 #include <netinet/in.h>
21 #include <netinet/tcp.h>
22 #include <signal.h>
23 #include <termios.h>
24 #include <netdb.h>
25 #include <errno.h>
26
27 #define MAX_REQ 1
28 #define FAIL (-1)
29 #define MAX_USER 4
30 #define MAX_TUPLE 65536
31
32 char *PATHNAME = "/tmp/ldserv";
33
34 //#define DEB
35
36
37 #define BUFFSIZE 65535
38
39 /*----------------------------------------------------------------------
40 パケットフォーマット
41 char short int int
42 Mode + ID + Seq + Data_len + Padding + Data
43 0 1 3 7 11 12
44 ----------------------------------------------------------------------*/
45 #define LINDA_MODE_OFFSET 0
46 #define LINDA_ID_OFFSET 1
47 #define LINDA_SEQ_OFFSET 3
48 #define LINDA_DATA_LENGTH_OFFSET 7
49 #define LINDA_HEADER_SIZE 12
50
51
52 #define INT_SIZE 4 // Byte = sizeof(int)
53
54 typedef struct tuple{ //構造体宣言
55 char *data;
56 struct tuple *next;
57 unsigned int seq;
58 unsigned short fd;
59 unsigned int datalen;
60 char mode;
61 } TUPLE ;
62
63 TUPLE *t[MAX_TUPLE]; // TUPLE Space
64 TUPLE *tp;
65 int unix_port = 11511; // unix_portを指定。
66 extern int errno;
67
68 #ifdef COUNT_PACKET
69 // print packet count message per PRINT_INTERVAL sec
70 #define PRINT_INTERVAL 4
71
72 /*-------------------------------------------------------------------/
73 void
74 count_packet (char type):
75 クライアントより受け取ったコマンドをカウントする関数
76
77 引き数:
78 type - 受け取ったコマンドの種類(char型: i,o,r,w)
79 /-------------------------------------------------------------------*/
80 void count_packet(char type)
81 {
82 static int out_packet=-1,read_packet=0,in_packet=0,wait_packet=0;
83 static struct timeval start,now,previous;
84
85 if (out_packet == -1) {
86 gettimeofday(&start,NULL);
87 gettimeofday(&previous,NULL);
88 out_packet = 0;
89 printf("packet\tout\tread\tin\twait\ttime\n");
90 }
91
92 if (type == 'o') { //受け取ったコマンドの種類によって分別。
93 out_packet++;
94 } else if (type == 'r') {
95 read_packet++;
96 } else if (type == 'i') {
97 in_packet++;
98 } else if (type == 'w') {
99 wait_packet++;
100 } else {
101 fprintf(stderr,"No type in count_packet function\n");
102 return;
103 }
104
105 gettimeofday(&now,NULL);
106 if ((now.tv_sec-previous.tv_sec) > PRINT_INTERVAL) {
107 printf("log\t%d\t%d\t%d\t%d\t%ld\n",
108 out_packet,read_packet,in_packet,wait_packet,now.tv_sec-start.tv_sec);
109 fflush(stdout);
110
111 previous.tv_sec = now.tv_sec;
112 out_packet = read_packet = in_packet = 0;
113 wait_packet = 0;
114 }
115 }
116 #endif
117
118 #ifdef DEB
119 /*-------------------------------------------------------------------/
120 int
121 show_tuple ():
122 TUPLE Space にあるTUPLEをID順に表示する
123 /-------------------------------------------------------------------*/
124 void show_tuple()
125 {
126 int i;
127
128 static int toggle = -1;
129 static struct timeval start,now,previous;
130
131 TUPLE * tmp;
132
133 if (toggle == -1) {
134 gettimeofday(&start,NULL);
135 toggle = 0;
136 }
137 gettimeofday(&now,NULL);
138 printf("time **** %ld\n",now.tv_sec-start.tv_sec);
139 fflush(stdout);
140 previous.tv_sec = now.tv_sec;
141
142 printf("\n******\n");
143 for(i=0;i<MAX_TUPLE-1;i++){
144 if(t[i]){
145 printf("id: %d\t",i);
146 for(tmp=t[i];tmp;tmp = tmp->next){
147 printf("%c:%d\t",tmp->mode,tmp->seq);
148 }
149 printf("\n");
150 }
151 }
152 if(t[i]){
153 printf("id: %d\t",i);
154 for(tmp=t[i];tmp;tmp = tmp->next){
155 printf("%c:%c%c\t",tmp->mode,tmp->data[0],tmp->data[1]);
156 }
157 printf("\n");
158 }
159 printf("******\n\n");
160 return;
161 }
162 #endif
163
164 /*-------------------------------------------------------------------/
165 int
166 sz_send (int fd, unsigned char *buf, int size, int flag):
167 クライアントへTUPLEを送る。
168
169 引き数:
170 fd - 送信先のファイルディスクリプタ
171 buf - 送るデータ
172 size - bufのサイズ(byte)
173 flag - 使用していない
174 返り値:
175 送ったデータのbyte数
176 /-------------------------------------------------------------------*/
177 int
178 sz_send(int fd,unsigned char *buf,int size,int flag) {
179 int i,nsize;
180 nsize = htonl(size);
181 i = write(fd,&nsize,INT_SIZE);
182 i += write(fd,buf,size); // size は datasize + LINDA_HEADER_SIZE
183 return(i);
184 }
185
186 /*-------------------------------------------------------------------/
187 int
188 main (int argc,char *argv[]):
189 サーバのメイン。クライアントから送られて来た各コマンドに対応
190 した処理を行う。
191 /-------------------------------------------------------------------*/
192 int
193 main(int argc,char *argv[])
194 {
195 int i, a, users;
196 int ls,sk,fd,paddrlen,maxfds,id;
197 int datasize;
198 int skfg = 1;
199 short user = 0;
200 unsigned char userchar[2];
201 #ifdef UNIX_DOMAIN
202 struct sockaddr_un my_addr, peer;
203 #else
204 struct sockaddr_in my_addr, peer;
205 #endif
206 struct timeval zerotime;
207 unsigned char ipaddr[4];
208 unsigned char *buf = 0;
209 int len;
210 int fduser[MAX_USER];
211 fd_set readfds, suspect, tmpfd;
212 char ch;
213
214 extern char *optarg;
215 extern int optind;
216
217 while ((ch = getopt(argc, argv, "p:s:?")) != -1){
218 switch(ch) {
219 case 'p':
220 unix_port = atoi(optarg);
221 break;
222 case 's':
223 PATHNAME = optarg;
224 break;
225 case '?':
226 default:
227 fprintf(stderr,"usage: %s [-p port]\n",argv[0]);
228 break;
229 }
230 }
231
232 argc -= optind;
233 argv += optind;
234
235 zerotime.tv_sec = zerotime.tv_usec = 0L;
236 users = 0;
237
238 #ifdef UNIX_DOMAIN
239 if ((ls = socket(AF_UNIX, SOCK_STREAM, 0)) == FAIL){
240 #else
241 if ((ls = socket(AF_INET, SOCK_STREAM, 0)) == FAIL){
242 #endif
243 fprintf(stderr, "socket open error! errno :%d %s\n", errno,
244 strerror(errno));
245 exit(1);
246 }
247 if (setsockopt(ls, SOL_SOCKET, SO_REUSEADDR,(char *) &skfg, sizeof(skfg)) < 0){
248 fprintf(stderr, "setsockopt error %d\n",errno);
249 close(ls);
250 exit(1);
251 }
252 #ifdef UNIX_DOMAIN
253 my_addr.sun_family = AF_UNIX;
254 strcpy(my_addr.sun_path, PATHNAME);
255 #else
256 my_addr.sin_family = AF_INET;
257 my_addr.sin_port = unix_port;
258 my_addr.sin_addr.s_addr = INADDR_ANY;
259 #endif
260
261 if (bind(ls, (struct sockaddr *)&my_addr,sizeof(my_addr)) == FAIL){
262 fprintf(stderr, "socket binded address error! errno:%d %s\n", errno,
263 strerror(errno));
264 close(ls);
265 exit(1);
266 }
267 if (listen(ls, MAX_REQ) == FAIL){
268 fprintf(stderr, "list creat error! errno:%d %s\n", errno,
269 strerror(errno));
270 close(ls);
271 exit(1);
272 }
273 paddrlen = sizeof(peer);
274 maxfds = FD_SETSIZE;
275 fd = 0;
276 #ifdef DEB
277 maxfds = 8;
278 #endif
279 FD_ZERO(&readfds);
280
281 // main loop
282 while(1){
283 FD_SET(ls, &readfds);
284 tmpfd = readfds;
285
286 #ifdef DEB
287 show_tuple();
288 #endif
289 if ((sk = select(maxfds, &readfds, NULL, NULL, NULL)) == FAIL){
290 if (errno == EBADF){
291 for(i = 0;i < maxfds;i++){
292 FD_ZERO(&suspect);
293 if (FD_ISSET(i, &readfds)){
294 FD_SET(i, &suspect);
295 if (select(maxfds, &suspect, NULL, NULL, &zerotime) == FAIL){
296 fprintf(stdout, "%d descriptor clear",i);
297 FD_CLR(i, &readfds);
298 }
299 FD_CLR(i, &suspect);
300 }
301 }
302 } else {
303 fprintf(stderr, "select error! errno:%d %s\n", errno,
304 strerror(errno));
305 close(ls);
306 exit(1);
307 }
308 } else {
309 while(1){
310 fd = (fd + 1) % maxfds;
311 if (FD_ISSET(fd, &readfds)) break;
312 }
313 readfds = tmpfd;
314
315 // 新規の接続
316 if (fd == ls){
317 if ((sk = accept(ls, (struct sockaddr *)&peer, &paddrlen)) == FAIL){
318 fprintf(stderr, "connection accept error! errno:%d %s\n", errno,
319 strerror(errno));
320 close(ls);
321 exit(1);
322 }
323 #ifndef UNIX_DOMAIN
324 if (peer.sin_family == AF_INET)
325 {
326 int tmp = 1;
327
328 setsockopt (sk, IPPROTO_TCP, TCP_NODELAY,
329 (char *) &tmp, sizeof (int));
330 }
331 #endif
332 if ((tp = t[MAX_TUPLE-1]) == NULL){
333 tp = t[MAX_TUPLE-1] = (TUPLE *) malloc(sizeof(TUPLE));
334 tp->next = NULL;
335 } else {
336 while(tp->next) tp = tp->next;
337 tp->next = (TUPLE *) malloc(sizeof(TUPLE));
338 tp = tp->next;
339 tp->next = NULL;
340 }
341 tp->mode = 'o';
342 tp->seq = 0;
343 tp->data = (char *) malloc(sizeof(short));
344 tp->datalen = sizeof(short); user++;
345 userchar[0] = user / 10 + '0';
346 userchar[1] = user % 10 + '0';
347 memcpy(tp->data, &userchar ,sizeof(userchar));
348
349 fprintf(stdout, "localhost connected assign id = %d\n", user);
350 #ifndef UNIX_DOMAIN
351 memcpy(ipaddr, &peer.sin_addr.s_addr, sizeof(ipaddr));
352 fprintf(stdout, "%d.%d.%d.%d connected \n",
353 ipaddr[0],ipaddr[1],ipaddr[2],ipaddr[3]);
354 #endif
355 FD_SET(sk,&readfds);
356
357 // 通信を確立しているクライアントからの要求
358 } else {
359 int tempnum;
360 #ifdef DEB
361 fprintf(stdout, "\n\ndata from %d descriptor\n", fd);
362 #endif
363 if((len=read(fd,&tempnum,INT_SIZE))!=INT_SIZE) {
364 if (len==0) {
365 fprintf(stdout, "fd %d closed\n", fd);
366 close(fd);
367 continue;
368 }
369 fprintf(stderr, "read error! on fd:%d ret=%d %s\n",
370 fd,len,
371 strerror(errno));
372 close(fd);
373 continue;
374 }
375 len = ntohl(tempnum); // len は DATASIZE + LINDA_HEADER_SIZE
376 #ifdef DEB
377 fprintf(stderr, "datta on fd:%d len=%d\n", fd,len);
378 #endif
379 if((buf = (unsigned char*)malloc(len))==NULL){
380 fprintf(stderr,"allocate error! :%d %s",errno,strerror(errno));
381 exit(1);
382 }
383 for(a=0;a<len;a+=i) {
384 if((i=read(fd,buf+a,len-a))<0) {
385 fprintf(stderr, "ldserv: client read error! on i=%d len=%d %s\n",
386 i, len, strerror(errno));
387 close(fd);
388 }
389 }
390 #ifdef DEB
391 fprintf(stdout,"recv size = %d : mode = %c : strings = %s\n",
392 len, *buf, buf + LINDA_HEADER_SIZE);
393 #endif
394
395 #ifdef COUNT_PACKET
396 count_packet(buf[0]);
397 #endif
398
399 if ((buf[LINDA_MODE_OFFSET] == '!') || (len == 0)){
400 FD_CLR(fd, &readfds);
401 for(i = 0;i < users; i++){
402 if (fduser[i] == fd) break;
403 }
404 fprintf(stdout, "connection closed descriptor :%d\n", fd);
405 close(fd);
406 } else if (buf[LINDA_MODE_OFFSET] == 'c'){
407 id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
408 //buf[LINDA_MODE_OFFSET] = 'a';
409 tp = t[id];
410 while(tp && tp->next && (tp->mode=='w')){
411 tp = tp->next;
412 }
413 if (tp && (tp->mode == 'o')){
414 buf[LINDA_DATA_LENGTH_OFFSET] = (tp->datalen>>24) & 0xff;
415 buf[LINDA_DATA_LENGTH_OFFSET+1] = (tp->datalen>>16) & 0xff;
416 buf[LINDA_DATA_LENGTH_OFFSET+2] = (tp->datalen>>8) & 0xff;
417 buf[LINDA_DATA_LENGTH_OFFSET+3] = (tp->datalen) & 0xff;
418
419 } else {
420 /* means no out tuple */
421 memset(&buf[LINDA_DATA_LENGTH_OFFSET],0,INT_SIZE);
422 }
423
424 if (sz_send(fd, buf, LINDA_HEADER_SIZE, 0) == FAIL){
425 fprintf(stderr,"recv error! errno :%d %s\n", errno,
426 strerror(errno));
427 }
428 #ifdef DEB
429 fprintf(stdout,"send size= %d : mode = %c\n",len, *buf);
430 #endif
431 } else if (buf[LINDA_MODE_OFFSET] == 'i' || buf[LINDA_MODE_OFFSET] == 'r'){
432
433 int mode = buf[LINDA_MODE_OFFSET];
434
435 TUPLE * temp = NULL;
436 id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
437 #ifdef DEB
438 fprintf(stdout, "***%c command : id = %d ***\n",mode,id);
439 #endif
440 tp = t[id];
441
442 // w は無視する
443 while(tp && tp->next && (tp->mode=='w')){
444 temp = tp;
445 tp = tp->next;
446 }
447
448 if (tp && (tp->mode == 'o')){
449 if((buf = (unsigned char*)realloc(buf,tp->datalen+LINDA_HEADER_SIZE)) == NULL){
450 fprintf(stderr,"2 reallocate error! errno :%d %s",errno,strerror(errno));
451 exit(1);
452 }
453
454 buf[LINDA_MODE_OFFSET] = 'a';
455 /* seq and id are already set */
456 buf[LINDA_DATA_LENGTH_OFFSET] = (tp->datalen>>24) & 0xff;
457 buf[LINDA_DATA_LENGTH_OFFSET+1] = (tp->datalen>>16) & 0xff;
458 buf[LINDA_DATA_LENGTH_OFFSET+2] = (tp->datalen>>8) & 0xff;
459 buf[LINDA_DATA_LENGTH_OFFSET+3] = (tp->datalen) & 0xff;
460
461 memcpy(buf+LINDA_HEADER_SIZE, tp->data, tp->datalen);
462 #ifdef DEB
463 fprintf(stdout,"send size = %d : mode = %c : strings = %s\n",
464 len, *buf, buf + LINDA_HEADER_SIZE);
465 #endif
466 if (sz_send(fd, buf, (tp->datalen) + LINDA_HEADER_SIZE, 0) == FAIL){
467 fprintf(stderr,"recv error! errno :%d %s\n", errno,
468 strerror(errno));
469 }
470 if(mode == 'i') {
471 free(tp->data);
472 tp->data=0;
473 if(temp){
474 temp->next = tp->next;
475 }else{
476 t[id] = tp->next;
477 }
478 free(tp);
479
480 }
481 } else {
482 if (tp == NULL){
483 tp = t[id] = (TUPLE *) malloc(sizeof(TUPLE));
484 tp->next = NULL;
485 } else {
486 while(tp->next) tp = tp->next;
487 tp->next = (TUPLE *) malloc(sizeof(TUPLE));
488 tp = tp->next;
489 tp->next = NULL;
490 }
491
492 tp->mode = mode;
493 tp->seq = (buf[LINDA_SEQ_OFFSET] <<24) +
494 (buf[LINDA_SEQ_OFFSET+1]<<16) +
495 (buf[LINDA_SEQ_OFFSET+2]<<8) +
496 (buf[LINDA_SEQ_OFFSET+3]); /* seq */
497 tp->fd = fd;
498 tp->datalen = 0;
499 tp->data = 0;
500 #ifdef DEB
501 fprintf(stdout, "data inserted insert seq = %d, id = %d\n"
502 ,tp->seq,id);
503 #endif
504 }
505 } else if (buf[LINDA_MODE_OFFSET] == 'w'){
506 int mode = buf[LINDA_MODE_OFFSET];
507
508 id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
509 #ifdef DEB
510 fprintf(stdout, "***%c command : id = %d ***\n",mode,id);
511 #endif
512 tp = (TUPLE *) malloc(sizeof(TUPLE));
513 tp->mode = mode;
514 tp->seq = (buf[LINDA_SEQ_OFFSET] <<24) +
515 (buf[LINDA_SEQ_OFFSET+1]<<16) +
516 (buf[LINDA_SEQ_OFFSET+2]<<8) +
517 (buf[LINDA_SEQ_OFFSET+3]); /* seq */
518 tp->fd = fd;
519 tp->datalen = 0;
520 tp->data = 0;
521 tp->next = t[id];
522 t[id] = tp;
523 #ifdef DEB
524 fprintf(stdout, "data inserted insert seq = %d, id = %d\n"
525 ,t[id]->seq,id);
526 #endif
527 } else if (buf[LINDA_MODE_OFFSET] == 'o'){
528 id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
529
530 datasize = (buf[LINDA_DATA_LENGTH_OFFSET] <<24) +
531 (buf[LINDA_DATA_LENGTH_OFFSET+1]<<16) +
532 (buf[LINDA_DATA_LENGTH_OFFSET+2]<<8) +
533 (buf[LINDA_DATA_LENGTH_OFFSET+3]);
534 #ifdef DEB
535 fprintf(stdout, "*** out command : seq = %d ***\n",id);
536 #endif
537 while((t[id]) && ((t[id]->mode=='w')||(t[id]->mode == 'r'))){
538
539 buf[LINDA_MODE_OFFSET] = 'a';
540 buf[LINDA_SEQ_OFFSET] = (t[id]->seq>>24) & 0xff;
541 buf[LINDA_SEQ_OFFSET+1] = (t[id]->seq>>16) & 0xff;
542 buf[LINDA_SEQ_OFFSET+2] = (t[id]->seq>>8) & 0xff;
543 buf[LINDA_SEQ_OFFSET+3] = (t[id]->seq) & 0xff;
544 if (sz_send(t[id]->fd,buf,datasize+LINDA_HEADER_SIZE, 0) == FAIL){
545 fprintf(stderr,"recv error! errno :%d %s\n", errno,
546 strerror(errno));
547 }
548 tp = t[id];
549 t[id] = tp->next;
550 free(tp);
551 }
552 if ((t[id])&&t[id]->mode == 'i'){
553 buf[LINDA_MODE_OFFSET] = 'a';
554 buf[LINDA_SEQ_OFFSET] = (t[id]->seq>>24) & 0xff;
555 buf[LINDA_SEQ_OFFSET+1] = (t[id]->seq>>16) & 0xff;
556 buf[LINDA_SEQ_OFFSET+2] = (t[id]->seq>>8) & 0xff;
557 buf[LINDA_SEQ_OFFSET+3] = (t[id]->seq) & 0xff;
558 if (sz_send(t[id]->fd,buf,datasize+LINDA_HEADER_SIZE,0)==FAIL){
559 fprintf(stderr,"recv error! errno :%d %s\n", errno,
560 strerror(errno));
561 }
562 #ifdef DEB
563 fprintf(stdout,"sendsize = %d :mode = %c :strings=%s\n",
564 len, *buf, buf + LINDA_HEADER_SIZE);
565 #endif
566 #ifdef DEB
567 fprintf(stdout, "data senddata =%s,id=%d ,mode=%c,len=%d \n"
568 ,tp->data, id, t[id]->mode, t[id]->datalen);
569 #endif
570 // we should not free waiting in-packet's data
571 tp = t[id]; t[id] = tp->next; free(tp);
572 } else if ((t[id]==NULL) || (t[id]->mode == 'o')){
573 if ((tp = t[id]) == NULL){
574 tp = t[id] = (TUPLE *) malloc(sizeof(TUPLE));
575 tp->next = NULL;
576 } else {
577 while(tp->next) tp = tp->next;
578 tp->next = (TUPLE *) malloc(sizeof(TUPLE));
579 tp = tp->next;
580 tp->next = NULL;
581 }
582 tp->mode = 'o';
583 tp->seq =(buf[LINDA_SEQ_OFFSET] <<24) +
584 (buf[LINDA_SEQ_OFFSET+1]<<16) +
585 (buf[LINDA_SEQ_OFFSET+2]<<8) +
586 (buf[LINDA_SEQ_OFFSET+3]);
587 tp->data = (char *) malloc(datasize);
588 tp->datalen = datasize;
589
590 memcpy(tp->data, buf+LINDA_HEADER_SIZE,datasize);
591 #ifdef DEB
592 fprintf(stdout, "data inserted data = %s, len = %d, id = %d\n"
593 ,tp->data,tp->datalen,id);
594 #endif
595 } else {
596 fprintf(stdout,"Uncorrect mode : %c\n",t[id]->mode);
597 return(0);
598 }
599 } else {
600 fprintf(stdout,"Uncorrect buffer\n");
601 if(buf) free(buf);
602 return(0);
603 }
604 free(buf);
605 buf = 0;
606 }
607 }
608 }
609 close(ls);/* conection terminat */
610 }
611