diff Linda/ldserv.c @ 0:0fae5658fb0b

Initial revision
author gongo
date Thu, 02 Nov 2006 08:55:19 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Linda/ldserv.c	Thu Nov 02 08:55:19 2006 +0000
@@ -0,0 +1,611 @@
+/**********************************************************************/
+/*                                                                    */
+/*  Linda Server for PlayStation                                      */
+//       $Id$            
+//
+/*                                                                    */
+/**********************************************************************/
+#include <stdio.h>      //ヘッダーファイルの作成
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/uio.h>
+#include <sys/time.h>
+#include <sys/select.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <signal.h>
+#include <termios.h>
+#include <netdb.h>
+#include <errno.h>
+
+#define MAX_REQ		1
+#define FAIL		(-1)
+#define MAX_USER	4
+#define MAX_TUPLE	65536
+
+char *PATHNAME	 = "/tmp/ldserv";
+
+//#define DEB
+
+
+#define BUFFSIZE        65535
+
+/*----------------------------------------------------------------------
+ パケットフォーマット
+ char     short  int    int
+ Mode +   ID   + Seq  + Data_len + Padding + Data
+ 0        1      3      7          11        12
+----------------------------------------------------------------------*/
+#define LINDA_MODE_OFFSET          0
+#define LINDA_ID_OFFSET            1
+#define LINDA_SEQ_OFFSET           3
+#define LINDA_DATA_LENGTH_OFFSET   7
+#define LINDA_HEADER_SIZE          12
+
+
+#define INT_SIZE    4     // Byte = sizeof(int)
+
+typedef struct tuple{     //構造体宣言
+    char *data;
+    struct tuple *next;
+    unsigned int seq;
+    unsigned short fd;
+    unsigned int datalen;
+    char mode;
+} TUPLE ;
+
+TUPLE *t[MAX_TUPLE];     // TUPLE Space
+TUPLE *tp;
+int unix_port = 11511;   // unix_portを指定。
+extern int errno;
+
+#ifdef COUNT_PACKET
+// print packet count message per PRINT_INTERVAL sec
+#define PRINT_INTERVAL 4
+
+/*-------------------------------------------------------------------/
+  void
+  count_packet (char type):
+      クライアントより受け取ったコマンドをカウントする関数
+      
+  引き数:
+      type - 受け取ったコマンドの種類(char型: i,o,r,w)
+/-------------------------------------------------------------------*/
+void count_packet(char type)
+{
+    static int out_packet=-1,read_packet=0,in_packet=0,wait_packet=0;
+    static struct timeval start,now,previous;
+    
+    if (out_packet == -1) {
+        gettimeofday(&start,NULL);
+        gettimeofday(&previous,NULL);
+        out_packet = 0;
+        printf("packet\tout\tread\tin\twait\ttime\n");
+    }
+    
+    if (type == 'o') {                   //受け取ったコマンドの種類によって分別。
+        out_packet++;
+    } else if (type == 'r') {
+        read_packet++;
+    } else if (type == 'i') {
+        in_packet++;
+    } else if (type == 'w') {
+        wait_packet++;
+    } else {
+        fprintf(stderr,"No type in count_packet function\n");
+        return;
+    }
+    
+    gettimeofday(&now,NULL);
+    if ((now.tv_sec-previous.tv_sec) > PRINT_INTERVAL) {
+        printf("log\t%d\t%d\t%d\t%d\t%ld\n",
+               out_packet,read_packet,in_packet,wait_packet,now.tv_sec-start.tv_sec);
+        fflush(stdout);
+        
+        previous.tv_sec = now.tv_sec;
+        out_packet = read_packet = in_packet = 0;
+        wait_packet = 0;
+    }
+}
+#endif
+
+#ifdef DEB
+/*-------------------------------------------------------------------/
+  int
+  show_tuple ():
+      TUPLE Space にあるTUPLEをID順に表示する
+/-------------------------------------------------------------------*/
+void show_tuple()
+{
+    int i;
+
+    static int toggle = -1;
+    static struct timeval start,now,previous;
+    
+    TUPLE * tmp;
+
+    if (toggle == -1) {
+        gettimeofday(&start,NULL);
+        toggle = 0;
+    }
+    gettimeofday(&now,NULL);
+    printf("time ****  %ld\n",now.tv_sec-start.tv_sec);
+    fflush(stdout);
+    previous.tv_sec = now.tv_sec;
+    
+    printf("\n******\n");
+    for(i=0;i<MAX_TUPLE-1;i++){
+        if(t[i]){
+            printf("id: %d\t",i);
+            for(tmp=t[i];tmp;tmp = tmp->next){
+                printf("%c:%d\t",tmp->mode,tmp->seq);
+            }
+            printf("\n");
+        }
+    }
+    if(t[i]){
+        printf("id: %d\t",i);
+        for(tmp=t[i];tmp;tmp = tmp->next){
+            printf("%c:%c%c\t",tmp->mode,tmp->data[0],tmp->data[1]);
+        }
+        printf("\n");
+    }
+    printf("******\n\n");
+    return;
+}
+#endif
+
+/*-------------------------------------------------------------------/
+  int
+  sz_send (int fd, unsigned char *buf, int size, int flag):
+      クライアントへTUPLEを送る。
+
+  引き数:
+      fd   - 送信先のファイルディスクリプタ
+      buf  - 送るデータ
+      size - bufのサイズ(byte)
+      flag - 使用していない
+  返り値:
+      送ったデータのbyte数
+/-------------------------------------------------------------------*/
+int
+sz_send(int fd,unsigned char *buf,int size,int flag) {
+    int i,nsize;
+    nsize = htonl(size);
+    i  = write(fd,&nsize,INT_SIZE);
+    i += write(fd,buf,size);  // size は datasize + LINDA_HEADER_SIZE
+    return(i);
+}
+
+/*-------------------------------------------------------------------/
+  int
+  main (int argc,char *argv[]):
+      サーバのメイン。クライアントから送られて来た各コマンドに対応
+      した処理を行う。
+/-------------------------------------------------------------------*/
+int
+main(int argc,char *argv[])
+{
+    int i, a, users;
+    int ls,sk,fd,paddrlen,maxfds,id;
+    int datasize;
+    int skfg = 1;
+    short user = 0;
+    unsigned char  userchar[2];
+#ifdef UNIX_DOMAIN
+    struct sockaddr_un my_addr, peer;
+#else
+    struct sockaddr_in my_addr, peer;
+#endif
+    struct timeval zerotime;
+    unsigned char ipaddr[4];
+    unsigned char *buf = 0;
+    int len;
+    int fduser[MAX_USER];
+    fd_set readfds, suspect, tmpfd;
+    char ch;
+    
+    extern char *optarg;
+    extern int optind;
+    
+    while ((ch = getopt(argc, argv, "p:s:?")) != -1){
+        switch(ch) {
+        case 'p':
+            unix_port = atoi(optarg);
+            break;
+        case 's':
+            PATHNAME = optarg;
+            break;
+        case '?':
+        default:
+            fprintf(stderr,"usage: %s [-p port]\n",argv[0]);
+            break;
+        }
+    }
+    
+    argc -= optind;
+    argv += optind;
+    
+    zerotime.tv_sec = zerotime.tv_usec = 0L;
+    users = 0;
+    
+#ifdef UNIX_DOMAIN
+    if ((ls = socket(AF_UNIX, SOCK_STREAM, 0)) == FAIL){
+#else
+    if ((ls = socket(AF_INET, SOCK_STREAM, 0)) == FAIL){
+#endif
+        fprintf(stderr, "socket open error! errno :%d %s\n", errno,
+                strerror(errno));
+        exit(1);
+    }
+    if (setsockopt(ls, SOL_SOCKET, SO_REUSEADDR,(char *) &skfg, sizeof(skfg)) < 0){
+        fprintf(stderr, "setsockopt error %d\n",errno);
+        close(ls);
+        exit(1);
+    }
+#ifdef UNIX_DOMAIN
+    my_addr.sun_family = AF_UNIX;
+    strcpy(my_addr.sun_path, PATHNAME);
+#else
+    my_addr.sin_family = AF_INET;
+    my_addr.sin_port = unix_port;
+    my_addr.sin_addr.s_addr = INADDR_ANY;
+#endif
+
+    if (bind(ls, (struct sockaddr *)&my_addr,sizeof(my_addr)) == FAIL){
+        fprintf(stderr, "socket binded address error! errno:%d %s\n", errno,
+            strerror(errno));
+        close(ls);
+        exit(1);
+    }
+    if (listen(ls, MAX_REQ) == FAIL){
+        fprintf(stderr, "list creat error! errno:%d %s\n", errno,
+                strerror(errno));
+        close(ls);
+        exit(1);
+    }
+    paddrlen = sizeof(peer);
+    maxfds = FD_SETSIZE;
+    fd = 0;
+#ifdef DEB
+    maxfds = 8;
+#endif
+    FD_ZERO(&readfds);
+
+    // main loop
+    while(1){
+        FD_SET(ls, &readfds);
+        tmpfd = readfds;
+
+#ifdef DEB
+        show_tuple();
+#endif
+        if ((sk = select(maxfds, &readfds, NULL, NULL, NULL)) == FAIL){
+            if (errno == EBADF){
+                for(i = 0;i < maxfds;i++){
+                    FD_ZERO(&suspect);
+                    if (FD_ISSET(i, &readfds)){
+                        FD_SET(i, &suspect);
+                        if (select(maxfds, &suspect, NULL, NULL, &zerotime) == FAIL){
+                            fprintf(stdout, "%d descriptor clear",i);
+                            FD_CLR(i, &readfds);
+                        }
+                        FD_CLR(i, &suspect);
+                    }
+                }
+            } else {
+                fprintf(stderr, "select error! errno:%d %s\n", errno,
+                        strerror(errno));
+                close(ls);
+                exit(1);
+            }
+        } else {
+            while(1){
+                fd = (fd + 1) % maxfds;
+                if (FD_ISSET(fd, &readfds)) break;
+            }
+            readfds = tmpfd;
+            
+            // 新規の接続
+            if (fd == ls){
+                if ((sk = accept(ls, (struct sockaddr *)&peer, &paddrlen)) == FAIL){
+                    fprintf(stderr, "connection accept error! errno:%d %s\n", errno,
+                            strerror(errno));
+                    close(ls);
+                    exit(1);
+                }
+#ifndef UNIX_DOMAIN
+                if (peer.sin_family == AF_INET)
+                {
+                    int tmp = 1;
+
+                    setsockopt (sk, IPPROTO_TCP, TCP_NODELAY,
+                        (char *) &tmp, sizeof (int));
+                }
+#endif
+                if ((tp = t[MAX_TUPLE-1]) == NULL){
+                    tp = t[MAX_TUPLE-1] = (TUPLE *) malloc(sizeof(TUPLE));
+                    tp->next = NULL;
+                } else {
+                    while(tp->next) tp = tp->next;
+                    tp->next = (TUPLE *) malloc(sizeof(TUPLE));
+                    tp = tp->next;
+                    tp->next = NULL;
+                }
+                tp->mode = 'o'; 
+                tp->seq = 0;
+                tp->data = (char *) malloc(sizeof(short));
+                tp->datalen = sizeof(short); user++;
+                userchar[0] = user / 10 + '0';
+                userchar[1] = user % 10 + '0';
+                memcpy(tp->data, &userchar ,sizeof(userchar));
+                
+                fprintf(stdout, "localhost connected assign id = %d\n", user);
+#ifndef UNIX_DOMAIN
+                memcpy(ipaddr, &peer.sin_addr.s_addr, sizeof(ipaddr));
+                fprintf(stdout, "%d.%d.%d.%d connected \n",
+                        ipaddr[0],ipaddr[1],ipaddr[2],ipaddr[3]);
+#endif
+                FD_SET(sk,&readfds);
+                
+            // 通信を確立しているクライアントからの要求
+            } else {
+                int tempnum;
+#ifdef DEB
+                fprintf(stdout, "\n\ndata from %d descriptor\n", fd);
+#endif
+                if((len=read(fd,&tempnum,INT_SIZE))!=INT_SIZE) {
+                    if (len==0) {
+                        fprintf(stdout, "fd %d closed\n", fd);
+                        close(fd);
+                        continue;
+                    }
+                    fprintf(stderr, "read error! on fd:%d ret=%d %s\n", 
+                            fd,len,
+                        strerror(errno));
+                    close(fd); 
+                    continue;
+                }
+                len = ntohl(tempnum);  // len は DATASIZE + LINDA_HEADER_SIZE
+#ifdef DEB
+                fprintf(stderr, "datta on fd:%d len=%d\n", fd,len);
+#endif
+                if((buf = (unsigned char*)malloc(len))==NULL){
+                    fprintf(stderr,"allocate error! :%d %s",errno,strerror(errno));
+                    exit(1);
+                }
+                for(a=0;a<len;a+=i) {
+                    if((i=read(fd,buf+a,len-a))<0) {
+                        fprintf(stderr, "ldserv: client read error! on i=%d len=%d %s\n",
+                                i, len, strerror(errno));
+                        close(fd);
+                    }
+                }
+#ifdef DEB
+                fprintf(stdout,"recv size = %d : mode = %c : strings = %s\n",
+                        len, *buf, buf + LINDA_HEADER_SIZE);
+#endif
+
+#ifdef COUNT_PACKET
+                    count_packet(buf[0]);
+#endif
+
+                if ((buf[LINDA_MODE_OFFSET] == '!') || (len == 0)){
+                    FD_CLR(fd, &readfds);
+                    for(i = 0;i < users; i++){
+                        if (fduser[i] == fd) break;
+                    }
+                    fprintf(stdout, "connection closed descriptor :%d\n", fd);
+                    close(fd);
+                } else if (buf[LINDA_MODE_OFFSET] == 'c'){
+                    id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
+                    //buf[LINDA_MODE_OFFSET] = 'a';
+                    tp = t[id];
+                    while(tp && tp->next && (tp->mode=='w')){
+                        tp = tp->next;
+                    }
+                    if (tp && (tp->mode == 'o')){                        
+                        buf[LINDA_DATA_LENGTH_OFFSET]   = (tp->datalen>>24) & 0xff;
+                        buf[LINDA_DATA_LENGTH_OFFSET+1] = (tp->datalen>>16) & 0xff;
+                        buf[LINDA_DATA_LENGTH_OFFSET+2] = (tp->datalen>>8)  & 0xff;
+                        buf[LINDA_DATA_LENGTH_OFFSET+3] = (tp->datalen)     & 0xff;
+                        
+                    } else {
+                        /* means no out tuple */
+                        memset(&buf[LINDA_DATA_LENGTH_OFFSET],0,INT_SIZE);
+                    }
+
+                    if (sz_send(fd, buf, LINDA_HEADER_SIZE, 0) == FAIL){
+                        fprintf(stderr,"recv error! errno :%d %s\n", errno,
+                                strerror(errno));
+                    }
+#ifdef DEB
+                    fprintf(stdout,"send size= %d : mode = %c\n",len, *buf);
+#endif
+                } else if (buf[LINDA_MODE_OFFSET] == 'i' || buf[LINDA_MODE_OFFSET] == 'r'){
+                    
+                    int mode = buf[LINDA_MODE_OFFSET];
+
+                    TUPLE * temp = NULL;
+                    id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
+#ifdef DEB
+                    fprintf(stdout, "***%c command : id = %d ***\n",mode,id);
+#endif
+                    tp = t[id];
+
+                    // w は無視する
+                    while(tp && tp->next && (tp->mode=='w')){
+                        temp = tp;
+                        tp = tp->next;
+                    }
+                    
+                    if (tp && (tp->mode == 'o')){
+                        if((buf = (unsigned char*)realloc(buf,tp->datalen+LINDA_HEADER_SIZE)) == NULL){
+                            fprintf(stderr,"2 reallocate error! errno :%d %s",errno,strerror(errno));
+                            exit(1);
+                        }
+                        
+                        buf[LINDA_MODE_OFFSET] = 'a'; 
+                        /* seq and id are already set */
+                        buf[LINDA_DATA_LENGTH_OFFSET]   = (tp->datalen>>24) & 0xff;
+                        buf[LINDA_DATA_LENGTH_OFFSET+1] = (tp->datalen>>16) & 0xff;
+                        buf[LINDA_DATA_LENGTH_OFFSET+2] = (tp->datalen>>8)  & 0xff;
+                        buf[LINDA_DATA_LENGTH_OFFSET+3] = (tp->datalen)     & 0xff;
+                        
+                        memcpy(buf+LINDA_HEADER_SIZE, tp->data, tp->datalen);
+#ifdef DEB
+                        fprintf(stdout,"send size = %d : mode = %c : strings = %s\n",
+                                len, *buf, buf + LINDA_HEADER_SIZE);
+#endif
+                        if (sz_send(fd, buf, (tp->datalen) + LINDA_HEADER_SIZE, 0) == FAIL){
+                            fprintf(stderr,"recv error! errno :%d %s\n", errno,
+                                    strerror(errno));
+                        }
+                        if(mode == 'i') {
+                            free(tp->data);
+                            tp->data=0;
+                            if(temp){
+                                temp->next = tp->next;
+                            }else{
+                                t[id] = tp->next;
+                            }
+                            free(tp);
+                            
+                        }
+                    } else {
+                        if (tp == NULL){
+                            tp = t[id] = (TUPLE *) malloc(sizeof(TUPLE));
+                            tp->next = NULL;
+                        } else {
+                            while(tp->next) tp = tp->next;
+                            tp->next = (TUPLE *) malloc(sizeof(TUPLE));
+                            tp = tp->next;
+                            tp->next = NULL;
+                        }
+                        
+                        tp->mode = mode;
+                        tp->seq = (buf[LINDA_SEQ_OFFSET]  <<24) +
+                            (buf[LINDA_SEQ_OFFSET+1]<<16) +
+                            (buf[LINDA_SEQ_OFFSET+2]<<8)  +
+                            (buf[LINDA_SEQ_OFFSET+3]);  /* seq */
+                        tp->fd = fd;
+                        tp->datalen = 0;
+                        tp->data = 0;
+#ifdef DEB
+                        fprintf(stdout, "data inserted insert seq = %d, id = %d\n"
+                                ,tp->seq,id);
+#endif
+                    }
+                } else if (buf[LINDA_MODE_OFFSET] == 'w'){
+                    int mode = buf[LINDA_MODE_OFFSET];
+
+                    id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
+#ifdef DEB
+                    fprintf(stdout, "***%c command : id = %d ***\n",mode,id);
+#endif
+                    tp = (TUPLE *) malloc(sizeof(TUPLE));
+                    tp->mode = mode;
+                    tp->seq = (buf[LINDA_SEQ_OFFSET]  <<24) +
+                        (buf[LINDA_SEQ_OFFSET+1]<<16) +
+                        (buf[LINDA_SEQ_OFFSET+2]<<8)  +
+                        (buf[LINDA_SEQ_OFFSET+3]);  /* seq */
+                    tp->fd = fd;
+                    tp->datalen = 0;
+                    tp->data = 0;
+                    tp->next = t[id];
+                    t[id] = tp;
+#ifdef DEB
+                    fprintf(stdout, "data inserted insert seq = %d, id = %d\n"
+                            ,t[id]->seq,id);
+#endif
+                } else if (buf[LINDA_MODE_OFFSET] == 'o'){
+                    id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
+                    
+                    datasize = (buf[LINDA_DATA_LENGTH_OFFSET]  <<24) +
+                               (buf[LINDA_DATA_LENGTH_OFFSET+1]<<16) +
+                               (buf[LINDA_DATA_LENGTH_OFFSET+2]<<8)  +
+                               (buf[LINDA_DATA_LENGTH_OFFSET+3]);
+#ifdef DEB
+                    fprintf(stdout, "*** out command : seq = %d ***\n",id);
+#endif
+                    while((t[id]) && ((t[id]->mode=='w')||(t[id]->mode == 'r'))){
+                        
+                        buf[LINDA_MODE_OFFSET] = 'a';
+                        buf[LINDA_SEQ_OFFSET]   = (t[id]->seq>>24) & 0xff;
+                        buf[LINDA_SEQ_OFFSET+1] = (t[id]->seq>>16) & 0xff;
+                        buf[LINDA_SEQ_OFFSET+2] = (t[id]->seq>>8)  & 0xff;
+                        buf[LINDA_SEQ_OFFSET+3] = (t[id]->seq)     & 0xff;
+                        if (sz_send(t[id]->fd,buf,datasize+LINDA_HEADER_SIZE, 0) == FAIL){
+                            fprintf(stderr,"recv error! errno :%d %s\n", errno,
+                                    strerror(errno));
+                        }
+                        tp = t[id];
+                        t[id] = tp->next;
+                        free(tp);
+                    }
+                    if ((t[id])&&t[id]->mode == 'i'){
+                        buf[LINDA_MODE_OFFSET] = 'a'; 
+                        buf[LINDA_SEQ_OFFSET]   = (t[id]->seq>>24) & 0xff;
+                        buf[LINDA_SEQ_OFFSET+1] = (t[id]->seq>>16) & 0xff;
+                        buf[LINDA_SEQ_OFFSET+2] = (t[id]->seq>>8)  & 0xff;
+                        buf[LINDA_SEQ_OFFSET+3] = (t[id]->seq)     & 0xff;
+                        if (sz_send(t[id]->fd,buf,datasize+LINDA_HEADER_SIZE,0)==FAIL){
+                                fprintf(stderr,"recv error! errno :%d %s\n", errno,
+                                        strerror(errno));
+                        }
+#ifdef DEB
+                        fprintf(stdout,"sendsize = %d :mode = %c :strings=%s\n",
+                                len, *buf, buf + LINDA_HEADER_SIZE);
+#endif
+#ifdef DEB
+                        fprintf(stdout, "data senddata =%s,id=%d ,mode=%c,len=%d \n"
+                                ,tp->data, id, t[id]->mode, t[id]->datalen);
+#endif
+                        // we should not free waiting in-packet's data
+                        tp = t[id]; t[id] = tp->next; free(tp);
+                    } else if ((t[id]==NULL) || (t[id]->mode == 'o')){
+                        if ((tp = t[id]) == NULL){
+                            tp = t[id] = (TUPLE *) malloc(sizeof(TUPLE));
+                            tp->next = NULL;
+                        } else {
+                            while(tp->next) tp = tp->next;
+                            tp->next = (TUPLE *) malloc(sizeof(TUPLE));
+                            tp = tp->next;
+                            tp->next = NULL;
+                        }
+                        tp->mode = 'o';
+                        tp->seq =(buf[LINDA_SEQ_OFFSET]  <<24) +
+                            (buf[LINDA_SEQ_OFFSET+1]<<16) +
+                            (buf[LINDA_SEQ_OFFSET+2]<<8)  +
+                            (buf[LINDA_SEQ_OFFSET+3]);
+                        tp->data = (char *) malloc(datasize);
+                        tp->datalen = datasize;
+                        
+                        memcpy(tp->data, buf+LINDA_HEADER_SIZE,datasize);
+#ifdef DEB
+                        fprintf(stdout, "data inserted data = %s, len = %d, id = %d\n"
+                                ,tp->data,tp->datalen,id);
+#endif
+                    } else {
+                        fprintf(stdout,"Uncorrect mode : %c\n",t[id]->mode);
+                        return(0);
+                    }
+                } else {
+                    fprintf(stdout,"Uncorrect buffer\n");
+                    if(buf) free(buf);
+                    return(0);
+                }
+                free(buf);
+                buf = 0;
+            }
+        }
+    }
+    close(ls);/* conection terminat */
+}
+