view src/parallel_execution/CUDAWorker.cbc @ 481:a517b11c37f7

Refactoring CUDAWorker.cbc
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Thu, 28 Dec 2017 23:31:23 +0900
parents 71b634a5ed65
children d6983ce1015d
line wrap: on
line source

#include "../context.h"
#interface "TaskManager.h"
#interface "Worker.h"
#interface "Iterator.h"
#interface "Queue.h"

extern void cudaInit(struct CUDAWorker *cudaWorker,int phase) ;
extern void cudaShutdown(CUDAWorker *cudaWorker);

static void startCUDAWorker(Worker* worker);

Worker* createCUDAWorker(struct Context* context, int id, Queue* queue, TaskManagerImpl *im) {
    struct Worker* worker = new Worker();
    struct CUDAWorker* cudaWorker = new CUDAWorker();
    worker->worker = (union Data*)cudaWorker;
    worker->tasks = queue;
    cudaWorker->id = id;
    cudaWorker->loopCounter = 0;
    worker->taskReceive = C_taskReceiveCUDAWorker;
    worker->shutdown = C_shutdownCUDAWorker;
    pthread_create(&worker->thread, NULL, (void*)&startCUDAWorker, worker);
    return worker;
}

static void startCUDAWorker(Worker* worker) {
    struct CUDAWorker* cudaWorker = &worker->worker->CUDAWorker;
    int deviceNum = 0;
    cudaInit(cudaWorker, deviceNum);
    cudaWorker->context  = NEW(struct Context);
    initContext(cudaWorker->context);
    cudaWorker->executor = createCUDAExecutor(cudaWorker->context, cudaWorker->device);
    Gearef(cudaWorker->context, Worker)->worker = (union Data*)worker;
    Gearef(cudaWorker->context, Worker)->tasks = worker->tasks;
    goto meta(cudaWorker->context, worker->taskReceive);
}

__code taskReceiveCUDAWorker(struct Worker* worker, struct Queue* tasks) {
    goto tasks->take(getTaskCUDAWorker);
}

__code getTaskCUDAWorker(struct CUDAWorker* cudaWorker, struct Context* task, struct Worker* worker) {
    if (!task) {
        goto worker->shutdown(); // end thread
    }
    task->worker = worker;
    enum Code taskCg = task->next;
    task->next = C_odgCommitCUDAWorker; // commit outputDG after task exec
    goto meta(task, taskCg); // switch task context
}

__code getTaskCUDAWorker_stub(struct Context* context) {
    CUDAWorker* cudaWorker = (CUDAWorker*)GearImpl(context, Worker, worker);
    Worker* worker = &Gearef(context,Worker)->worker->Worker;
    struct Context* task = &Gearef(context, Queue)->data->Context;
    goto getTaskCUDAWorker(context, cudaWorker, task, worker);
}

__code odgCommitCUDAWorker(struct CUDAWorker* worker, struct Context* task) {
    if (task->iterate) {
        struct Iterator* iterator = task->iterator;
        goto iterator->barrier(task, odgCommitCUDAWorker1, odgCommitCUDAWorker6);
    } else {
        goto odgCommitCUDAWorker1();
    }
}

__code odgCommitCUDAWorker_stub(struct Context* context) {
    // switch worker context
    struct Context* workerContext = context->worker->worker->CUDAWorker.context;
    Gearef(workerContext, Worker)->worker = (union Data*)context->worker;
    Gearef(workerContext, Worker)->task = context;
    CUDAWorker* cudaWorker = (CUDAWorker*)GearImpl(workerContext, Worker, worker);
    goto odgCommitCUDAWorker(workerContext,
                            cudaWorker,
                            context);
}

__code odgCommitCUDAWorker1(struct CUDAWorker* worker, struct Context* task) {
    int i = worker->loopCounter;
    if (task->odg+i < task->maxOdg) {
        goto odgCommitCUDAWorker2();
    }
    worker->loopCounter = 0;
    struct TaskManager* taskManager = task->taskManager;
    goto taskManager->decrementTaskCount(odgCommitCUDAWorker6);
}

__code odgCommitCUDAWorker2(struct CUDAWorker* worker, struct Context* task) {
    int i = worker->loopCounter;
    struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
    goto queue->isEmpty(odgCommitCUDAWorker3, odgCommitCUDAWorker5);
}

__code odgCommitCUDAWorker3(struct CUDAWorker* worker, struct Context* task) {
    int i = worker->loopCounter;
    struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
    goto queue->take(odgCommitCUDAWorker4);
}

__code odgCommitCUDAWorker4(struct CUDAWorker* worker, struct Context* task, struct Context* waitTask) {
    if (__sync_fetch_and_sub(&waitTask->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of waitTask->idgCount point)
        struct TaskManager* taskManager = waitTask->taskManager;
        goto taskManager->spawn(waitTask, odgCommitCUDAWorker2);
    }
    goto odgCommitCUDAWorker2();
}

__code odgCommitCUDAWorker4_stub(struct Context* context) {
    CUDAWorker* cudaWorker     = (CUDAWorker*)GearImpl(context, Worker, worker);
    struct Context* task     = Gearef(context, Worker)->task;
    struct Context* waitTask = &Gearef(context, Queue)->data->Context;
    goto odgCommitCUDAWorker4(context,
                             cudaWorker,
                             task,
                             waitTask);
}

__code odgCommitCUDAWorker5(struct CUDAWorker* worker, struct Context* task) {
    worker->loopCounter++;
    goto odgCommitCUDAWorker1();
}

__code odgCommitCUDAWorker6(struct CUDAWorker* worker, struct Context* task) {
    struct Worker* taskWorker = task->worker;
    goto taskWorker->taskReceive(taskWorker->tasks);
}

__code shutdownCUDAWorker(struct CUDAWorker* worker) {
    cudaShutdown(worker);
    goto meta(context, C_exit_code);
}