view src/parallel_execution/examples/boundedBuffer/SemaphoreImpl.cbc @ 493:82f0c49750f1

Add codeGear for boundedBuffer example
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Sun, 31 Dec 2017 01:36:18 +0900
parents 9333486471b9
children d8b2036c6942
line wrap: on
line source

#include "../../../context.h"
#interface "Semaphore.h"
#interface "Queue.h"
#interface "TaskManager.h"

Semaphore* createSemaphoreImpl(struct Context* context, int n) {
    struct Semaphore* semaphore = new Semaphore();
    struct SemaphoreImpl* semaphoreImpl = new SemaphoreImpl();
    semaphore->semaphore = (union Data*)semaphoreImpl;
    semaphoreImpl->value =  n;
    semaphoreImpl->waitThreadQueue = createSynchronizedQueue(context);
    pthread_mutex_init(&semaphoreImpl->mutex, NULL);
    semaphore->p = C_pOperationSemaphoreImpl;
    semaphore->v = C_vOperationSemaphoreImpl;
    return semaphore;
}

__code pOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) {
    pthread_mutex_lock(&semaphore->mutex);
    goto meta(context, C_pOperationSemaphoreImpl1);
}

__code pOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) {
    if(semaphore->value == 0) {
        context->next= C_pOperationSemaphoreImpl;
        struct Queue* queue = semaphore->waitThreadQueue;
        goto queue->put(context, pOperationSemaphoreImpl2); // put this context(thread, process)
    }
    semaphore->value--;
    pthread_mutex_unlock(&semaphore->mutex);
    goto next(...);
}

__code pOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, struct Worker* worker, __code next(...)) {
    pthread_mutex_unlock(&semaphore->mutex);
    goto worker->taskReceive(); // goto shceduler
}

__code pOperationSemaphoreImpl2_stub(struct Context* context) {
    // switch worker context
    struct Context* workerContext = context->worker->worker->CPUWorker.context;
    SemaphoreImpl* semaphoreImpl = (SemaphoreImpl*)GearImpl(context, Semaphore, semaphore);
    goto pOperationSemaphoreImpl2(workerContext,
                                  semaphoreImpl,
                                  context->worker,
                                  Gearef(context, Semaphore)->next);
}

__code vOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) {
    pthread_mutex_lock(&semaphore->mutex);
    semaphore->value++;
    struct Queue* queue = semaphore->waitThreadQueue;
    goto queue->take(vOperationSemaphoreImpl1);                        
}

__code vOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...), struct Context* waitTask) {
    struct TaskManager* taskManager = waitTask->taskManager;
    goto taskManager->spawn(waitTask, vOperationSemaphoreImpl2); //notify
}

__code vOperationSemaphoreImpl2(struct SemaphoreImpl* semaphore, __code next(...), struct Context* waitTask) {
    pthread_mutex_unlock(&semaphore->mutex);
    goto next(...);
}