diff src/CUDAWorker.cbc @ 590:9146d6017f18 default tip

hg mv parallel_execution/* ..
author anatofuz <anatofuz@cr.ie.u-ryukyu.ac.jp>
date Thu, 16 Jan 2020 15:12:06 +0900
parents src/parallel_execution/CUDAWorker.cbc@d6983ce1015d
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/CUDAWorker.cbc	Thu Jan 16 15:12:06 2020 +0900
@@ -0,0 +1,131 @@
+#include "../context.h"
+#interface "TaskManager.h"
+#interface "Worker.h"
+#interface "Iterator.h"
+#interface "Queue.h"
+
+extern void cudaInit(struct CUDAWorker *cudaWorker,int phase, int deviceNum);
+extern void cudaShutdown(CUDAWorker *cudaWorker);
+
+static void startCUDAWorker(Worker* worker);
+
+Worker* createCUDAWorker(struct Context* context, int id, Queue* queue, int deviceNum) {
+    struct Worker* worker = new Worker();
+    struct CUDAWorker* cudaWorker = new CUDAWorker();
+    worker->worker = (union Data*)cudaWorker;
+    worker->tasks = queue;
+    cudaWorker->id = id;
+    cudaWorker->loopCounter = 0;
+    cudaWorker->deviceNum = deviceNum;
+    worker->taskReceive = C_taskReceiveCUDAWorker;
+    worker->shutdown = C_shutdownCUDAWorker;
+    pthread_create(&worker->thread, NULL, (void*)&startCUDAWorker, worker);
+    return worker;
+}
+
+static void startCUDAWorker(Worker* worker) {
+    struct CUDAWorker* cudaWorker = &worker->worker->CUDAWorker;
+    cudaInit(cudaWorker, 0, cudaWorker->deviceNum);
+    cudaWorker->context  = NEW(struct Context);
+    initContext(cudaWorker->context);
+    cudaWorker->executor = createCUDAExecutor(cudaWorker->context, cudaWorker->device);
+    Gearef(cudaWorker->context, Worker)->worker = (union Data*)worker;
+    Gearef(cudaWorker->context, Worker)->tasks = worker->tasks;
+    goto meta(cudaWorker->context, worker->taskReceive);
+}
+
+__code taskReceiveCUDAWorker(struct Worker* worker, struct Queue* tasks) {
+    goto tasks->take(getTaskCUDAWorker);
+}
+
+__code getTaskCUDAWorker(struct CUDAWorker* cudaWorker, struct Context* task, struct Worker* worker) {
+    if (!task) {
+        goto worker->shutdown(); // end thread
+    }
+    task->worker = worker;
+    enum Code taskCg = task->next;
+    task->next = C_odgCommitCUDAWorker; // commit outputDG after task exec
+    goto meta(task, taskCg); // switch task context
+}
+
+__code getTaskCUDAWorker_stub(struct Context* context) {
+    CUDAWorker* cudaWorker = (CUDAWorker*)GearImpl(context, Worker, worker);
+    Worker* worker = &Gearef(context,Worker)->worker->Worker;
+    struct Context* task = &Gearef(context, Queue)->data->Context;
+    goto getTaskCUDAWorker(context, cudaWorker, task, worker);
+}
+
+__code odgCommitCUDAWorker(struct CUDAWorker* worker, struct Context* task) {
+    if (task->iterate) {
+        struct Iterator* iterator = task->iterator;
+        goto iterator->barrier(task, odgCommitCUDAWorker1, odgCommitCUDAWorker6);
+    } else {
+        goto odgCommitCUDAWorker1();
+    }
+}
+
+__code odgCommitCUDAWorker_stub(struct Context* context) {
+    // switch worker context
+    struct Context* workerContext = context->worker->worker->CUDAWorker.context;
+    Gearef(workerContext, Worker)->worker = (union Data*)context->worker;
+    Gearef(workerContext, Worker)->task = context;
+    CUDAWorker* cudaWorker = (CUDAWorker*)GearImpl(workerContext, Worker, worker);
+    goto odgCommitCUDAWorker(workerContext,
+                            cudaWorker,
+                            context);
+}
+
+__code odgCommitCUDAWorker1(struct CUDAWorker* worker, struct Context* task) {
+    int i = worker->loopCounter;
+    if (task->odg+i < task->maxOdg) {
+        goto odgCommitCUDAWorker2();
+    }
+    worker->loopCounter = 0;
+    struct TaskManager* taskManager = task->taskManager;
+    goto taskManager->decrementTaskCount(odgCommitCUDAWorker6);
+}
+
+__code odgCommitCUDAWorker2(struct CUDAWorker* worker, struct Context* task) {
+    int i = worker->loopCounter;
+    struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
+    goto queue->isEmpty(odgCommitCUDAWorker3, odgCommitCUDAWorker5);
+}
+
+__code odgCommitCUDAWorker3(struct CUDAWorker* worker, struct Context* task) {
+    int i = worker->loopCounter;
+    struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
+    goto queue->take(odgCommitCUDAWorker4);
+}
+
+__code odgCommitCUDAWorker4(struct CUDAWorker* worker, struct Context* task, struct Context* waitTask) {
+    if (__sync_fetch_and_sub(&waitTask->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of waitTask->idgCount point)
+        struct TaskManager* taskManager = waitTask->taskManager;
+        goto taskManager->spawn(waitTask, odgCommitCUDAWorker2);
+    }
+    goto odgCommitCUDAWorker2();
+}
+
+__code odgCommitCUDAWorker4_stub(struct Context* context) {
+    CUDAWorker* cudaWorker     = (CUDAWorker*)GearImpl(context, Worker, worker);
+    struct Context* task     = Gearef(context, Worker)->task;
+    struct Context* waitTask = &Gearef(context, Queue)->data->Context;
+    goto odgCommitCUDAWorker4(context,
+                             cudaWorker,
+                             task,
+                             waitTask);
+}
+
+__code odgCommitCUDAWorker5(struct CUDAWorker* worker, struct Context* task) {
+    worker->loopCounter++;
+    goto odgCommitCUDAWorker1();
+}
+
+__code odgCommitCUDAWorker6(struct CUDAWorker* worker, struct Context* task) {
+    struct Worker* taskWorker = task->worker;
+    goto taskWorker->taskReceive(taskWorker->tasks);
+}
+
+__code shutdownCUDAWorker(struct CUDAWorker* worker) {
+    cudaShutdown(worker);
+    goto meta(context, C_exit_code);
+}