view tools/Linda_library/ldserv.c @ 122:ad73eacf560a default tip

remove warning
author e095732
date Thu, 07 Feb 2013 22:32:26 +0900
parents 6c40056777be
children
line wrap: on
line source

/**********************************************************************/
/*                                                                    */
/*  Linda Server                                                      */
//       $Id$            
//
/*                                                                    */
/**********************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/uio.h>
#include <sys/time.h>
#include <sys/select.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <signal.h>
#include <termios.h>
#include <netdb.h>
#include <errno.h>
#include <signal.h>
#define MAX_REQ		1
#define FAIL		(-1)
#define MAX_USER	4
#define MAX_TUPLE	65536
#define PATHNAME	"/tmp/ldserv"

#define DEBUG


#define BUFFSIZE        65535

/*----------------------------------------------------------------------
 パケットフォーマット
 char     short  int    int
 Mode +   ID   + Seq  + Data_len + Padding + Data
 0        1      3      7          11        12
----------------------------------------------------------------------*/
#define LINDA_MODE_OFFSET          0
#define LINDA_ID_OFFSET            1
#define LINDA_SEQ_OFFSET           3
#define LINDA_DATA_LENGTH_OFFSET   7
#define LINDA_HEADER_SIZE          12


#define INT_SIZE    4     // 4Byte = sizeof(int)

typedef struct tuple{
    unsigned char *data;
    struct tuple *next;
    unsigned int seq;
    unsigned short fd;
    unsigned int datalen;
    char mode;
} TUPLE ;

TUPLE *tuple_space[MAX_TUPLE];     // TUPLE Space
TUPLE *tp;
int unix_port = 11511; // 11511 is Default Port number
extern int errno;

#ifdef COUNT_PACKET
// print packet count message per PRINT_INTERVAL sec
#define PRINT_INTERVAL 4

/*-------------------------------------------------------------------/
  void
  count_packet (char type):
      クライアントより受け取ったコマンドをカウントする関数
      
  引き数:
      type - 受け取ったコマンドの種類(char型: i,o,r,w)
/-------------------------------------------------------------------*/
void count_packet(char type)
{
    static int out_packet=-1,read_packet=0,in_packet=0,wait_packet=0;
    static struct timeval start,now,previous;
    
    if (out_packet == -1) {
        gettimeofday(&start,NULL);
        gettimeofday(&previous,NULL);
        out_packet = 0;
        printf("packet\tout\tread\tin\twait\ttime\n");
    }
    
    if (type == 'o') {
        out_packet++;
    } else if (type == 'r') {
        read_packet++;
    } else if (type == 'i') {
        in_packet++;
    } else if (type == 'w') {
        wait_packet++;
    } else {
        fprintf(stderr,"No type in count_packet function\n");
        return;
    }
    
    gettimeofday(&now,NULL);
    if ((now.tv_sec-previous.tv_sec) > PRINT_INTERVAL) {
        printf("log\t%d\t%d\t%d\t%d\t%ld\n",
               out_packet,read_packet,in_packet,wait_packet,now.tv_sec-start.tv_sec);
        fflush(stdout);
        
        previous.tv_sec = now.tv_sec;
        out_packet = read_packet = in_packet = 0;
        wait_packet = 0;
    }
}
#endif

#ifdef DEBUG
/*-------------------------------------------------------------------/
  int
  show_tuple_space ():
  TUPLE Space にあるTUPLEをID順に表示する
/-------------------------------------------------------------------*/
void show_tuple_space()
{
    int i;
    
    static int toggle = -1;
    static struct timeval start,now,previous;
    
    TUPLE * tmp;
    
    if (toggle == -1) {
        gettimeofday(&start,NULL);
        toggle = 0;
    }
    gettimeofday(&now,NULL);
    printf("time ****  %ld\n",now.tv_sec-start.tv_sec);
    fflush(stdout);
    previous.tv_sec = now.tv_sec;
    
    printf("\n******\n");
    for(i=0;i<MAX_TUPLE-1;i++){
        if(tuple_space[i]){
            printf("id: %d\t",i);
            for(tmp=tuple_space[i];tmp;tmp = tmp->next){
                printf("%c:%d\t",tmp->mode,tmp->seq);
            }
            printf("\n");
        }
    }
    if(tuple_space[i]){
        printf("id: %d\t",i);
        for(tmp=tuple_space[i];tmp;tmp = tmp->next){
            printf("%c:%c%c\t",tmp->mode,tmp->data[0],tmp->data[1]);
        }
        printf("\n");
    }
    printf("******\n\n");
    return;
}
#endif


