view Renderer/Engine/lindaapi.cc @ 575:341f1f881a9b draft

Linda API worked. (slightly unreliable)
author Shinji KONO <kono@ie.u-ryukyu.ac.jp>
date Fri, 23 Oct 2009 15:53:24 +0900
parents 096a900bd9d3
children 92b0d490e839
line wrap: on
line source

//  $Id$
//

/*----------------------------------------------------------------------
 インクルードファイル読み込み
----------------------------------------------------------------------*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/time.h>
#include <unistd.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <netinet/tcp.h>
#include <sys/un.h>
#include <errno.h>
#include <arpa/inet.h>

#include "lindaapi.h"


#if 0
#define PSX_Debug(deb)    (putchar(PS_DEB)),\
            (printf deb ),\
            (putchar(PS_DEB))
#define DEB(a)         
#else
#define PSX_Debug(deb)    
#define DEB(a)        /* a */
#endif

/* Global Variables */
static COMMAND *q_top, *q_end; /* コマンドキュー */
static REPLY *reply, *r_end;   /* 受け取り用キュー */
static int qsize; /* コマンドキューのサイズ */
static fd_set g_fds;   /* 接続しているタプルスペース群のFD(FileDiscripter)を保持 */
static int g_max_fds = 0;  /* 監視するFDの最大値 */

/* Static Functions */
static void unix_chkserv(int ps);
void psx_free(void *);
static int psx_queue(unsigned int tspace_id, unsigned int id,
              unsigned int size, unsigned char *data, char mode,
              void(*callback)(unsigned char *,void *),void * obj);

#ifdef COUNT_PACKET
// print packet count message per PRINT_INTERVAL sec
#define PRINT_INTERVAL 4
static void count_packet(char type);

/*-------------------------------------------------------------------/
  static void
  count_packet (char type):
      パケットの送受信カウントする
      
  引き数:
      type - 送信、受信 (char型: s,r)
/-------------------------------------------------------------------*/
static void
count_packet(char type)
{
    static int send_packet=-1,receive_packet=0;
    static struct timeval start,now,previous;
    
    if (out_packet == -1) {
        gettimeofday(&start,NULL);
        gettimeofday(&previous,NULL);
        send_packet = 0;
        printf("packet\tout\tread\t\ttime\n");
    }
    
    if (type == 's') {
        send_packet++;
    } else if (type == 'r') {
        receive_packet++;
    } else {
        fprintf(stderr,"No type in count_packet function\n");
        return;
    }
    
    gettimeofday(&now,NULL);
    if ((now.tv_sec-previous.tv_sec) > PRINT_INTERVAL) {
        printf("log\t%d\t%d\t%ld\n",
               send_packet,receive_packet,now.tv_sec-start.tv_sec);
        fflush(stdout);
        
        previous.tv_sec = now.tv_sec;
        send_packet = receive_packet = 0;
    }
}
#endif


#define unix_read_w     read


static int
unix_write(int fd,unsigned char *buf,unsigned int size) {
    unsigned int count=0;
    uint32_t nsize;

    /* これから送信するデータのサイズをまず送信  */
    nsize = htonl(size);
    write(fd, &nsize, INT_SIZE);

    /* 目的のデータを送信  */
    while (count < size) {
	count += write(fd, buf+count, size-count);
    }
#ifdef COUNT_PACKET
    count_packet('s');
#endif
    return count+INT_SIZE;
}

#define unix_write_w   unix_write

#define SERV_NAME      unix_port
#define PROTO_NAME     "tcp"
#define SERVER_NAME    hostname
#define MAX_REQ        16



/*-------------------------------------------------------------------/
  void
  init_linda():
      大域変数の初期化等を行なう
/-------------------------------------------------------------------*/
void
init_linda() {
    FD_ZERO(&g_fds);
    /* 大域変数はゼロクリアされる
    g_max_fds = 0;
    q_end = q_top = NULL;
    r_end = reply = NULL;
    qsize = 0;
    */
}


