view src/parallel_execution/examples/socketQueue/RemoteDGMQueue.cbc @ 1022:635ccc391642

Organize repository
author ichikitakahiro <e165713@ie.u-ryukyu.ac.jp>
date Thu, 31 Mar 2022 13:23:08 +0900
parents a9c630cc1c65
children 793b21a8ea12
line wrap: on
line source

#include "../../../context.h"
#interface "TQueue.h"
#interface "Atomic.h"
#interface "FileString.h"
#interface "DataTransfer.h"
#impl "TQueue.h" as "RemoteDGMQueue.h"

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netdb.h>
#define BUF_SIZE 1024 

/*
 * Non-blocking queue of Paper: Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms(https://www.research.ibm.com/people/m/michael/podc-1996.pdf).
 */

TQueue* createRemoteDGMQueue(struct Context* context, char* sNum) {
    struct TQueue* tQueue = new TQueue();
    struct RemoteDGMQueue* RemoteDGMQueue = new RemoteDGMQueue();
    RemoteDGMQueue->top = new Element(); // allocate a free node
    RemoteDGMQueue->top->next = NULL;
    RemoteDGMQueue->last = RemoteDGMQueue->top;
    RemoteDGMQueue->atomic = createAtomicReference(context);
    RemoteDGMQueue->socket = createSocketRemoteDGMQueue(sNum); 

    tQueue->tQueue = (union Data*)RemoteDGMQueue;
    tQueue->take  = C_takeRemoteDGMQueue;
    tQueue->put  = C_putRemoteDGMQueue;
    tQueue->isEmpty = C_isEmptyRemoteDGMQueue;
    tQueue->clear = C_clearRemoteDGMQueue;
    tQueue->getData = C_getDataRemoteDGMQueue;
    return tQueue;
}

int* createSocketRemoteDGMQueue(char *sNum){
    int sock;
    struct sockaddr_in a_addr;
    char *hostname = "localhost";
    char *service = sNum;
    struct addrinfo hints, *res0, *res;
    int err;

    memset(&hints, 0, sizeof(hints));
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_family = PF_UNSPEC; //UNSPECはIPv4,6両方のうち使えるものを返す
    if((err = getaddrinfo(hostname, service, &hints, &res0)) != 0){
        printf("error %d : %s\n", err, gai_strerror(err));
        return 1;
    }

    for (res=res0; res!=NULL; res=res->ai_next) {
        sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
        if (sock < 0) {
          continue;
        }

        if (connect(sock, res->ai_addr, res->ai_addrlen) != 0) {
            close(sock);
            continue;
        }
        break;
    }

    if (res == NULL) {
    /* 有効な接続が出来なかった */
       printf("failed\n");

       return 1;
    }
    
    freeaddrinfo(res0);

    printf("Connected!!\n");
    return sock;
}




__code clearRemoteDGMQueue(struct RemoteDGMQueue* tQueue, __code next(...)) {
    struct Element* top = tQueue->top;
    struct Atomic* atomic = tQueue->atomic;
    goto atomic->checkAndSet(&tQueue->top, top, NULL, next(...), clearRemoteDGMQueue);
}

__code getTranceRemoteDGMQueue(struct RemoteDGMQueue* tQueue, union Data* data, __code next(...)){
    printf("get");
    goto next(...);
}

__code putRemoteDGMQueue(struct RemoteDGMQueue* tQueue, union Data* data, __code next(...)) {
    Element* element = new Element();
    element->data = data;
    element->next = NULL;
    Element* last = tQueue->last;
    Element* nextElement = last->next;
    if (last != tQueue->last) {
        goto putRemoteDGMQueue();
    }
    if (nextElement == NULL) {
        struct Atomic* atomic = tQueue->atomic;
        Gearef(context, TQueue)->data = element->data;
        goto atomic->checkAndSet(&last->next, nextElement, element, sendDataRemoteDGMQueue, putRemoteDGMQueue); //書き換え
    } else {
        struct Atomic* atomic = tQueue->atomic;
        goto atomic->checkAndSet(&tQueue->last, last, nextElement, putRemoteDGMQueue, putRemoteDGMQueue);
    }
}


