8
|
1 /**********************************************************************/
|
|
2 /* */
|
|
3 /* Linda Server */
|
|
4 // $Id$
|
|
5 //
|
|
6 /* */
|
|
7 /**********************************************************************/
|
|
8 #include <stdio.h>
|
|
9 #include <stdlib.h>
|
|
10 #include <string.h>
|
|
11 #include <unistd.h>
|
|
12 #include <fcntl.h>
|
|
13 #include <sys/uio.h>
|
|
14 #include <sys/time.h>
|
|
15 #include <sys/select.h>
|
|
16 #include <sys/stat.h>
|
|
17 #include <sys/types.h>
|
|
18 #include <sys/socket.h>
|
|
19 #include <sys/un.h>
|
|
20 #include <netinet/in.h>
|
|
21 #include <netinet/tcp.h>
|
|
22 #include <signal.h>
|
|
23 #include <termios.h>
|
|
24 #include <netdb.h>
|
|
25 #include <errno.h>
|
|
26 #include <signal.h>
|
|
27 #define MAX_REQ 1
|
|
28 #define FAIL (-1)
|
|
29 #define MAX_USER 4
|
|
30 #define MAX_TUPLE 65536
|
|
31 #define PATHNAME "/tmp/ldserv"
|
|
32
|
|
33 #define DEBUG
|
|
34
|
|
35
|
|
36 #define BUFFSIZE 65535
|
|
37
|
|
38 /*----------------------------------------------------------------------
|
|
39 パケットフォーマット
|
|
40 char short int int
|
|
41 Mode + ID + Seq + Data_len + Padding + Data
|
|
42 0 1 3 7 11 12
|
|
43 ----------------------------------------------------------------------*/
|
|
44 #define LINDA_MODE_OFFSET 0
|
|
45 #define LINDA_ID_OFFSET 1
|
|
46 #define LINDA_SEQ_OFFSET 3
|
|
47 #define LINDA_DATA_LENGTH_OFFSET 7
|
|
48 #define LINDA_HEADER_SIZE 12
|
|
49
|
|
50
|
|
51 #define INT_SIZE 4 // 4Byte = sizeof(int)
|
|
52
|
|
53 typedef struct tuple{
|
|
54 unsigned char *data;
|
|
55 struct tuple *next;
|
|
56 unsigned int seq;
|
|
57 unsigned short fd;
|
|
58 unsigned int datalen;
|
|
59 char mode;
|
|
60 } TUPLE ;
|
|
61
|
|
62 TUPLE *tuple_space[MAX_TUPLE]; // TUPLE Space
|
|
63 TUPLE *tp;
|
|
64 int unix_port = 11511; // 11511 is Default Port number
|
|
65 extern int errno;
|
|
66
|
|
67 #ifdef COUNT_PACKET
|
|
68 // print packet count message per PRINT_INTERVAL sec
|
|
69 #define PRINT_INTERVAL 4
|
|
70
|
|
71 /*-------------------------------------------------------------------/
|
|
72 void
|
|
73 count_packet (char type):
|
|
74 クライアントより受け取ったコマンドをカウントする関数
|
|
75
|
|
76 引き数:
|
|
77 type - 受け取ったコマンドの種類(char型: i,o,r,w)
|
|
78 /-------------------------------------------------------------------*/
|
|
79 void count_packet(char type)
|
|
80 {
|
|
81 static int out_packet=-1,read_packet=0,in_packet=0,wait_packet=0;
|
|
82 static struct timeval start,now,previous;
|
|
83
|
|
84 if (out_packet == -1) {
|
|
85 gettimeofday(&start,NULL);
|
|
86 gettimeofday(&previous,NULL);
|
|
87 out_packet = 0;
|
|
88 printf("packet\tout\tread\tin\twait\ttime\n");
|
|
89 }
|
|
90
|
|
91 if (type == 'o') {
|
|
92 out_packet++;
|
|
93 } else if (type == 'r') {
|
|
94 read_packet++;
|
|
95 } else if (type == 'i') {
|
|
96 in_packet++;
|
|
97 } else if (type == 'w') {
|
|
98 wait_packet++;
|
|
99 } else {
|
|
100 fprintf(stderr,"No type in count_packet function\n");
|
|
101 return;
|
|
102 }
|
|
103
|
|
104 gettimeofday(&now,NULL);
|
|
105 if ((now.tv_sec-previous.tv_sec) > PRINT_INTERVAL) {
|
|
106 printf("log\t%d\t%d\t%d\t%d\t%ld\n",
|
|
107 out_packet,read_packet,in_packet,wait_packet,now.tv_sec-start.tv_sec);
|
|
108 fflush(stdout);
|
|
109
|
|
110 previous.tv_sec = now.tv_sec;
|
|
111 out_packet = read_packet = in_packet = 0;
|
|
112 wait_packet = 0;
|
|
113 }
|
|
114 }
|
|
115 #endif
|
|
116
|
|
117 #ifdef DEBUG
|
|
118 /*-------------------------------------------------------------------/
|
|
119 int
|
|
120 show_tuple_space ():
|
|
121 TUPLE Space にあるTUPLEをID順に表示する
|
|
122 /-------------------------------------------------------------------*/
|
|
123 void show_tuple_space()
|
|
124 {
|
|
125 int i;
|
|
126
|
|
127 static int toggle = -1;
|
|
128 static struct timeval start,now,previous;
|
|
129
|
|
130 TUPLE * tmp;
|
|
131
|
|
132 if (toggle == -1) {
|
|
133 gettimeofday(&start,NULL);
|
|
134 toggle = 0;
|
|
135 }
|
|
136 gettimeofday(&now,NULL);
|
|
137 printf("time **** %ld\n",now.tv_sec-start.tv_sec);
|
|
138 fflush(stdout);
|
|
139 previous.tv_sec = now.tv_sec;
|
|
140
|
|
141 printf("\n******\n");
|
|
142 for(i=0;i<MAX_TUPLE-1;i++){
|
|
143 if(tuple_space[i]){
|
|
144 printf("id: %d\t",i);
|
|
145 for(tmp=tuple_space[i];tmp;tmp = tmp->next){
|
|
146 printf("%c:%d\t",tmp->mode,tmp->seq);
|
|
147 }
|
|
148 printf("\n");
|
|
149 }
|
|
150 }
|
|
151 if(tuple_space[i]){
|
|
152 printf("id: %d\t",i);
|
|
153 for(tmp=tuple_space[i];tmp;tmp = tmp->next){
|
|
154 printf("%c:%c%c\t",tmp->mode,tmp->data[0],tmp->data[1]);
|
|
155 }
|
|
156 printf("\n");
|
|
157 }
|
|
158 printf("******\n\n");
|
|
159 return;
|
|
160 }
|
|
161 #endif
|
|
162
|
|
163
|
|
164 /*-------------------------------------------------------------------/
|
|
165 int
|
|
166 get_int(unsigned char * tuple, int offset):
|
|
167 TUPLEのヘッダに格納された int型 のデータを得るための関数
|
|
168 psx_get_datalength() と psx_get_seq() から呼ばれる。
|
|
169
|
|
170 引き数:
|
|
171 tuple - ヘッダ情報も含んだTUPLE。psx_reply()で得たものでもいい。
|
|
172 offset - 取りだすデータのオフセット。LINDA_DATA_LENGTH_OFFSET
|
|
173 か LINDA_SEQ_OFFSET。
|
|
174
|
|
175 返り値:
|
|
176 指定したオフセットに格納されていた数値(int型)
|
|
177 /-------------------------------------------------------------------*/
|
|
178 static
|
|
179 unsigned int
|
|
180 get_int(unsigned char * tuple, int offset){
|
|
181 unsigned int i;
|
|
182
|
|
183 i = (tuple[offset] <<24)+
|
|
184 (tuple[offset+1]<<16)+
|
|
185 (tuple[offset+2]<<8) +
|
|
186 (tuple[offset+3]);
|
|
187 return i;
|
|
188 }
|
|
189
|
|
190 unsigned int
|
|
191 psx_get_datalength(unsigned char * tuple){
|
|
192 return get_int(tuple,LINDA_DATA_LENGTH_OFFSET);
|
|
193 }
|
|
194
|
|
195 unsigned int
|
|
196 psx_get_seq(unsigned char * tuple){
|
|
197 return get_int(tuple,LINDA_SEQ_OFFSET);
|
|
198 }
|
|
199
|
|
200 unsigned short
|
|
201 psx_get_id(unsigned char * tuple){
|
|
202 return (tuple[LINDA_ID_OFFSET] * 256 + tuple[LINDA_ID_OFFSET+1]);
|
|
203 }
|
|
204
|
|
205 unsigned char
|
|
206 psx_get_mode(unsigned char * tuple){
|
|
207 return tuple[LINDA_MODE_OFFSET];
|
|
208 }
|
|
209
|
|
210 static
|
|
211 void
|
|
212 set_int_to_char(unsigned char * tuple, int i, int offset){
|
|
213 tuple[offset] = (i>>24) & 0xff;
|
|
214 tuple[offset+1] = (i>>16) & 0xff;
|
|
215 tuple[offset+2] = (i>>8) & 0xff;
|
|
216 tuple[offset+3] = (i) & 0xff;
|
|
217 }
|
|
218
|
|
219 void
|
|
220 psx_set_datalength(unsigned char * tuple, int length){
|
|
221 set_int_to_char(tuple,length,LINDA_DATA_LENGTH_OFFSET);
|
|
222 }
|
|
223
|
|
224 void
|
|
225 psx_set_seq(unsigned char * tuple, int seq){
|
|
226 set_int_to_char(tuple,seq,LINDA_SEQ_OFFSET);
|
|
227 }
|
|
228
|
|
229 void
|
|
230 psx_set_id(unsigned char * tuple, short id){
|
|
231 tuple[LINDA_ID_OFFSET] = (id>>8) & 0xff;
|
|
232 tuple[LINDA_ID_OFFSET+1] = id & 0xff;
|
|
233 }
|
|
234
|
|
235 void
|
|
236 psx_set_mode(unsigned char * tuple, char mode){
|
|
237 tuple[LINDA_MODE_OFFSET] = mode;
|
|
238 }
|
|
239
|
|
240 /*-------------------------------------------------------------------/
|
|
241 int
|
|
242 sz_send (int fd, unsigned char *buf, int size, int flag):
|
|
243 クライアントへTUPLEを送る。
|
|
244
|
|
245 引き数:
|
|
246 fd - 送信先のファイルディスクリプタ
|
|
247 buf - 送るデータ
|
|
248 size - bufのサイズ(byte)
|
|
249 flag - 使用していない
|
|
250 返り値:
|
|
251 送ったデータのbyte数
|
|
252 /-------------------------------------------------------------------*/
|
|
253 int
|
|
254 sz_send(int fd, unsigned char *buf, unsigned int size,int flag) {
|
|
255 unsigned int i,nsize,writtensize;
|
|
256
|
|
257 nsize = htonl(size); // size は datasize + LINDA_HEADER_SIZE
|
|
258 write(fd,&nsize,INT_SIZE);
|
|
259 for(writtensize=0,i=0; writtensize < size; writtensize+=i){
|
|
260 i = write(fd,(char*)buf+writtensize,size-writtensize);
|
|
261 }
|
|
262
|
|
263 return(i);
|
|
264 }
|
|
265
|
|
266 struct sigaction old,new;
|
|
267
|
|
268 static void
|
|
269 intr(int i)
|
|
270 {
|
|
271 fprintf(stderr,"intr: %d \n",i);
|
|
272 // stop = 1;
|
|
273 }
|
|
274
|
|
275 /*-------------------------------------------------------------------/
|
|
276 int
|
|
277 main (int argc,char *argv[]):
|
|
278 サーバのメイン。クライアントから送られて来た各コマンドに対応
|
|
279 した処理を行う。
|
|
280 /-------------------------------------------------------------------*/
|
|
281 int
|
|
282 main(int argc,char *argv[])
|
|
283 {
|
|
284 int i, a, users;
|
|
285 int ls,sk,fd,maxfds;
|
|
286 unsigned int id;
|
|
287 unsigned int datasize;
|
|
288
|
|
289 socklen_t paddrlen;
|
|
290
|
|
291 int skfg = 1;
|
|
292 short user = 0;
|
|
293 unsigned char userchar[2];
|
|
294 #ifdef UNIX_DOMAIN
|
|
295 struct sockaddr_un my_addr, peer;
|
|
296 #else
|
|
297 struct sockaddr_in my_addr, peer;
|
|
298 #endif
|
|
299 struct timeval zerotime;
|
|
300 unsigned char ipaddr[4];
|
|
301 unsigned char *buf = 0;
|
|
302 int len;
|
|
303 int fduser[MAX_USER];
|
|
304 fd_set readfds, suspect, tmpfd;
|
|
305 char ch;
|
|
306
|
|
307 extern char *optarg;
|
|
308 extern int optind;
|
|
309
|
|
310
|
|
311 new.sa_handler = intr;
|
|
312 sigaction(SIGPIPE,&new,&old);
|
|
313
|
|
314
|
|
315 while ((ch = getopt(argc, argv, "p:h:")) != -1){
|
|
316 switch(ch) {
|
|
317 case 'p':
|
|
318 unix_port = atoi(optarg);
|
|
319 break;
|
|
320 case '?':
|
|
321 default:
|
|
322 fprintf(stderr,"usage: %s [-p port]\n",argv[0]);
|
|
323 break;
|
|
324 }
|
|
325 }
|
|
326
|
|
327 argc -= optind;
|
|
328 argv += optind;
|
|
329
|
|
330 zerotime.tv_sec = zerotime.tv_usec = 0L;
|
|
331 users = 0;
|
|
332
|
|
333 #ifdef UNIX_DOMAIN
|
|
334 if ((ls = socket(AF_UNIX, SOCK_STREAM, 0)) == FAIL){
|
|
335 #else
|
|
336 if ((ls = socket(AF_INET, SOCK_STREAM, 0)) == FAIL){
|
|
337 #endif
|
|
338 // }
|
|
339 fprintf(stderr, "socket open error! errno :%d %s\n", errno,
|
|
340 strerror(errno));
|
|
341 exit(1);
|
|
342 }
|
|
343 if (setsockopt(ls, SOL_SOCKET, SO_REUSEADDR,(char *) &skfg, sizeof(skfg)) < 0){
|
|
344 fprintf(stderr, "setsockopt error %d\n",errno);
|
|
345 close(ls);
|
|
346 exit(1);
|
|
347 }
|
|
348 #ifdef UNIX_DOMAIN
|
|
349 my_addr.sun_family = AF_UNIX;
|
|
350 strcpy(my_addr.sun_path, PATHNAME);
|
|
351 #else
|
|
352 my_addr.sin_family = AF_INET;
|
|
353 my_addr.sin_port = unix_port;
|
|
354 my_addr.sin_addr.s_addr = INADDR_ANY;
|
|
355 #endif
|
|
356
|
|
357 if (bind(ls, (struct sockaddr *)&my_addr,sizeof(my_addr)) == FAIL){
|
|
358 fprintf(stderr, "socket binded address error! errno:%d %s\n", errno,
|
|
359 strerror(errno));
|
|
360 close(ls);
|
|
361 exit(1);
|
|
362 }
|
|
363 if (listen(ls, MAX_REQ) == FAIL){
|
|
364 fprintf(stderr, "list creat error! errno:%d %s\n", errno,
|
|
365 strerror(errno));
|
|
366 close(ls);
|
|
367 exit(1);
|
|
368 }
|
|
369 paddrlen = (socklen_t)sizeof(peer);
|
|
370 maxfds = FD_SETSIZE;
|
|
371 fd = 0;
|
|
372 #ifdef DEBUG
|
|
373 // maxfds = 8;
|
|
374 #endif
|
|
375 FD_ZERO(&readfds);
|
|
376
|
|
377 // main loop
|
|
378 while(1){
|
|
379 FD_SET(ls, &readfds);
|
|
380 tmpfd = readfds;
|
|
381
|
|
382 #ifdef DEBUG
|
|
383 show_tuple_space();
|
|
384 #endif
|
|
385 if ((sk = select(maxfds, &readfds, NULL, NULL, NULL)) == FAIL){
|
|
386 if (errno == EBADF){
|
|
387 for(i = 0;i < maxfds;i++){
|
|
388 FD_ZERO(&suspect);
|
|
389 if (FD_ISSET(i, &readfds)){
|
|
390 FD_SET(i, &suspect);
|
|
391 if (select(maxfds, &suspect, NULL, NULL, &zerotime) == FAIL){
|
|
392 fprintf(stdout, "%d descriptor clear",i);
|
|
393 FD_CLR(i, &readfds);
|
|
394 }
|
|
395 FD_CLR(i, &suspect);
|
|
396 }
|
|
397 }
|
|
398 } else {
|
|
399 fprintf(stderr, "select error! errno:%d %s\n", errno,
|
|
400 strerror(errno));
|
|
401 close(ls);
|
|
402 exit(1);
|
|
403 }
|
|
404 } else {
|
|
405 while(1){
|
|
406 fd = (fd + 1) % maxfds;
|
|
407 if (FD_ISSET(fd, &readfds)) break;
|
|
408 }
|
|
409 readfds = tmpfd;
|
|
410
|
|
411 // 新規の接続
|
|
412 if (fd == ls){
|
|
413 if ((sk = accept(ls, (struct sockaddr *)&peer, &paddrlen)) == FAIL){
|
|
414 fprintf(stderr, "connection accept error! errno:%d %s\n", errno,
|
|
415 strerror(errno));
|
|
416 close(ls);
|
|
417 exit(1);
|
|
418 }
|
|
419 #ifndef UNIX_DOMAIN
|
|
420 if (peer.sin_family == AF_INET){
|
|
421 int tmp = 1;
|
|
422
|
|
423 setsockopt (sk, IPPROTO_TCP, TCP_NODELAY,
|
|
424 (char *) &tmp, sizeof (int));
|
|
425 }
|
|
426 #endif
|
|
427 if ((tp = tuple_space[MAX_TUPLE-1]) == NULL){
|
|
428 tp = tuple_space[MAX_TUPLE-1] = (TUPLE *) malloc(sizeof(TUPLE));
|
|
429 tp->next = NULL;
|
|
430 } else {
|
|
431 while(tp->next) tp = tp->next;
|
|
432 tp->next = (TUPLE *) malloc(sizeof(TUPLE));
|
|
433 tp = tp->next;
|
|
434 tp->next = NULL;
|
|
435 }
|
|
436 tp->mode = 'o';
|
|
437 tp->seq = 0;
|
|
438 tp->data = (unsigned char *) malloc(sizeof(short)+LINDA_HEADER_SIZE);
|
|
439 tp->datalen = sizeof(short);
|
|
440 user++;
|
|
441 userchar[0] = user / 10 + '0';
|
|
442 userchar[1] = user % 10 + '0';
|
|
443
|
|
444 psx_set_mode(tp->data,tp->mode);
|
|
445 psx_set_id(tp->data,MAX_TUPLE-1);
|
|
446 psx_set_seq(tp->data,tp->seq);
|
|
447 psx_set_datalength(tp->data,tp->datalen);
|
|
448 memcpy(tp->data+LINDA_HEADER_SIZE, &userchar ,sizeof(userchar));
|
|
449
|
|
450 fprintf(stdout, "localhost connected assign id = %d\n", user);
|
|
451 #ifndef UNIX_DOMAIN
|
|
452 memcpy(ipaddr, &peer.sin_addr.s_addr, sizeof(ipaddr));
|
|
453 fprintf(stdout, "%d.%d.%d.%d connected \n",
|
|
454 ipaddr[0],ipaddr[1],ipaddr[2],ipaddr[3]);
|
|
455 #endif
|
|
456 FD_SET(sk,&readfds);
|
|
457
|
|
458 // 接続を確立しているクライアントからの要求
|
|
459 } else {
|
|
460 int tempnum;
|
|
461 #ifdef DEBUG
|
|
462 fprintf(stdout, "\n\ndata from %d descriptor\n", fd);
|
|
463 #endif
|
|
464 if((len=read(fd,&tempnum,INT_SIZE))!=INT_SIZE) {
|
|
465 if (len==0) {
|
|
466 fprintf(stdout, "fd %d closed\n", fd);
|
|
467 close(fd);
|
|
468 continue;
|
|
469 }
|
|
470 fprintf(stderr, "read error! on fd:%d ret=%d %s\n",
|
|
471 fd, len, strerror(errno));
|
|
472 close(fd);
|
|
473 continue;
|
|
474 }
|
|
475 len = ntohl(tempnum); // len は DATASIZE + LINDA_HEADER_SIZE
|
|
476
|
|
477 #ifdef DEBUG
|
|
478 fprintf(stderr, "datta on fd:%d len=%d\n", fd,len);
|
|
479 #endif
|
|
480 if((buf = (unsigned char*)malloc(len))==NULL){
|
|
481 fprintf(stderr,"allocate error! :%d %s",errno,strerror(errno));
|
|
482 exit(1);
|
|
483 }
|
|
484 for(a=0;a<len;a+=i) {
|
|
485 if((i=read(fd,buf+a,len-a))<0) {
|
|
486 fprintf(stderr, "ldserv: client read error! on i=%d len=%d %s\n",
|
|
487 i, len, strerror(errno));
|
|
488 close(fd);
|
|
489 }
|
|
490 }
|
|
491 #ifdef DEBUG
|
|
492 fprintf(stdout,"recv size = %d : mode = %c\n", len, *buf);
|
|
493 #endif
|
|
494 #ifdef COUNT_PACKET
|
|
495 count_packet(buf[0]);
|
|
496 #endif
|
|
497
|
|
498 if ((buf[LINDA_MODE_OFFSET] == '!') || (len == 0)){
|
|
499 FD_CLR(fd, &readfds);
|
|
500 for(i = 0;i < users; i++){
|
|
501 if (fduser[i] == fd) break;
|
|
502 }
|
|
503 fprintf(stdout, "connection closed descriptor :%d\n", fd);
|
|
504 close(fd);
|
|
505 free(buf);
|
|
506 buf=0;
|
|
507 } else if (buf[LINDA_MODE_OFFSET] == 'c'){
|
|
508 id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
|
|
509 //buf[LINDA_MODE_OFFSET] = 'a';
|
|
510 tp = tuple_space[id];
|
|
511 while(tp && tp->next && (tp->mode=='w')){
|
|
512 tp = tp->next;
|
|
513 }
|
|
514 if (tp && (tp->mode == 'o')){
|
|
515 psx_set_datalength(buf,tp->datalen);
|
|
516 } else {
|
|
517 /* means no out tuple */
|
|
518 memset(&buf[LINDA_DATA_LENGTH_OFFSET],0,INT_SIZE);
|
|
519 }
|
|
520
|
|
521 if (sz_send(fd, buf, LINDA_HEADER_SIZE, 0) == FAIL){
|
|
522 fprintf(stderr,"recv error! errno :%d %s\n", errno,
|
|
523 strerror(errno));
|
|
524 }
|
|
525 free(buf);
|
|
526 buf = 0;
|
|
527 #ifdef DEBUG
|
|
528 fprintf(stdout,"send size= %d : mode = %c\n",len, *buf);
|
|
529 #endif
|
|
530 } else if (buf[LINDA_MODE_OFFSET] == 'i' || buf[LINDA_MODE_OFFSET] == 'r'){
|
|
531 int mode = buf[LINDA_MODE_OFFSET];
|
|
532 TUPLE * temp = NULL;
|
|
533
|
|
534 id = psx_get_id(buf);
|
|
535 #ifdef DEBUG
|
|
536 fprintf(stdout, "*** %c command : id = %d ***\n",mode,id);
|
|
537 #endif
|
|
538 tp = tuple_space[id];
|
|
539
|
|
540 // w は無視する
|
|
541 while(tp && tp->next && (tp->mode=='w')){
|
|
542 temp = tp;
|
|
543 tp = tp->next;
|
|
544 }
|
|
545
|
|
546 if (tp && (tp->mode == 'o')){
|
|
547 psx_set_mode(tp->data,'a');
|
|
548 psx_set_seq(tp->data, psx_get_seq(buf));
|
|
549 free(buf);
|
|
550 buf=0;
|
|
551 #ifdef DEBUG
|
|
552 fprintf(stdout,"send size = %d : mode = %c\n",
|
|
553 tp->datalen + LINDA_HEADER_SIZE, tp->mode);
|
|
554 #endif
|
|
555 if (sz_send(fd, tp->data, (tp->datalen) + LINDA_HEADER_SIZE, 0) == FAIL){
|
|
556 fprintf(stderr,"recv error! errno :%d %s\n", errno,
|
|
557 strerror(errno));
|
|
558 }
|
|
559 if(mode == 'i') {
|
|
560 if(tp->data){
|
|
561 free(tp->data);
|
|
562 tp->data=0;
|
|
563 }
|
|
564
|
|
565 if(temp){
|
|
566 temp->next = tp->next;
|
|
567 }else{
|
|
568 tuple_space[id] = tp->next;
|
|
569 }
|
|
570 free(tp);
|
|
571 tp = 0;
|
|
572 }
|
|
573 } else {
|
|
574 if (tp == NULL){
|
|
575 tp = tuple_space[id] = (TUPLE *) malloc(sizeof(TUPLE));
|
|
576 tp->next = NULL;
|
|
577 } else {
|
|
578 while(tp->next) tp = tp->next;
|
|
579 tp->next = (TUPLE *) malloc(sizeof(TUPLE));
|
|
580 tp = tp->next;
|
|
581 tp->next = NULL;
|
|
582 }
|
|
583
|
|
584 tp->mode = mode;
|
|
585 tp->seq = psx_get_seq(buf);
|
|
586 tp->fd = fd;
|
|
587 tp->datalen = 0;
|
|
588 tp->data = 0;
|
|
589 free(buf);
|
|
590 buf=0;
|
|
591 #ifdef DEBUG
|
|
592 fprintf(stdout, "data inserted insert seq = %d, id = %d\n"
|
|
593 ,tp->seq,id);
|
|
594 #endif
|
|
595 }
|
|
596 } else if (buf[LINDA_MODE_OFFSET] == 'w'){
|
|
597 int mode = buf[LINDA_MODE_OFFSET];
|
|
598
|
|
599 id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
|
|
600 #ifdef DEBUG
|
|
601 fprintf(stdout, "***%c command : id = %d ***\n",mode,id);
|
|
602 #endif
|
|
603 tp = (TUPLE *) malloc(sizeof(TUPLE));
|
|
604 tp->mode = mode;
|
|
605 tp->seq = psx_get_seq(buf);
|
|
606 tp->fd = fd;
|
|
607 tp->datalen = 0;
|
|
608 tp->data = 0;
|
|
609 tp->next = tuple_space[id];
|
|
610 tuple_space[id] = tp;
|
|
611 free(buf);
|
|
612 buf=0;
|
|
613 #ifdef DEBUG
|
|
614 fprintf(stdout, "data inserted insert seq = %d, id = %d\n"
|
|
615 ,tuple_space[id]->seq,id);
|
|
616 #endif
|
|
617 } else if (buf[LINDA_MODE_OFFSET] == 'o'){
|
|
618 id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
|
|
619
|
|
620 datasize = psx_get_datalength(buf);
|
|
621 #ifdef DEBUG
|
|
622 fprintf(stdout, "*** out command : id = %d ***\n",id);
|
|
623 #endif
|
|
624 while((tuple_space[id]) &&
|
|
625 ((tuple_space[id]->mode=='w')||(tuple_space[id]->mode == 'r'))){
|
|
626
|
|
627 // buf[LINDA_MODE_OFFSET] = 'a';
|
|
628 psx_set_mode(buf,'a');
|
|
629 psx_set_seq(buf,tuple_space[id]->seq);
|
|
630
|
|
631 if (sz_send(tuple_space[id]->fd,buf,datasize+LINDA_HEADER_SIZE, 0) == FAIL){
|
|
632 fprintf(stderr,"recv error! errno :%d %s\n", errno,
|
|
633 strerror(errno));
|
|
634 }
|
|
635 tp = tuple_space[id];
|
|
636 tuple_space[id] = tp->next;
|
|
637 free(tp);
|
|
638 tp = 0;
|
|
639 }
|
|
640 if ((tuple_space[id]) && tuple_space[id]->mode == 'i'){
|
|
641 buf[LINDA_MODE_OFFSET] = 'a';
|
|
642 psx_set_seq(buf,tuple_space[id]->seq);
|
|
643
|
|
644 #ifdef DEBUG
|
|
645 fprintf(stdout,"sendsize = %d :mode = %c\n",
|
|
646 datasize+LINDA_HEADER_SIZE, *buf);
|
|
647 #endif
|
|
648 if (sz_send(tuple_space[id]->fd,buf,datasize+LINDA_HEADER_SIZE,0)==FAIL){
|
|
649 fprintf(stderr,"recv error! errno :%d %s\n", errno,
|
|
650 strerror(errno));
|
|
651 }
|
|
652 free(buf);
|
|
653 buf=0;
|
|
654 #ifdef DEBUG
|
|
655 fprintf(stdout, "send dataid=%d ,mode=%c,len=%d \n"
|
|
656 ,id, tuple_space[id]->mode, tuple_space[id]->datalen);
|
|
657 #endif
|
|
658 // we should not free waiting in-packet's data
|
|
659 tp = tuple_space[id];
|
|
660 tuple_space[id] = tp->next;
|
|
661 free(tp);
|
|
662 tp = 0;
|
|
663 } else if ((tuple_space[id]==NULL) || (tuple_space[id]->mode == 'o')){
|
|
664 if ((tp = tuple_space[id]) == NULL){
|
|
665 tp = tuple_space[id] = (TUPLE *) malloc(sizeof(TUPLE));
|
|
666 tp->next = NULL;
|
|
667 } else {
|
|
668 while(tp->next) tp = tp->next;
|
|
669 tp->next = (TUPLE *) malloc(sizeof(TUPLE));
|
|
670 tp = tp->next;
|
|
671 tp->next = NULL;
|
|
672 }
|
|
673 tp->mode = 'o';
|
|
674 tp->seq = psx_get_seq(buf);
|
|
675 tp->data = buf;//(char *) malloc(datasize);
|
|
676 tp->datalen = datasize;
|
|
677 buf = 0;
|
|
678 // memcpy(tp->data, buf+LINDA_HEADER_SIZE,datasize);
|
|
679 #ifdef DEBUG
|
|
680 fprintf(stdout, "data inserted len = %d, id = %d\n" ,tp->datalen,id);
|
|
681 #endif
|
|
682 } else {
|
|
683 fprintf(stdout,"Uncorrect mode : %c\n",tuple_space[id]->mode);
|
|
684 free(buf);
|
|
685 buf=0;
|
|
686 return(0);
|
|
687 }
|
|
688
|
|
689 } else {
|
|
690 fprintf(stdout,"Uncorrect buffer\n");
|
|
691 if(buf) free(buf);
|
|
692 return(0);
|
|
693 }
|
|
694 }
|
|
695 }
|
|
696 }
|
|
697 perror("Unexpeced end");
|
|
698 close(ls);/* conection terminat */
|
|
699 }
|
|
700
|