Mercurial > hg > Gears > Gears
comparison src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc @ 1022:635ccc391642
Organize repository
author | ichikitakahiro <e165713@ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 31 Mar 2022 13:23:08 +0900 |
parents | a9c630cc1c65 |
children | 793b21a8ea12 |
comparison
equal
deleted
inserted
replaced
1014:a9c630cc1c65 | 1022:635ccc391642 |
---|---|
23 struct LocalDGMQueue* LocalDGMQueue = new LocalDGMQueue(); | 23 struct LocalDGMQueue* LocalDGMQueue = new LocalDGMQueue(); |
24 LocalDGMQueue->top = new Element(); // allocate a free node | 24 LocalDGMQueue->top = new Element(); // allocate a free node |
25 LocalDGMQueue->top->next = NULL; | 25 LocalDGMQueue->top->next = NULL; |
26 LocalDGMQueue->last = LocalDGMQueue->top; | 26 LocalDGMQueue->last = LocalDGMQueue->top; |
27 LocalDGMQueue->atomic = createAtomicReference(context); | 27 LocalDGMQueue->atomic = createAtomicReference(context); |
28 LocalDGMQueue->socket = createSocket(sNum); | 28 LocalDGMQueue->socket = createSocketLocalDGMQueue(sNum); |
29 | 29 |
30 tQueue->tQueue = (union Data*)LocalDGMQueue; | 30 tQueue->tQueue = (union Data*)LocalDGMQueue; |
31 tQueue->take = C_takeLocalDGMQueue; | 31 tQueue->take = C_takeLocalDGMQueue; |
32 tQueue->put = C_putLocalDGMQueue; | 32 tQueue->put = C_putLocalDGMQueue; |
33 tQueue->isEmpty = C_isEmptyLocalDGMQueue; | 33 tQueue->isEmpty = C_isEmptyLocalDGMQueue; |
34 tQueue->clear = C_clearLocalDGMQueue; | 34 tQueue->clear = C_clearLocalDGMQueue; |
35 tQueue->getData = C_getDataLocalDGMQueue; | 35 tQueue->getData = C_getDataLocalDGMQueue; |
36 return tQueue; | 36 return tQueue; |
37 } | 37 } |
38 | 38 |
39 int* createSocket(char* sNum){ | 39 int* createSocketLocalDGMQueue(char* sNum){ |
40 int w_addr, c_sock; | 40 int w_addr, c_sock; |
41 struct sockaddr_in a_addr; | 41 struct sockaddr_in a_addr; |
42 char *hostname = "Localhost"; | 42 char *hostname = "Localhost"; |
43 char *service = sNum; | 43 char *service = sNum; |
44 struct addrinfo hints, *res0, *res; | 44 struct addrinfo hints, *res0, *res; |
102 struct Element* top = tQueue->top; | 102 struct Element* top = tQueue->top; |
103 struct Atomic* atomic = tQueue->atomic; | 103 struct Atomic* atomic = tQueue->atomic; |
104 goto atomic->checkAndSet(&tQueue->top, top, NULL, next(...), clearLocalDGMQueue); | 104 goto atomic->checkAndSet(&tQueue->top, top, NULL, next(...), clearLocalDGMQueue); |
105 } | 105 } |
106 | 106 |
107 __code getTrance(struct LocalDGMQueue* tQueue, union Data* data, __code next(...)){ | 107 __code getTranceLocalDGMQueue(struct LocalDGMQueue* tQueue, union Data* data, __code next(...)){ |
108 printf("get"); | 108 printf("get"); |
109 goto next(...); | 109 goto next(...); |
110 } | 110 } |
111 | 111 |
112 __code putLocalDGMQueue(struct LocalDGMQueue* tQueue, union Data* data, __code next(...)) { | 112 __code putLocalDGMQueue(struct LocalDGMQueue* tQueue, union Data* data, __code next(...)) { |
175 goto whenEmpty(...); | 175 goto whenEmpty(...); |
176 } | 176 } |
177 goto next(...); | 177 goto next(...); |
178 } | 178 } |
179 | 179 |
180 __code getData(struct LocalDGMQueue* tQueue, __code next(...), __code whenEOF(...)){ | 180 __code getDataLocalDGMQueue(struct LocalDGMQueue* tQueue, __code next(...), __code whenEOF(...)){ |
181 int recv_size, send_size; | 181 int recv_size, send_size; |
182 char recv_buf[BUF_SIZE], send_buf; | 182 char recv_buf[BUF_SIZE], send_buf; |
183 | 183 |
184 /* クライアントから文字列を受信 */ | 184 /* クライアントから文字列を受信 */ |
185 union Data* recv_data; | 185 union Data* recv_data; |
186 recv_size = recv(tQueue->socket, recv_data, sizeof(union Data), 0); | 186 recv_size = read(tQueue->socket, recv_data, sizeof(union Data)); |
187 //printf("[%s] [%d]\n", fileString->str, fileString->size); | 187 //printf("[%s] [%d]\n", fileString->str, fileString->size); |
188 printf("size is %d\n", sizeof(union Data)); | |
189 if (recv_size == -1) { | 188 if (recv_size == -1) { |
190 printf("recv error\n"); | 189 printf("recv error\n"); |
191 goto exit_code(); | 190 goto exit_code(); |
192 } | 191 } |
193 if (recv_size == 0) { | 192 if (recv_size == 0) { |
203 FileString* fileString = NEW(FileString); | 202 FileString* fileString = NEW(FileString); |
204 fileString = recv_data; | 203 fileString = recv_data; |
205 if (strcmp(fileString->str, "finish") == 0) { | 204 if (strcmp(fileString->str, "finish") == 0) { |
206 /* 接続終了を表す0を送信 */ | 205 /* 接続終了を表す0を送信 */ |
207 send_buf = 0; | 206 send_buf = 0; |
208 send_size = send(tQueue->socket, &send_buf, 1, 0); | 207 send_size = write(tQueue->socket, &send_buf, 1); |
209 if (send_size == -1) { | 208 if (send_size == -1) { |
210 printf("send error\n"); | 209 printf("send error\n"); |
211 } | 210 } |
212 close(tQueue->buffer); | 211 close(tQueue->buffer); |
213 goto whenEOF(...); | 212 goto whenEOF(...); |
214 } else { | 213 } else { |
215 /* "finish"以外の場合はクライアントとの接続を継続 */ | 214 /* "finish"以外の場合はクライアントとの接続を継続 */ |
216 send_buf = 1; | 215 send_buf = 1; |
217 send_size = send(tQueue->socket, &send_buf, 1, 0); | 216 send_size = write(tQueue->socket, &send_buf, 1); |
218 if (send_size == -1) { | 217 if (send_size == -1) { |
219 printf("send error\n"); | 218 printf("send error\n"); |
220 } | 219 } |
221 } | 220 } |
222 | 221 |
223 | |
224 | |
225 Gearef(context, TQueue)->data = recv_data; | 222 Gearef(context, TQueue)->data = recv_data; |
226 | |
227 goto putLocalDGMQueue(recv_data, next); | 223 goto putLocalDGMQueue(recv_data, next); |
228 } | 224 } |
229 | 225 |
230 __code sendData(struct RemoteDGMQueue* tQueue, struct FileString* string, __code next(...)){ | 226 __code sendDataLocalDGMQueue(struct RemoteDGMQueue* tQueue, struct FileString* string, __code next(...)){ |
231 } | 227 } |
232 | 228 |