/*-------------------------------------------------------------------/
  int
  open_linda (char * hostname, int port):
      Lindaサーバとのコネクションを確立し、タプルスペースのIDを返す。
      現在はファイルディスクリプタを返している。

  引き数:
      hostname - サーバのホスト名
      port - サーバのポート番号
  返り値:
      コネクション確立が成功するとそのファイルディスクリプタを返す。
      失敗すると -1 を返す。
/-------------------------------------------------------------------*/
int
open_linda(char * hostname, int port){
    int fd;
    struct hostent *hoste;
    struct sockaddr_in serv_addr;
    struct sockaddr_un serv_addr_un;
    
    if (hostname[0]=='/') {
        /* Unix domain */
        if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == FAIL){
	    perror("socket");
            return(-1);
        }
        serv_addr_un.sun_family = AF_UNIX;
        strcpy(serv_addr_un.sun_path, hostname);
        fprintf(stdout,"connecting ... %d\n", serv_addr.sin_port);
        if (connect(fd, (struct sockaddr *)&serv_addr_un,sizeof(serv_addr_un)) == FAIL){
	    perror("connect");
            close(fd);
            return(-1);
        }
        
    } else {
        /* INET domain */
        if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == FAIL){
			perror("socket");
            return(-1);
        }
        if ((hoste = gethostbyname(SERVER_NAME)) == NULL){
            fprintf(stderr,"hostname error\n");
            close(fd);
            return(-1);
        }
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_port = port;
        serv_addr.sin_addr.s_addr = ((struct in_addr *)(hoste->h_addr))->s_addr;
        if (serv_addr.sin_family == AF_INET) {
            int tmp = 1;
            setsockopt (fd, IPPROTO_TCP, TCP_NODELAY,
                        (char *) &tmp, sizeof (int));
        }
        fprintf(stdout,"connecting ... %d\n", serv_addr.sin_port);
        if (connect(fd, (struct sockaddr *)&serv_addr,sizeof(serv_addr)) == FAIL){
            fprintf(stderr,"connection error! errno :%d %s\n", errno,
                    strerror(errno));
            close(fd);
            return(-1);
        }
    }

    FD_SET(fd, &g_fds);
    if (g_max_fds < fd) g_max_fds = fd;
    
    fprintf(stdout," connect middle server %d\n", fd);
    return fd;
}

int
open_linda_java(char * hostname, int port){
    int fd;
    struct hostent *hoste;
    struct sockaddr_in serv_addr;
    struct sockaddr_un serv_addr_un;
    
    if (hostname[0]=='/') {
        /* Unix domain */
        if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == FAIL){
			perror("socket");
            return(-1);
        }
        serv_addr_un.sun_family = AF_UNIX;
        strcpy(serv_addr_un.sun_path, hostname);
        DEB(fprintf(stdout,"connecting ... %d\n", serv_addr.sin_port));
        if (connect(fd, (struct sockaddr *)&serv_addr_un,sizeof(serv_addr_un)) == FAIL){
			perror("connect");
            close(fd);
            return(-1);
        }
        
    } else {
        /* INET domain */
        if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == FAIL){
			perror("socket");
            return(-2);
        }
	serv_addr.sin_family = AF_INET;
	serv_addr.sin_port = htons(port);

	serv_addr.sin_addr.s_addr = inet_addr(hostname);
	if (serv_addr.sin_addr.s_addr == 0xffffffff) {
	  if ((hoste = gethostbyname(hostname)) == NULL){
	    fprintf(stdout, "hostname error\n");
	    close(fd);
	    return(-1);
	  }
	  serv_addr.sin_addr.s_addr = *(unsigned int *)hoste->h_addr_list[0];
	}

        if (serv_addr.sin_family == AF_INET) {
            int tmp = 1;
            setsockopt (fd, IPPROTO_TCP, TCP_NODELAY,
                        (char *) &tmp, sizeof (int));
        }
        DEB(fprintf(stdout,"connecting ... %d \n", ntohs(serv_addr.sin_port)));
        DEB(fprintf(stdout," serv_addr.sin_port ... %d \n", ntohs(serv_addr.sin_port)));
	//fprintf(stdout," serv_addr.sin_addr.s_addr... %s\n", serv_addr.sin_addr.s_addr);
        if (connect(fd, (struct sockaddr *)&serv_addr,sizeof(serv_addr)) == FAIL){
			perror("connect");
            close(fd);
            return(-4);
        }
    }

    FD_SET(fd, &g_fds);
    if (g_max_fds < fd) g_max_fds = fd;
    
    DEB(fprintf(stdout," connect middle server %d\n", fd));
    return fd;
}