__code takeRemoteDGMQueue(struct RemoteDGMQueue* tQueue, __code next(union Data* data, ...)) {
    struct Element* top = tQueue->top;
    struct Element* last = tQueue->last;
    struct Element* nextElement = top->next;
    if (top != tQueue->top) {
        goto takeRemoteDGMQueue();
    }
    if (top == last) {
        if (nextElement != NULL) {
            struct Atomic* atomic = tQueue->atomic;
            goto atomic->checkAndSet(&tQueue->last, last, nextElement, takeRemoteDGMQueue, takeRemoteDGMQueue);
        }
    } else {
        struct Atomic* atomic = tQueue->atomic;
        goto atomic->checkAndSet(&tQueue->top, top, nextElement, takeRemoteDGMQueue1, takeRemoteDGMQueue);
    }
    goto takeRemoteDGMQueue();
}

__code takeRemoteDGMQueue1(struct RemoteDGMQueue* tQueue, __code next(union Data* data, ...), struct Element* nextElement) {
    data = nextElement->data;
    Gearef(context, TQueue)->data = data;
    goto next(data, ...);
}

__code takeRemoteDGMQueue1_stub(struct Context* context) {
	RemoteDGMQueue* tQueue = (RemoteDGMQueue*)GearImpl(context, TQueue, tQueue);
	enum Code next = Gearef(context, TQueue)->next;
	Data** O_data = &Gearef(context, TQueue)->data;
	goto takeRemoteDGMQueue1(context,
                                tQueue,
                                next,
                                O_data,
                                (struct Element*)Gearef(context, Atomic)->newData);
}

__code isEmptyRemoteDGMQueue(struct RemoteDGMQueue* tQueue, __code next(...), __code whenEmpty(...)) {
    struct Element* top = tQueue->top;
    struct Element* last = tQueue->last;
    struct Element* nextElement = top->next;
    if (top != tQueue->top) {
        goto isEmptyRemoteDGMQueue();
    }
    if (top == last && nextElement == NULL) {
        goto whenEmpty(...);
    }
    goto next(...);
}

__code sendDataRemoteDGMQueue(struct RemoteDGMQueue* tQueue, union Data* data, __code next(...)){
    char recv_buf;
    int send_size, recv_size;


    /* サーバーに送る文字列を取得 */
    //memcpy(send_buf, string->str, sizeof(send_buf));

    /* 文字列を送信 */
    send_size = write(tQueue->socket, data, sizeof(union Data));
    if (send_size == -1) {
        printf("send error\n");
        close(tQueue->socket);
        goto exit_code();
    }

        /* サーバーからの応答を受信 */
    recv_size = read(tQueue->socket, &recv_buf, 1);
    if (recv_size == -1) {
        printf("recv error\n");
        close(tQueue->socket);
        goto exit_code();
    }

    if (recv_size == 0) {
        /* 受信サイズが0の場合は相手が接続閉じていると判断 */
        printf("connection ended\n");
        close(tQueue->socket);
        goto exit_code(); 
    }

        /* 応答が0の場合はデータ送信終了 */
    if (recv_buf == 0) {
        printf("Finish connection\n");
        close(tQueue->socket);
        goto exit_code();
    }
    goto next(...);
}


__code sendDataRemoteDGMQueue_stub(struct Context* context) {
	RemoteDGMQueue* tQueue = (RemoteDGMQueue*)GearImpl(context, TQueue, tQueue);
	union Data* data = Gearef(context, TQueue)->data;
	enum Code next = Gearef(context, TQueue)->next;
	goto sendDataRemoteDGMQueue(context, tQueue, data, next);
}

__code getDataRemoteDGMQueue(struct RemoteDGMQueue* tQueue, __code next(...), __code whenEOF(...)){

}

__code closeSocketRemoteDGMQueue(struct RemoteDGMQueue* tQueue, __code next(...)){

}