comparison Renderer/Engine/lindaapi.cc @ 572:096a900bd9d3 draft

merge
author Shinji KONO <kono@ie.u-ryukyu.ac.jp>
date Fri, 23 Oct 2009 00:40:24 +0900
parents 3bc98f6d31ff
children 341f1f881a9b
comparison
equal deleted inserted replaced
571:fd3789d17305 572:096a900bd9d3
1 // $Id: lindaapi.c,v 1.9 2006/04/03 08:17:11 kono Exp $ 1 // $Id$
2 // 2 //
3 3
4 /*---------------------------------------------------------------------- 4 /*----------------------------------------------------------------------
5 インクルードファイル読み込み 5 ゃ潟若<ゃ茯粋昭
6 ----------------------------------------------------------------------*/ 6 ----------------------------------------------------------------------*/
7 #include <sys/file.h>
8 #include <stdio.h> 7 #include <stdio.h>
8 #include <string.h>
9 #include <stdlib.h> 9 #include <stdlib.h>
10 #include <string.h> 10 #include <sys/time.h>
11 #include <unistd.h> 11 #include <unistd.h>
12 #include <fcntl.h> 12 #include <netinet/in.h>
13 #include <sys/uio.h>
14 #include <sys/time.h>
15 #include <sys/select.h> 13 #include <sys/select.h>
16 #include <sys/stat.h>
17 #include <sys/types.h> 14 #include <sys/types.h>
18 #include <sys/socket.h> 15 #include <sys/socket.h>
16 #include <netdb.h>
17 #include <netinet/tcp.h>
19 #include <sys/un.h> 18 #include <sys/un.h>
20 #include <netinet/in.h>
21 #include <netinet/tcp.h>
22 #include <signal.h>
23 #include <termios.h>
24 #include <netdb.h>
25 #include <errno.h> 19 #include <errno.h>
26 #include <sys/select.h> 20 #include <arpa/inet.h>
27 21
28 #include "lindaapi.h" 22 #include "lindaapi.h"
29 23
30 #define TIMEDELTA 10 24
31 #if 0 25 #if 0
32 #define PSX_Debug(deb) (putchar(PS_DEB)),\ 26 #define PSX_Debug(deb) (putchar(PS_DEB)),\
33 (printf deb ),\ 27 (printf deb ),\
34 (putchar(PS_DEB)) 28 (putchar(PS_DEB))
35 #define DEB(a) 29 #define DEB(a)
36 #else 30 #else
37 #define PSX_Debug(deb) 31 #define PSX_Debug(deb)
38 #define DEB(a) /* a */ 32 #define DEB(a) /* a */
39 #endif 33 #endif
40 34
41 COMMAND *q_top, *q_end; 35 /* Global Variables */
42 REPLY *reply, *r_end; 36 static COMMAND *q_top, *q_end; /* 潟潟ャ */
43 37 static REPLY *reply, *r_end; /* ャ */
44 int qsize, ps; 38 static int qsize; /* 潟潟ャ若泣ゃ */
45 unsigned short seq; 39 static fd_set g_fds; /* ・膓帥鴻若合召FD(FileDiscripter)篆 */
40 static int g_max_fds; /* hFD紊у */
41
42 /* Static Functions */
43 static void unix_chkserv(int ps);
44 void psx_free(void *);
45 static int psx_queue(unsigned int tspace_id, unsigned int id,
46 unsigned int size, unsigned char *data, char mode,
47 void(*callback)(unsigned char *,void *),void * obj);
46 48
47 #ifdef COUNT_PACKET 49 #ifdef COUNT_PACKET
48 // print packet count message per PRINT_INTERVAL sec 50 // print packet count message per PRINT_INTERVAL sec
49 #define PRINT_INTERVAL 4 51 #define PRINT_INTERVAL 4
50 52 static void count_packet(char type);
51 /*-------------------------------------------------------------------/ 53
52 void 54 /*-------------------------------------------------------------------/
55 static void
53 count_packet (char type): 56 count_packet (char type):
54 パケットの送受信カウントする 57 宴篆<潟
55 58
56 引き数: 59 綣:
57 type - 送信、受信 (char型: s,r) 60 type - 篆<篆 (char: s,r)
58 /-------------------------------------------------------------------*/ 61 /-------------------------------------------------------------------*/
59 void count_packet(char type) 62 static void
63 count_packet(char type)
60 { 64 {
61 static int send_packet=-1,receive_packet=0; 65 static int send_packet=-1,receive_packet=0;
62 static struct timeval start,now,previous; 66 static struct timeval start,now,previous;
63 67
64 if (out_packet == -1) { 68 if (out_packet == -1) {
65 gettimeofday(&start,NULL); 69 gettimeofday(&start,NULL);
66 gettimeofday(&previous,NULL); 70 gettimeofday(&previous,NULL);
67 send_packet = 0; 71 send_packet = 0;
68 printf("packet\tout\tread\t\ttime\n"); 72 printf("packet\tout\tread\t\ttime\n");
69 } 73 }
70 74
71 if (type == 's') { 75 if (type == 's') {
72 send_packet++; 76 send_packet++;
73 } else if (type == 'r') { 77 } else if (type == 'r') {
74 receive_packet++; 78 receive_packet++;
75 } else { 79 } else {
76 fprintf(stderr,"No type in count_packet function\n"); 80 fprintf(stderr,"No type in count_packet function\n");
77 return; 81 return;
78 } 82 }
79 83
80 gettimeofday(&now,NULL); 84 gettimeofday(&now,NULL);
81 if ((now.tv_sec-previous.tv_sec) > PRINT_INTERVAL) { 85 if ((now.tv_sec-previous.tv_sec) > PRINT_INTERVAL) {
82 printf("log\t%d\t%d\t%ld\n", 86 printf("log\t%d\t%d\t%ld\n",
83 send_packet,receive_packet,now.tv_sec-start.tv_sec); 87 send_packet,receive_packet,now.tv_sec-start.tv_sec);
84 fflush(stdout); 88 fflush(stdout);
85 89
86 previous.tv_sec = now.tv_sec; 90 previous.tv_sec = now.tv_sec;
87 send_packet = receive_packet = 0; 91 send_packet = receive_packet = 0;
88 } 92 }
89 } 93 }
90 #endif 94 #endif
91 95
92 96
93 #define unix_open open
94 #define unix_read_w read 97 #define unix_read_w read
95 /*-------------------------------------------------------------------/ 98
96 int 99
97 unix_read (int fd, char *buf, unsigned int size): 100 static int
98 サーバからTUPLEを読みこむ。
99 現在は使われていない。
100
101 引き数:
102 fd - サーバのファイルディスクリプタ
103 buf - 受け取るデータの格納場所(TUPLEヘッダ含む)
104 size - bufのbyte数
105 返り値:
106 読みこんだbyte数
107 /-------------------------------------------------------------------*/
108 int
109 unix_read(int fd,char *buf,unsigned int size) {
110 int len,a,i;
111 if(read(fd,buf,INT_SIZE)!=INT_SIZE) { // INT_SIZE is sizeof(int)
112 fprintf(stderr, "read error! on fd:%d len=%d %s\n", fd,
113 *(unsigned int*)&buf[0],
114 strerror(errno));
115 exit(1);
116 }
117 len = ntohl(*(unsigned int*)&buf[0]);
118 if((unsigned int)len>size) len=(int)size;
119 for(a=0;a<len;a+=i) {
120 if((i=read(fd,buf+a,len-a))<0) {
121 fprintf(stderr, "ldserv: client read error! on i=%d len= %d %s\n",
122 i, len, strerror(errno));
123 exit(1);
124 }
125 }
126 return len;
127 }
128
129 /*-------------------------------------------------------------------/
130 int
131 unix_write (int fd, unsigned char *buf, unsigned int size):
132 サーバへTUPLEを送る。
133
134 引き数:
135 fd - サーバのファイルディスクリプタ
136 buf - サーバへ送るデータ(TUPLEヘッダ含む)
137 size - bufのbyte数
138 返り値:
139 送った(書きこんだ)データのbyte数
140 /-------------------------------------------------------------------*/
141 int
142 unix_write(int fd,unsigned char *buf,unsigned int size) { 101 unix_write(int fd,unsigned char *buf,unsigned int size) {
143 int i,nsize; 102 unsigned int count=0;
103 uint32_t nsize;
104
105 /* 篆<若帥泣ゃ冴障篆 */
144 nsize = htonl(size); 106 nsize = htonl(size);
145 i = write(fd,&nsize,INT_SIZE); 107 write(fd, &nsize, INT_SIZE);
146 i += write(fd,buf,size); // size == datasize + LINDA_HEADER_SIZE 108
109 /* 若帥篆 */
110 while (count < size) {
111 count += write(fd, buf+count, size-count);
112 }
147 #ifdef COUNT_PACKET 113 #ifdef COUNT_PACKET
148 count_packet('s'); 114 count_packet('s');
149 #endif 115 #endif
150 return(i); 116 return count+INT_SIZE;
151 } 117 }
152 118
153 #define unix_write_w unix_write 119 #define unix_write_w unix_write
154 120
155 #define SERV_NAME unix_port 121 #define SERV_NAME unix_port
156 #define PROTO_NAME "tcp" 122 #define PROTO_NAME "tcp"
157 #define SERVER_NAME hostname 123 #define SERVER_NAME hostname
158 #define MAX_REQ 16 124 #define MAX_REQ 16
159 125
160 int fd,paddrlen; 126
161 struct hostent *hoste; 127
162 struct sockaddr_in serv_addr; 128 /*-------------------------------------------------------------------/
163 struct sockaddr_un serv_addr_un; 129 void
164 unsigned char ipaddr[4]; 130 init_linda():
131 紊у紊違膈茵
132 /-------------------------------------------------------------------*/
133 void
134 init_linda() {
135 FD_ZERO(&g_fds);
136 /* 紊у紊違若≪
137 g_max_fds = 0;
138 q_end = q_top = NULL;
139 r_end = reply = NULL;
140 qsize = 0;
141 */
142 }
165 143
166 144
167 /*-------------------------------------------------------------------/ 145 /*-------------------------------------------------------------------/
168 int 146 int
169 start_linda (char * hostname): 147 open_linda (char * hostname, int port):
170 サーバとのコネクションを確立し、COMMANDキューとREPLYキューの 148 Linda泣若潟激с潟腆榊帥鴻若鴻ID菴
171 初期化を行なう。 149 憜<ゃc鴻帥菴
172 150
173 引き数: 151 綣:
174 hostname - サーバのホスト名 152 hostname - 泣若鴻
175 返り値: 153 port - 泣若若
176 コネクション確立が成功するとそのファイルディスクリプタを返す。 154 菴:
177 失敗すると -1 を返す。 155 潟激с括∈腴<ゃc鴻帥菴
156 紊掩 -1 菴
178 /-------------------------------------------------------------------*/ 157 /-------------------------------------------------------------------*/
179 int 158 int
180 start_linda(char * hostname){ 159 open_linda(char * hostname, int port){
181 char *p; 160 int fd;
182 const char *hostname0 = "/tmp/ldserv"; 161 struct hostent *hoste;
183 162 struct sockaddr_in serv_addr;
184 if (! hostname) { 163 struct sockaddr_un serv_addr_un;
185 hostname = (char *)hostname0; 164
186 }
187 if (hostname[0]=='/') { 165 if (hostname[0]=='/') {
188 /* Unix domain */ 166 /* Unix domain */
189 if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == FAIL){ 167 if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == FAIL){
190 fprintf(stderr, "socket open error! errno :%d %s\n", errno, 168 perror("socket");
191 strerror(errno));
192 return(-1); 169 return(-1);
193 } 170 }
194 serv_addr_un.sun_family = AF_UNIX; 171 serv_addr_un.sun_family = AF_UNIX;
195 strcpy(serv_addr_un.sun_path, hostname); 172 strcpy(serv_addr_un.sun_path, hostname);
196 fprintf(stdout,"connecting ... %d\n", ntohs(serv_addr.sin_port)); 173 fprintf(stdout,"connecting ... %d\n", serv_addr.sin_port);
197 if (connect(fd, (struct sockaddr *)&serv_addr_un,sizeof(serv_addr_un)) == FAIL){ 174 if (connect(fd, (struct sockaddr *)&serv_addr_un,sizeof(serv_addr_un)) == FAIL){
198 fprintf(stderr,"connection error! errno :%d %s\n", errno, 175 perror("connect");
199 strerror(errno));
200 close(fd); 176 close(fd);
201 return(-1); 177 return(-1);
202 } 178 }
203 179
204 } else { 180 } else {
205 /* INET domain */ 181 /* INET domain */
206 if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == FAIL){ 182 if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == FAIL){
207 fprintf(stderr, "socket open error! errno :%d %s\n", errno, 183 perror("socket");
208 strerror(errno));
209 return(-1); 184 return(-1);
210 } 185 }
211 /* check optional port number */ 186 if ((hoste = gethostbyname(SERVER_NAME)) == NULL){
212 serv_addr.sin_port = htons(10000);
213 p = (char *)malloc(strlen(hostname));
214 strcpy(p,hostname);
215 hostname = p;
216 while(*p) {
217 if (*p==':') {
218 serv_addr.sin_port = htons(atoi(p+1));
219 *p = 0;
220 break;
221 }
222 p++;
223 }
224 if ((hoste = gethostbyname(hostname)) == NULL){
225 fprintf(stderr,"hostname error\n"); 187 fprintf(stderr,"hostname error\n");
226 close(fd); 188 close(fd);
227 return(-1); 189 return(-1);
228 } 190 }
229 free(hostname); hostname=0;
230 serv_addr.sin_family = AF_INET; 191 serv_addr.sin_family = AF_INET;
192 serv_addr.sin_port = port;
231 serv_addr.sin_addr.s_addr = ((struct in_addr *)(hoste->h_addr))->s_addr; 193 serv_addr.sin_addr.s_addr = ((struct in_addr *)(hoste->h_addr))->s_addr;
232 if (serv_addr.sin_family == AF_INET) { 194 if (serv_addr.sin_family == AF_INET) {
233 int tmp = 1; 195 int tmp = 1;
234 setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, 196 setsockopt (fd, IPPROTO_TCP, TCP_NODELAY,
235 (char *) &tmp, sizeof (int)); 197 (char *) &tmp, sizeof (int));
236 } 198 }
237 fprintf(stdout,"connecting ... %d\n", ntohs(serv_addr.sin_port)); 199 fprintf(stdout,"connecting ... %d\n", serv_addr.sin_port);
238 if (connect(fd, (struct sockaddr *)&serv_addr,sizeof(serv_addr)) == FAIL){ 200 if (connect(fd, (struct sockaddr *)&serv_addr,sizeof(serv_addr)) == FAIL){
239 fprintf(stderr,"connection error! errno :%d %s\n", errno, 201 fprintf(stderr,"connection error! errno :%d %s\n", errno,
240 strerror(errno)); 202 strerror(errno));
241 close(fd); 203 close(fd);
242 return(-1); 204 return(-1);
243 } 205 }
244 } 206 }
245 207
246 ps = fd; 208 FD_SET(fd, &g_fds);
209 if (g_max_fds < fd) g_max_fds = fd;
210
247 fprintf(stdout," connect middle server %d\n", fd); 211 fprintf(stdout," connect middle server %d\n", fd);
248 q_end = q_top = NULL; 212 return fd;
249 r_end = reply = NULL; 213 }
250 qsize = seq = 0; 214
251 seq = 120; 215 int
252 return ps; 216 open_linda_java(char * hostname, int port){
253 } 217 int fd;
218 struct hostent *hoste;
219 struct sockaddr_in serv_addr;
220 struct sockaddr_un serv_addr_un;
221
222 if (hostname[0]=='/') {
223 /* Unix domain */
224 if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == FAIL){
225 perror("socket");
226 return(-1);
227 }
228 serv_addr_un.sun_family = AF_UNIX;
229 strcpy(serv_addr_un.sun_path, hostname);
230 DEB(fprintf(stdout,"connecting ... %d\n", serv_addr.sin_port));
231 if (connect(fd, (struct sockaddr *)&serv_addr_un,sizeof(serv_addr_un)) == FAIL){
232 perror("connect");
233 close(fd);
234 return(-1);
235 }
236
237 } else {
238 /* INET domain */
239 if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == FAIL){
240 perror("socket");
241 return(-2);
242 }
243 serv_addr.sin_family = AF_INET;
244 serv_addr.sin_port = htons(port);
245
246 serv_addr.sin_addr.s_addr = inet_addr(hostname);
247 if (serv_addr.sin_addr.s_addr == 0xffffffff) {
248 if ((hoste = gethostbyname(hostname)) == NULL){
249 fprintf(stdout, "hostname error\n");
250 close(fd);
251 return(-1);
252 }
253 serv_addr.sin_addr.s_addr = *(unsigned int *)hoste->h_addr_list[0];
254 }
255
256 if (serv_addr.sin_family == AF_INET) {
257 int tmp = 1;
258 setsockopt (fd, IPPROTO_TCP, TCP_NODELAY,
259 (char *) &tmp, sizeof (int));
260 }
261 DEB(fprintf(stdout,"connecting ... %d \n", ntohs(serv_addr.sin_port)));
262 DEB(fprintf(stdout," serv_addr.sin_port ... %d \n", ntohs(serv_addr.sin_port)));
263 //fprintf(stdout," serv_addr.sin_addr.s_addr... %s\n", serv_addr.sin_addr.s_addr);
264 if (connect(fd, (struct sockaddr *)&serv_addr,sizeof(serv_addr)) == FAIL){
265 perror("connect");
266 close(fd);
267 return(-4);
268 }
269 }
270
271 FD_SET(fd, &g_fds);
272 if (g_max_fds < fd) g_max_fds = fd;
273
274 DEB(fprintf(stdout," connect middle server %d\n", fd));
275 return fd;
276 }
277
254 278
255 /*-------------------------------------------------------------------/ 279 /*-------------------------------------------------------------------/
256 int 280 int
257 psx_out (unsigned int id, unsigned char *data, unsigned int size): 281 close_linda(int tspace_id):
258 outコマンドをCOMMANDキューへ溜める。 282 ・膓帥鴻若鴻吾・膓
259 283 純宴g_fds 紊
260 引き数: 284 綣:
261 id - TUPLE SpaceのID 285 tspace_id - 帥鴻若鴻ID
262 data - 送信するデータ 286 菴:
263 size - dataのサイズ 287 close
264 返り値:
265 シーケンス番号
266 /-------------------------------------------------------------------*/ 288 /-------------------------------------------------------------------*/
267 int 289 int
268 psx_out(unsigned int id, unsigned char *data, unsigned int size){ 290 close_linda(int tspace_id){
269 if (psx_queue(id, size, data, 'o', NULL, NULL) == FAIL){ 291 int retval;
292 int i;
293 if ((retval = close(tspace_id)) == 0) {
294 FD_CLR(tspace_id, &g_fds);
295 if (g_max_fds == tspace_id) {
296 for (i = g_max_fds-1; FD_ISSET(i, &g_fds) && i; i--);
297 g_max_fds = i;
298 }
299 }
300 return retval;
301 }
302
303 /*-------------------------------------------------------------------/
304 int
305 psx_out (unsigned int tspace_id, unsigned int id,
306 unsigned char *data, unsigned int size):
307 out潟潟COMMANDャ若御
308
309 綣:
310 tspace_id - 帥鴻若鴻ID
311 id - 帥ID
312 data - 篆<若
313 size - data泣ゃ
314 菴:
315 激若宴潟合
316 /-------------------------------------------------------------------*/
317 int
318 psx_out(unsigned int tspace_id, unsigned int id,
319 unsigned char *data, unsigned int size){
320 int r;
321 if ((r = psx_queue(tspace_id, id, size, data, 'o', NULL, NULL)) == FAIL) {
270 return(FAIL); 322 return(FAIL);
271 } 323 }
272 DEB( fprintf(stdout, "psx_out: size = %d, command = %s\n", 324 DEB( fprintf(stdout, "psx_out: size = %d, command = %s\n",
273 q_end->size, q_end->command+LINDA_HEADER_SIZE)); 325 q_end->size, q_end->command+LINDA_HEADER_SIZE));
274 return(seq); 326 return(r);
275 } 327 }
276 328
277 /*-------------------------------------------------------------------/ 329 /*-------------------------------------------------------------------/
278 int 330 int
279 psx_ld (unsigned int id, char mode, void(*callback)(char*,void*), 331 psx_ld (unsigned tspace_id, unsigned int id,
280 void * obj): 332 char mode, void(*callback)(char*,void*), void * obj):
281 in,read,waitなどの受信コマンドをCOMMANDキューへ溜める。 333 in,read,wait篆<潟潟COMMANDャ若御
282 psx_in,psx_rd,psx_wait_rdなどに置き換えられている。 334 psx_in,psx_rd,psx_wait_rd臀
283 335
284 引き数: 336 綣:
285 id - TUPLE SpaceのID 337 tspace_id- 帥鴻若鴻ID
286 mode - i,r,w の文字を取り、各々in,read,waitを表している。 338 id - 帥ID
287 callback - コールバックを使用する場合の関数へのポインタ。 339 mode - i,r,w 絖in,read,wait茵
288 使用しない場合はNULLをいれる。 340 callback - 潟若篏睡翫∽違吾ゃ潟帥
289 obj - コールバックで用いる関数の引き数。 341 篏睡翫NULL
290 返り値: 342 obj - 潟若х∽違綣違
291 psx_queue内でmallocされたREPLY構造体へのポインタ 343 菴:
344 psx_queuemallocREPLY罕篏吾ゃ潟
292 /-------------------------------------------------------------------*/ 345 /-------------------------------------------------------------------*/
293 int 346 int
294 psx_ld(unsigned int id, char mode,void(*callback)(char *,void *),void * obj){ 347 psx_ld(unsigned int tspace_id, unsigned int id,
348 char mode, void(*callback)(unsigned char *,void *), void * obj){
295 int r; 349 int r;
296 if ((r=psx_queue(id, 0, NULL, mode, callback, obj)) == FAIL){ 350 if ((r = psx_queue(tspace_id, id, 0, NULL, mode, callback, obj)) == FAIL) {
297 return(FAIL); 351 return(FAIL);
298 } 352 }
299 return(r); 353 return(r);
300 } 354 }
301 355
302 /*-------------------------------------------------------------------/ 356 /*-------------------------------------------------------------------/
303 unsigned char * 357 unsigned char *
304 psx_reply (int seq): 358 psx_reply (int seq):
305 サーバから答えが来たデータを返す。 359 泣若膈ャ若帥菴
306 360
307 引き数: 361 綣:
308 seq - psx_ld()が返した値。 362 seq - psx_ld()菴ゃ
309 返り値: 363 菴:
310 seqに対応したデータを返す。データをまだ受信していない場合は 364 seq絲上若帥菴若帥障篆<翫
311 NULLを返す。 365 NULL菴
312 /-------------------------------------------------------------------*/ 366 /-------------------------------------------------------------------*/
313 unsigned char * 367 unsigned char *
314 psx_reply(int seq){ 368 psx_reply(unsigned int seq){
315 REPLY *p, *q; 369 REPLY *p, *q;
316 char *ans; 370 unsigned char *ans;
317 371
318 DEB(fprintf(stdout, "psx_reply: search of seq = %d\n", seq)); 372 DEB(fprintf(stdout, "psx_reply: search of seq = %d\n", seq));
319 PSX_Debug(("psx_reply: seq %d", seq)); 373 PSX_Debug(("psx_reply: seq %d", seq));
320 for(q = NULL,p = reply;p;q = p,p = p->next){ 374 for(q = NULL,p = reply; p; q = p,p = p->next){
321 if (p->seq == (unsigned)seq){ 375 if (p->seq == seq){
322 DEB(fprintf(stdout, "psx_reply: match of seq = %d\n", seq)); 376 DEB(fprintf(stdout, "psx_reply: match of seq = %d\n", seq));
323 if (p->mode == '!'){ 377 if (p->mode == '!'){
324 ans = (char *)p->answer; 378 ans = p->answer;
325 if (q == NULL){ 379 if (q == NULL){
326 reply = p->next; 380 reply = p->next;
327 if(p==r_end) { 381 if(p==r_end) {
328 r_end = p->next; 382 r_end = p->next;
329 } 383 }
331 q->next = p->next; 385 q->next = p->next;
332 if(p==r_end) { 386 if(p==r_end) {
333 r_end = q; 387 r_end = q;
334 } 388 }
335 } 389 }
336 PSX_Debug(("psx_reply: reply %x r_end %x p %x q %x",reply,r_end,p,q)); 390 PSX_Debug(("psx_reply: reply %x r_end %x p %x q %x",reply,r_end,p,q));
337 free(p); 391 psx_free(p);
338 DEB( for(p=reply;p;p=p->next) { PSX_Debug(("psx_queue dump: seq %d mode %c %x %x",p->seq,p->mode,p,p->next))}); 392 DEB( for(p=reply;p;p=p->next) { PSX_Debug(("psx_queue dump: seq %d mode %c %x %x",p->seq,p->mode,p,p->next))});
339 DEB( fprintf(stdout, "psx_reply: returned answer = %s\n", ans)); 393 DEB( fprintf(stdout, "psx_reply: returned answer = %s\n", ans));
340 PSX_Debug(("psx_reply: answer %s",ans)); 394 PSX_Debug(("psx_reply: answer %s",ans));
341 return((unsigned char *)ans); 395 return(ans);
342 } else { 396 } else {
343 if (p->mode == '?'){ 397 if (p->mode == '?'){
344 DEB(fprintf(stdout, "psx_reply: don't accept anser\n")); 398 DEB(fprintf(stdout, "psx_reply: don't accept anser\n"));
345 return(NULL); 399 return(NULL);
346 } 400 }
354 } 408 }
355 409
356 /*-------------------------------------------------------------------/ 410 /*-------------------------------------------------------------------/
357 void 411 void
358 psx_sync_n (): 412 psx_sync_n ():
359 サーバとデータの送受信をする。COMMANDキューに溜まったデータを 413 泣若若帥篆<COMMANDャ若羣障c若帥
360 送信し、サーバから送られて来たデータを対応するREPLYへいれる。 414 篆<泣若ャ若帥絲上REPLY吾
361 /-------------------------------------------------------------------*/ 415 /-------------------------------------------------------------------*/
416 #define TIMEDELTA 10
362 void 417 void
363 psx_sync_n(){ 418 psx_sync_n(){
364 int acount; 419 int acount;
420 int i;
365 COMMAND *c, *t; 421 COMMAND *c, *t;
366 422
367 fd_set tmp, fds; 423 fd_set tmp;
368 struct timeval timeout; 424 struct timeval timeout;
369 timeout.tv_sec=0; 425 timeout.tv_sec=0;
370 timeout.tv_usec=TIMEDELTA * 1000; 426 timeout.tv_usec=TIMEDELTA * 1000;
371 427
372 acount = 0; 428 acount = 0;
373 while (q_top != NULL){ 429 while (q_top != NULL){
374 c = q_top; 430 c = q_top;
375 unix_write_w(ps, c->command, c->size); 431 unix_write_w(c->tspace_id, c->command, c->size);
376 free(c->command); 432 psx_free(c->command);
377 t = c->next; 433 t = c->next;
378 free(c); 434 psx_free(c);
379 q_top = c = t;qsize--; 435 q_top = c = t;
380 } 436 qsize--;
381 FD_ZERO(&fds); 437 }
382 FD_SET(ps, &fds); 438
383 tmp = fds; 439 tmp = g_fds;
384 while(select(32, &tmp, NULL, NULL, &timeout) > 0) { 440 while(select(g_max_fds+1, &tmp, NULL, NULL, &timeout) > 0) {
385 if(FD_ISSET(ps, &tmp)) { 441 for (i = 0; i < g_max_fds+1; i++) {
386 unix_chkserv(); 442 if (FD_ISSET(i, &tmp)) {
387 } 443 unix_chkserv(i);
388 } 444 }
389 } 445 }
390 446 }
391 /*-------------------------------------------------------------------/ 447 }
392 int 448
393 psx_queue (unsigned int id, unsigned int size, unsigned char *data, 449 /*-------------------------------------------------------------------/
394 char mode, void(*callback)(char*,void*), void * obj): 450 static int
395 out,in,read,waitなどのコマンドをCOMMANDキューに溜める。データを 451 psx_queue (unsigned int tspace_id, unsigned int id,
396 受信するコマンド(in,read,wait)のときは受け取ったときにデータを 452 unsigned int size, unsigned char *data, char mode,
397 格納するREPLY構造体を作る。 453 void(*callback)(char*,void*), void * obj):
398 454 out,in,read,wait潟潟COMMANDャ若羣若帥
399 引き数: 455 篆<潟潟(in,read,wait)c若帥
400 id - アクセスするTUPLE SpaceのID 456 主REPLY罕篏篏
401 size - dataのサイズ 457
402 data - 送信するデータ。受信時はNULL。 458 綣:
403 mode - コマンドのモード(out,in,read,wait は各々char型: o,i,r,w) 459 tspace_id- 篆≦帥鴻若鴻ID
404 callback - コールバックを使用する場合の関数へのポインタ。 460 id - ≪祉鴻TUPLE SpaceID
405 使用しない場合はNULL。 461 size - data泣ゃ
406 obj - コールバックで用いる関数に引き渡すデータ。 462 data - 篆<若帥篆≧NULL
407 返り値: 463 mode - 潟潟≪若(out,in,read,wait char: o,i,r,w)
408 成功した場合 - mallocしたREPLY構造体へのポインタ。outの場合は 464 callback - 潟若篏睡翫∽違吾ゃ潟帥
409 0が返る。 465 篏睡翫NULL
410 失敗した場合 - FAIL(-1)が返る。 466 obj - 潟若х∽違綣羝<若帥
411 /-------------------------------------------------------------------*/ 467 菴:
412 int 468 翫 - mallocREPLY罕篏吾ゃ潟帥out翫
413 psx_queue(unsigned int id, unsigned int size, unsigned char *data, char mode, 469 0菴
414 void(*callback)(char *,void *), void * obj){ 470 紊掩翫 - FAIL(-1)菴
471 /-------------------------------------------------------------------*/
472 static int
473 psx_queue(unsigned int tspace_id, unsigned int id,
474 unsigned int size, unsigned char *data, char mode,
475 void(*callback)(unsigned char *,void *), void * obj){
415 REPLY *p; 476 REPLY *p;
416 COMMAND *c; 477 COMMAND *c;
417 478
418 seq++; 479 if (qsize >= MAX_QUEUE) {
419 if (qsize >= MAX_QUEUE){
420 // PSX_Debug(("max queue: qsize=%d",qsize)); 480 // PSX_Debug(("max queue: qsize=%d",qsize));
421 psx_sync_n(); 481 psx_sync_n();
422 } 482 }
423 483
424 for(p=reply;p;p=p->next){ 484 if (q_top == NULL) {
425 if(p->seq == seq){
426 printf("same seq number: %d\n",seq);
427 }
428 }
429
430 if (q_top == NULL){
431 if ((q_top = (COMMAND *) malloc (sizeof(COMMAND))) == NULL){ 485 if ((q_top = (COMMAND *) malloc (sizeof(COMMAND))) == NULL){
432 return(FAIL); 486 return(FAIL);
433 } 487 }
434 c = q_end = q_top; 488 c = q_end = q_top;
435 } else { 489 } else {
437 return(FAIL); 491 return(FAIL);
438 } 492 }
439 c = q_end; 493 c = q_end;
440 q_end = q_end->next; 494 q_end = q_end->next;
441 } 495 }
442 496
443 // size は DATASIZE 497 /* size DATASIZE */
444 if ((q_end->command = (unsigned char *) malloc(size+LINDA_HEADER_SIZE)) == NULL){ 498 if ((q_end->command = (unsigned char *) malloc(size+LINDA_HEADER_SIZE)) == NULL) {
445 free(q_end); 499 psx_free(q_end);
446 c->next = NULL; 500 c->next = NULL;
447 return(FAIL); 501 return(FAIL);
448 } 502 }
449 503
450 if (mode != 'o'){ 504 /* 若水荀羆(in,rd,wait)膊宴 */
505 if (mode != 'o') {
451 if (reply == NULL){ 506 if (reply == NULL){
452 if ((reply = (REPLY *) malloc (sizeof(REPLY))) == NULL){ 507 if ((reply = (REPLY *) malloc (sizeof(REPLY))) == NULL){
453 return(FAIL); 508 return(FAIL);
454 } 509 }
455 p = r_end = reply; p->next = NULL; 510 p = r_end = reply; p->next = NULL;
458 return(FAIL); 513 return(FAIL);
459 } 514 }
460 p = r_end->next; r_end = p; p->next = NULL; 515 p = r_end->next; r_end = p; p->next = NULL;
461 } 516 }
462 p->mode = '?'; 517 p->mode = '?';
463 p->seq = (int)p; // 構造体のアドレスで識別 518 p->seq = (int)p; // 罕篏≪鴻ц
464 p->callback = callback; 519 p->callback = callback;
465 p->obj = obj; 520 p->obj = obj;
466 PSX_Debug(("psx_queue: seq %d reply %x p %x r_end %x",seq,reply,p,r_end)); 521 PSX_Debug(("psx_queue: seq %d reply %x p %x r_end %x",seq,reply,p,r_end));
467 }else{ 522 } else {
468 p=0; 523 p = 0;
469 } 524 }
470 q_end->command[LINDA_MODE_OFFSET] = mode; 525 q_end->command[LINDA_MODE_OFFSET] = mode;
471 526
472 q_end->command[LINDA_ID_OFFSET] = id >> 8; 527 q_end->command[LINDA_ID_OFFSET] = id >> 8;
473 q_end->command[LINDA_ID_OFFSET+1] = id & 0xff; 528 q_end->command[LINDA_ID_OFFSET+1] = id & 0xff;
474 529
475 q_end->command[LINDA_SEQ_OFFSET] = ((int)p>>24) & 0xff; 530 q_end->command[LINDA_SEQ_OFFSET] = ((int)p>>24) & 0xff;
476 q_end->command[LINDA_SEQ_OFFSET+1] = ((int)p>>16) & 0xff; 531 q_end->command[LINDA_SEQ_OFFSET+1] = ((int)p>>16) & 0xff;
477 q_end->command[LINDA_SEQ_OFFSET+2] = ((int)p>>8) & 0xff; 532 q_end->command[LINDA_SEQ_OFFSET+2] = ((int)p>>8) & 0xff;
478 q_end->command[LINDA_SEQ_OFFSET+3] = ((int)p) & 0xff; 533 q_end->command[LINDA_SEQ_OFFSET+3] = ((int)p) & 0xff;
479 534
480 q_end->command[LINDA_DATA_LENGTH_OFFSET] = (size>>24) & 0xff; 535 q_end->command[LINDA_DATA_LENGTH_OFFSET] = (size>>24) & 0xff;
481 q_end->command[LINDA_DATA_LENGTH_OFFSET+1] = (size>>16) & 0xff; 536 q_end->command[LINDA_DATA_LENGTH_OFFSET+1] = (size>>16) & 0xff;
482 q_end->command[LINDA_DATA_LENGTH_OFFSET+2] = (size>>8) & 0xff; 537 q_end->command[LINDA_DATA_LENGTH_OFFSET+2] = (size>>8) & 0xff;
483 q_end->command[LINDA_DATA_LENGTH_OFFSET+3] = (size) & 0xff; 538 q_end->command[LINDA_DATA_LENGTH_OFFSET+3] = (size) & 0xff;
484 539
485 q_end->size = size+LINDA_HEADER_SIZE; /* command size */ 540 q_end->size = size+LINDA_HEADER_SIZE; /* command size */
486 q_end->next = NULL; qsize++; 541 q_end->tspace_id = tspace_id; /* destination id */
487 if (data && size>0) memcpy(q_end->command+LINDA_HEADER_SIZE, data, size); 542 q_end->next = NULL;
543 qsize++;
544 if (data && size>0)
545 memcpy(q_end->command+LINDA_HEADER_SIZE, data, size);
488 return((int)p); 546 return((int)p);
489 } 547 }
490 548
491 /*-------------------------------------------------------------------/ 549 /*-------------------------------------------------------------------/
492 void 550 static void
493 unix_chkserv (): 551 unix_chkserv (int ps):
494 サーバからデータ(TUPLE)を受け取る。REPLY構造体にコールバック関数 552 泣若若(TUPLE)REPLY罕篏潟若∽
495 が指定されていればその関数を実行し、REPLY構造体をキューから取り 553 絎違∽違絎茵REPLY罕篏ャ若
496 除く。コールバック関数が指定されていなければREPLY構造体にデータ 554 ゃ潟若∽違絎REPLY罕篏若
497 を引き渡す。 555 綣羝<
498 /-------------------------------------------------------------------*/ 556 綣:
499 void 557 ps - ・膓帥鴻若鴻純宴
500 unix_chkserv(){ 558 /-------------------------------------------------------------------*/
501 int i,k,pkt,npkt; 559 static void
560 unix_chkserv(int ps){
561 int i,pkt,npkt,mode;
562 unsigned int k;
502 REPLY *r,*prev; 563 REPLY *r,*prev;
503 int a; 564 int a;
504 unsigned char * tuple = 0; 565 unsigned char * tuple = 0;
505 566
506 if((i=read(ps,&npkt,INT_SIZE))<0) { 567 if((i=read(ps,&npkt,INT_SIZE))<0) {
507 fprintf(stderr, "size read error! on fd:%d %s\n", ps, 568 perror("read");
508 strerror(errno));
509 exit(1); 569 exit(1);
510 } 570 }
511 pkt = ntohl(npkt); 571 pkt = ntohl(npkt);
512 DEB(printf("pkt: %d\n",pkt)); 572 DEB(printf("pkt: %d\n",pkt));
513 DEB(fprintf(stdout, "psx_chkserv: queue number: %d , size = %d\n", i, pkt)); 573 DEB(fprintf(stdout, "psx_chkserv: queue number: %d , size = %d\n", i, pkt));
514 if((tuple = (unsigned char *)malloc(pkt))==NULL){ 574 if((tuple = (unsigned char *)malloc(pkt))==NULL){
515 fprintf(stderr,"allocate error! errno :%d %s",errno,strerror(errno)); 575 perror("malloc");
516 exit(1); 576 exit(1);
517 } 577 }
518 for(a=0;a<pkt;a+=i) { 578 for(a=0;a<pkt;a+=i) {
519 if((i=unix_read_w(ps,tuple+a,pkt-a))<0) { 579 if((i=unix_read_w(ps,tuple+a,pkt-a))<0) {
520 fprintf(stderr, "psx_chkserv: read error! on i=%d pkt=%d %s\n", 580 fprintf(stderr, "psx_chkserv: read error! on i=%d pkt=%d %s\n",
524 } 584 }
525 585
526 #ifdef COUNT_PACKET 586 #ifdef COUNT_PACKET
527 count_packet('r'); 587 count_packet('r');
528 #endif 588 #endif
529 589 mode = psx_get_mode(tuple);
530 i = tuple[LINDA_ID_OFFSET] * 256 + tuple[LINDA_ID_OFFSET+1]; /* id */ 590 i = psx_get_id(tuple);
531 k = (tuple[LINDA_SEQ_OFFSET] <<24) + 591 k = psx_get_seq(tuple);
532 (tuple[LINDA_SEQ_OFFSET+1]<<16) + 592 PSX_Debug(("psx_chkserv: anser packet size = %d id %d seq %d", pkt,i,k));
533 (tuple[LINDA_SEQ_OFFSET+2]<<8) +
534 (tuple[LINDA_SEQ_OFFSET+3]); /* seq */
535 PSX_Debug(("psx_chkserv: anser packet size = %d id %d seq %d", pkt,i,k));
536 DEB(fprintf(stdout, "psx_chkserv: data from server: %s id=%d seq = %d\n", tuple, i, k)); 593 DEB(fprintf(stdout, "psx_chkserv: data from server: %s id=%d seq = %d\n", tuple, i, k));
537 DEB ( 594 DEB (
538 for(p=reply;p;p=p->next) { 595 for(p=reply;p;p=p->next) {
539 PSX_Debug(printf("psx_queue dump: seq %d mode %c %x %x",p->seq,p->mode,p,p->next)); 596 PSX_Debug(printf("psx_queue dump: seq %d mode %c %x %x",p->seq,p->mode,p,p->next));
540 }) 597 })
541 598
542 for(prev = NULL,r = reply; r; prev = r,r = r->next){ 599 for(prev = NULL,r = reply; r; prev = r,r = r->next){
543 DEB(fprintf(stdout,"seq: %d\n",r->seq);) 600 DEB(fprintf(stdout,"seq: %d\n",r->seq);)
544 if (r->seq == (unsigned int)k){ 601 if (r->seq == k){
545 if(r->callback){ // call callback function 602 if(r->callback){ // call callback function
546 (*r->callback)((char *)tuple,r->obj); 603 (*r->callback)(tuple,r->obj);
547 if (prev == NULL){ 604 if (prev == NULL){
548 reply = r->next; 605 reply = r->next;
549 if(r == r_end) { 606 if(r == r_end) {
550 r_end = r->next; 607 r_end = r->next;
551 } 608 }
553 prev->next = r->next; 610 prev->next = r->next;
554 if(r == r_end) { 611 if(r == r_end) {
555 r_end = prev; 612 r_end = prev;
556 } 613 }
557 } 614 }
558 free(r); 615 psx_free(r);
559 }else{ // normal reply 616 }else{ // normal reply
560 PSX_Debug(("psx_chkserv: copy answer r %x seq %d",r,k)); 617 PSX_Debug(("psx_chkserv: copy answer r %x seq %d",r,k));
561 r->answer = tuple; 618 if(mode == 'a'){
562 r->mode = '!'; 619 r->answer = tuple;
620 }else{
621 r->answer = NULL;
622 }
623 r->mode = '!';
563 } 624 }
564 break; 625 break;
565 } 626 }
566 } 627 }
567 tuple = 0; 628 tuple = 0;
568 if (!r){ 629 if (!r){
569 DEB(fprintf(stdout, "unix_chkserv: accepted seq %d does not match. \n",k)); 630 DEB(fprintf(stdout, "unix_chkserv: accepted seq %d does not match. \n",k));
570 } 631 }
571 } 632 }
572 633
573 void psx_free(char * tuple) 634 void psx_free(void *tuple)
574 { 635 {
575 // free(tuple - LINDA_HEADER_SIZE);
576 free(tuple); 636 free(tuple);
577 } 637 }
578 638
579 /*-------------------------------------------------------------------/ 639 /*-------------------------------------------------------------------/
580 int 640 static unsigned int
581 get_int(unsigned char * tuple, int offset): 641 get_int(unsigned char * tuple, int offset):
582 TUPLEのヘッダに格納された int型 のデータを得るための関数 642 TUPLE主 int 若帥緇∽
583 psx_get_datalength() と psx_get_seq() から呼ばれる。 643 psx_get_datalength() psx_get_seq() 若違
584 644
585 引き数: 645 綣:
586 tuple - ヘッダ情報も含んだTUPLE。psx_reply()で得たものでもいい。 646 tuple - 宴TUPLEpsx_reply()ус
587 offset - 取りだすデータのオフセット。LINDA_DATA_LENGTH_OFFSET 647 offset - 若帥祉LINDA_DATA_LENGTH_OFFSET
588 か LINDA_SEQ_OFFSET。 648 LINDA_SEQ_OFFSET
589 649
590 返り値: 650 菴:
591 指定したオフセットに格納されていた数値(int型) 651 絎祉主医(int)
592 /-------------------------------------------------------------------*/ 652 /-------------------------------------------------------------------*/
593 static 653 static unsigned int
594 int get_int(unsigned char * tuple, int offset){ 654 get_int(unsigned char * tuple, int offset){
595 int i; 655 unsigned int i;
596 i = (tuple[offset] <<24) + 656 i = (tuple[offset] <<24) +
597 (tuple[offset+1]<<16) + 657 (tuple[offset+1]<<16) +
598 (tuple[offset+2]<<8) + 658 (tuple[offset+2]<<8) +
599 (tuple[offset+3]); 659 (tuple[offset+3]);
600 return i; 660 return i;
601 } 661 }
602 662
603 int psx_get_datalength(unsigned char * tuple){ 663 unsigned int
664 psx_get_datalength(unsigned char * tuple){
604 return get_int(tuple,LINDA_DATA_LENGTH_OFFSET); 665 return get_int(tuple,LINDA_DATA_LENGTH_OFFSET);
605 } 666 }
606 667
607 unsigned char *psx_get_data(unsigned char * tuple){ 668 unsigned char *
608 return tuple+LINDA_HEADER_SIZE; 669 psx_get_data(unsigned char * tuple) {
609 } 670 return tuple + LINDA_HEADER_SIZE;
610 671 }
611 int psx_get_seq(unsigned char * tuple){ 672
673 unsigned int
674 psx_get_seq(unsigned char * tuple){
612 return get_int(tuple,LINDA_SEQ_OFFSET); 675 return get_int(tuple,LINDA_SEQ_OFFSET);
613 } 676 }
614 677
615 short psx_get_id(unsigned char * tuple){ 678 unsigned short
616 short s; 679 psx_get_id(unsigned char * tuple){
617 s = tuple[LINDA_ID_OFFSET] * 256 + 680 return (tuple[LINDA_ID_OFFSET] * 256 + tuple[LINDA_ID_OFFSET+1]);
618 tuple[LINDA_ID_OFFSET+1]; 681 }
619 return s; 682
620 } 683 unsigned char
621 684 psx_get_mode(unsigned char * tuple){
622 char psx_get_mode(unsigned char * tuple){
623 return tuple[LINDA_MODE_OFFSET]; 685 return tuple[LINDA_MODE_OFFSET];
624 } 686 }
625 687
688 static
689 void
690 set_int_to_char(unsigned char * tuple, int i, int offset){
691 tuple[offset] = (i>>24) & 0xff;
692 tuple[offset+1] = (i>>16) & 0xff;
693 tuple[offset+2] = (i>>8) & 0xff;
694 tuple[offset+3] = (i) & 0xff;
695 }
696
697 void
698 psx_set_datalength(unsigned char * tuple, int length){
699 set_int_to_char(tuple,length,LINDA_DATA_LENGTH_OFFSET);
700 }
701
702
703 void
704 psx_set_seq(unsigned char * tuple, int seq){
705 set_int_to_char(tuple,seq,LINDA_SEQ_OFFSET);
706 }
707
708 void
709 psx_set_id(unsigned char * tuple, short id){
710 tuple[LINDA_ID_OFFSET] = id >> 8;
711 tuple[LINDA_ID_OFFSET+1] = id & 0xff;
712 }
713
714 void
715 psx_set_mode(unsigned char * tuple, char mode){
716 tuple[LINDA_MODE_OFFSET] = mode;
717 }
718
719
626 720
627 /* end */ 721 /* end */