comparison src/parallel_execution/examples/socketQueue/LocalDGMQueue.cbc @ 1022:635ccc391642

Organize repository
author ichikitakahiro <e165713@ie.u-ryukyu.ac.jp>
date Thu, 31 Mar 2022 13:23:08 +0900
parents a9c630cc1c65
children 793b21a8ea12
comparison
equal deleted inserted replaced
1014:a9c630cc1c65 1022:635ccc391642
23 struct LocalDGMQueue* LocalDGMQueue = new LocalDGMQueue(); 23 struct LocalDGMQueue* LocalDGMQueue = new LocalDGMQueue();
24 LocalDGMQueue->top = new Element(); // allocate a free node 24 LocalDGMQueue->top = new Element(); // allocate a free node
25 LocalDGMQueue->top->next = NULL; 25 LocalDGMQueue->top->next = NULL;
26 LocalDGMQueue->last = LocalDGMQueue->top; 26 LocalDGMQueue->last = LocalDGMQueue->top;
27 LocalDGMQueue->atomic = createAtomicReference(context); 27 LocalDGMQueue->atomic = createAtomicReference(context);
28 LocalDGMQueue->socket = createSocket(sNum); 28 LocalDGMQueue->socket = createSocketLocalDGMQueue(sNum);
29 29
30 tQueue->tQueue = (union Data*)LocalDGMQueue; 30 tQueue->tQueue = (union Data*)LocalDGMQueue;
31 tQueue->take = C_takeLocalDGMQueue; 31 tQueue->take = C_takeLocalDGMQueue;
32 tQueue->put = C_putLocalDGMQueue; 32 tQueue->put = C_putLocalDGMQueue;
33 tQueue->isEmpty = C_isEmptyLocalDGMQueue; 33 tQueue->isEmpty = C_isEmptyLocalDGMQueue;
34 tQueue->clear = C_clearLocalDGMQueue; 34 tQueue->clear = C_clearLocalDGMQueue;
35 tQueue->getData = C_getDataLocalDGMQueue; 35 tQueue->getData = C_getDataLocalDGMQueue;
36 return tQueue; 36 return tQueue;
37 } 37 }
38 38
39 int* createSocket(char* sNum){ 39 int* createSocketLocalDGMQueue(char* sNum){
40 int w_addr, c_sock; 40 int w_addr, c_sock;
41 struct sockaddr_in a_addr; 41 struct sockaddr_in a_addr;
42 char *hostname = "Localhost"; 42 char *hostname = "Localhost";
43 char *service = sNum; 43 char *service = sNum;
44 struct addrinfo hints, *res0, *res; 44 struct addrinfo hints, *res0, *res;
102 struct Element* top = tQueue->top; 102 struct Element* top = tQueue->top;
103 struct Atomic* atomic = tQueue->atomic; 103 struct Atomic* atomic = tQueue->atomic;
104 goto atomic->checkAndSet(&tQueue->top, top, NULL, next(...), clearLocalDGMQueue); 104 goto atomic->checkAndSet(&tQueue->top, top, NULL, next(...), clearLocalDGMQueue);
105 } 105 }
106 106
107 __code getTrance(struct LocalDGMQueue* tQueue, union Data* data, __code next(...)){ 107 __code getTranceLocalDGMQueue(struct LocalDGMQueue* tQueue, union Data* data, __code next(...)){
108 printf("get"); 108 printf("get");
109 goto next(...); 109 goto next(...);
110 } 110 }
111 111
112 __code putLocalDGMQueue(struct LocalDGMQueue* tQueue, union Data* data, __code next(...)) { 112 __code putLocalDGMQueue(struct LocalDGMQueue* tQueue, union Data* data, __code next(...)) {
175 goto whenEmpty(...); 175 goto whenEmpty(...);
176 } 176 }
177 goto next(...); 177 goto next(...);
178 } 178 }
179 179
180 __code getData(struct LocalDGMQueue* tQueue, __code next(...), __code whenEOF(...)){ 180 __code getDataLocalDGMQueue(struct LocalDGMQueue* tQueue, __code next(...), __code whenEOF(...)){
181 int recv_size, send_size; 181 int recv_size, send_size;
182 char recv_buf[BUF_SIZE], send_buf; 182 char recv_buf[BUF_SIZE], send_buf;
183 183
184 /* クライアントから文字列を受信 */ 184 /* クライアントから文字列を受信 */
185 union Data* recv_data; 185 union Data* recv_data;
186 recv_size = recv(tQueue->socket, recv_data, sizeof(union Data), 0); 186 recv_size = read(tQueue->socket, recv_data, sizeof(union Data));
187 //printf("[%s] [%d]\n", fileString->str, fileString->size); 187 //printf("[%s] [%d]\n", fileString->str, fileString->size);
188 printf("size is %d\n", sizeof(union Data));
189 if (recv_size == -1) { 188 if (recv_size == -1) {
190 printf("recv error\n"); 189 printf("recv error\n");
191 goto exit_code(); 190 goto exit_code();
192 } 191 }
193 if (recv_size == 0) { 192 if (recv_size == 0) {
203 FileString* fileString = NEW(FileString); 202 FileString* fileString = NEW(FileString);
204 fileString = recv_data; 203 fileString = recv_data;
205 if (strcmp(fileString->str, "finish") == 0) { 204 if (strcmp(fileString->str, "finish") == 0) {
206 /* 接続終了を表す0を送信 */ 205 /* 接続終了を表す0を送信 */
207 send_buf = 0; 206 send_buf = 0;
208 send_size = send(tQueue->socket, &send_buf, 1, 0); 207 send_size = write(tQueue->socket, &send_buf, 1);
209 if (send_size == -1) { 208 if (send_size == -1) {
210 printf("send error\n"); 209 printf("send error\n");
211 } 210 }
212 close(tQueue->buffer); 211 close(tQueue->buffer);
213 goto whenEOF(...); 212 goto whenEOF(...);
214 } else { 213 } else {
215 /* "finish"以外の場合はクライアントとの接続を継続 */ 214 /* "finish"以外の場合はクライアントとの接続を継続 */
216 send_buf = 1; 215 send_buf = 1;
217 send_size = send(tQueue->socket, &send_buf, 1, 0); 216 send_size = write(tQueue->socket, &send_buf, 1);
218 if (send_size == -1) { 217 if (send_size == -1) {
219 printf("send error\n"); 218 printf("send error\n");
220 } 219 }
221 } 220 }
222 221
223
224
225 Gearef(context, TQueue)->data = recv_data; 222 Gearef(context, TQueue)->data = recv_data;
226
227 goto putLocalDGMQueue(recv_data, next); 223 goto putLocalDGMQueue(recv_data, next);
228 } 224 }
229 225
230 __code sendData(struct RemoteDGMQueue* tQueue, struct FileString* string, __code next(...)){ 226 __code sendDataLocalDGMQueue(struct RemoteDGMQueue* tQueue, struct FileString* string, __code next(...)){
231 } 227 }
232 228