view Linda/ldserv.c @ 19:8ab79a86aa73

*** empty log message ***
author gongo
date Sat, 04 Nov 2006 09:13:06 +0000
parents 0fae5658fb0b
children
line wrap: on
line source

/**********************************************************************/
/*                                                                    */
/*  Linda Server for PlayStation                                      */
//       $Id$            
//
/*                                                                    */
/**********************************************************************/
#include <stdio.h>      //ヘッダーファイルの作成
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/uio.h>
#include <sys/time.h>
#include <sys/select.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <signal.h>
#include <termios.h>
#include <netdb.h>
#include <errno.h>

#define MAX_REQ		1
#define FAIL		(-1)
#define MAX_USER	4
#define MAX_TUPLE	65536

char *PATHNAME	 = "/tmp/ldserv";

//#define DEB


#define BUFFSIZE        65535

/*----------------------------------------------------------------------
 パケットフォーマット
 char     short  int    int
 Mode +   ID   + Seq  + Data_len + Padding + Data
 0        1      3      7          11        12
----------------------------------------------------------------------*/
#define LINDA_MODE_OFFSET          0
#define LINDA_ID_OFFSET            1
#define LINDA_SEQ_OFFSET           3
#define LINDA_DATA_LENGTH_OFFSET   7
#define LINDA_HEADER_SIZE          12


#define INT_SIZE    4     // Byte = sizeof(int)

typedef struct tuple{     //構造体宣言
    char *data;
    struct tuple *next;
    unsigned int seq;
    unsigned short fd;
    unsigned int datalen;
    char mode;
} TUPLE ;

TUPLE *t[MAX_TUPLE];     // TUPLE Space
TUPLE *tp;
int unix_port = 11511;   // unix_portを指定。
extern int errno;

#ifdef COUNT_PACKET
// print packet count message per PRINT_INTERVAL sec
#define PRINT_INTERVAL 4

/*-------------------------------------------------------------------/
  void
  count_packet (char type):
      クライアントより受け取ったコマンドをカウントする関数
      
  引き数:
      type - 受け取ったコマンドの種類(char型: i,o,r,w)
/-------------------------------------------------------------------*/
void count_packet(char type)
{
    static int out_packet=-1,read_packet=0,in_packet=0,wait_packet=0;
    static struct timeval start,now,previous;
    
    if (out_packet == -1) {
        gettimeofday(&start,NULL);
        gettimeofday(&previous,NULL);
        out_packet = 0;
        printf("packet\tout\tread\tin\twait\ttime\n");
    }
    
    if (type == 'o') {                   //受け取ったコマンドの種類によって分別。
        out_packet++;
    } else if (type == 'r') {
        read_packet++;
    } else if (type == 'i') {
        in_packet++;
    } else if (type == 'w') {
        wait_packet++;
    } else {
        fprintf(stderr,"No type in count_packet function\n");
        return;
    }
    
    gettimeofday(&now,NULL);
    if ((now.tv_sec-previous.tv_sec) > PRINT_INTERVAL) {
        printf("log\t%d\t%d\t%d\t%d\t%ld\n",
               out_packet,read_packet,in_packet,wait_packet,now.tv_sec-start.tv_sec);
        fflush(stdout);
        
        previous.tv_sec = now.tv_sec;
        out_packet = read_packet = in_packet = 0;
        wait_packet = 0;
    }
}
#endif

#ifdef DEB
/*-------------------------------------------------------------------/
  int
  show_tuple ():
      TUPLE Space にあるTUPLEをID順に表示する
/-------------------------------------------------------------------*/
void show_tuple()
{
    int i;

    static int toggle = -1;
    static struct timeval start,now,previous;
    
    TUPLE * tmp;

    if (toggle == -1) {
        gettimeofday(&start,NULL);
        toggle = 0;
    }
    gettimeofday(&now,NULL);
    printf("time ****  %ld\n",now.tv_sec-start.tv_sec);
    fflush(stdout);
    previous.tv_sec = now.tv_sec;
    
    printf("\n******\n");
    for(i=0;i<MAX_TUPLE-1;i++){
        if(t[i]){
            printf("id: %d\t",i);
            for(tmp=t[i];tmp;tmp = tmp->next){
                printf("%c:%d\t",tmp->mode,tmp->seq);
            }
            printf("\n");
        }
    }
    if(t[i]){
        printf("id: %d\t",i);
        for(tmp=t[i];tmp;tmp = tmp->next){
            printf("%c:%c%c\t",tmp->mode,tmp->data[0],tmp->data[1]);
        }
        printf("\n");
    }
    printf("******\n\n");
    return;
}
#endif