/*-------------------------------------------------------------------/
  int
  close_linda(int tspace_id):
      接続しているタプルスペースへの接続を切る。
      ソケットを閉じ、g_fds から外す。
  引数:
      tspace_id - 閉じるタプルスペースのID
  返り値:
      close の値
/-------------------------------------------------------------------*/
int
close_linda(int tspace_id){
    int retval;
    int i;
    if ((retval = close(tspace_id)) == 0) {
        FD_CLR(tspace_id, &g_fds);
        if (g_max_fds == tspace_id) {
            for (i = g_max_fds-1; FD_ISSET(i, &g_fds) && i; i--);
            g_max_fds = i;
        }
    }
    return retval;
}

/*-------------------------------------------------------------------/
  int
  psx_out (unsigned int tspace_id, unsigned int id,
           unsigned char *data, unsigned int size):
      outコマンドをCOMMANDキューへ溜める。

  引き数:
      tspace_id - タプルスペースのID
      id        - タプルのID
      data      - 送信するデータ
      size      - dataのサイズ
  返り値:
      シーケンス番号
/-------------------------------------------------------------------*/
int
psx_out(unsigned int tspace_id, unsigned int id,
        unsigned char *data, unsigned int size){
    int r;
    if ((r = psx_queue(tspace_id, id, size, data, 'o', NULL, NULL)) == FAIL) {
        return(FAIL);
    }
    DEB( fprintf(stdout, "psx_out: size = %d, command = %s\n",
                 q_end->size, q_end->command+LINDA_HEADER_SIZE));
    return(r);
}

/*-------------------------------------------------------------------/
  int
  psx_ld (unsigned tspace_id, unsigned int id,
          char mode, void(*callback)(char*,void*), void * obj):
      in,read,waitなどの受信コマンドをCOMMANDキューへ溜める。
      psx_in,psx_rd,psx_wait_rdなどに置き換えられている。
      
  引き数:
      tspace_id- タプルスペースのID
      id       - タプルのID
      mode     - i,r,w の文字を取り、各々in,read,waitを表している。
      callback - コールバックを使用する場合の関数へのポインタ。
                 使用しない場合はNULLをいれる。
      obj      - コールバックで用いる関数の引き数。
  返り値:
      psx_queue内でmallocされたREPLY構造体へのポインタ
/-------------------------------------------------------------------*/
int
psx_ld(unsigned int tspace_id, unsigned int id,
       char mode, void(*callback)(unsigned char *,void *), void * obj){
    int r;
    if ((r = psx_queue(tspace_id, id, 0, NULL, mode, callback, obj)) == FAIL) {
        return(FAIL);
    }
    return(r);
}

