Mercurial > hg > GearsTemplate
diff src/CUDAWorker.cbc @ 590:9146d6017f18 default tip
hg mv parallel_execution/* ..
author | anatofuz <anatofuz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 16 Jan 2020 15:12:06 +0900 |
parents | src/parallel_execution/CUDAWorker.cbc@d6983ce1015d |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/CUDAWorker.cbc Thu Jan 16 15:12:06 2020 +0900 @@ -0,0 +1,131 @@ +#include "../context.h" +#interface "TaskManager.h" +#interface "Worker.h" +#interface "Iterator.h" +#interface "Queue.h" + +extern void cudaInit(struct CUDAWorker *cudaWorker,int phase, int deviceNum); +extern void cudaShutdown(CUDAWorker *cudaWorker); + +static void startCUDAWorker(Worker* worker); + +Worker* createCUDAWorker(struct Context* context, int id, Queue* queue, int deviceNum) { + struct Worker* worker = new Worker(); + struct CUDAWorker* cudaWorker = new CUDAWorker(); + worker->worker = (union Data*)cudaWorker; + worker->tasks = queue; + cudaWorker->id = id; + cudaWorker->loopCounter = 0; + cudaWorker->deviceNum = deviceNum; + worker->taskReceive = C_taskReceiveCUDAWorker; + worker->shutdown = C_shutdownCUDAWorker; + pthread_create(&worker->thread, NULL, (void*)&startCUDAWorker, worker); + return worker; +} + +static void startCUDAWorker(Worker* worker) { + struct CUDAWorker* cudaWorker = &worker->worker->CUDAWorker; + cudaInit(cudaWorker, 0, cudaWorker->deviceNum); + cudaWorker->context = NEW(struct Context); + initContext(cudaWorker->context); + cudaWorker->executor = createCUDAExecutor(cudaWorker->context, cudaWorker->device); + Gearef(cudaWorker->context, Worker)->worker = (union Data*)worker; + Gearef(cudaWorker->context, Worker)->tasks = worker->tasks; + goto meta(cudaWorker->context, worker->taskReceive); +} + +__code taskReceiveCUDAWorker(struct Worker* worker, struct Queue* tasks) { + goto tasks->take(getTaskCUDAWorker); +} + +__code getTaskCUDAWorker(struct CUDAWorker* cudaWorker, struct Context* task, struct Worker* worker) { + if (!task) { + goto worker->shutdown(); // end thread + } + task->worker = worker; + enum Code taskCg = task->next; + task->next = C_odgCommitCUDAWorker; // commit outputDG after task exec + goto meta(task, taskCg); // switch task context +} + +__code getTaskCUDAWorker_stub(struct Context* context) { + CUDAWorker* cudaWorker = (CUDAWorker*)GearImpl(context, Worker, worker); + Worker* worker = &Gearef(context,Worker)->worker->Worker; + struct Context* task = &Gearef(context, Queue)->data->Context; + goto getTaskCUDAWorker(context, cudaWorker, task, worker); +} + +__code odgCommitCUDAWorker(struct CUDAWorker* worker, struct Context* task) { + if (task->iterate) { + struct Iterator* iterator = task->iterator; + goto iterator->barrier(task, odgCommitCUDAWorker1, odgCommitCUDAWorker6); + } else { + goto odgCommitCUDAWorker1(); + } +} + +__code odgCommitCUDAWorker_stub(struct Context* context) { + // switch worker context + struct Context* workerContext = context->worker->worker->CUDAWorker.context; + Gearef(workerContext, Worker)->worker = (union Data*)context->worker; + Gearef(workerContext, Worker)->task = context; + CUDAWorker* cudaWorker = (CUDAWorker*)GearImpl(workerContext, Worker, worker); + goto odgCommitCUDAWorker(workerContext, + cudaWorker, + context); +} + +__code odgCommitCUDAWorker1(struct CUDAWorker* worker, struct Context* task) { + int i = worker->loopCounter; + if (task->odg+i < task->maxOdg) { + goto odgCommitCUDAWorker2(); + } + worker->loopCounter = 0; + struct TaskManager* taskManager = task->taskManager; + goto taskManager->decrementTaskCount(odgCommitCUDAWorker6); +} + +__code odgCommitCUDAWorker2(struct CUDAWorker* worker, struct Context* task) { + int i = worker->loopCounter; + struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]); + goto queue->isEmpty(odgCommitCUDAWorker3, odgCommitCUDAWorker5); +} + +__code odgCommitCUDAWorker3(struct CUDAWorker* worker, struct Context* task) { + int i = worker->loopCounter; + struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]); + goto queue->take(odgCommitCUDAWorker4); +} + +__code odgCommitCUDAWorker4(struct CUDAWorker* worker, struct Context* task, struct Context* waitTask) { + if (__sync_fetch_and_sub(&waitTask->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of waitTask->idgCount point) + struct TaskManager* taskManager = waitTask->taskManager; + goto taskManager->spawn(waitTask, odgCommitCUDAWorker2); + } + goto odgCommitCUDAWorker2(); +} + +__code odgCommitCUDAWorker4_stub(struct Context* context) { + CUDAWorker* cudaWorker = (CUDAWorker*)GearImpl(context, Worker, worker); + struct Context* task = Gearef(context, Worker)->task; + struct Context* waitTask = &Gearef(context, Queue)->data->Context; + goto odgCommitCUDAWorker4(context, + cudaWorker, + task, + waitTask); +} + +__code odgCommitCUDAWorker5(struct CUDAWorker* worker, struct Context* task) { + worker->loopCounter++; + goto odgCommitCUDAWorker1(); +} + +__code odgCommitCUDAWorker6(struct CUDAWorker* worker, struct Context* task) { + struct Worker* taskWorker = task->worker; + goto taskWorker->taskReceive(taskWorker->tasks); +} + +__code shutdownCUDAWorker(struct CUDAWorker* worker) { + cudaShutdown(worker); + goto meta(context, C_exit_code); +}