changeset 474:b92898d3a630

Refactoring CPUWorker.cbc
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Thu, 28 Dec 2017 12:58:25 +0900
parents 71b634a5ed65
children fae47dc256b6
files src/parallel_execution/CPUWorker.cbc src/parallel_execution/MultiDimIterator.cbc src/parallel_execution/Worker.h src/parallel_execution/context.h
diffstat 4 files changed, 51 insertions(+), 61 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/CPUWorker.cbc	Thu Dec 28 11:55:59 2017 +0900
+++ b/src/parallel_execution/CPUWorker.cbc	Thu Dec 28 12:58:25 2017 +0900
@@ -33,12 +33,11 @@
 }
 
 __code getTaskCPUWorker(struct CPUWorker* cpuWorker, struct Context* task, struct Worker* worker) {
-    if (!task) {
-        goto worker->shutdown(); // end thread
+    if (!task) { goto worker->shutdown(); // end thread
     }
     task->worker = worker;
     enum Code taskCg = task->next;
-    task->next = C_odgCommitCPUWorker; // set CG after task exec
+    task->next = C_odgCommitCPUWorker; // commit outputDG after task exec
     goto meta(task, taskCg); // switch task context
 }
 
@@ -49,76 +48,74 @@
     goto getTaskCPUWorker(context, cpuWorker, task, worker);
 }
 
-__code iterateCommitCPUWorker(struct CPUWorker* worker) {
-    struct Iterator* iterator = context->iterator;
-    goto iterator->barrier(context, odgCommitCPUWorker, iterateCommitCPUWorker1);
-}
-
-__code iterateCommitCPUWorker1(struct CPUWorker* worker, struct Context* task) {
-    struct Worker* taskWorker = task->worker;
-    goto taskWorker->taskReceive(taskWorker->tasks);
-}
-
-__code iterateCommitCPUWorker1_stub(struct Context* context) {
-    // switch worker context
-    struct Context* workerContext = context->worker->worker->CPUWorker.context;
-    CPUWorker* cpuWorker = &context->worker->worker->CPUWorker;
-    goto iterateCommitCPUWorker1(workerContext,
-            cpuWorker,
-            context);
-}
-
 __code odgCommitCPUWorker(struct CPUWorker* worker, struct Context* task) {
-    int i = worker->loopCounter;
-    if (task->odg+i < task->maxOdg) {
+    if (task->iterate) {
+        struct Iterator* iterator = task->iterator;
+        goto iterator->barrier(task, odgCommitCPUWorker1, odgCommitCPUWorker6);
+    } else {
         goto odgCommitCPUWorker1();
     }
-    worker->loopCounter = 0;
-    struct TaskManager* taskManager = task->taskManager;
-    goto taskManager->decrementTaskCount(taskReceiveCPUWorker);
 }
 
 __code odgCommitCPUWorker_stub(struct Context* context) {
     // switch worker context
     struct Context* workerContext = context->worker->worker->CPUWorker.context;
-    CPUWorker* cpuWorker = &context->worker->worker->CPUWorker;
+    Gearef(workerContext, Worker)->worker = (union Data*)context->worker;
+    Gearef(workerContext, Worker)->task = context;
+    CPUWorker* cpuWorker = (CPUWorker*)GearImpl(workerContext, Worker, worker);
     goto odgCommitCPUWorker(workerContext,
-            cpuWorker,
-            context);
+                            cpuWorker,
+                            context);
 }
 