/*-------------------------------------------------------------------/
  int
  sz_send (int fd, unsigned char *buf, int size, int flag):
      クライアントへTUPLEを送る。

  引き数:
      fd   - 送信先のファイルディスクリプタ
      buf  - 送るデータ
      size - bufのサイズ(byte)
      flag - 使用していない
  返り値:
      送ったデータのbyte数
/-------------------------------------------------------------------*/
int
sz_send(int fd,unsigned char *buf,int size,int flag) {
    int i,nsize;
    nsize = htonl(size);
    i  = write(fd,&nsize,INT_SIZE);
    i += write(fd,buf,size);  // size は datasize + LINDA_HEADER_SIZE
    return(i);
}

/*-------------------------------------------------------------------/
  int
  main (int argc,char *argv[]):
      サーバのメイン。クライアントから送られて来た各コマンドに対応
      した処理を行う。
/-------------------------------------------------------------------*/
int
main(int argc,char *argv[])
{
    int i, a, users;
    int ls,sk,fd,paddrlen,maxfds,id;
    int datasize;
    int skfg = 1;
    short user = 0;
    unsigned char  userchar[2];
#ifdef UNIX_DOMAIN
    struct sockaddr_un my_addr, peer;
#else
    struct sockaddr_in my_addr, peer;
#endif
    struct timeval zerotime;
    unsigned char ipaddr[4];
    unsigned char *buf = 0;
    int len;
    int fduser[MAX_USER];
    fd_set readfds, suspect, tmpfd;
    char ch;
    
    extern char *optarg;
    extern int optind;
    
    while ((ch = getopt(argc, argv, "p:s:?")) != -1){
        switch(ch) {
        case 'p':
            unix_port = atoi(optarg);
            break;
        case 's':
            PATHNAME = optarg;
            break;
        case '?':
        default:
            fprintf(stderr,"usage: %s [-p port]\n",argv[0]);
            break;
        }
    }
    
    argc -= optind;
    argv += optind;
    
    zerotime.tv_sec = zerotime.tv_usec = 0L;
    users = 0;
    
#ifdef UNIX_DOMAIN
    if ((ls = socket(AF_UNIX, SOCK_STREAM, 0)) == FAIL){
#else
    if ((ls = socket(AF_INET, SOCK_STREAM, 0)) == FAIL){
#endif
        fprintf(stderr, "socket open error! errno :%d %s\n", errno,
                strerror(errno));
        exit(1);
    }
    if (setsockopt(ls, SOL_SOCKET, SO_REUSEADDR,(char *) &skfg, sizeof(skfg)) < 0){
        fprintf(stderr, "setsockopt error %d\n",errno);
        close(ls);
        exit(1);
    }
#ifdef UNIX_DOMAIN
    my_addr.sun_family = AF_UNIX;
    strcpy(my_addr.sun_path, PATHNAME);
#else
    my_addr.sin_family = AF_INET;
    my_addr.sin_port = unix_port;
    my_addr.sin_addr.s_addr = INADDR_ANY;
#endif

    if (bind(ls, (struct sockaddr *)&my_addr,sizeof(my_addr)) == FAIL){
        fprintf(stderr, "socket binded address error! errno:%d %s\n", errno,
            strerror(errno));
        close(ls);
        exit(1);
    }
    if (listen(ls, MAX_REQ) == FAIL){
        fprintf(stderr, "list creat error! errno:%d %s\n", errno,
                strerror(errno));
        close(ls);
        exit(1);
    }
    paddrlen = sizeof(peer);
    maxfds = FD_SETSIZE;
    fd = 0;
#ifdef DEB
    maxfds = 8;
#endif
    FD_ZERO(&readfds);

    // main loop
    while(1){
        FD_SET(ls, &readfds);
        tmpfd = readfds;

#ifdef DEB
        show_tuple();
#endif
        if ((sk = select(maxfds, &readfds, NULL, NULL, NULL)) == FAIL){
            if (errno == EBADF){
                for(i = 0;i < maxfds;i++){
                    FD_ZERO(&suspect);
                    if (FD_ISSET(i, &readfds)){
                        FD_SET(i, &suspect);
                        if (select(maxfds, &suspect, NULL, NULL, &zerotime) == FAIL){
                            fprintf(stdout, "%d descriptor clear",i);
                            FD_CLR(i, &readfds);
                        }
                        FD_CLR(i, &suspect);
                    }
                }
            } else {
                fprintf(stderr, "select error! errno:%d %s\n", errno,
                        strerror(errno));
                close(ls);
                exit(1);
            }
        } else {
            while(1){
                fd = (fd + 1) % maxfds;
                if (FD_ISSET(fd, &readfds)) break;
            }
            readfds = tmpfd;
            
            // 新規の接続
            if (fd == ls){
                if ((sk = accept(ls, (struct sockaddr *)&peer, &paddrlen)) == FAIL){
                    fprintf(stderr, "connection accept error! errno:%d %s\n", errno,
                            strerror(errno));
                    close(ls);
                    exit(1);
                }
#ifndef UNIX_DOMAIN
                if (peer.sin_family == AF_INET)
                {
                    int tmp = 1;

                    setsockopt (sk, IPPROTO_TCP, TCP_NODELAY,
                        (char *) &tmp, sizeof (int));
                }
#endif
                if ((tp = t[MAX_TUPLE-1]) == NULL){
                    tp = t[MAX_TUPLE-1] = (TUPLE *) malloc(sizeof(TUPLE));
                    tp->next = NULL;
                } else {
                    while(tp->next) tp = tp->next;
                    tp->next = (TUPLE *) malloc(sizeof(TUPLE));
                    tp = tp->next;
                    tp->next = NULL;
                }
                tp->mode = 'o'; 
                tp->seq = 0;
                tp->data = (char *) malloc(sizeof(short));
                tp->datalen = sizeof(short); user++;
                userchar[0] = user / 10 + '0';
                userchar[1] = user % 10 + '0';
                memcpy(tp->data, &userchar ,sizeof(userchar));
                
                fprintf(stdout, "localhost connected assign id = %d\n", user);
#ifndef UNIX_DOMAIN
                memcpy(ipaddr, &peer.sin_addr.s_addr, sizeof(ipaddr));
                fprintf(stdout, "%d.%d.%d.%d connected \n",
                        ipaddr[0],ipaddr[1],ipaddr[2],ipaddr[3]);
#endif
                FD_SET(sk,&readfds);
                
            // 通信を確立しているクライアントからの要求
            } else {
                int tempnum;
#ifdef DEB
                fprintf(stdout, "\n\ndata from %d descriptor\n", fd);
#endif
                if((len=read(fd,&tempnum,INT_SIZE))!=INT_SIZE) {
                    if (len==0) {
                        fprintf(stdout, "fd %d closed\n", fd);
                        close(fd);
                        continue;
                    }
                    fprintf(stderr, "read error! on fd:%d ret=%d %s\n", 
                            fd,len,
                        strerror(errno));
                    close(fd); 
                    continue;
                }
                len = ntohl(tempnum);  // len は DATASIZE + LINDA_HEADER_SIZE
#ifdef DEB
                fprintf(stderr, "datta on fd:%d len=%d\n", fd,len);
#endif
                if((buf = (unsigned char*)malloc(len))==NULL){
                    fprintf(stderr,"allocate error! :%d %s",errno,strerror(errno));
                    exit(1);
                }
                for(a=0;a<len;a+=i) {
                    if((i=read(fd,buf+a,len-a))<0) {
                        fprintf(stderr, "ldserv: client read error! on i=%d len=%d %s\n",
                                i, len, strerror(errno));
                        close(fd);
                    }
                }
#ifdef DEB
                fprintf(stdout,"recv size = %d : mode = %c : strings = %s\n",
                        len, *buf, buf + LINDA_HEADER_SIZE);
#endif

#ifdef COUNT_PACKET
                    count_packet(buf[0]);
#endif

                if ((buf[LINDA_MODE_OFFSET] == '!') || (len == 0)){
                    FD_CLR(fd, &readfds);
                    for(i = 0;i < users; i++){
                        if (fduser[i] == fd) break;
                    }
                    fprintf(stdout, "connection closed descriptor :%d\n", fd);
                    close(fd);
                } else if (buf[LINDA_MODE_OFFSET] == 'c'){
                    id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
                    //buf[LINDA_MODE_OFFSET] = 'a';
                    tp = t[id];
                    while(tp && tp->next && (tp->mode=='w')){
                        tp = tp->next;
                    }
                    if (tp && (tp->mode == 'o')){                        
                        buf[LINDA_DATA_LENGTH_OFFSET]   = (tp->datalen>>24) & 0xff;
                        buf[LINDA_DATA_LENGTH_OFFSET+1] = (tp->datalen>>16) & 0xff;
                        buf[LINDA_DATA_LENGTH_OFFSET+2] = (tp->datalen>>8)  & 0xff;
                        buf[LINDA_DATA_LENGTH_OFFSET+3] = (tp->datalen)     & 0xff;
                        
                    } else {
                        /* means no out tuple */
                        memset(&buf[LINDA_DATA_LENGTH_OFFSET],0,INT_SIZE);
                    }

                    if (sz_send(fd, buf, LINDA_HEADER_SIZE, 0) == FAIL){
                        fprintf(stderr,"recv error! errno :%d %s\n", errno,
                                strerror(errno));
                    }
#ifdef DEB
                    fprintf(stdout,"send size= %d : mode = %c\n",len, *buf);
#endif
                } else if (buf[LINDA_MODE_OFFSET] == 'i' || buf[LINDA_MODE_OFFSET] == 'r'){
                    
                    int mode = buf[LINDA_MODE_OFFSET];

                    TUPLE * temp = NULL;
                    id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
#ifdef DEB
                    fprintf(stdout, "***%c command : id = %d ***\n",mode,id);
#endif
                    tp = t[id];

                    // w は無視する
                    while(tp && tp->next && (tp->mode=='w')){
                        temp = tp;
                        tp = tp->next;
                    }
                    
                    if (tp && (tp->mode == 'o')){
                        if((buf = (unsigned char*)realloc(buf,tp->datalen+LINDA_HEADER_SIZE)) == NULL){
                            fprintf(stderr,"2 reallocate error! errno :%d %s",errno,strerror(errno));
                            exit(1);
                        }
                        
                        buf[LINDA_MODE_OFFSET] = 'a'; 
                        /* seq and id are already set */
                        buf[LINDA_DATA_LENGTH_OFFSET]   = (tp->datalen>>24) & 0xff;
                        buf[LINDA_DATA_LENGTH_OFFSET+1] = (tp->datalen>>16) & 0xff;
                        buf[LINDA_DATA_LENGTH_OFFSET+2] = (tp->datalen>>8)  & 0xff;
                        buf[LINDA_DATA_LENGTH_OFFSET+3] = (tp->datalen)     & 0xff;
                        
                        memcpy(buf+LINDA_HEADER_SIZE, tp->data, tp->datalen);
#ifdef DEB
                        fprintf(stdout,"send size = %d : mode = %c : strings = %s\n",
                                len, *buf, buf + LINDA_HEADER_SIZE);
#endif
                        if (sz_send(fd, buf, (tp->datalen) + LINDA_HEADER_SIZE, 0) == FAIL){
                            fprintf(stderr,"recv error! errno :%d %s\n", errno,
                                    strerror(errno));
                        }
                        if(mode == 'i') {
                            free(tp->data);
                            tp->data=0;
                            if(temp){
                                temp->next = tp->next;
                            }else{
                                t[id] = tp->next;
                            }
                            free(tp);
                            
                        }
                    } else {
                        if (tp == NULL){
                            tp = t[id] = (TUPLE *) malloc(sizeof(TUPLE));
                            tp->next = NULL;
                        } else {
                            while(tp->next) tp = tp->next;
                            tp->next = (TUPLE *) malloc(sizeof(TUPLE));
                            tp = tp->next;
                            tp->next = NULL;
                        }
                        
                        tp->mode = mode;
                        tp->seq = (buf[LINDA_SEQ_OFFSET]  <<24) +
                            (buf[LINDA_SEQ_OFFSET+1]<<16) +
                            (buf[LINDA_SEQ_OFFSET+2]<<8)  +
                            (buf[LINDA_SEQ_OFFSET+3]);  /* seq */
                        tp->fd = fd;
                        tp->datalen = 0;
                        tp->data = 0;
#ifdef DEB
                        fprintf(stdout, "data inserted insert seq = %d, id = %d\n"
                                ,tp->seq,id);
#endif
                    }
                } else if (buf[LINDA_MODE_OFFSET] == 'w'){
                    int mode = buf[LINDA_MODE_OFFSET];

                    id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
#ifdef DEB
                    fprintf(stdout, "***%c command : id = %d ***\n",mode,id);
#endif
                    tp = (TUPLE *) malloc(sizeof(TUPLE));
                    tp->mode = mode;
                    tp->seq = (buf[LINDA_SEQ_OFFSET]  <<24) +
                        (buf[LINDA_SEQ_OFFSET+1]<<16) +
                        (buf[LINDA_SEQ_OFFSET+2]<<8)  +
                        (buf[LINDA_SEQ_OFFSET+3]);  /* seq */
                    tp->fd = fd;
                    tp->datalen = 0;
                    tp->data = 0;
                    tp->next = t[id];
                    t[id] = tp;
#ifdef DEB
                    fprintf(stdout, "data inserted insert seq = %d, id = %d\n"
                            ,t[id]->seq,id);
#endif
                } else if (buf[LINDA_MODE_OFFSET] == 'o'){
                    id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
                    
                    datasize = (buf[LINDA_DATA_LENGTH_OFFSET]  <<24) +
                               (buf[LINDA_DATA_LENGTH_OFFSET+1]<<16) +
                               (buf[LINDA_DATA_LENGTH_OFFSET+2]<<8)  +
                               (buf[LINDA_DATA_LENGTH_OFFSET+3]);
#ifdef DEB
                    fprintf(stdout, "*** out command : seq = %d ***\n",id);
#endif
                    while((t[id]) && ((t[id]->mode=='w')||(t[id]->mode == 'r'))){
                        
                        buf[LINDA_MODE_OFFSET] = 'a';
                        buf[LINDA_SEQ_OFFSET]   = (t[id]->seq>>24) & 0xff;
                        buf[LINDA_SEQ_OFFSET+1] = (t[id]->seq>>16) & 0xff;
                        buf[LINDA_SEQ_OFFSET+2] = (t[id]->seq>>8)  & 0xff;
                        buf[LINDA_SEQ_OFFSET+3] = (t[id]->seq)     & 0xff;
                        if (sz_send(t[id]->fd,buf,datasize+LINDA_HEADER_SIZE, 0) == FAIL){
                            fprintf(stderr,"recv error! errno :%d %s\n", errno,
                                    strerror(errno));
                        }
                        tp = t[id];
                        t[id] = tp->next;
                        free(tp);
                    }
                    if ((t[id])&&t[id]->mode == 'i'){
                        buf[LINDA_MODE_OFFSET] = 'a'; 
                        buf[LINDA_SEQ_OFFSET]   = (t[id]->seq>>24) & 0xff;
                        buf[LINDA_SEQ_OFFSET+1] = (t[id]->seq>>16) & 0xff;
                        buf[LINDA_SEQ_OFFSET+2] = (t[id]->seq>>8)  & 0xff;
                        buf[LINDA_SEQ_OFFSET+3] = (t[id]->seq)     & 0xff;
                        if (sz_send(t[id]->fd,buf,datasize+LINDA_HEADER_SIZE,0)==FAIL){
                                fprintf(stderr,"recv error! errno :%d %s\n", errno,
                                        strerror(errno));
                        }
#ifdef DEB
                        fprintf(stdout,"sendsize = %d :mode = %c :strings=%s\n",
                                len, *buf, buf + LINDA_HEADER_SIZE);
#endif
#ifdef DEB
                        fprintf(stdout, "data senddata =%s,id=%d ,mode=%c,len=%d \n"
                                ,tp->data, id, t[id]->mode, t[id]->datalen);
#endif
                        // we should not free waiting in-packet's data
                        tp = t[id]; t[id] = tp->next; free(tp);
                    } else if ((t[id]==NULL) || (t[id]->mode == 'o')){
                        if ((tp = t[id]) == NULL){
                            tp = t[id] = (TUPLE *) malloc(sizeof(TUPLE));
                            tp->next = NULL;
                        } else {
                            while(tp->next) tp = tp->next;
                            tp->next = (TUPLE *) malloc(sizeof(TUPLE));
                            tp = tp->next;
                            tp->next = NULL;
                        }
                        tp->mode = 'o';
                        tp->seq =(buf[LINDA_SEQ_OFFSET]  <<24) +
                            (buf[LINDA_SEQ_OFFSET+1]<<16) +
                            (buf[LINDA_SEQ_OFFSET+2]<<8)  +
                            (buf[LINDA_SEQ_OFFSET+3]);
                        tp->data = (char *) malloc(datasize);
                        tp->datalen = datasize;
                        
                        memcpy(tp->data, buf+LINDA_HEADER_SIZE,datasize);
#ifdef DEB
                        fprintf(stdout, "data inserted data = %s, len = %d, id = %d\n"
                                ,tp->data,tp->datalen,id);
#endif
                    } else {
                        fprintf(stdout,"Uncorrect mode : %c\n",t[id]->mode);
                        return(0);
                    }
                } else {
                    fprintf(stdout,"Uncorrect buffer\n");
                    if(buf) free(buf);
                    return(0);
                }
                free(buf);
                buf = 0;
            }
        }
    }
    close(ls);/* conection terminat */
}