572
|
1 // $Id$
|
539
|
2 //
|
|
3
|
|
4 /*----------------------------------------------------------------------
|
572
|
5 インクルードファイル読み込み
|
539
|
6 ----------------------------------------------------------------------*/
|
|
7 #include <stdio.h>
|
|
8 #include <string.h>
|
572
|
9 #include <stdlib.h>
|
|
10 #include <sys/time.h>
|
539
|
11 #include <unistd.h>
|
572
|
12 #include <netinet/in.h>
|
539
|
13 #include <sys/select.h>
|
|
14 #include <sys/types.h>
|
|
15 #include <sys/socket.h>
|
572
|
16 #include <netdb.h>
|
539
|
17 #include <netinet/tcp.h>
|
572
|
18 #include <sys/un.h>
|
539
|
19 #include <errno.h>
|
572
|
20 #include <arpa/inet.h>
|
539
|
21
|
|
22 #include "lindaapi.h"
|
|
23
|
572
|
24
|
539
|
25 #if 0
|
|
26 #define PSX_Debug(deb) (putchar(PS_DEB)),\
|
|
27 (printf deb ),\
|
|
28 (putchar(PS_DEB))
|
572
|
29 #define DEB(a)
|
539
|
30 #else
|
572
|
31 #define PSX_Debug(deb)
|
539
|
32 #define DEB(a) /* a */
|
|
33 #endif
|
|
34
|
572
|
35 /* Global Variables */
|
|
36 static COMMAND *q_top, *q_end; /* コマンドキュー */
|
|
37 static REPLY *reply, *r_end; /* 受け取り用キュー */
|
|
38 static int qsize; /* コマンドキューのサイズ */
|
|
39 static fd_set g_fds; /* 接続しているタプルスペース群のFD(FileDiscripter)を保持 */
|
|
40 static int g_max_fds; /* 監視するFDの最大値 */
|
539
|
41
|
572
|
42 /* Static Functions */
|
|
43 static void unix_chkserv(int ps);
|
|
44 void psx_free(void *);
|
|
45 static int psx_queue(unsigned int tspace_id, unsigned int id,
|
|
46 unsigned int size, unsigned char *data, char mode,
|
|
47 void(*callback)(unsigned char *,void *),void * obj);
|
539
|
48
|
|
49 #ifdef COUNT_PACKET
|
|
50 // print packet count message per PRINT_INTERVAL sec
|
|
51 #define PRINT_INTERVAL 4
|
572
|
52 static void count_packet(char type);
|
539
|
53
|
|
54 /*-------------------------------------------------------------------/
|
572
|
55 static void
|
539
|
56 count_packet (char type):
|
572
|
57 パケットの送受信カウントする
|
|
58
|
|
59 引き数:
|
|
60 type - 送信、受信 (char型: s,r)
|
539
|
61 /-------------------------------------------------------------------*/
|
572
|
62 static void
|
|
63 count_packet(char type)
|
539
|
64 {
|
|
65 static int send_packet=-1,receive_packet=0;
|
|
66 static struct timeval start,now,previous;
|
572
|
67
|
539
|
68 if (out_packet == -1) {
|
|
69 gettimeofday(&start,NULL);
|
|
70 gettimeofday(&previous,NULL);
|
|
71 send_packet = 0;
|
|
72 printf("packet\tout\tread\t\ttime\n");
|
|
73 }
|
572
|
74
|
539
|
75 if (type == 's') {
|
|
76 send_packet++;
|
|
77 } else if (type == 'r') {
|
|
78 receive_packet++;
|
|
79 } else {
|
|
80 fprintf(stderr,"No type in count_packet function\n");
|
|
81 return;
|
|
82 }
|
572
|
83
|
539
|
84 gettimeofday(&now,NULL);
|
|
85 if ((now.tv_sec-previous.tv_sec) > PRINT_INTERVAL) {
|
|
86 printf("log\t%d\t%d\t%ld\n",
|
|
87 send_packet,receive_packet,now.tv_sec-start.tv_sec);
|
|
88 fflush(stdout);
|
572
|
89
|
539
|
90 previous.tv_sec = now.tv_sec;
|
|
91 send_packet = receive_packet = 0;
|
|
92 }
|
|
93 }
|
|
94 #endif
|
|
95
|
|
96
|
|
97 #define unix_read_w read
|
572
|
98
|
|
99
|
|
100 static int
|
|
101 unix_write(int fd,unsigned char *buf,unsigned int size) {
|
|
102 unsigned int count=0;
|
|
103 uint32_t nsize;
|
539
|
104
|
572
|
105 /* これから送信するデータのサイズをまず送信 */
|
|
106 nsize = htonl(size);
|
|
107 write(fd, &nsize, INT_SIZE);
|
|
108
|
|
109 /* 目的のデータを送信 */
|
|
110 while (count < size) {
|
|
111 count += write(fd, buf+count, size-count);
|
539
|
112 }
|
|
113 #ifdef COUNT_PACKET
|
|
114 count_packet('s');
|
|
115 #endif
|
572
|
116 return count+INT_SIZE;
|
539
|
117 }
|
|
118
|
|
119 #define unix_write_w unix_write
|
|
120
|
|
121 #define SERV_NAME unix_port
|
|
122 #define PROTO_NAME "tcp"
|
|
123 #define SERVER_NAME hostname
|
|
124 #define MAX_REQ 16
|
|
125
|
572
|
126
|
|
127
|
|
128 /*-------------------------------------------------------------------/
|
|
129 void
|
|
130 init_linda():
|
|
131 大域変数の初期化等を行なう
|
|
132 /-------------------------------------------------------------------*/
|
|
133 void
|
|
134 init_linda() {
|
|
135 FD_ZERO(&g_fds);
|
|
136 /* 大域変数はゼロクリアされる
|
|
137 g_max_fds = 0;
|
|
138 q_end = q_top = NULL;
|
|
139 r_end = reply = NULL;
|
|
140 qsize = 0;
|
|
141 */
|
|
142 }
|
539
|
143
|
|
144
|
|
145 /*-------------------------------------------------------------------/
|
|
146 int
|
572
|
147 open_linda (char * hostname, int port):
|
|
148 Lindaサーバとのコネクションを確立し、タプルスペースのIDを返す。
|
|
149 現在はファイルディスクリプタを返している。
|
539
|
150
|
572
|
151 引き数:
|
|
152 hostname - サーバのホスト名
|
|
153 port - サーバのポート番号
|
|
154 返り値:
|
|
155 コネクション確立が成功するとそのファイルディスクリプタを返す。
|
|
156 失敗すると -1 を返す。
|
539
|
157 /-------------------------------------------------------------------*/
|
|
158 int
|
572
|
159 open_linda(char * hostname, int port){
|
|
160 int fd;
|
|
161 struct hostent *hoste;
|
|
162 struct sockaddr_in serv_addr;
|
|
163 struct sockaddr_un serv_addr_un;
|
|
164
|
539
|
165 if (hostname[0]=='/') {
|
|
166 /* Unix domain */
|
|
167 if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == FAIL){
|
572
|
168 perror("socket");
|
539
|
169 return(-1);
|
|
170 }
|
|
171 serv_addr_un.sun_family = AF_UNIX;
|
|
172 strcpy(serv_addr_un.sun_path, hostname);
|
572
|
173 fprintf(stdout,"connecting ... %d\n", serv_addr.sin_port);
|
539
|
174 if (connect(fd, (struct sockaddr *)&serv_addr_un,sizeof(serv_addr_un)) == FAIL){
|
572
|
175 perror("connect");
|
539
|
176 close(fd);
|
|
177 return(-1);
|
|
178 }
|
572
|
179
|
539
|
180 } else {
|
|
181 /* INET domain */
|
|
182 if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == FAIL){
|
572
|
183 perror("socket");
|
539
|
184 return(-1);
|
|
185 }
|
572
|
186 if ((hoste = gethostbyname(SERVER_NAME)) == NULL){
|
539
|
187 fprintf(stderr,"hostname error\n");
|
|
188 close(fd);
|
|
189 return(-1);
|
|
190 }
|
|
191 serv_addr.sin_family = AF_INET;
|
572
|
192 serv_addr.sin_port = port;
|
539
|
193 serv_addr.sin_addr.s_addr = ((struct in_addr *)(hoste->h_addr))->s_addr;
|
|
194 if (serv_addr.sin_family == AF_INET) {
|
|
195 int tmp = 1;
|
|
196 setsockopt (fd, IPPROTO_TCP, TCP_NODELAY,
|
|
197 (char *) &tmp, sizeof (int));
|
|
198 }
|
572
|
199 fprintf(stdout,"connecting ... %d\n", serv_addr.sin_port);
|
539
|
200 if (connect(fd, (struct sockaddr *)&serv_addr,sizeof(serv_addr)) == FAIL){
|
|
201 fprintf(stderr,"connection error! errno :%d %s\n", errno,
|
|
202 strerror(errno));
|
|
203 close(fd);
|
|
204 return(-1);
|
|
205 }
|
|
206 }
|
|
207
|
572
|
208 FD_SET(fd, &g_fds);
|
|
209 if (g_max_fds < fd) g_max_fds = fd;
|
|
210
|
539
|
211 fprintf(stdout," connect middle server %d\n", fd);
|
572
|
212 return fd;
|
|
213 }
|
|
214
|
|
215 int
|
|
216 open_linda_java(char * hostname, int port){
|
|
217 int fd;
|
|
218 struct hostent *hoste;
|
|
219 struct sockaddr_in serv_addr;
|
|
220 struct sockaddr_un serv_addr_un;
|
|
221
|
|
222 if (hostname[0]=='/') {
|
|
223 /* Unix domain */
|
|
224 if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == FAIL){
|
|
225 perror("socket");
|
|
226 return(-1);
|
|
227 }
|
|
228 serv_addr_un.sun_family = AF_UNIX;
|
|
229 strcpy(serv_addr_un.sun_path, hostname);
|
|
230 DEB(fprintf(stdout,"connecting ... %d\n", serv_addr.sin_port));
|
|
231 if (connect(fd, (struct sockaddr *)&serv_addr_un,sizeof(serv_addr_un)) == FAIL){
|
|
232 perror("connect");
|
|
233 close(fd);
|
|
234 return(-1);
|
|
235 }
|
|
236
|
|
237 } else {
|
|
238 /* INET domain */
|
|
239 if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == FAIL){
|
|
240 perror("socket");
|
|
241 return(-2);
|
|
242 }
|
|
243 serv_addr.sin_family = AF_INET;
|
|
244 serv_addr.sin_port = htons(port);
|
|
245
|
|
246 serv_addr.sin_addr.s_addr = inet_addr(hostname);
|
|
247 if (serv_addr.sin_addr.s_addr == 0xffffffff) {
|
|
248 if ((hoste = gethostbyname(hostname)) == NULL){
|
|
249 fprintf(stdout, "hostname error\n");
|
|
250 close(fd);
|
|
251 return(-1);
|
|
252 }
|
|
253 serv_addr.sin_addr.s_addr = *(unsigned int *)hoste->h_addr_list[0];
|
|
254 }
|
|
255
|
|
256 if (serv_addr.sin_family == AF_INET) {
|
|
257 int tmp = 1;
|
|
258 setsockopt (fd, IPPROTO_TCP, TCP_NODELAY,
|
|
259 (char *) &tmp, sizeof (int));
|
|
260 }
|
|
261 DEB(fprintf(stdout,"connecting ... %d \n", ntohs(serv_addr.sin_port)));
|
|
262 DEB(fprintf(stdout," serv_addr.sin_port ... %d \n", ntohs(serv_addr.sin_port)));
|
|
263 //fprintf(stdout," serv_addr.sin_addr.s_addr... %s\n", serv_addr.sin_addr.s_addr);
|
|
264 if (connect(fd, (struct sockaddr *)&serv_addr,sizeof(serv_addr)) == FAIL){
|
|
265 perror("connect");
|
|
266 close(fd);
|
|
267 return(-4);
|
|
268 }
|
|
269 }
|
|
270
|
|
271 FD_SET(fd, &g_fds);
|
|
272 if (g_max_fds < fd) g_max_fds = fd;
|
|
273
|
|
274 DEB(fprintf(stdout," connect middle server %d\n", fd));
|
|
275 return fd;
|
|
276 }
|
|
277
|
|
278
|
|
279 /*-------------------------------------------------------------------/
|
|
280 int
|
|
281 close_linda(int tspace_id):
|
|
282 接続しているタプルスペースへの接続を切る。
|
|
283 ソケットを閉じ、g_fds から外す。
|
|
284 引数:
|
|
285 tspace_id - 閉じるタプルスペースのID
|
|
286 返り値:
|
|
287 close の値
|
|
288 /-------------------------------------------------------------------*/
|
|
289 int
|
|
290 close_linda(int tspace_id){
|
|
291 int retval;
|
|
292 int i;
|
|
293 if ((retval = close(tspace_id)) == 0) {
|
|
294 FD_CLR(tspace_id, &g_fds);
|
|
295 if (g_max_fds == tspace_id) {
|
|
296 for (i = g_max_fds-1; FD_ISSET(i, &g_fds) && i; i--);
|
|
297 g_max_fds = i;
|
|
298 }
|
|
299 }
|
|
300 return retval;
|
539
|
301 }
|
|
302
|
|
303 /*-------------------------------------------------------------------/
|
|
304 int
|
572
|
305 psx_out (unsigned int tspace_id, unsigned int id,
|
|
306 unsigned char *data, unsigned int size):
|
|
307 outコマンドをCOMMANDキューへ溜める。
|
539
|
308
|
572
|
309 引き数:
|
|
310 tspace_id - タプルスペースのID
|
|
311 id - タプルのID
|
|
312 data - 送信するデータ
|
|
313 size - dataのサイズ
|
|
314 返り値:
|
|
315 シーケンス番号
|
539
|
316 /-------------------------------------------------------------------*/
|
|
317 int
|
572
|
318 psx_out(unsigned int tspace_id, unsigned int id,
|
|
319 unsigned char *data, unsigned int size){
|
|
320 int r;
|
|
321 if ((r = psx_queue(tspace_id, id, size, data, 'o', NULL, NULL)) == FAIL) {
|
539
|
322 return(FAIL);
|
|
323 }
|
|
324 DEB( fprintf(stdout, "psx_out: size = %d, command = %s\n",
|
|
325 q_end->size, q_end->command+LINDA_HEADER_SIZE));
|
572
|
326 return(r);
|
539
|
327 }
|
|
328
|
|
329 /*-------------------------------------------------------------------/
|
|
330 int
|
572
|
331 psx_ld (unsigned tspace_id, unsigned int id,
|
|
332 char mode, void(*callback)(char*,void*), void * obj):
|
|
333 in,read,waitなどの受信コマンドをCOMMANDキューへ溜める。
|
|
334 psx_in,psx_rd,psx_wait_rdなどに置き換えられている。
|
|
335
|
|
336 引き数:
|
|
337 tspace_id- タプルスペースのID
|
|
338 id - タプルのID
|
|
339 mode - i,r,w の文字を取り、各々in,read,waitを表している。
|
|
340 callback - コールバックを使用する場合の関数へのポインタ。
|
|
341 使用しない場合はNULLをいれる。
|
|
342 obj - コールバックで用いる関数の引き数。
|
|
343 返り値:
|
|
344 psx_queue内でmallocされたREPLY構造体へのポインタ
|
539
|
345 /-------------------------------------------------------------------*/
|
|
346 int
|
572
|
347 psx_ld(unsigned int tspace_id, unsigned int id,
|
|
348 char mode, void(*callback)(unsigned char *,void *), void * obj){
|
539
|
349 int r;
|
572
|
350 if ((r = psx_queue(tspace_id, id, 0, NULL, mode, callback, obj)) == FAIL) {
|
539
|
351 return(FAIL);
|
|
352 }
|
|
353 return(r);
|
|
354 }
|
|
355
|
|
356 /*-------------------------------------------------------------------/
|
|
357 unsigned char *
|
|
358 psx_reply (int seq):
|
572
|
359 サーバから答えが来たデータを返す。
|
539
|
360
|
572
|
361 引き数:
|
|
362 seq - psx_ld()が返した値。
|
|
363 返り値:
|
|
364 seqに対応したデータを返す。データをまだ受信していない場合は
|
|
365 NULLを返す。
|
539
|
366 /-------------------------------------------------------------------*/
|
|
367 unsigned char *
|
572
|
368 psx_reply(unsigned int seq){
|
539
|
369 REPLY *p, *q;
|
572
|
370 unsigned char *ans;
|
|
371
|
539
|
372 DEB(fprintf(stdout, "psx_reply: search of seq = %d\n", seq));
|
572
|
373 PSX_Debug(("psx_reply: seq %d", seq));
|
|
374 for(q = NULL,p = reply; p; q = p,p = p->next){
|
|
375 if (p->seq == seq){
|
539
|
376 DEB(fprintf(stdout, "psx_reply: match of seq = %d\n", seq));
|
|
377 if (p->mode == '!'){
|
572
|
378 ans = p->answer;
|
539
|
379 if (q == NULL){
|
|
380 reply = p->next;
|
|
381 if(p==r_end) {
|
|
382 r_end = p->next;
|
|
383 }
|
|
384 } else {
|
|
385 q->next = p->next;
|
|
386 if(p==r_end) {
|
|
387 r_end = q;
|
|
388 }
|
|
389 }
|
572
|
390 PSX_Debug(("psx_reply: reply %x r_end %x p %x q %x",reply,r_end,p,q));
|
|
391 psx_free(p);
|
539
|
392 DEB( for(p=reply;p;p=p->next) { PSX_Debug(("psx_queue dump: seq %d mode %c %x %x",p->seq,p->mode,p,p->next))});
|
|
393 DEB( fprintf(stdout, "psx_reply: returned answer = %s\n", ans));
|
|
394 PSX_Debug(("psx_reply: answer %s",ans));
|
572
|
395 return(ans);
|
539
|
396 } else {
|
|
397 if (p->mode == '?'){
|
|
398 DEB(fprintf(stdout, "psx_reply: don't accept anser\n"));
|
|
399 return(NULL);
|
|
400 }
|
|
401 }
|
|
402 }
|
|
403
|
|
404 }
|
|
405 PSX_Debug(("psx_reply: no match seq %d",seq));
|
|
406 DEB(fprintf(stdout, "psx_reply: no match of seq\n"));
|
|
407 return(NULL);
|
|
408 }
|
|
409
|
|
410 /*-------------------------------------------------------------------/
|
|
411 void
|
|
412 psx_sync_n ():
|
572
|
413 サーバとデータの送受信をする。COMMANDキューに溜まったデータを
|
|
414 送信し、サーバから送られて来たデータを対応するREPLYへいれる。
|
539
|
415 /-------------------------------------------------------------------*/
|
572
|
416 #define TIMEDELTA 10
|
539
|
417 void
|
|
418 psx_sync_n(){
|
|
419 int acount;
|
572
|
420 int i;
|
539
|
421 COMMAND *c, *t;
|
|
422
|
572
|
423 fd_set tmp;
|
539
|
424 struct timeval timeout;
|
|
425 timeout.tv_sec=0;
|
|
426 timeout.tv_usec=TIMEDELTA * 1000;
|
|
427
|
|
428 acount = 0;
|
|
429 while (q_top != NULL){
|
|
430 c = q_top;
|
572
|
431 unix_write_w(c->tspace_id, c->command, c->size);
|
|
432 psx_free(c->command);
|
539
|
433 t = c->next;
|
572
|
434 psx_free(c);
|
|
435 q_top = c = t;
|
|
436 qsize--;
|
539
|
437 }
|
572
|
438
|
|
439 tmp = g_fds;
|
|
440 while(select(g_max_fds+1, &tmp, NULL, NULL, &timeout) > 0) {
|
|
441 for (i = 0; i < g_max_fds+1; i++) {
|
|
442 if (FD_ISSET(i, &tmp)) {
|
|
443 unix_chkserv(i);
|
|
444 }
|
539
|
445 }
|
572
|
446 }
|
539
|
447 }
|
|
448
|
|
449 /*-------------------------------------------------------------------/
|
572
|
450 static int
|
|
451 psx_queue (unsigned int tspace_id, unsigned int id,
|
|
452 unsigned int size, unsigned char *data, char mode,
|
|
453 void(*callback)(char*,void*), void * obj):
|
|
454 out,in,read,waitなどのコマンドをCOMMANDキューに溜める。データを
|
|
455 受信するコマンド(in,read,wait)のときは受け取ったときにデータを
|
|
456 格納するREPLY構造体を作る。
|
539
|
457
|
572
|
458 引き数:
|
|
459 tspace_id- 送信先タプルスペースのID
|
|
460 id - アクセスするTUPLE SpaceのID
|
|
461 size - dataのサイズ
|
|
462 data - 送信するデータ。受信時はNULL。
|
|
463 mode - コマンドのモード(out,in,read,wait は各々char型: o,i,r,w)
|
|
464 callback - コールバックを使用する場合の関数へのポインタ。
|
|
465 使用しない場合はNULL。
|
|
466 obj - コールバックで用いる関数に引き渡すデータ。
|
|
467 返り値:
|
|
468 成功した場合 - mallocしたREPLY構造体へのポインタ。outの場合は
|
|
469 0が返る。
|
|
470 失敗した場合 - FAIL(-1)が返る。
|
539
|
471 /-------------------------------------------------------------------*/
|
572
|
472 static int
|
|
473 psx_queue(unsigned int tspace_id, unsigned int id,
|
|
474 unsigned int size, unsigned char *data, char mode,
|
|
475 void(*callback)(unsigned char *,void *), void * obj){
|
539
|
476 REPLY *p;
|
|
477 COMMAND *c;
|
572
|
478
|
|
479 if (qsize >= MAX_QUEUE) {
|
539
|
480 // PSX_Debug(("max queue: qsize=%d",qsize));
|
|
481 psx_sync_n();
|
|
482 }
|
572
|
483
|
|
484 if (q_top == NULL) {
|
539
|
485 if ((q_top = (COMMAND *) malloc (sizeof(COMMAND))) == NULL){
|
|
486 return(FAIL);
|
|
487 }
|
|
488 c = q_end = q_top;
|
|
489 } else {
|
|
490 if ((q_end->next = (COMMAND *) malloc (sizeof(COMMAND))) == NULL){
|
|
491 return(FAIL);
|
|
492 }
|
|
493 c = q_end;
|
|
494 q_end = q_end->next;
|
|
495 }
|
572
|
496
|
|
497 /* size は DATASIZE */
|
|
498 if ((q_end->command = (unsigned char *) malloc(size+LINDA_HEADER_SIZE)) == NULL) {
|
|
499 psx_free(q_end);
|
539
|
500 c->next = NULL;
|
|
501 return(FAIL);
|
|
502 }
|
|
503
|
572
|
504 /* データ受け取り要求(in,rd,wait)なら受け取り用の箱を用意 */
|
|
505 if (mode != 'o') {
|
539
|
506 if (reply == NULL){
|
|
507 if ((reply = (REPLY *) malloc (sizeof(REPLY))) == NULL){
|
|
508 return(FAIL);
|
|
509 }
|
|
510 p = r_end = reply; p->next = NULL;
|
|
511 } else {
|
|
512 if ((r_end->next = (REPLY *) malloc (sizeof(REPLY))) == NULL){
|
|
513 return(FAIL);
|
|
514 }
|
|
515 p = r_end->next; r_end = p; p->next = NULL;
|
|
516 }
|
|
517 p->mode = '?';
|
572
|
518 p->seq = (int)p; // 構造体のアドレスで識別
|
539
|
519 p->callback = callback;
|
|
520 p->obj = obj;
|
|
521 PSX_Debug(("psx_queue: seq %d reply %x p %x r_end %x",seq,reply,p,r_end));
|
572
|
522 } else {
|
|
523 p = 0;
|
539
|
524 }
|
|
525 q_end->command[LINDA_MODE_OFFSET] = mode;
|
|
526
|
|
527 q_end->command[LINDA_ID_OFFSET] = id >> 8;
|
|
528 q_end->command[LINDA_ID_OFFSET+1] = id & 0xff;
|
|
529
|
|
530 q_end->command[LINDA_SEQ_OFFSET] = ((int)p>>24) & 0xff;
|
|
531 q_end->command[LINDA_SEQ_OFFSET+1] = ((int)p>>16) & 0xff;
|
|
532 q_end->command[LINDA_SEQ_OFFSET+2] = ((int)p>>8) & 0xff;
|
|
533 q_end->command[LINDA_SEQ_OFFSET+3] = ((int)p) & 0xff;
|
572
|
534
|
539
|
535 q_end->command[LINDA_DATA_LENGTH_OFFSET] = (size>>24) & 0xff;
|
|
536 q_end->command[LINDA_DATA_LENGTH_OFFSET+1] = (size>>16) & 0xff;
|
|
537 q_end->command[LINDA_DATA_LENGTH_OFFSET+2] = (size>>8) & 0xff;
|
|
538 q_end->command[LINDA_DATA_LENGTH_OFFSET+3] = (size) & 0xff;
|
572
|
539
|
539
|
540 q_end->size = size+LINDA_HEADER_SIZE; /* command size */
|
572
|
541 q_end->tspace_id = tspace_id; /* destination id */
|
|
542 q_end->next = NULL;
|
|
543 qsize++;
|
|
544 if (data && size>0)
|
|
545 memcpy(q_end->command+LINDA_HEADER_SIZE, data, size);
|
539
|
546 return((int)p);
|
|
547 }
|
|
548
|
|
549 /*-------------------------------------------------------------------/
|
572
|
550 static void
|
|
551 unix_chkserv (int ps):
|
|
552 サーバからデータ(TUPLE)を受け取る。REPLY構造体にコールバック関数
|
|
553 が指定されていればその関数を実行し、REPLY構造体をキューから取り
|
|
554 除く。コールバック関数が指定されていなければREPLY構造体にデータ
|
|
555 を引き渡す。
|
|
556 引数:
|
|
557 ps - 接続しているタプルスペースのソケット
|
539
|
558 /-------------------------------------------------------------------*/
|
572
|
559 static void
|
|
560 unix_chkserv(int ps){
|
|
561 int i,pkt,npkt,mode;
|
|
562 unsigned int k;
|
539
|
563 REPLY *r,*prev;
|
|
564 int a;
|
|
565 unsigned char * tuple = 0;
|
|
566
|
|
567 if((i=read(ps,&npkt,INT_SIZE))<0) {
|
572
|
568 perror("read");
|
539
|
569 exit(1);
|
|
570 }
|
|
571 pkt = ntohl(npkt);
|
|
572 DEB(printf("pkt: %d\n",pkt));
|
|
573 DEB(fprintf(stdout, "psx_chkserv: queue number: %d , size = %d\n", i, pkt));
|
|
574 if((tuple = (unsigned char *)malloc(pkt))==NULL){
|
572
|
575 perror("malloc");
|
539
|
576 exit(1);
|
|
577 }
|
|
578 for(a=0;a<pkt;a+=i) {
|
|
579 if((i=unix_read_w(ps,tuple+a,pkt-a))<0) {
|
|
580 fprintf(stderr, "psx_chkserv: read error! on i=%d pkt=%d %s\n",
|
|
581 i, pkt, strerror(errno));
|
|
582 exit(1);//close(ps);
|
|
583 }
|
|
584 }
|
|
585
|
|
586 #ifdef COUNT_PACKET
|
|
587 count_packet('r');
|
|
588 #endif
|
572
|
589 mode = psx_get_mode(tuple);
|
|
590 i = psx_get_id(tuple);
|
|
591 k = psx_get_seq(tuple);
|
|
592 PSX_Debug(("psx_chkserv: anser packet size = %d id %d seq %d", pkt,i,k));
|
539
|
593 DEB(fprintf(stdout, "psx_chkserv: data from server: %s id=%d seq = %d\n", tuple, i, k));
|
|
594 DEB (
|
|
595 for(p=reply;p;p=p->next) {
|
|
596 PSX_Debug(printf("psx_queue dump: seq %d mode %c %x %x",p->seq,p->mode,p,p->next));
|
|
597 })
|
572
|
598
|
539
|
599 for(prev = NULL,r = reply; r; prev = r,r = r->next){
|
|
600 DEB(fprintf(stdout,"seq: %d\n",r->seq);)
|
572
|
601 if (r->seq == k){
|
539
|
602 if(r->callback){ // call callback function
|
572
|
603 (*r->callback)(tuple,r->obj);
|
539
|
604 if (prev == NULL){
|
|
605 reply = r->next;
|
|
606 if(r == r_end) {
|
|
607 r_end = r->next;
|
|
608 }
|
|
609 } else {
|
|
610 prev->next = r->next;
|
|
611 if(r == r_end) {
|
|
612 r_end = prev;
|
|
613 }
|
|
614 }
|
572
|
615 psx_free(r);
|
539
|
616 }else{ // normal reply
|
|
617 PSX_Debug(("psx_chkserv: copy answer r %x seq %d",r,k));
|
572
|
618 if(mode == 'a'){
|
|
619 r->answer = tuple;
|
|
620 }else{
|
|
621 r->answer = NULL;
|
|
622 }
|
|
623 r->mode = '!';
|
539
|
624 }
|
|
625 break;
|
|
626 }
|
|
627 }
|
|
628 tuple = 0;
|
|
629 if (!r){
|
|
630 DEB(fprintf(stdout, "unix_chkserv: accepted seq %d does not match. \n",k));
|
|
631 }
|
|
632 }
|
|
633
|
572
|
634 void psx_free(void *tuple)
|
539
|
635 {
|
|
636 free(tuple);
|
|
637 }
|
|
638
|
|
639 /*-------------------------------------------------------------------/
|
572
|
640 static unsigned int
|
539
|
641 get_int(unsigned char * tuple, int offset):
|
572
|
642 TUPLEのヘッダに格納された int型 のデータを得るための関数
|
|
643 psx_get_datalength() と psx_get_seq() から呼ばれる。
|
539
|
644
|
572
|
645 引き数:
|
|
646 tuple - ヘッダ情報も含んだTUPLE。psx_reply()で得たものでもいい。
|
|
647 offset - 取りだすデータのオフセット。LINDA_DATA_LENGTH_OFFSET
|
|
648 か LINDA_SEQ_OFFSET。
|
539
|
649
|
572
|
650 返り値:
|
|
651 指定したオフセットに格納されていた数値(int型)
|
539
|
652 /-------------------------------------------------------------------*/
|
572
|
653 static unsigned int
|
|
654 get_int(unsigned char * tuple, int offset){
|
|
655 unsigned int i;
|
539
|
656 i = (tuple[offset] <<24) +
|
|
657 (tuple[offset+1]<<16) +
|
|
658 (tuple[offset+2]<<8) +
|
|
659 (tuple[offset+3]);
|
|
660 return i;
|
|
661 }
|
|
662
|
572
|
663 unsigned int
|
|
664 psx_get_datalength(unsigned char * tuple){
|
539
|
665 return get_int(tuple,LINDA_DATA_LENGTH_OFFSET);
|
|
666 }
|
|
667
|
572
|
668 unsigned char *
|
|
669 psx_get_data(unsigned char * tuple) {
|
|
670 return tuple + LINDA_HEADER_SIZE;
|
539
|
671 }
|
|
672
|
572
|
673 unsigned int
|
|
674 psx_get_seq(unsigned char * tuple){
|
539
|
675 return get_int(tuple,LINDA_SEQ_OFFSET);
|
|
676 }
|
|
677
|
572
|
678 unsigned short
|
|
679 psx_get_id(unsigned char * tuple){
|
|
680 return (tuple[LINDA_ID_OFFSET] * 256 + tuple[LINDA_ID_OFFSET+1]);
|
539
|
681 }
|
|
682
|
572
|
683 unsigned char
|
|
684 psx_get_mode(unsigned char * tuple){
|
539
|
685 return tuple[LINDA_MODE_OFFSET];
|
|
686 }
|
|
687
|
572
|
688 static
|
|
689 void
|
|
690 set_int_to_char(unsigned char * tuple, int i, int offset){
|
|
691 tuple[offset] = (i>>24) & 0xff;
|
|
692 tuple[offset+1] = (i>>16) & 0xff;
|
|
693 tuple[offset+2] = (i>>8) & 0xff;
|
|
694 tuple[offset+3] = (i) & 0xff;
|
|
695 }
|
|
696
|
|
697 void
|
|
698 psx_set_datalength(unsigned char * tuple, int length){
|
|
699 set_int_to_char(tuple,length,LINDA_DATA_LENGTH_OFFSET);
|
|
700 }
|
|
701
|
|
702
|
|
703 void
|
|
704 psx_set_seq(unsigned char * tuple, int seq){
|
|
705 set_int_to_char(tuple,seq,LINDA_SEQ_OFFSET);
|
|
706 }
|
|
707
|
|
708 void
|
|
709 psx_set_id(unsigned char * tuple, short id){
|
|
710 tuple[LINDA_ID_OFFSET] = id >> 8;
|
|
711 tuple[LINDA_ID_OFFSET+1] = id & 0xff;
|
|
712 }
|
|
713
|
|
714 void
|
|
715 psx_set_mode(unsigned char * tuple, char mode){
|
|
716 tuple[LINDA_MODE_OFFSET] = mode;
|
|
717 }
|
|
718
|
|
719
|
539
|
720
|
|
721 /* end */
|