/*-------------------------------------------------------------------/
  unsigned char *
  psx_reply (int seq):
      サーバから答えが来たデータを返す。

  引き数:
      seq - psx_ld()が返した値。
  返り値:
      seqに対応したデータを返す。データをまだ受信していない場合は
      NULLを返す。
/-------------------------------------------------------------------*/
unsigned char *
psx_reply(unsigned int seq){
    REPLY *p, *q;
    unsigned char *ans;
    
    DEB(fprintf(stdout, "psx_reply: search of seq = %d\n", seq));
    PSX_Debug(("psx_reply: seq %d", seq)); 
    for(q = NULL,p = reply; p; q = p,p = p->next){
        if (p->seq == seq){
            DEB(fprintf(stdout, "psx_reply: match of seq = %d\n", seq));
            if (p->mode == '!'){
                ans = p->answer;
                if (q == NULL){
                    reply = p->next;
                    if(p==r_end) {
                        r_end = p->next;
                    }
                } else {
                    q->next = p->next;
                    if(p==r_end) {
                        r_end = q;
                    }
                }
                PSX_Debug(("psx_reply: reply %x r_end %x p %x q %x",reply,r_end,p,q)); 
                psx_free(p);
                DEB( for(p=reply;p;p=p->next) { PSX_Debug(("psx_queue dump: seq %d mode %c %x %x",p->seq,p->mode,p,p->next))});
                DEB( fprintf(stdout, "psx_reply: returned answer = %s\n", ans));
                PSX_Debug(("psx_reply: answer %s",ans));
                return(ans);
            } else {
                if (p->mode == '?'){
                    DEB(fprintf(stdout, "psx_reply: don't accept anser\n"));
                    return(NULL);
                }
            }
        }

    }
    PSX_Debug(("psx_reply: no match seq %d",seq));
    DEB(fprintf(stdout, "psx_reply: no match of seq\n"));
    return(NULL);
}

/*-------------------------------------------------------------------/
  void
  psx_sync_n ():
      サーバとデータの送受信をする。COMMANDキューに溜まったデータを
      送信し、サーバから送られて来たデータを対応するREPLYへいれる。
/-------------------------------------------------------------------*/
#define TIMEDELTA       10
void
psx_sync_n(){
    int acount;
    int i;
    COMMAND *c, *t;

    if (g_max_fds==0) return;

    fd_set tmp;
    struct timeval timeout;
    timeout.tv_sec=0;
    timeout.tv_usec=TIMEDELTA * 1000;

    acount = 0;
    while (q_top != NULL){
        c = q_top;
        unix_write_w(c->tspace_id, c->command, c->size);
        psx_free(c->command);
        t = c->next;
        psx_free(c);
        q_top = c = t;
        qsize--;
    }

    tmp = g_fds;
    while(select(g_max_fds+1, &tmp, NULL, NULL, &timeout) > 0) {
        for (i = 0; i < g_max_fds+1; i++) {
            if (FD_ISSET(i, &tmp)) {
                unix_chkserv(i);
            }
        }
    }    
}

