view src/parallel_execution/CUDAWorker.cbc @ 473:71b634a5ed65

Merge
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Thu, 28 Dec 2017 11:55:59 +0900 (2017-12-28)
parents ac244346c85d
children a517b11c37f7
line wrap: on
line source
#include "../context.h"
#interface "TaskManager.h"
#interface "Worker.h"

extern void cudaInit(struct CUDAWorker *cudaWorker,int phase) ;

static void startCUDAWorker(Worker* worker);

Worker* createCUDAWorker(struct Context* context, int id, Queue* queue, TaskManagerImpl *im) {
    struct Worker* worker = new Worker();
    struct CUDAWorker* cudaWorker = new CUDAWorker();
    worker->worker = (union Data*)cudaWorker;
    worker->tasks = queue;
    cudaWorker->id = id;
    worker->taskReceive = C_taskReceiveCUDAWorker;
    worker->shutdown = C_shutdownCUDAWorker;
    pthread_create(&worker->worker->CUDAWorker.thread, NULL, (void*)&startCUDAWorker, worker);
    return worker;
}

static void startCUDAWorker(Worker* worker) {
    struct CUDAWorker* cudaWorker = &worker->worker->CUDAWorker;
    int deviceNum = 0;
    cudaInit(cudaWorker, deviceNum);
    cudaWorker->context  = NEW(struct Context);
    initContext(cudaWorker->context);
    cudaWorker->executor = createCUDAExecutor(cudaWorker->context, cudaWorker->device);
    Gearef(cudaWorker->context, Worker)->worker = (union Data*)worker;
    goto meta(cudaWorker->context, worker->taskReceive);
}

__code taskReceiveCUDAWorker(struct Worker* worker) {
    queue->queue = (union Data*)worker->tasks;
    queue->next = C_getTaskCUDAWorker;
    goto meta(context, worker->tasks->take);
}

__code taskReceiveCUDAWorker_stub(struct Context* context) {
    goto taskReceiveCUDAWorker(context, &Gearef(context, Worker)->worker->Worker, Gearef(context, Queue));
}

__code getTaskCUDAWorker(struct Worker* worker, struct Context* task) {
    if (!task)
        goto meta(context, worker->shutdown); // end thread
    task->worker = worker;
    enum Code taskCg = task->next;
    if (task->iterate) {
        task->next = C_iterateCommitCUDA;
    } else {
        task->next = C_odgCommitCUDA; // set CG after task exec
    }
    goto meta(task, taskCg);
}

__code getTaskCUDAWorker_stub(struct Context* context) {
    Worker* worker = &Gearef(context,Worker)->worker->Worker;
    struct Context* task = &Gearef(context, Queue)->data->Context;
    goto getTaskCUDAWorker(context, worker, task);
}

__code iterateCommitCUDA(struct Iterator* iterator) {
    iterator->iterator = (union Data*)context->iterator;
    iterator->task = context;
    iterator->next = C_odgCommitCUDA;
    iterator->whenWait = C_iterateCommitCUDA1;
    goto meta(context, context->iterator->barrier);
}

__code iterateCommitCUDA1(struct Context* task) {
    goto meta(context, C_taskReceiveCUDAWorker);
}

__code iterateCommitCUDA1_stub(struct Context* context) {
    struct Context* workerContext = context->worker->worker->CUDAWorker.context;
    goto iterateCommitCUDA1(workerContext, context);
}

__code odgCommitCUDA(struct LoopCounter* loopCounter, struct Context* task, struct TaskManager* taskManager) {
    int i = loopCounter->i ;
    if (task->odg+i < task->maxOdg) {
        goto meta(task, C_odgCommitCUDA1);
    }
    loopCounter->i = 0;
    taskManager->taskManager = (union Data*)task->taskManager;
    taskManager->next = C_taskReceiveCUDAWorker;
    goto meta(context, task->taskManager->decrementTaskCount);
}

__code odgCommitCUDA_stub(struct Context* context) {
    struct Context* workerContext = context->worker->worker->CUDAWorker.context;
    goto odgCommitCUDA(workerContext,
            Gearef(context, LoopCounter),
            context,
            Gearef(workerContext, TaskManager));
}

__code odgCommitCUDA1(struct LoopCounter* loopCounter, struct Queue* queue) {
    int i = loopCounter->i ;
    queue->queue = (union Data*)GET_WAIT_LIST(context->data[context->odg+i]);
    queue->whenEmpty = C_odgCommitCUDA4;
    queue->next = C_odgCommitCUDA2;
    goto meta(context, queue->queue->Queue.isEmpty);
}

__code odgCommitCUDA1_stub(struct Context* context) {
    goto odgCommitCUDA1(context,
            Gearef(context, LoopCounter),
            Gearef(context, Queue));
}

__code odgCommitCUDA2(struct Queue* queue) {
    queue->next = C_odgCommitCUDA3;
    goto meta(context, queue->queue->Queue.take);
}

__code odgCommitCUDA2_stub(struct Context* context) {
    goto odgCommitCUDA2(context,
            Gearef(context, Queue));
}

__code odgCommitCUDA3(struct TaskManager* taskManager, struct Context* task) {
    if (__sync_fetch_and_sub(&task->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of task->idgCount point)
        taskManager->taskManager = (union Data*)task->taskManager;
        taskManager->task = task;
        taskManager->next = C_odgCommitCUDA1;
        goto meta(context, task->taskManager->spawn);
    }
    goto meta(context, C_odgCommitCUDA1);
}

__code odgCommitCUDA3_stub(struct Context* context) {
    struct Context* task = &Gearef(context, Queue)->data->Context;
    goto odgCommitCUDA3(context,
            Gearef(context, TaskManager),
            task);
}

__code odgCommitCUDA4(struct LoopCounter* loopCounter) {
    loopCounter->i++;
    goto meta(context, C_odgCommitCUDA);
}

__code odgCommitCUDA4_stub(struct Context* context) {
    goto odgCommitCUDA4(context,
            Gearef(context, LoopCounter));
}

extern void cudaShutdown( CUDAWorker *cudaWorker);

__code shutdownCUDAWorker(struct Context* context, CUDAWorker* worker) {
    cudaShutdown(worker);
    goto meta(context, C_exit_code);
}

__code shutdownCUDAWorker_stub(struct Context* context) {
    CUDAWorker* worker = (CUDAWorker *)GearImpl(context, Worker, worker);
    goto shutdownCUDAWorker(context,worker);
}