changeset 481:a517b11c37f7

Refactoring CUDAWorker.cbc
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Thu, 28 Dec 2017 23:31:23 +0900
parents 39b5df2d1c93
children 5859bed4edff
files src/parallel_execution/CPUWorker.cbc src/parallel_execution/CUDAWorker.cbc src/parallel_execution/context.h
diffstat 3 files changed, 70 insertions(+), 96 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/CPUWorker.cbc	Thu Dec 28 19:54:28 2017 +0900
+++ b/src/parallel_execution/CPUWorker.cbc	Thu Dec 28 23:31:23 2017 +0900
@@ -20,7 +20,7 @@
 }
 
 static void startWorker(struct Worker* worker) {
-    CPUWorker* cpuWorker = (CPUWorker*)worker->worker;
+    struct CPUWorker* cpuWorker = &worker->worker->CPUWorker;
     cpuWorker->context = NEW(struct Context);
     initContext(cpuWorker->context);
     Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker;
@@ -33,7 +33,8 @@
 }
 
 __code getTaskCPUWorker(struct CPUWorker* cpuWorker, struct Context* task, struct Worker* worker) {
-    if (!task) { goto worker->shutdown(); // end thread
+    if (!task) {
+        goto worker->shutdown(); // end thread
     }
     task->worker = worker;
     enum Code taskCg = task->next;
--- a/src/parallel_execution/CUDAWorker.cbc	Thu Dec 28 19:54:28 2017 +0900
+++ b/src/parallel_execution/CUDAWorker.cbc	Thu Dec 28 23:31:23 2017 +0900
@@ -1,8 +1,11 @@
 #include "../context.h"
 #interface "TaskManager.h"
 #interface "Worker.h"
+#interface "Iterator.h"
+#interface "Queue.h"
 
 extern void cudaInit(struct CUDAWorker *cudaWorker,int phase) ;
+extern void cudaShutdown(CUDAWorker *cudaWorker);
 
 static void startCUDAWorker(Worker* worker);
 
@@ -12,9 +15,10 @@
     worker->worker = (union Data*)cudaWorker;
     worker->tasks = queue;
     cudaWorker->id = id;
+    cudaWorker->loopCounter = 0;
     worker->taskReceive = C_taskReceiveCUDAWorker;
     worker->shutdown = C_shutdownCUDAWorker;
-    pthread_create(&worker->worker->CUDAWorker.thread, NULL, (void*)&startCUDAWorker, worker);
+    pthread_create(&worker->thread, NULL, (void*)&startCUDAWorker, worker);
     return worker;
 }
 
@@ -26,133 +30,102 @@
     initContext(cudaWorker->context);
     cudaWorker->executor = createCUDAExecutor(cudaWorker->context, cudaWorker->device);
     Gearef(cudaWorker->context, Worker)->worker = (union Data*)worker;
+    Gearef(cudaWorker->context, Worker)->tasks = worker->tasks;
     goto meta(cudaWorker->context, worker->taskReceive);
 }
 