/*-------------------------------------------------------------------/
  int
  get_int(unsigned char * tuple, int offset):
      TUPLEのヘッダに格納された int型 のデータを得るための関数
      psx_get_datalength() と psx_get_seq() から呼ばれる。

  引き数:
      tuple  - ヘッダ情報も含んだTUPLE。psx_reply()で得たものでもいい。
      offset - 取りだすデータのオフセット。LINDA_DATA_LENGTH_OFFSET
               か LINDA_SEQ_OFFSET。

  返り値:
      指定したオフセットに格納されていた数値(int型)
/-------------------------------------------------------------------*/
static 
unsigned int
get_int(unsigned char * tuple, int offset){
    unsigned int i;

    i = (tuple[offset]  <<24)+
        (tuple[offset+1]<<16)+
        (tuple[offset+2]<<8) +
        (tuple[offset+3]);
    return i;
}

unsigned int
psx_get_datalength(unsigned char * tuple){
    return get_int(tuple,LINDA_DATA_LENGTH_OFFSET);
}

unsigned int
psx_get_seq(unsigned char * tuple){
    return get_int(tuple,LINDA_SEQ_OFFSET);
}

unsigned short
psx_get_id(unsigned char * tuple){
    return (tuple[LINDA_ID_OFFSET] * 256 + tuple[LINDA_ID_OFFSET+1]);
}

unsigned char
psx_get_mode(unsigned char * tuple){
    return tuple[LINDA_MODE_OFFSET];
}

static
void
set_int_to_char(unsigned char * tuple, int i, int offset){
    tuple[offset]   = (i>>24) & 0xff;
    tuple[offset+1] = (i>>16) & 0xff;
    tuple[offset+2] = (i>>8)  & 0xff;
    tuple[offset+3] = (i)     & 0xff;
}

void
psx_set_datalength(unsigned char * tuple, int length){
    set_int_to_char(tuple,length,LINDA_DATA_LENGTH_OFFSET);
}

void
psx_set_seq(unsigned char * tuple, int seq){
    set_int_to_char(tuple,seq,LINDA_SEQ_OFFSET);
}

void
psx_set_id(unsigned char * tuple, short id){
    tuple[LINDA_ID_OFFSET]   = (id>>8) & 0xff;
    tuple[LINDA_ID_OFFSET+1] = id & 0xff;
}

void
psx_set_mode(unsigned char * tuple, char mode){
    tuple[LINDA_MODE_OFFSET] = mode;
}

/*-------------------------------------------------------------------/
  int
  sz_send (int fd, unsigned char *buf, int size, int flag):
      クライアントへTUPLEを送る。

  引き数:
      fd   - 送信先のファイルディスクリプタ
      buf  - 送るデータ
      size - bufのサイズ(byte)
      flag - 使用していない
  返り値:
      送ったデータのbyte数
/-------------------------------------------------------------------*/
int
sz_send(int fd, unsigned char *buf, unsigned int size,int flag) {
    unsigned int i,nsize,writtensize;

    nsize = htonl(size);  // size は datasize + LINDA_HEADER_SIZE
    write(fd,&nsize,INT_SIZE);
    for(writtensize=0,i=0; writtensize < size; writtensize+=i){
        i = write(fd,(char*)buf+writtensize,size-writtensize);
    }

    return(i);
}

