Mercurial > hg > Gears > Gears
view src/parallel_execution/examples/socketQueue/RemoteDGMQueue.cbc @ 1022:635ccc391642
Organize repository
author | ichikitakahiro <e165713@ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 31 Mar 2022 13:23:08 +0900 |
parents | a9c630cc1c65 |
children | 793b21a8ea12 |
line wrap: on
line source
#include "../../../context.h" #interface "TQueue.h" #interface "Atomic.h" #interface "FileString.h" #interface "DataTransfer.h" #impl "TQueue.h" as "RemoteDGMQueue.h" #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <netdb.h> #define BUF_SIZE 1024 /* * Non-blocking queue of Paper: Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms(https://www.research.ibm.com/people/m/michael/podc-1996.pdf). */ TQueue* createRemoteDGMQueue(struct Context* context, char* sNum) { struct TQueue* tQueue = new TQueue(); struct RemoteDGMQueue* RemoteDGMQueue = new RemoteDGMQueue(); RemoteDGMQueue->top = new Element(); // allocate a free node RemoteDGMQueue->top->next = NULL; RemoteDGMQueue->last = RemoteDGMQueue->top; RemoteDGMQueue->atomic = createAtomicReference(context); RemoteDGMQueue->socket = createSocketRemoteDGMQueue(sNum); tQueue->tQueue = (union Data*)RemoteDGMQueue; tQueue->take = C_takeRemoteDGMQueue; tQueue->put = C_putRemoteDGMQueue; tQueue->isEmpty = C_isEmptyRemoteDGMQueue; tQueue->clear = C_clearRemoteDGMQueue; tQueue->getData = C_getDataRemoteDGMQueue; return tQueue; } int* createSocketRemoteDGMQueue(char *sNum){ int sock; struct sockaddr_in a_addr; char *hostname = "localhost"; char *service = sNum; struct addrinfo hints, *res0, *res; int err; memset(&hints, 0, sizeof(hints)); hints.ai_socktype = SOCK_STREAM; hints.ai_family = PF_UNSPEC; //UNSPECはIPv4,6両方のうち使えるものを返す if((err = getaddrinfo(hostname, service, &hints, &res0)) != 0){ printf("error %d : %s\n", err, gai_strerror(err)); return 1; } for (res=res0; res!=NULL; res=res->ai_next) { sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol); if (sock < 0) { continue; } if (connect(sock, res->ai_addr, res->ai_addrlen) != 0) { close(sock); continue; } break; } if (res == NULL) { /* 有効な接続が出来なかった */ printf("failed\n"); return 1; } freeaddrinfo(res0); printf("Connected!!\n"); return sock; } __code clearRemoteDGMQueue(struct RemoteDGMQueue* tQueue, __code next(...)) { struct Element* top = tQueue->top; struct Atomic* atomic = tQueue->atomic; goto atomic->checkAndSet(&tQueue->top, top, NULL, next(...), clearRemoteDGMQueue); } __code getTranceRemoteDGMQueue(struct RemoteDGMQueue* tQueue, union Data* data, __code next(...)){ printf("get"); goto next(...); } __code putRemoteDGMQueue(struct RemoteDGMQueue* tQueue, union Data* data, __code next(...)) { Element* element = new Element(); element->data = data; element->next = NULL; Element* last = tQueue->last; Element* nextElement = last->next; if (last != tQueue->last) { goto putRemoteDGMQueue(); } if (nextElement == NULL) { struct Atomic* atomic = tQueue->atomic; Gearef(context, TQueue)->data = element->data; goto atomic->checkAndSet(&last->next, nextElement, element, sendDataRemoteDGMQueue, putRemoteDGMQueue); //書き換え } else { struct Atomic* atomic = tQueue->atomic; goto atomic->checkAndSet(&tQueue->last, last, nextElement, putRemoteDGMQueue, putRemoteDGMQueue); } } __code takeRemoteDGMQueue(struct RemoteDGMQueue* tQueue, __code next(union Data* data, ...)) { struct Element* top = tQueue->top; struct Element* last = tQueue->last; struct Element* nextElement = top->next; if (top != tQueue->top) { goto takeRemoteDGMQueue(); } if (top == last) { if (nextElement != NULL) { struct Atomic* atomic = tQueue->atomic; goto atomic->checkAndSet(&tQueue->last, last, nextElement, takeRemoteDGMQueue, takeRemoteDGMQueue); } } else { struct Atomic* atomic = tQueue->atomic; goto atomic->checkAndSet(&tQueue->top, top, nextElement, takeRemoteDGMQueue1, takeRemoteDGMQueue); } goto takeRemoteDGMQueue(); } __code takeRemoteDGMQueue1(struct RemoteDGMQueue* tQueue, __code next(union Data* data, ...), struct Element* nextElement) { data = nextElement->data; Gearef(context, TQueue)->data = data; goto next(data, ...); } __code takeRemoteDGMQueue1_stub(struct Context* context) { RemoteDGMQueue* tQueue = (RemoteDGMQueue*)GearImpl(context, TQueue, tQueue); enum Code next = Gearef(context, TQueue)->next; Data** O_data = &Gearef(context, TQueue)->data; goto takeRemoteDGMQueue1(context, tQueue, next, O_data, (struct Element*)Gearef(context, Atomic)->newData); } __code isEmptyRemoteDGMQueue(struct RemoteDGMQueue* tQueue, __code next(...), __code whenEmpty(...)) { struct Element* top = tQueue->top; struct Element* last = tQueue->last; struct Element* nextElement = top->next; if (top != tQueue->top) { goto isEmptyRemoteDGMQueue(); } if (top == last && nextElement == NULL) { goto whenEmpty(...); } goto next(...); } __code sendDataRemoteDGMQueue(struct RemoteDGMQueue* tQueue, union Data* data, __code next(...)){ char recv_buf; int send_size, recv_size; /* サーバーに送る文字列を取得 */ //memcpy(send_buf, string->str, sizeof(send_buf)); /* 文字列を送信 */ send_size = write(tQueue->socket, data, sizeof(union Data)); if (send_size == -1) { printf("send error\n"); close(tQueue->socket); goto exit_code(); } /* サーバーからの応答を受信 */ recv_size = read(tQueue->socket, &recv_buf, 1); if (recv_size == -1) { printf("recv error\n"); close(tQueue->socket); goto exit_code(); } if (recv_size == 0) { /* 受信サイズが0の場合は相手が接続閉じていると判断 */ printf("connection ended\n"); close(tQueue->socket); goto exit_code(); } /* 応答が0の場合はデータ送信終了 */ if (recv_buf == 0) { printf("Finish connection\n"); close(tQueue->socket); goto exit_code(); } goto next(...); } __code sendDataRemoteDGMQueue_stub(struct Context* context) { RemoteDGMQueue* tQueue = (RemoteDGMQueue*)GearImpl(context, TQueue, tQueue); union Data* data = Gearef(context, TQueue)->data; enum Code next = Gearef(context, TQueue)->next; goto sendDataRemoteDGMQueue(context, tQueue, data, next); } __code getDataRemoteDGMQueue(struct RemoteDGMQueue* tQueue, __code next(...), __code whenEOF(...)){ } __code closeSocketRemoteDGMQueue(struct RemoteDGMQueue* tQueue, __code next(...)){ }