Mercurial > hg > Members > kono > Cerium
view Renderer/Engine/lindaapi.cc @ 639:70c5c2d2eb24
fix
author | Shinji KONO <kono@ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 19 Nov 2009 18:45:24 +0900 |
parents | 0decff4e867b |
children | 2432c7fe291c |
line wrap: on
line source
// $Id$ // /*---------------------------------------------------------------------- インクルードファイル読み込み ----------------------------------------------------------------------*/ #include <stdio.h> #include <string.h> #include <stdlib.h> #include <sys/time.h> #include <unistd.h> #include <netinet/in.h> #include <sys/select.h> #include <sys/types.h> #include <sys/socket.h> #include <netdb.h> #include <netinet/tcp.h> #include <sys/un.h> #include <errno.h> #include <arpa/inet.h> #include "lindaapi.h" #if 0 #define PSX_Debug(deb) (putchar(PS_DEB)),\ (printf deb ),\ (putchar(PS_DEB)) #define DEB(a) #else #define PSX_Debug(deb) #define DEB(a) /* a */ #endif /* Global Variables */ static COMMAND *q_top, *q_end; /* コマンドキュー */ static REPLY *reply, *r_end; /* 受け取り用キュー */ static int qsize; /* コマンドキューのサイズ */ static fd_set g_fds; /* 接続しているタプルスペース群のFD(FileDiscripter)を保持 */ static int g_max_fds = 0; /* 監視するFDの最大値 */ /* Static Functions */ static void unix_chkserv(int ps); void psx_free(void *); static long psx_queue(unsigned int tspace_id, unsigned int id, unsigned int size, unsigned char *data, char mode, void(*callback)(unsigned char *,void *),void * obj); #ifdef COUNT_PACKET // print packet count message per PRINT_INTERVAL sec #define PRINT_INTERVAL 4 static void count_packet(char type); /*-------------------------------------------------------------------/ static void count_packet (char type): パケットの送受信カウントする 引き数: type - 送信、受信 (char型: s,r) /-------------------------------------------------------------------*/ static void count_packet(char type) { static int send_packet=-1,receive_packet=0; static struct timeval start,now,previous; if (out_packet == -1) { gettimeofday(&start,NULL); gettimeofday(&previous,NULL); send_packet = 0; printf("packet\tout\tread\t\ttime\n"); } if (type == 's') { send_packet++; } else if (type == 'r') { receive_packet++; } else { fprintf(stderr,"No type in count_packet function\n"); return; } gettimeofday(&now,NULL); if ((now.tv_sec-previous.tv_sec) > PRINT_INTERVAL) { printf("log\t%d\t%d\t%ld\n", send_packet,receive_packet,now.tv_sec-start.tv_sec); fflush(stdout); previous.tv_sec = now.tv_sec; send_packet = receive_packet = 0; } } #endif #define unix_read_w read static int unix_write(int fd,unsigned char *buf,unsigned int size) { unsigned int count=0; uint32_t nsize; /* これから送信するデータのサイズをまず送信 */ nsize = htonl(size); write(fd, &nsize, INT_SIZE); /* 目的のデータを送信 */ while (count < size) { count += write(fd, buf+count, size-count); } #ifdef COUNT_PACKET count_packet('s'); #endif return count+INT_SIZE; } #define unix_write_w unix_write #define SERV_NAME unix_port #define PROTO_NAME "tcp" #define SERVER_NAME hostname #define MAX_REQ 16 /*-------------------------------------------------------------------/ void init_linda(): 大域変数の初期化等を行なう /-------------------------------------------------------------------*/ void init_linda() { FD_ZERO(&g_fds); /* 大域変数はゼロクリアされる g_max_fds = 0; q_end = q_top = NULL; r_end = reply = NULL; qsize = 0; */ } /*-------------------------------------------------------------------/ int open_linda (char * hostname, int port): Lindaサーバとのコネクションを確立し、タプルスペースのIDを返す。 現在はファイルディスクリプタを返している。 引き数: hostname - サーバのホスト名 port - サーバのポート番号 返り値: コネクション確立が成功するとそのファイルディスクリプタを返す。 失敗すると -1 を返す。 /-------------------------------------------------------------------*/ int open_linda(const char * hostname, int port){ int fd; struct hostent *hoste; struct sockaddr_in serv_addr; struct sockaddr_un serv_addr_un; if (hostname[0]=='/') { /* Unix domain */ if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == FAIL){ perror("socket"); return(-1); } serv_addr_un.sun_family = AF_UNIX; strcpy(serv_addr_un.sun_path, hostname); fprintf(stdout,"connecting ... %d\n", serv_addr.sin_port); if (connect(fd, (struct sockaddr *)&serv_addr_un,sizeof(serv_addr_un)) == FAIL){ perror("connect"); close(fd); return(-1); } } else { /* INET domain */ if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == FAIL){ perror("socket"); return(-1); } if ((hoste = gethostbyname(SERVER_NAME)) == NULL){ fprintf(stderr,"hostname error\n"); close(fd); return(-1); } serv_addr.sin_family = AF_INET; serv_addr.sin_port = port; serv_addr.sin_addr.s_addr = ((struct in_addr *)(hoste->h_addr))->s_addr; if (serv_addr.sin_family == AF_INET) { int tmp = 1; setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, (char *) &tmp, sizeof (int)); } fprintf(stdout,"connecting ... %d\n", serv_addr.sin_port); if (connect(fd, (struct sockaddr *)&serv_addr,sizeof(serv_addr)) == FAIL){ fprintf(stderr,"connection error! errno :%d %s\n", errno, strerror(errno)); close(fd); return(-1); } } FD_SET(fd, &g_fds); if (g_max_fds < fd) g_max_fds = fd; fprintf(stdout," connect middle server %d\n", fd); return fd; } int open_linda_java(const char * hostname, int port){ int fd; struct hostent *hoste; struct sockaddr_in serv_addr; struct sockaddr_un serv_addr_un; if (hostname[0]=='/') { /* Unix domain */ if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == FAIL){ perror("socket"); return(-1); } serv_addr_un.sun_family = AF_UNIX; strcpy(serv_addr_un.sun_path, hostname); DEB(fprintf(stdout,"connecting ... %d\n", serv_addr.sin_port)); if (connect(fd, (struct sockaddr *)&serv_addr_un,sizeof(serv_addr_un)) == FAIL){ perror("connect"); close(fd); return(-1); } } else { /* INET domain */ if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == FAIL){ perror("socket"); return(-2); } serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(port); serv_addr.sin_addr.s_addr = inet_addr(hostname); if (serv_addr.sin_addr.s_addr == 0xffffffff) { if ((hoste = gethostbyname(hostname)) == NULL){ fprintf(stdout, "hostname error\n"); close(fd); return(-1); } serv_addr.sin_addr.s_addr = *(unsigned int *)hoste->h_addr_list[0]; } if (serv_addr.sin_family == AF_INET) { int tmp = 1; setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, (char *) &tmp, sizeof (int)); } DEB(fprintf(stdout,"connecting ... %d \n", ntohs(serv_addr.sin_port))); DEB(fprintf(stdout," serv_addr.sin_port ... %d \n", ntohs(serv_addr.sin_port))); //fprintf(stdout," serv_addr.sin_addr.s_addr... %s\n", serv_addr.sin_addr.s_addr); if (connect(fd, (struct sockaddr *)&serv_addr,sizeof(serv_addr)) == FAIL){ perror("connect"); close(fd); return(-4); } } FD_SET(fd, &g_fds); if (g_max_fds < fd) g_max_fds = fd; DEB(fprintf(stdout," connect middle server %d\n", fd)); return fd; } /*-------------------------------------------------------------------/ int close_linda(int tspace_id): 接続しているタプルスペースへの接続を切る。 ソケットを閉じ、g_fds から外す。 引数: tspace_id - 閉じるタプルスペースのID 返り値: close の値 /-------------------------------------------------------------------*/ int close_linda(int tspace_id){ int retval; int i; if ((retval = close(tspace_id)) == 0) { FD_CLR(tspace_id, &g_fds); if (g_max_fds == tspace_id) { for (i = g_max_fds-1; FD_ISSET(i, &g_fds) && i; i--); g_max_fds = i; } } return retval; } /*-------------------------------------------------------------------/ int psx_out (unsigned int tspace_id, unsigned int id, unsigned char *data, unsigned int size): outコマンドをCOMMANDキューへ溜める。 引き数: tspace_id - タプルスペースのID id - タプルのID data - 送信するデータ size - dataのサイズ 返り値: シーケンス番号 /-------------------------------------------------------------------*/ long psx_out(unsigned int tspace_id, unsigned int id, unsigned char *data, unsigned int size){ long r; if ((r = psx_queue(tspace_id, id, size, data, 'o', NULL, NULL)) == FAIL) { return(FAIL); } DEB( fprintf(stdout, "psx_out: size = %d, command = %s\n", q_end->size, q_end->command+LINDA_HEADER_SIZE)); return(r); } /*-------------------------------------------------------------------/ int psx_ld (unsigned tspace_id, unsigned int id, char mode, void(*callback)(char*,void*), void * obj): in,read,waitなどの受信コマンドをCOMMANDキューへ溜める。 psx_in,psx_rd,psx_wait_rdなどに置き換えられている。 引き数: tspace_id- タプルスペースのID id - タプルのID mode - i,r,w の文字を取り、各々in,read,waitを表している。 callback - コールバックを使用する場合の関数へのポインタ。 使用しない場合はNULLをいれる。 obj - コールバックで用いる関数の引き数。 返り値: psx_queue内でmallocされたREPLY構造体へのポインタ /-------------------------------------------------------------------*/ long psx_ld(unsigned int tspace_id, unsigned int id, char mode, void(*callback)(unsigned char *,void *), void * obj){ long r; if ((r = psx_queue(tspace_id, id, 0, NULL, mode, callback, obj)) == FAIL) { return(FAIL); } return(r); } /*-------------------------------------------------------------------/ unsigned char * psx_reply (int seq): サーバから答えが来たデータを返す。 引き数: seq - psx_ld()が返した値。 返り値: seqに対応したデータを返す。データをまだ受信していない場合は NULLを返す。 /-------------------------------------------------------------------*/ unsigned char * psx_reply(unsigned int seq){ REPLY *p, *q; unsigned char *ans; DEB(fprintf(stdout, "psx_reply: search of seq = %d\n", seq)); PSX_Debug(("psx_reply: seq %d", seq)); for(q = NULL,p = reply; p; q = p,p = p->next){ if (p->seq == seq){ DEB(fprintf(stdout, "psx_reply: match of seq = %d\n", seq)); if (p->mode == '!'){ ans = p->answer; if (q == NULL){ reply = p->next; if(p==r_end) { r_end = p->next; } } else { q->next = p->next; if(p==r_end) { r_end = q; } } PSX_Debug(("psx_reply: reply %x r_end %x p %x q %x",reply,r_end,p,q)); psx_free(p); DEB( for(p=reply;p;p=p->next) { PSX_Debug(("psx_queue dump: seq %d mode %c %x %x",p->seq,p->mode,p,p->next))}); DEB( fprintf(stdout, "psx_reply: returned answer = %s\n", ans)); PSX_Debug(("psx_reply: answer %s",ans)); return(ans); } else { if (p->mode == '?'){ DEB(fprintf(stdout, "psx_reply: don't accept anser\n")); return(NULL); } } } } PSX_Debug(("psx_reply: no match seq %d",seq)); DEB(fprintf(stdout, "psx_reply: no match of seq\n")); return(NULL); } /*-------------------------------------------------------------------/ void psx_sync_n (): サーバとデータの送受信をする。COMMANDキューに溜まったデータを 送信し、サーバから送られて来たデータを対応するREPLYへいれる。 /-------------------------------------------------------------------*/ #define TIMEDELTA 10 void psx_sync_n(){ int acount; int i; COMMAND *c, *t; if (g_max_fds==0) return; fd_set tmp; struct timeval timeout; timeout.tv_sec=0; timeout.tv_usec=TIMEDELTA * 1000; acount = 0; while (q_top != NULL){ c = q_top; unix_write_w(c->tspace_id, c->command, c->size); psx_free(c->command); t = c->next; psx_free(c); q_top = c = t; qsize--; } tmp = g_fds; while(select(g_max_fds+1, &tmp, NULL, NULL, &timeout) > 0) { for (i = 0; i < g_max_fds+1; i++) { if (FD_ISSET(i, &tmp)) { unix_chkserv(i); } } } } /*-------------------------------------------------------------------/ static int psx_queue (unsigned int tspace_id, unsigned int id, unsigned int size, unsigned char *data, char mode, void(*callback)(char*,void*), void * obj): out,in,read,waitなどのコマンドをCOMMANDキューに溜める。データを 受信するコマンド(in,read,wait)のときは受け取ったときにデータを 格納するREPLY構造体を作る。 引き数: tspace_id- 送信先タプルスペースのID id - アクセスするTUPLE SpaceのID size - dataのサイズ data - 送信するデータ。受信時はNULL。 mode - コマンドのモード(out,in,read,wait は各々char型: o,i,r,w) callback - コールバックを使用する場合の関数へのポインタ。 使用しない場合はNULL。 obj - コールバックで用いる関数に引き渡すデータ。 返り値: 成功した場合 - mallocしたREPLY構造体へのポインタ。outの場合は 0が返る。 失敗した場合 - FAIL(-1)が返る。 /-------------------------------------------------------------------*/ static long psx_queue(unsigned int tspace_id, unsigned int id, unsigned int size, unsigned char *data, char mode, void(*callback)(unsigned char *,void *), void * obj){ REPLY *p; COMMAND *c; if (qsize >= MAX_QUEUE) { // PSX_Debug(("max queue: qsize=%d",qsize)); psx_sync_n(); } if (q_top == NULL) { if ((q_top = (COMMAND *) malloc (sizeof(COMMAND))) == NULL){ return(FAIL); } c = q_end = q_top; } else { if ((q_end->next = (COMMAND *) malloc (sizeof(COMMAND))) == NULL){ return(FAIL); } c = q_end; q_end = q_end->next; } /* size は DATASIZE */ if ((q_end->command = (unsigned char *) malloc(size+LINDA_HEADER_SIZE)) == NULL) { psx_free(q_end); c->next = NULL; return(FAIL); } /* データ受け取り要求(in,rd,wait)なら受け取り用の箱を用意 */ if (mode != 'o') { if (reply == NULL){ if ((reply = (REPLY *) malloc (sizeof(REPLY))) == NULL){ return(FAIL); } p = r_end = reply; p->next = NULL; } else { if ((r_end->next = (REPLY *) malloc (sizeof(REPLY))) == NULL){ return(FAIL); } p = r_end->next; r_end = p; p->next = NULL; } p->mode = '?'; p->seq = (long)p; // 構造体のアドレスで識別 p->callback = callback; p->obj = obj; PSX_Debug(("psx_queue: seq %d reply %x p %x r_end %x",seq,reply,p,r_end)); } else { p = 0; } q_end->command[LINDA_MODE_OFFSET] = mode; q_end->command[LINDA_ID_OFFSET] = id >> 8; q_end->command[LINDA_ID_OFFSET+1] = id & 0xff; q_end->command[LINDA_SEQ_OFFSET] = ((long)p>>24) & 0xff; q_end->command[LINDA_SEQ_OFFSET+1] = ((long)p>>16) & 0xff; q_end->command[LINDA_SEQ_OFFSET+2] = ((long)p>>8) & 0xff; q_end->command[LINDA_SEQ_OFFSET+3] = ((long)p) & 0xff; q_end->command[LINDA_DATA_LENGTH_OFFSET] = (size>>24) & 0xff; q_end->command[LINDA_DATA_LENGTH_OFFSET+1] = (size>>16) & 0xff; q_end->command[LINDA_DATA_LENGTH_OFFSET+2] = (size>>8) & 0xff; q_end->command[LINDA_DATA_LENGTH_OFFSET+3] = (size) & 0xff; q_end->size = size+LINDA_HEADER_SIZE; /* command size */ q_end->tspace_id = tspace_id; /* destination id */ q_end->next = NULL; qsize++; if (data && size>0) memcpy(q_end->command+LINDA_HEADER_SIZE, data, size); return((long)p); } /*-------------------------------------------------------------------/ static void unix_chkserv (int ps): サーバからデータ(TUPLE)を受け取る。REPLY構造体にコールバック関数 が指定されていればその関数を実行し、REPLY構造体をキューから取り 除く。コールバック関数が指定されていなければREPLY構造体にデータ を引き渡す。 引数: ps - 接続しているタプルスペースのソケット /-------------------------------------------------------------------*/ static void unix_chkserv(int ps){ int i,pkt,npkt,mode; unsigned int k; REPLY *r,*prev; int a; unsigned char * tuple = 0; if((i=read(ps,&npkt,INT_SIZE))<0) { perror("read"); exit(1); } pkt = ntohl(npkt); DEB(printf("pkt: %d\n",pkt)); DEB(fprintf(stdout, "psx_chkserv: queue number: %d , size = %d\n", i, pkt)); if((tuple = (unsigned char *)malloc(pkt))==NULL){ perror("malloc"); exit(1); } for(a=0;a<pkt;a+=i) { if((i=unix_read_w(ps,tuple+a,pkt-a))<0) { fprintf(stderr, "psx_chkserv: read error! on i=%d pkt=%d %s\n", i, pkt, strerror(errno)); exit(1);//close(ps); } } #ifdef COUNT_PACKET count_packet('r'); #endif mode = psx_get_mode(tuple); i = psx_get_id(tuple); k = psx_get_seq(tuple); PSX_Debug(("psx_chkserv: anser packet size = %d id %d seq %d", pkt,i,k)); DEB(fprintf(stdout, "psx_chkserv: data from server: %s id=%d seq = %d\n", tuple, i, k)); DEB ( for(p=reply;p;p=p->next) { PSX_Debug(printf("psx_queue dump: seq %d mode %c %x %x",p->seq,p->mode,p,p->next)); }) for(prev = NULL,r = reply; r; prev = r,r = r->next){ DEB(fprintf(stdout,"seq: %d\n",r->seq);) if (r->seq == k){ if(r->callback){ // call callback function (*r->callback)(tuple,r->obj); if (prev == NULL){ reply = r->next; if(r == r_end) { r_end = r->next; } } else { prev->next = r->next; if(r == r_end) { r_end = prev; } } psx_free(r); }else{ // normal reply PSX_Debug(("psx_chkserv: copy answer r %x seq %d",r,k)); if(mode == 'a'){ r->answer = tuple; }else{ r->answer = NULL; } r->mode = '!'; } break; } } tuple = 0; if (!r){ DEB(fprintf(stdout, "unix_chkserv: accepted seq %d does not match. \n",k)); } } void psx_free(void *tuple) { free(tuple); } /*-------------------------------------------------------------------/ static unsigned int get_int(unsigned char * tuple, int offset): TUPLEのヘッダに格納された int型 のデータを得るための関数 psx_get_datalength() と psx_get_seq() から呼ばれる。 引き数: tuple - ヘッダ情報も含んだTUPLE。psx_reply()で得たものでもいい。 offset - 取りだすデータのオフセット。LINDA_DATA_LENGTH_OFFSET か LINDA_SEQ_OFFSET。 返り値: 指定したオフセットに格納されていた数値(int型) /-------------------------------------------------------------------*/ static unsigned int get_int(unsigned char * tuple, int offset){ unsigned int i; i = (tuple[offset] <<24) + (tuple[offset+1]<<16) + (tuple[offset+2]<<8) + (tuple[offset+3]); return i; } unsigned int psx_get_datalength(unsigned char * tuple){ return get_int(tuple,LINDA_DATA_LENGTH_OFFSET); } unsigned char * psx_get_data(unsigned char * tuple) { return tuple + LINDA_HEADER_SIZE; } unsigned int psx_get_seq(unsigned char * tuple){ return get_int(tuple,LINDA_SEQ_OFFSET); } unsigned short psx_get_id(unsigned char * tuple){ return (tuple[LINDA_ID_OFFSET] * 256 + tuple[LINDA_ID_OFFSET+1]); } unsigned char psx_get_mode(unsigned char * tuple){ return tuple[LINDA_MODE_OFFSET]; } static void set_int_to_char(unsigned char * tuple, int i, int offset){ tuple[offset] = (i>>24) & 0xff; tuple[offset+1] = (i>>16) & 0xff; tuple[offset+2] = (i>>8) & 0xff; tuple[offset+3] = (i) & 0xff; } void psx_set_datalength(unsigned char * tuple, int length){ set_int_to_char(tuple,length,LINDA_DATA_LENGTH_OFFSET); } void psx_set_seq(unsigned char * tuple, int seq){ set_int_to_char(tuple,seq,LINDA_SEQ_OFFSET); } void psx_set_id(unsigned char * tuple, short id){ tuple[LINDA_ID_OFFSET] = id >> 8; tuple[LINDA_ID_OFFSET+1] = id & 0xff; } void psx_set_mode(unsigned char * tuple, char mode){ tuple[LINDA_MODE_OFFSET] = mode; } /* end */