changeset 473:71b634a5ed65

Merge
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Thu, 28 Dec 2017 11:55:59 +0900
parents a4d94c591246
children b92898d3a630
files src/parallel_execution/CPUWorker.cbc src/parallel_execution/CUDAWorker.cbc src/parallel_execution/SingleLinkedQueue.cbc src/parallel_execution/Worker.h src/parallel_execution/context.h
diffstat 5 files changed, 79 insertions(+), 95 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/CPUWorker.cbc	Tue Dec 26 15:19:42 2017 +0900
+++ b/src/parallel_execution/CPUWorker.cbc	Thu Dec 28 11:55:59 2017 +0900
@@ -1,5 +1,9 @@
 #include "../context.h"
 #interface "TaskManager.h"
+#interface "Worker.h"
+#interface "Iterator.h"
+#interface "Queue.h"
+
 static void startWorker(Worker* worker);
 
 Worker* createCPUWorker(struct Context* context, int id, Queue* queue) {
@@ -8,138 +12,115 @@
     worker->worker = (union Data*)cpuWorker;
     worker->tasks = queue;
     cpuWorker->id = id;
-    worker->taskReceive = C_taskReceiveWorker;
-    worker->shutdown = C_shutdownWorker;
+    cpuWorker->loopCounter = 0;
+    worker->taskReceive = C_taskReceiveCPUWorker;
+    worker->shutdown = C_shutdownCPUWorker;
     pthread_create(&worker->thread, NULL, (void*)&startWorker, worker);
     return worker;
 }
 
