view src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc @ 505:528c31b045de

Replace goto meta
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Tue, 02 Jan 2018 02:23:41 +0900
parents d8b2036c6942
children a7127917c736
line wrap: on
line source

#include "../../../context.h"
#interface "Semaphore.h"
#interface "Queue.h"
#interface "TaskManager.h"

Semaphore* createSemaphoreImpl(struct Context* context, int n) {
    struct Semaphore* semaphore = new Semaphore();
    struct SemaphoreImpl* semaphoreImpl = new SemaphoreImpl();
    semaphore->semaphore = (union Data*)semaphoreImpl;
    semaphoreImpl->value =  n;
    semaphoreImpl->waitThreadQueue = createSynchronizedQueue(context);
    pthread_mutex_init(&semaphoreImpl->mutex, NULL);
    semaphore->p = C_pOperationSemaphoreImpl;
    semaphore->v = C_vOperationSemaphoreImpl;
    return semaphore;
}

__code pOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) {
    pthread_mutex_lock(&semaphore->mutex);
    goto pOperationSemaphoreImpl1();
}

__code pOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) {
    if(semaphore->value == 0) {
        context->next= C_pOperationSemaphoreImpl;
        struct Queue* queue = semaphore->waitThreadQueue;
        goto queue->put(context, pOperationSemaphoreImpl2); // put this context(thread, process)
    }
    semaphore->value--;
    pthread_mutex_unlock(&semaphore->mutex);
    goto next(...);
}

__code pOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, struct Worker* worker, __code next(...)) {
    pthread_mutex_unlock(&semaphore->mutex);
    goto worker->taskReceive(); // goto shceduler
}

__code pOperationSemaphoreImpl2_stub(struct Context* context) {
    // switch worker context
    struct Context* workerContext = context->worker->worker->CPUWorker.context;
    SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore);
    goto pOperationSemaphoreImpl2(workerContext,
                                  semaphoreImpl,
                                  context->worker,
                                  Gearef(context, Semaphore)->next);
}

__code vOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) {
    pthread_mutex_lock(&semaphore->mutex);
    semaphore->value++;
    struct Queue* queue = semaphore->waitThreadQueue;
    goto queue->isEmpty(vOperationSemaphoreImpl1, vOperationSemaphoreImpl3);
}

__code vOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) {
    struct Queue* queue = semaphore->waitThreadQueue;
    goto queue->take(vOperationSemaphoreImpl2);
}

__code vOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, struct Context* waitTask, __code next(...)) {
    struct TaskManager* taskManager = waitTask->taskManager;
    goto taskManager->spawn(waitTask, vOperationSemaphoreImpl3); //notify
}

__code vOperationSemaphoreImpl2_stub(struct Context* context) {
    SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore);
    struct Context* waitTask = &Gearef(context, Queue)->data->Context;
    goto vOperationSemaphoreImpl2(context,
                                  semaphoreImpl,
                                  waitTask,
                                  Gearef(context, Semaphore)->next);
}

__code vOperationSemaphoreImpl3(struct SemaphoreImpl* semaphore, __code next(...)) {
    pthread_mutex_unlock(&semaphore->mutex);
    goto next(...);
}