struct sigaction old,new;

static void
intr(int i)
{
    fprintf(stderr,"intr: %d \n",i);
    // stop = 1;
}

/*-------------------------------------------------------------------/
  int
  main (int argc,char *argv[]):
      サーバのメイン。クライアントから送られて来た各コマンドに対応
      した処理を行う。
/-------------------------------------------------------------------*/
int
main(int argc,char *argv[])
{
    int i, a, users;
    int ls,sk,fd,maxfds;
    unsigned int id;
    unsigned int datasize;

    socklen_t paddrlen;
    
    int skfg = 1;
    short user = 0;
    unsigned char  userchar[2];
#ifdef UNIX_DOMAIN
    struct sockaddr_un my_addr, peer;
#else
    struct sockaddr_in my_addr, peer;
#endif
    struct timeval zerotime;
    unsigned char ipaddr[4];
    unsigned char *buf = 0;
    int len;
    int fduser[MAX_USER];
    fd_set readfds, suspect, tmpfd;
    char ch;
    
    extern char *optarg;
    extern int optind;


    new.sa_handler = intr;
    sigaction(SIGPIPE,&new,&old);

    
    while ((ch = getopt(argc, argv, "p:h:")) != -1){
        switch(ch) {
        case 'p':
            unix_port = atoi(optarg);
            break;
        case '?':
        default:
            fprintf(stderr,"usage: %s [-p port]\n",argv[0]);
            break;
        }
    }
    
    argc -= optind;
    argv += optind;
    
    zerotime.tv_sec = zerotime.tv_usec = 0L;
    users = 0;
    
#ifdef UNIX_DOMAIN
    if ((ls = socket(AF_UNIX, SOCK_STREAM, 0)) == FAIL){
#else
    if ((ls = socket(AF_INET, SOCK_STREAM, 0)) == FAIL){
#endif
    // }
        fprintf(stderr, "socket open error! errno :%d %s\n", errno,
                strerror(errno));
        exit(1);
    }
    if (setsockopt(ls, SOL_SOCKET, SO_REUSEADDR,(char *) &skfg, sizeof(skfg)) < 0){
        fprintf(stderr, "setsockopt error %d\n",errno);
        close(ls);
        exit(1);
    }
#ifdef UNIX_DOMAIN
    my_addr.sun_family = AF_UNIX;
    strcpy(my_addr.sun_path, PATHNAME);
#else
    my_addr.sin_family = AF_INET;
    my_addr.sin_port = unix_port;
    my_addr.sin_addr.s_addr = INADDR_ANY;
#endif
    
    if (bind(ls, (struct sockaddr *)&my_addr,sizeof(my_addr)) == FAIL){
        fprintf(stderr, "socket binded address error! errno:%d %s\n", errno,
                strerror(errno));
        close(ls);
        exit(1);
    }
    if (listen(ls, MAX_REQ) == FAIL){
        fprintf(stderr, "list creat error! errno:%d %s\n", errno,
                strerror(errno));
        close(ls);
        exit(1);
    }
    paddrlen = (socklen_t)sizeof(peer);
    maxfds = FD_SETSIZE;
    fd = 0;
#ifdef DEBUG
//    maxfds = 8;
#endif
    FD_ZERO(&readfds);

    // main loop
    while(1){
        FD_SET(ls, &readfds);
        tmpfd = readfds;
        
#ifdef DEBUG
        show_tuple_space();
#endif
        if ((sk = select(maxfds, &readfds, NULL, NULL, NULL)) == FAIL){
            if (errno == EBADF){
                for(i = 0;i < maxfds;i++){
                    FD_ZERO(&suspect);
                    if (FD_ISSET(i, &readfds)){
                        FD_SET(i, &suspect);
                        if (select(maxfds, &suspect, NULL, NULL, &zerotime) == FAIL){
                            fprintf(stdout, "%d descriptor clear",i);
                            FD_CLR(i, &readfds);
                        }
                        FD_CLR(i, &suspect);
                    }
                }
            } else {
                fprintf(stderr, "select error! errno:%d %s\n", errno,
                        strerror(errno));
                close(ls);
                exit(1);
            }
        } else {
            while(1){
                fd = (fd + 1) % maxfds;
                if (FD_ISSET(fd, &readfds)) break;
            }
            readfds = tmpfd;
            
            // 新規の接続
            if (fd == ls){
                if ((sk = accept(ls, (struct sockaddr *)&peer, &paddrlen)) == FAIL){
                    fprintf(stderr, "connection accept error! errno:%d %s\n", errno,
                            strerror(errno));
                    close(ls);
                    exit(1);
                }
#ifndef UNIX_DOMAIN
                if (peer.sin_family == AF_INET){
                    int tmp = 1;
                    
                    setsockopt (sk, IPPROTO_TCP, TCP_NODELAY,
                                (char *) &tmp, sizeof (int));
                }
#endif
                if ((tp = tuple_space[MAX_TUPLE-1]) == NULL){
                    tp = tuple_space[MAX_TUPLE-1] = (TUPLE *) malloc(sizeof(TUPLE));
                    tp->next = NULL;
                } else {
                    while(tp->next) tp = tp->next;
                    tp->next = (TUPLE *) malloc(sizeof(TUPLE));
                    tp = tp->next;
                    tp->next = NULL;
                }
                tp->mode = 'o';
                tp->seq = 0;
                tp->data = (unsigned char *) malloc(sizeof(short)+LINDA_HEADER_SIZE);
                tp->datalen = sizeof(short);
                user++;
                userchar[0] = user / 10 + '0';
                userchar[1] = user % 10 + '0';
                
                psx_set_mode(tp->data,tp->mode);
                psx_set_id(tp->data,MAX_TUPLE-1);
                psx_set_seq(tp->data,tp->seq);
                psx_set_datalength(tp->data,tp->datalen);
                memcpy(tp->data+LINDA_HEADER_SIZE, &userchar ,sizeof(userchar));
                
                fprintf(stdout, "localhost connected assign id = %d\n", user);
#ifndef UNIX_DOMAIN
                memcpy(ipaddr, &peer.sin_addr.s_addr, sizeof(ipaddr));
                fprintf(stdout, "%d.%d.%d.%d connected \n",
                        ipaddr[0],ipaddr[1],ipaddr[2],ipaddr[3]);
#endif
                FD_SET(sk,&readfds);
                
            // 接続を確立しているクライアントからの要求
            } else {
                int tempnum;
#ifdef DEBUG
                fprintf(stdout, "\n\ndata from %d descriptor\n", fd);
#endif
                if((len=read(fd,&tempnum,INT_SIZE))!=INT_SIZE) {
                    if (len==0) {
                        fprintf(stdout, "fd %d closed\n", fd);
                        close(fd);
                        continue;
                    }
                    fprintf(stderr, "read error! on fd:%d ret=%d %s\n", 
                            fd, len, strerror(errno));
                    close(fd); 
                    continue;
                }
                len = ntohl(tempnum);  // len は DATASIZE + LINDA_HEADER_SIZE
                
#ifdef DEBUG
                fprintf(stderr, "datta on fd:%d len=%d\n", fd,len);
#endif
                if((buf = (unsigned char*)malloc(len))==NULL){
                    fprintf(stderr,"allocate error! :%d %s",errno,strerror(errno));
                    exit(1);
                }
                for(a=0;a<len;a+=i) {
                    if((i=read(fd,buf+a,len-a))<0) {
                        fprintf(stderr, "ldserv: client read error! on i=%d len=%d %s\n",
                                i, len, strerror(errno));
                        close(fd);
                    }
                }
#ifdef DEBUG
                fprintf(stdout,"recv size = %d : mode = %c\n", len, *buf);
#endif
#ifdef COUNT_PACKET
                count_packet(buf[0]);
#endif
                
                if ((buf[LINDA_MODE_OFFSET] == '!') || (len == 0)){
                    FD_CLR(fd, &readfds);
                    for(i = 0;i < users; i++){
                       if (fduser[i] == fd) break;
                    }
                    fprintf(stdout, "connection closed descriptor :%d\n", fd);
                    close(fd);
                    free(buf);
                    buf=0;
                } else if (buf[LINDA_MODE_OFFSET] == 'c'){
                    id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
                    //buf[LINDA_MODE_OFFSET] = 'a';
                    tp = tuple_space[id];
                    while(tp && tp->next && (tp->mode=='w')){
                        tp = tp->next;
                    }
                    if (tp && (tp->mode == 'o')){
                        psx_set_datalength(buf,tp->datalen);
                    } else {
                        /* means no out tuple */
                        memset(&buf[LINDA_DATA_LENGTH_OFFSET],0,INT_SIZE);
                    }
                    
                    if (sz_send(fd, buf, LINDA_HEADER_SIZE, 0) == FAIL){
                        fprintf(stderr,"recv error! errno :%d %s\n", errno,
                                strerror(errno));
                    }
                    free(buf);
                    buf = 0;
#ifdef DEBUG
                    fprintf(stdout,"send size= %d : mode = %c\n",len, *buf);
#endif
                } else if (buf[LINDA_MODE_OFFSET] == 'i' || buf[LINDA_MODE_OFFSET] == 'r'){
                    int mode = buf[LINDA_MODE_OFFSET];
                    TUPLE * temp = NULL;
                    
                    id = psx_get_id(buf);
#ifdef DEBUG
                    fprintf(stdout, "*** %c command : id = %d ***\n",mode,id);
#endif
                    tp = tuple_space[id];
                    
                    // w は無視する
                    while(tp && tp->next && (tp->mode=='w')){
                        temp = tp;
                        tp = tp->next;
                    }
                    
                    if (tp && (tp->mode == 'o')){
                        psx_set_mode(tp->data,'a');
                        psx_set_seq(tp->data, psx_get_seq(buf));
                        free(buf);
                        buf=0;
#ifdef DEBUG
                        fprintf(stdout,"send size = %d : mode = %c\n",
                                tp->datalen + LINDA_HEADER_SIZE, tp->mode);
#endif
                        if (sz_send(fd, tp->data, (tp->datalen) + LINDA_HEADER_SIZE, 0) == FAIL){
                            fprintf(stderr,"recv error! errno :%d %s\n", errno,
                                    strerror(errno));
                        }
                        if(mode == 'i') {
                            if(tp->data){
                                free(tp->data);
                                tp->data=0;
                            }
                            
                            if(temp){
                                temp->next = tp->next;
                            }else{
                                tuple_space[id] = tp->next;
                            }
                            free(tp);
                            tp = 0;
                        }
                    } else {
                        if (tp == NULL){
                            tp = tuple_space[id] = (TUPLE *) malloc(sizeof(TUPLE));
                            tp->next = NULL;
                        } else {
                            while(tp->next) tp = tp->next;
                            tp->next = (TUPLE *) malloc(sizeof(TUPLE));
                            tp = tp->next;
                            tp->next = NULL;
                        }
                        
                        tp->mode = mode;
                        tp->seq = psx_get_seq(buf);
                        tp->fd = fd;
                        tp->datalen = 0;
                        tp->data = 0;
                        free(buf);
                        buf=0;
#ifdef DEBUG
                        fprintf(stdout, "data inserted insert seq = %d, id = %d\n"
                                ,tp->seq,id);
#endif
                    }
                } else if (buf[LINDA_MODE_OFFSET] == 'w'){
                    int mode = buf[LINDA_MODE_OFFSET];
                    
                    id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
#ifdef DEBUG
                    fprintf(stdout, "***%c command : id = %d ***\n",mode,id);
#endif
                    tp = (TUPLE *) malloc(sizeof(TUPLE));
                    tp->mode = mode;
                    tp->seq = psx_get_seq(buf);
                    tp->fd = fd;
                    tp->datalen = 0;
                    tp->data = 0;
                    tp->next = tuple_space[id];
                    tuple_space[id] = tp;
                    free(buf);
                    buf=0;
#ifdef DEBUG
                    fprintf(stdout, "data inserted insert seq = %d, id = %d\n"
                            ,tuple_space[id]->seq,id);
#endif
                } else if (buf[LINDA_MODE_OFFSET] == 'o'){
                    id = buf[LINDA_ID_OFFSET] * 256 + buf[LINDA_ID_OFFSET+1];
                    
                    datasize = psx_get_datalength(buf);
#ifdef DEBUG
                    fprintf(stdout, "*** out command : id = %d ***\n",id);
#endif
                    while((tuple_space[id]) &&
                          ((tuple_space[id]->mode=='w')||(tuple_space[id]->mode == 'r'))){
                        
//                        buf[LINDA_MODE_OFFSET] = 'a';
                        psx_set_mode(buf,'a');
                        psx_set_seq(buf,tuple_space[id]->seq);
                        
                        if (sz_send(tuple_space[id]->fd,buf,datasize+LINDA_HEADER_SIZE, 0) == FAIL){
                            fprintf(stderr,"recv error! errno :%d %s\n", errno,
                                    strerror(errno));
                        }
                        tp = tuple_space[id];
                        tuple_space[id] = tp->next;
                        free(tp);
                        tp = 0;
                    }
                    if ((tuple_space[id]) && tuple_space[id]->mode == 'i'){
                        buf[LINDA_MODE_OFFSET] = 'a'; 
                        psx_set_seq(buf,tuple_space[id]->seq);
                        
#ifdef DEBUG
                        fprintf(stdout,"sendsize = %d :mode = %c\n",
                                datasize+LINDA_HEADER_SIZE, *buf);
#endif
                        if (sz_send(tuple_space[id]->fd,buf,datasize+LINDA_HEADER_SIZE,0)==FAIL){
                            fprintf(stderr,"recv error! errno :%d %s\n", errno,
                                    strerror(errno));
                        }
                        free(buf);
                        buf=0;
#ifdef DEBUG
                        fprintf(stdout, "send dataid=%d ,mode=%c,len=%d \n"
                                ,id, tuple_space[id]->mode, tuple_space[id]->datalen);
#endif
                        // we should not free waiting in-packet's data
                        tp = tuple_space[id];
                        tuple_space[id] = tp->next;
                        free(tp);
                        tp = 0;
                    } else if ((tuple_space[id]==NULL) || (tuple_space[id]->mode == 'o')){
                        if ((tp = tuple_space[id]) == NULL){
                            tp = tuple_space[id] = (TUPLE *) malloc(sizeof(TUPLE));
                            tp->next = NULL;
                        } else {
                            while(tp->next) tp = tp->next;
                            tp->next = (TUPLE *) malloc(sizeof(TUPLE));
                            tp = tp->next;
                            tp->next = NULL;
                        }
                        tp->mode = 'o';
                        tp->seq = psx_get_seq(buf);
                        tp->data = buf;//(char *) malloc(datasize);
                        tp->datalen = datasize;
                        buf = 0;
//                        memcpy(tp->data, buf+LINDA_HEADER_SIZE,datasize);
#ifdef DEBUG
                        fprintf(stdout, "data inserted len = %d, id = %d\n" ,tp->datalen,id);
#endif
                    } else {
                        fprintf(stdout,"Uncorrect mode : %c\n",tuple_space[id]->mode);
                        free(buf);
                        buf=0;
                        return(0);
                    }
                    
                } else {
                    fprintf(stdout,"Uncorrect buffer\n");
                    if(buf) free(buf);
                    return(0);
                }
            }
        }
    }
    perror("Unexpeced end");
    close(ls);/* conection terminat */
}