-static void startWorker(Worker* worker) {
+static void startWorker(struct Worker* worker) {
     CPUWorker* cpuWorker = (CPUWorker*)worker->worker;
     cpuWorker->context = NEW(struct Context);
     initContext(cpuWorker->context);
     Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker;
+    Gearef(cpuWorker->context, Worker)->tasks = worker->tasks;
     goto meta(cpuWorker->context, worker->taskReceive);
 }
 
-__code taskReceiveWorker(struct Worker* worker,struct Queue* queue) {
-    queue->queue = (union Data*)worker->tasks;
-    queue->next = C_getTask;
-    goto meta(context, worker->tasks->take);
-}
-
-__code taskReceiveWorker_stub(struct Context* context) {
-    goto taskReceiveWorker(context, &Gearef(context, Worker)->worker->Worker, Gearef(context, Queue));
+__code taskReceiveCPUWorker(struct CPUWorker* worker, struct Queue* tasks) {
+    goto tasks->take(getTaskCPUWorker);
 }
 
-__code getTask(struct Worker* worker, struct Context* task) {
-    if (!task)
-        goto meta(context, worker->shutdown); // end thread
+__code getTaskCPUWorker(struct CPUWorker* cpuWorker, struct Context* task, struct Worker* worker) {
+    if (!task) {
+        goto worker->shutdown(); // end thread
+    }
     task->worker = worker;
     enum Code taskCg = task->next;
-    if (task->iterate) {
-        task->next = C_iterateCommit;
-    } else {
-        task->next = C_odgCommit; // set CG after task exec
-    }
-    goto meta(task, taskCg);
+    task->next = C_odgCommitCPUWorker; // set CG after task exec
+    goto meta(task, taskCg); // switch task context
 }
 
-__code getTask_stub(struct Context* context) {
+__code getTaskCPUWorker_stub(struct Context* context) {
+    CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker);
     Worker* worker = &Gearef(context,Worker)->worker->Worker;
     struct Context* task = &Gearef(context, Queue)->data->Context;
-    goto getTask(context, worker, task);
+    goto getTaskCPUWorker(context, cpuWorker, task, worker);
+}
+
+__code iterateCommitCPUWorker(struct CPUWorker* worker) {
+    struct Iterator* iterator = context->iterator;
+    goto iterator->barrier(context, odgCommitCPUWorker, iterateCommitCPUWorker1);
 }
 
-__code iterateCommit(struct Iterator* iterator) {
-    iterator->iterator = (union Data*)context->iterator;
-    iterator->task = context;
-    iterator->next = C_odgCommit;
-    iterator->whenWait = C_iterateCommit1;
-    goto meta(context, context->iterator->barrier);
+__code iterateCommitCPUWorker1(struct CPUWorker* worker, struct Context* task) {
+    struct Worker* taskWorker = task->worker;
+    goto taskWorker->taskReceive(taskWorker->tasks);
 }
 
-__code iterateCommit1(struct Context* task) {
-    goto meta(context, C_taskReceiveWorker);
-}
-
-__code iterateCommit1_stub(struct Context* context) {
+__code iterateCommitCPUWorker1_stub(struct Context* context) {
     // switch worker context
     struct Context* workerContext = context->worker->worker->CPUWorker.context;
-    goto iterateCommit1(workerContext, context);
+    CPUWorker* cpuWorker = &context->worker->worker->CPUWorker;
+    goto iterateCommitCPUWorker1(workerContext,
+            cpuWorker,
+            context);
 }
 
-__code odgCommit(struct LoopCounter* loopCounter, struct Context* task, struct TaskManager* taskManager) {
-    int i = loopCounter->i ;
+__code odgCommitCPUWorker(struct CPUWorker* worker, struct Context* task) {
+    int i = worker->loopCounter;
     if (task->odg+i < task->maxOdg) {
-        goto meta(task, C_odgCommit1);
+        goto odgCommitCPUWorker1();
     }
-    loopCounter->i = 0;
-    taskManager->taskManager = (union Data*)task->taskManager;
-    taskManager->next = C_taskReceiveWorker;
-    goto meta(context, task->taskManager->decrementTaskCount);
+    worker->loopCounter = 0;
+    struct TaskManager* taskManager = task->taskManager;
+    goto taskManager->decrementTaskCount(taskReceiveCPUWorker);
 }
 
-__code odgCommit_stub(struct Context* context) {
+__code odgCommitCPUWorker_stub(struct Context* context) {
     // switch worker context
     struct Context* workerContext = context->worker->worker->CPUWorker.context;
-    goto odgCommit(workerContext,
-                   Gearef(context, LoopCounter),
-                   context,
-                   Gearef(workerContext, TaskManager));
-}
-
-__code odgCommit1(struct LoopCounter* loopCounter, struct Queue* queue) {
-    int i = loopCounter->i ;
-    queue->queue = (union Data*)GET_WAIT_LIST(context->data[context->odg+i]);
-    queue->whenEmpty = C_odgCommit4;
-    queue->next = C_odgCommit2;
-    goto meta(context, queue->queue->Queue.isEmpty);
+    CPUWorker* cpuWorker = &context->worker->worker->CPUWorker;
+    goto odgCommitCPUWorker(workerContext,
+            cpuWorker,
+            context);
 }
 
-__code odgCommit1_stub(struct Context* context) {
-    goto odgCommit1(context,
-                   Gearef(context, LoopCounter),
-                   Gearef(context, Queue));
+__code odgCommitCPUWorker1(struct CPUWorker* worker) {
+    int i = worker->loopCounter;
+    struct Queue* queue = GET_WAIT_LIST(context->data[context->odg+i]);
+    goto queue->isEmpty(odgCommitCPUWorker2, odgCommitCPUWorker4);
 }
 
-__code odgCommit2(struct Queue* queue) {
-    queue->next = C_odgCommit3;
-    goto meta(context, queue->queue->Queue.take);
+__code odgCommitCPUWorker2(struct CPUWorker* worker) {
+    int i = worker->loopCounter;
+    struct Queue* queue = GET_WAIT_LIST(context->data[context->odg+i]);
+    goto queue->take(odgCommitCPUWorker3);
 }
 
-__code odgCommit2_stub(struct Context* context) {
-    goto odgCommit2(context,
-                   Gearef(context, Queue));
+__code odgCommitCPUWorker3(struct CPUWorker* worker, struct Context* task) {
+    if (__sync_fetch_and_sub(&task->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of task->idgCount point)
+        struct TaskManager* taskManager = task->taskManager;
+        taskManager->task = task;
+        goto taskManager->spawn(task, odgCommitCPUWorker1);
+    }
+    goto odgCommitCPUWorker1();
 }
 
-__code odgCommit3(struct TaskManager* taskManager, struct Context* task) {
-    if (__sync_fetch_and_sub(&task->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of task->idgCount point)
-        taskManager->taskManager = (union Data*)task->taskManager;
-        taskManager->task = task;
-        taskManager->next = C_odgCommit1;
-        goto meta(context, task->taskManager->spawn);
-    }
-    goto meta(context, C_odgCommit1);
-}
-
-__code odgCommit3_stub(struct Context* context) {
+__code odgCommitCPUWorker3_stub(struct Context* context) {
+    CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker);
     struct Context* task = &Gearef(context, Queue)->data->Context;
-    goto odgCommit3(context,
-            Gearef(context, TaskManager),
+    goto odgCommitCPUWorker3(context,
+            cpuWorker,
             task);
 }
 
-__code odgCommit4(struct LoopCounter* loopCounter) {
-    loopCounter->i++;
-    goto meta(context, C_odgCommit);
+__code odgCommitCPUWorker4(struct CPUWorker* worker) {
+    worker->loopCounter++;
+    goto odgCommitCPUWorker();
 }
 
-__code odgCommit4_stub(struct Context* context) {
-    goto odgCommit4(context,
-            Gearef(context, LoopCounter));
+__code shutdownCPUWorker(struct CPUWorker* worker) {
+    goto exit_code();
 }
-
-__code shutdownWorker(struct CPUWorker* worker) {
-    goto meta(context, C_exit_code);
-}
--- a/src/parallel_execution/CUDAWorker.cbc	Tue Dec 26 15:19:42 2017 +0900
+++ b/src/parallel_execution/CUDAWorker.cbc	Thu Dec 28 11:55:59 2017 +0900
@@ -1,4 +1,5 @@
 #include "../context.h"
+#interface "TaskManager.h"
 #interface "Worker.h"
 
 extern void cudaInit(struct CUDAWorker *cudaWorker,int phase) ;
@@ -28,9 +29,9 @@
     goto meta(cudaWorker->context, worker->taskReceive);
 }
 
-__code taskReceiveCUDAWorker(struct Worker* worker,struct Queue* queue) {
+__code taskReceiveCUDAWorker(struct Worker* worker) {
     queue->queue = (union Data*)worker->tasks;
-    queue->next = C_getTaskCUDA;
+    queue->next = C_getTaskCUDAWorker;
     goto meta(context, worker->tasks->take);
 }
 
@@ -38,7 +39,7 @@
     goto taskReceiveCUDAWorker(context, &Gearef(context, Worker)->worker->Worker, Gearef(context, Queue));
 }
 
-__code getTaskCUDA(struct Worker* worker, struct Context* task) {
+__code getTaskCUDAWorker(struct Worker* worker, struct Context* task) {
     if (!task)
         goto meta(context, worker->shutdown); // end thread
     task->worker = worker;
@@ -51,10 +52,10 @@
     goto meta(task, taskCg);
 }
 
-__code getTaskCUDA_stub(struct Context* context) {
+__code getTaskCUDAWorker_stub(struct Context* context) {
     Worker* worker = &Gearef(context,Worker)->worker->Worker;
     struct Context* task = &Gearef(context, Queue)->data->Context;
-    goto getTaskCUDA(context, worker, task);
+    goto getTaskCUDAWorker(context, worker, task);
 }
 
 __code iterateCommitCUDA(struct Iterator* iterator) {
--- a/src/parallel_execution/SingleLinkedQueue.cbc	Tue Dec 26 15:19:42 2017 +0900
+++ b/src/parallel_execution/SingleLinkedQueue.cbc	Thu Dec 28 11:55:59 2017 +0900
@@ -1,6 +1,6 @@
 #include "../context.h"
+#include <stdio.h>
 #interface "Queue.h"
-#include <stdio.h>
 
 Queue* createSingleLinkedQueue(struct Context* context) {
     struct Queue* queue = new Queue();
--- a/src/parallel_execution/Worker.h	Tue Dec 26 15:19:42 2017 +0900
+++ b/src/parallel_execution/Worker.h	Thu Dec 28 11:55:59 2017 +0900
@@ -1,7 +1,7 @@
 typedef struct Worker<Impl>{
     union Data* worker;
-    __code taskReseive(struct Worker* worker,struct Queue* queue);
+    __code taskReceive(Impl* worker, struct Queue* tasks);
     __code shutdown(Impl* worker);
     __code next(...);
-    struct Queue* queue;
+    struct Queue* tasks;
 } Worker;
--- a/src/parallel_execution/context.h	Tue Dec 26 15:19:42 2017 +0900
+++ b/src/parallel_execution/context.h	Thu Dec 28 11:55:59 2017 +0900
@@ -183,12 +183,14 @@
         struct Queue* tasks;
         pthread_t thread;
         struct TaskManager* taskManager;
+        struct Context* context;
     } Worker;
     struct CPUWorker {
         pthread_mutex_t mutex;
         pthread_cond_t cond;
         struct Context* context;
         int id;
+        int loopCounter;
     } CPUWorker;
 #ifdef USE_CUDAWorker
     struct CUDAWorker {