-__code taskReceiveCUDAWorker(struct Worker* worker) {
-    queue->queue = (union Data*)worker->tasks;
-    queue->next = C_getTaskCUDAWorker;
-    goto meta(context, worker->tasks->take);
-}
-
-__code taskReceiveCUDAWorker_stub(struct Context* context) {
-    goto taskReceiveCUDAWorker(context, &Gearef(context, Worker)->worker->Worker, Gearef(context, Queue));
+__code taskReceiveCUDAWorker(struct Worker* worker, struct Queue* tasks) {
+    goto tasks->take(getTaskCUDAWorker);
 }
 
-__code getTaskCUDAWorker(struct Worker* worker, struct Context* task) {
-    if (!task)
-        goto meta(context, worker->shutdown); // end thread
+__code getTaskCUDAWorker(struct CUDAWorker* cudaWorker, struct Context* task, struct Worker* worker) {
+    if (!task) {
+        goto worker->shutdown(); // end thread
+    }
     task->worker = worker;
     enum Code taskCg = task->next;
-    if (task->iterate) {
-        task->next = C_iterateCommitCUDA;
-    } else {
-        task->next = C_odgCommitCUDA; // set CG after task exec
-    }
-    goto meta(task, taskCg);
+    task->next = C_odgCommitCUDAWorker; // commit outputDG after task exec
+    goto meta(task, taskCg); // switch task context
 }
 
 __code getTaskCUDAWorker_stub(struct Context* context) {
+    CUDAWorker* cudaWorker = (CUDAWorker*)GearImpl(context, Worker, worker);
     Worker* worker = &Gearef(context,Worker)->worker->Worker;
     struct Context* task = &Gearef(context, Queue)->data->Context;
-    goto getTaskCUDAWorker(context, worker, task);
+    goto getTaskCUDAWorker(context, cudaWorker, task, worker);
 }
 
-__code iterateCommitCUDA(struct Iterator* iterator) {
-    iterator->iterator = (union Data*)context->iterator;
-    iterator->task = context;
-    iterator->next = C_odgCommitCUDA;
-    iterator->whenWait = C_iterateCommitCUDA1;
-    goto meta(context, context->iterator->barrier);
-}
-
-__code iterateCommitCUDA1(struct Context* task) {
-    goto meta(context, C_taskReceiveCUDAWorker);
+__code odgCommitCUDAWorker(struct CUDAWorker* worker, struct Context* task) {
+    if (task->iterate) {
+        struct Iterator* iterator = task->iterator;
+        goto iterator->barrier(task, odgCommitCUDAWorker1, odgCommitCUDAWorker6);
+    } else {
+        goto odgCommitCUDAWorker1();
+    }
 }
 
-__code iterateCommitCUDA1_stub(struct Context* context) {
+__code odgCommitCUDAWorker_stub(struct Context* context) {
+    // switch worker context
     struct Context* workerContext = context->worker->worker->CUDAWorker.context;
-    goto iterateCommitCUDA1(workerContext, context);
+    Gearef(workerContext, Worker)->worker = (union Data*)context->worker;
+    Gearef(workerContext, Worker)->task = context;
+    CUDAWorker* cudaWorker = (CUDAWorker*)GearImpl(workerContext, Worker, worker);
+    goto odgCommitCUDAWorker(workerContext,
+                            cudaWorker,
+                            context);
 }
 
-__code odgCommitCUDA(struct LoopCounter* loopCounter, struct Context* task, struct TaskManager* taskManager) {
-    int i = loopCounter->i ;
+__code odgCommitCUDAWorker1(struct CUDAWorker* worker, struct Context* task) {
+    int i = worker->loopCounter;
     if (task->odg+i < task->maxOdg) {
-        goto meta(task, C_odgCommitCUDA1);
+        goto odgCommitCUDAWorker2();
     }
-    loopCounter->i = 0;
-    taskManager->taskManager = (union Data*)task->taskManager;
-    taskManager->next = C_taskReceiveCUDAWorker;
-    goto meta(context, task->taskManager->decrementTaskCount);
-}
-
-__code odgCommitCUDA_stub(struct Context* context) {
-    struct Context* workerContext = context->worker->worker->CUDAWorker.context;
-    goto odgCommitCUDA(workerContext,
-            Gearef(context, LoopCounter),
-            context,
-            Gearef(workerContext, TaskManager));
+    worker->loopCounter = 0;
+    struct TaskManager* taskManager = task->taskManager;
+    goto taskManager->decrementTaskCount(odgCommitCUDAWorker6);
 }
 
-__code odgCommitCUDA1(struct LoopCounter* loopCounter, struct Queue* queue) {
-    int i = loopCounter->i ;
-    queue->queue = (union Data*)GET_WAIT_LIST(context->data[context->odg+i]);
-    queue->whenEmpty = C_odgCommitCUDA4;
-    queue->next = C_odgCommitCUDA2;
-    goto meta(context, queue->queue->Queue.isEmpty);
+__code odgCommitCUDAWorker2(struct CUDAWorker* worker, struct Context* task) {
+    int i = worker->loopCounter;
+    struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
+    goto queue->isEmpty(odgCommitCUDAWorker3, odgCommitCUDAWorker5);
 }
 
-__code odgCommitCUDA1_stub(struct Context* context) {
-    goto odgCommitCUDA1(context,
-            Gearef(context, LoopCounter),
-            Gearef(context, Queue));
+__code odgCommitCUDAWorker3(struct CUDAWorker* worker, struct Context* task) {
+    int i = worker->loopCounter;
+    struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
+    goto queue->take(odgCommitCUDAWorker4);
 }
 
-__code odgCommitCUDA2(struct Queue* queue) {
-    queue->next = C_odgCommitCUDA3;
-    goto meta(context, queue->queue->Queue.take);
-}
-
-__code odgCommitCUDA2_stub(struct Context* context) {
-    goto odgCommitCUDA2(context,
-            Gearef(context, Queue));
+__code odgCommitCUDAWorker4(struct CUDAWorker* worker, struct Context* task, struct Context* waitTask) {
+    if (__sync_fetch_and_sub(&waitTask->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of waitTask->idgCount point)
+        struct TaskManager* taskManager = waitTask->taskManager;
+        goto taskManager->spawn(waitTask, odgCommitCUDAWorker2);
+    }
+    goto odgCommitCUDAWorker2();
 }
 
-__code odgCommitCUDA3(struct TaskManager* taskManager, struct Context* task) {
-    if (__sync_fetch_and_sub(&task->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of task->idgCount point)
-        taskManager->taskManager = (union Data*)task->taskManager;
-        taskManager->task = task;
-        taskManager->next = C_odgCommitCUDA1;
-        goto meta(context, task->taskManager->spawn);
-    }
-    goto meta(context, C_odgCommitCUDA1);
+__code odgCommitCUDAWorker4_stub(struct Context* context) {
+    CUDAWorker* cudaWorker     = (CUDAWorker*)GearImpl(context, Worker, worker);
+    struct Context* task     = Gearef(context, Worker)->task;
+    struct Context* waitTask = &Gearef(context, Queue)->data->Context;
+    goto odgCommitCUDAWorker4(context,
+                             cudaWorker,
+                             task,
+                             waitTask);
 }
 
-__code odgCommitCUDA3_stub(struct Context* context) {
-    struct Context* task = &Gearef(context, Queue)->data->Context;
-    goto odgCommitCUDA3(context,
-            Gearef(context, TaskManager),
-            task);
+__code odgCommitCUDAWorker5(struct CUDAWorker* worker, struct Context* task) {
+    worker->loopCounter++;
+    goto odgCommitCUDAWorker1();
 }
 
-__code odgCommitCUDA4(struct LoopCounter* loopCounter) {
-    loopCounter->i++;
-    goto meta(context, C_odgCommitCUDA);
+__code odgCommitCUDAWorker6(struct CUDAWorker* worker, struct Context* task) {
+    struct Worker* taskWorker = task->worker;
+    goto taskWorker->taskReceive(taskWorker->tasks);
 }
 
-__code odgCommitCUDA4_stub(struct Context* context) {
-    goto odgCommitCUDA4(context,
-            Gearef(context, LoopCounter));
-}
-
-extern void cudaShutdown( CUDAWorker *cudaWorker);
-
-__code shutdownCUDAWorker(struct Context* context, CUDAWorker* worker) {
+__code shutdownCUDAWorker(struct CUDAWorker* worker) {
     cudaShutdown(worker);
     goto meta(context, C_exit_code);
 }
-
-__code shutdownCUDAWorker_stub(struct Context* context) {
-    CUDAWorker* worker = (CUDAWorker *)GearImpl(context, Worker, worker);
-    goto shutdownCUDAWorker(context,worker);
-}
--- a/src/parallel_execution/context.h	Thu Dec 28 19:54:28 2017 +0900
+++ b/src/parallel_execution/context.h	Thu Dec 28 23:31:23 2017 +0900
@@ -196,9 +196,9 @@
     struct CUDAWorker {
         CUdevice device;
         CUcontext cuCtx;
-        pthread_t thread;
         struct Context* context;
         int id;
+        int loopCounter;
         struct Queue* tasks;
         int runFlag;
         enum Code next;