/*-------------------------------------------------------------------/
  static int
  psx_queue (unsigned int tspace_id, unsigned int id,
             unsigned int size, unsigned char *data, char mode,
             void(*callback)(char*,void*), void * obj):
      out,in,read,waitなどのコマンドをCOMMANDキューに溜める。データを
      受信するコマンド(in,read,wait)のときは受け取ったときにデータを
      格納するREPLY構造体を作る。

  引き数:
      tspace_id- 送信先タプルスペースのID
      id       - アクセスするTUPLE SpaceのID
      size     - dataのサイズ
      data     - 送信するデータ。受信時はNULL。
      mode     - コマンドのモード(out,in,read,wait は各々char型: o,i,r,w)
      callback - コールバックを使用する場合の関数へのポインタ。
                 使用しない場合はNULL。
      obj      - コールバックで用いる関数に引き渡すデータ。
  返り値:
      成功した場合 - mallocしたREPLY構造体へのポインタ。outの場合は
                     0が返る。
      失敗した場合 - FAIL(-1)が返る。
/-------------------------------------------------------------------*/
static int
psx_queue(unsigned int tspace_id, unsigned int id,
          unsigned int size, unsigned char *data, char mode,
          void(*callback)(unsigned char *,void *), void * obj){
    REPLY *p;
    COMMAND *c;
    
    if (qsize >= MAX_QUEUE) {
//        PSX_Debug(("max queue: qsize=%d",qsize));
        psx_sync_n();
    }
    
    if (q_top == NULL) {
        if ((q_top = (COMMAND *) malloc (sizeof(COMMAND))) == NULL){
            return(FAIL);
        }
        c = q_end = q_top;
    } else {
        if ((q_end->next = (COMMAND *) malloc (sizeof(COMMAND))) == NULL){
            return(FAIL);
        }
        c = q_end;
        q_end = q_end->next;
    }
    
    /* size は DATASIZE */
    if ((q_end->command = (unsigned char *) malloc(size+LINDA_HEADER_SIZE)) == NULL) {
        psx_free(q_end);
        c->next = NULL;
        return(FAIL);
    }

    /* データ受け取り要求(in,rd,wait)なら受け取り用の箱を用意 */
    if (mode != 'o') {  
        if (reply == NULL){
            if ((reply = (REPLY *) malloc (sizeof(REPLY))) == NULL){
                return(FAIL);
            }
            p = r_end = reply; p->next = NULL;
        } else {
            if ((r_end->next = (REPLY *) malloc (sizeof(REPLY))) == NULL){
                return(FAIL);
            }
            p = r_end->next; r_end = p; p->next = NULL;
        }
        p->mode = '?';
        p->seq = (int)p;  // 構造体のアドレスで識別
        p->callback = callback;
        p->obj = obj;
        PSX_Debug(("psx_queue: seq %d reply %x p %x r_end %x",seq,reply,p,r_end));
    } else {
        p = 0;
    }
    q_end->command[LINDA_MODE_OFFSET] = mode;

    q_end->command[LINDA_ID_OFFSET]   = id >> 8;
    q_end->command[LINDA_ID_OFFSET+1] = id & 0xff;

    q_end->command[LINDA_SEQ_OFFSET]   = ((int)p>>24) & 0xff;
    q_end->command[LINDA_SEQ_OFFSET+1] = ((int)p>>16) & 0xff;
    q_end->command[LINDA_SEQ_OFFSET+2] = ((int)p>>8)  & 0xff;
    q_end->command[LINDA_SEQ_OFFSET+3] = ((int)p)     & 0xff;
    
    q_end->command[LINDA_DATA_LENGTH_OFFSET]   = (size>>24) & 0xff;
    q_end->command[LINDA_DATA_LENGTH_OFFSET+1] = (size>>16) & 0xff;
    q_end->command[LINDA_DATA_LENGTH_OFFSET+2] = (size>>8)  & 0xff;
    q_end->command[LINDA_DATA_LENGTH_OFFSET+3] = (size)     & 0xff;
    
    q_end->size = size+LINDA_HEADER_SIZE;       /* command size */
    q_end->tspace_id = tspace_id;  /* destination id */
    q_end->next = NULL;
    qsize++;
    if (data && size>0)
        memcpy(q_end->command+LINDA_HEADER_SIZE, data, size);
    return((int)p);
}

/*-------------------------------------------------------------------/
  static void
  unix_chkserv (int ps):
      サーバからデータ(TUPLE)を受け取る。REPLY構造体にコールバック関数
      が指定されていればその関数を実行し、REPLY構造体をキューから取り
      除く。コールバック関数が指定されていなければREPLY構造体にデータ
      を引き渡す。
  引数:
      ps - 接続しているタプルスペースのソケット
/-------------------------------------------------------------------*/
static void
unix_chkserv(int ps){
    int i,pkt,npkt,mode;
    unsigned int k;
    REPLY *r,*prev;
    int a;
    unsigned char * tuple = 0;

    if((i=read(ps,&npkt,INT_SIZE))<0) {
		perror("read");
        exit(1);
    }
    pkt = ntohl(npkt);
    DEB(printf("pkt: %d\n",pkt));
    DEB(fprintf(stdout, "psx_chkserv: queue number: %d , size = %d\n", i, pkt));
    if((tuple = (unsigned char *)malloc(pkt))==NULL){
		perror("malloc");
        exit(1);
    }
    for(a=0;a<pkt;a+=i) {
        if((i=unix_read_w(ps,tuple+a,pkt-a))<0) {
            fprintf(stderr, "psx_chkserv: read error! on i=%d pkt=%d %s\n",
                    i, pkt, strerror(errno));
            exit(1);//close(ps);
        }
    }

#ifdef COUNT_PACKET
    count_packet('r');
#endif
    mode = psx_get_mode(tuple);
    i = psx_get_id(tuple);
    k = psx_get_seq(tuple);
    PSX_Debug(("psx_chkserv: anser packet size = %d id %d seq %d", pkt,i,k)); 
    DEB(fprintf(stdout, "psx_chkserv: data from server: %s id=%d seq = %d\n", tuple, i, k));
    DEB (
        for(p=reply;p;p=p->next) {
        PSX_Debug(printf("psx_queue dump: seq %d mode %c %x %x",p->seq,p->mode,p,p->next));
        })
        
    for(prev = NULL,r = reply; r; prev = r,r = r->next){
        DEB(fprintf(stdout,"seq: %d\n",r->seq);)
        if (r->seq == k){
            if(r->callback){     // call callback function
                (*r->callback)(tuple,r->obj);
                if (prev == NULL){
                    reply = r->next;
                    if(r == r_end) {
                        r_end = r->next;
                    }
                } else {
                    prev->next = r->next;
                    if(r == r_end) {
                        r_end = prev;
                    }
                }
                psx_free(r);
            }else{                // normal reply
                PSX_Debug(("psx_chkserv: copy answer r %x seq %d",r,k));
                if(mode == 'a'){
		    r->answer = tuple;
		}else{
		    r->answer = NULL;
		}
		r->mode = '!';
            }
            break;
        }
    }
    tuple = 0;
    if (!r){
        DEB(fprintf(stdout, "unix_chkserv: accepted seq %d does not match. \n",k));
    }
}