-__code odgCommitCPUWorker1(struct CPUWorker* worker) {
+__code odgCommitCPUWorker1(struct CPUWorker* worker, struct Context* task) {
     int i = worker->loopCounter;
-    struct Queue* queue = GET_WAIT_LIST(context->data[context->odg+i]);
-    goto queue->isEmpty(odgCommitCPUWorker2, odgCommitCPUWorker4);
+    if (task->odg+i < task->maxOdg) {
+        goto odgCommitCPUWorker2();
+    }
+    worker->loopCounter = 0;
+    struct TaskManager* taskManager = task->taskManager;
+    goto taskManager->decrementTaskCount(odgCommitCPUWorker6);
 }
 
-__code odgCommitCPUWorker2(struct CPUWorker* worker) {
+__code odgCommitCPUWorker2(struct CPUWorker* worker, struct Context* task) {
     int i = worker->loopCounter;
-    struct Queue* queue = GET_WAIT_LIST(context->data[context->odg+i]);
-    goto queue->take(odgCommitCPUWorker3);
+    struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
+    goto queue->isEmpty(odgCommitCPUWorker3, odgCommitCPUWorker5);
 }
 
 __code odgCommitCPUWorker3(struct CPUWorker* worker, struct Context* task) {
-    if (__sync_fetch_and_sub(&task->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of task->idgCount point)
-        struct TaskManager* taskManager = task->taskManager;
-        taskManager->task = task;
-        goto taskManager->spawn(task, odgCommitCPUWorker1);
+    int i = worker->loopCounter;
+    struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
+    goto queue->take(odgCommitCPUWorker4);
+}
+
+__code odgCommitCPUWorker4(struct CPUWorker* worker, struct Context* task, struct Context* waitTask) {
+    if (__sync_fetch_and_sub(&waitTask->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of waitTask->idgCount point)
+        struct TaskManager* taskManager = waitTask->taskManager;
+        goto taskManager->spawn(waitTask, odgCommitCPUWorker2);
     }
+    goto odgCommitCPUWorker2();
+}
+
+__code odgCommitCPUWorker4_stub(struct Context* context) {
+    CPUWorker* cpuWorker     = (CPUWorker*)GearImpl(context, Worker, worker);
+    struct Context* task     = Gearef(context, Worker)->task;
+    struct Context* waitTask = &Gearef(context, Queue)->data->Context;
+    goto odgCommitCPUWorker4(context,
+                             cpuWorker,
+                             task,
+                             waitTask);
+}
+
+__code odgCommitCPUWorker5(struct CPUWorker* worker, struct Context* task) {
+    worker->loopCounter++;
     goto odgCommitCPUWorker1();
 }
 
-__code odgCommitCPUWorker3_stub(struct Context* context) {
-    CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker);
-    struct Context* task = &Gearef(context, Queue)->data->Context;
-    goto odgCommitCPUWorker3(context,
-            cpuWorker,
-            task);
-}
-
-__code odgCommitCPUWorker4(struct CPUWorker* worker) {
-    worker->loopCounter++;
-    goto odgCommitCPUWorker();
+__code odgCommitCPUWorker6(struct CPUWorker* worker, struct Context* task) {
+    struct Worker* taskWorker = task->worker;
+    goto taskWorker->taskReceive(taskWorker->tasks);
 }
 
 __code shutdownCPUWorker(struct CPUWorker* worker) {
--- a/src/parallel_execution/MultiDimIterator.cbc	Thu Dec 28 11:55:59 2017 +0900
+++ b/src/parallel_execution/MultiDimIterator.cbc	Thu Dec 28 12:58:25 2017 +0900
@@ -109,11 +109,3 @@
     }
     goto whenWait(...);
 }
-
-__code barrierMultiDimIterator_stub(struct Context* context) {
-    MultiDimIterator* iterator = (MultiDimIterator*)GearImpl(context, Iterator, iterator);
-    Context* task = Gearef(context, Iterator)->task;
-    enum Code next = Gearef(context, Iterator)->next;
-    enum Code whenWait = Gearef(context, Iterator)->whenWait;
-    goto barrierMultiDimIterator(context, iterator, task, next, whenWait);
-}
--- a/src/parallel_execution/Worker.h	Thu Dec 28 11:55:59 2017 +0900
+++ b/src/parallel_execution/Worker.h	Thu Dec 28 12:58:25 2017 +0900
@@ -1,7 +1,8 @@
 typedef struct Worker<Impl>{
     union Data* worker;
+    struct Queue* tasks;
+    struct Context* task;
     __code taskReceive(Impl* worker, struct Queue* tasks);
     __code shutdown(Impl* worker);
     __code next(...);
-    struct Queue* tasks;
 } Worker;
--- a/src/parallel_execution/context.h	Thu Dec 28 11:55:59 2017 +0900
+++ b/src/parallel_execution/context.h	Thu Dec 28 12:58:25 2017 +0900
@@ -183,7 +183,7 @@
         struct Queue* tasks;
         pthread_t thread;
         struct TaskManager* taskManager;
-        struct Context* context;
+        struct Context* task;
     } Worker;
     struct CPUWorker {
         pthread_mutex_t mutex;