void psx_free(void *tuple)
{
    free(tuple);
}

/*-------------------------------------------------------------------/
  static unsigned int
  get_int(unsigned char * tuple, int offset):
      TUPLEのヘッダに格納された int型 のデータを得るための関数
      psx_get_datalength() と psx_get_seq() から呼ばれる。

  引き数:
      tuple  - ヘッダ情報も含んだTUPLE。psx_reply()で得たものでもいい。
      offset - 取りだすデータのオフセット。LINDA_DATA_LENGTH_OFFSET
               か LINDA_SEQ_OFFSET。

  返り値:
      指定したオフセットに格納されていた数値(int型)
/-------------------------------------------------------------------*/
static unsigned int
get_int(unsigned char * tuple, int offset){
    unsigned int i;
    i = (tuple[offset]  <<24) +
        (tuple[offset+1]<<16) +
        (tuple[offset+2]<<8)  +
        (tuple[offset+3]);
    return i;
}

unsigned int
psx_get_datalength(unsigned char * tuple){
    return get_int(tuple,LINDA_DATA_LENGTH_OFFSET);
}

unsigned char *
psx_get_data(unsigned char * tuple) {
    return tuple + LINDA_HEADER_SIZE;
}

unsigned int
psx_get_seq(unsigned char * tuple){
    return get_int(tuple,LINDA_SEQ_OFFSET);
}

unsigned short
psx_get_id(unsigned char * tuple){
    return (tuple[LINDA_ID_OFFSET] * 256 + tuple[LINDA_ID_OFFSET+1]);
}

unsigned char
psx_get_mode(unsigned char * tuple){
    return tuple[LINDA_MODE_OFFSET];
}

static
void
set_int_to_char(unsigned char * tuple, int i, int offset){
    tuple[offset]   = (i>>24) & 0xff;
    tuple[offset+1] = (i>>16) & 0xff;
    tuple[offset+2] = (i>>8)  & 0xff;
    tuple[offset+3] = (i)     & 0xff;
}

void
psx_set_datalength(unsigned char * tuple, int length){
    set_int_to_char(tuple,length,LINDA_DATA_LENGTH_OFFSET);
}


void
psx_set_seq(unsigned char * tuple, int seq){
    set_int_to_char(tuple,seq,LINDA_SEQ_OFFSET);
}

void
psx_set_id(unsigned char * tuple, short id){
    tuple[LINDA_ID_OFFSET]   = id >> 8;
    tuple[LINDA_ID_OFFSET+1] = id & 0xff;
}

void
psx_set_mode(unsigned char * tuple, char mode){
    tuple[LINDA_MODE_OFFSET] = mode;
}



/* end */