changeset 475:fae47dc256b6

Merge
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Thu, 28 Dec 2017 12:59:11 +0900
parents b92898d3a630 (diff) e5f0cced7d43 (current diff)
children 4b5f9884b777
files src/parallel_execution/context.h
diffstat 7 files changed, 90 insertions(+), 126 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/CPUWorker.cbc	Wed Dec 27 21:17:02 2017 +0900
+++ b/src/parallel_execution/CPUWorker.cbc	Thu Dec 28 12:59:11 2017 +0900
@@ -1,5 +1,9 @@
 #include "../context.h"
 #interface "TaskManager.h"
+#interface "Worker.h"
+#interface "Iterator.h"
+#interface "Queue.h"
+
 static void startWorker(Worker* worker);
 
 Worker* createCPUWorker(struct Context* context, int id, Queue* queue) {
@@ -8,138 +12,112 @@
     worker->worker = (union Data*)cpuWorker;
     worker->tasks = queue;
     cpuWorker->id = id;
-    worker->taskReceive = C_taskReceiveWorker;
-    worker->shutdown = C_shutdownWorker;
+    cpuWorker->loopCounter = 0;
+    worker->taskReceive = C_taskReceiveCPUWorker;
+    worker->shutdown = C_shutdownCPUWorker;
     pthread_create(&worker->thread, NULL, (void*)&startWorker, worker);
     return worker;
 }
 
-static void startWorker(Worker* worker) {
+static void startWorker(struct Worker* worker) {
     CPUWorker* cpuWorker = (CPUWorker*)worker->worker;
     cpuWorker->context = NEW(struct Context);
     initContext(cpuWorker->context);
     Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker;
+    Gearef(cpuWorker->context, Worker)->tasks = worker->tasks;
     goto meta(cpuWorker->context, worker->taskReceive);
 }
 
-__code taskReceiveWorker(struct Worker* worker,struct Queue* queue) {
-    queue->queue = (union Data*)worker->tasks;
-    queue->next = C_getTask;
-    goto meta(context, worker->tasks->take);
-}
-
-__code taskReceiveWorker_stub(struct Context* context) {
-    goto taskReceiveWorker(context, &Gearef(context, Worker)->worker->Worker, Gearef(context, Queue));
+__code taskReceiveCPUWorker(struct CPUWorker* worker, struct Queue* tasks) {
+    goto tasks->take(getTaskCPUWorker);
 }
 
-__code getTask(struct Worker* worker, struct Context* task) {
-    if (!task)
-        goto meta(context, worker->shutdown); // end thread
+__code getTaskCPUWorker(struct CPUWorker* cpuWorker, struct Context* task, struct Worker* worker) {
+    if (!task) { goto worker->shutdown(); // end thread
+    }
     task->worker = worker;
     enum Code taskCg = task->next;
-    if (task->iterate) {
-        task->next = C_iterateCommit;
-    } else {
-        task->next = C_odgCommit; // set CG after task exec
-    }
-    goto meta(task, taskCg);
-}
-
-__code getTask_stub(struct Context* context) {
-    Worker* worker = &Gearef(context,Worker)->worker->Worker;
-    struct Context* task = &Gearef(context, Queue)->data->Context;
-    goto getTask(context, worker, task);
+    task->next = C_odgCommitCPUWorker; // commit outputDG after task exec
+    goto meta(task, taskCg); // switch task context
 }
 
-__code iterateCommit(struct Iterator* iterator) {
-    iterator->iterator = (union Data*)context->iterator;
-    iterator->task = context;
-    iterator->next = C_odgCommit;
-    iterator->whenWait = C_iterateCommit1;
-    goto meta(context, context->iterator->barrier);
-}
-
-__code iterateCommit1(struct Context* task) {
-    goto meta(context, C_taskReceiveWorker);
+__code getTaskCPUWorker_stub(struct Context* context) {
+    CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker);
+    Worker* worker = &Gearef(context,Worker)->worker->Worker;
+    struct Context* task = &Gearef(context, Queue)->data->Context;
+    goto getTaskCPUWorker(context, cpuWorker, task, worker);
 }
 
-__code iterateCommit1_stub(struct Context* context) {
-    // switch worker context
-    struct Context* workerContext = context->worker->worker->CPUWorker.context;
-    goto iterateCommit1(workerContext, context);
+__code odgCommitCPUWorker(struct CPUWorker* worker, struct Context* task) {
+    if (task->iterate) {
+        struct Iterator* iterator = task->iterator;
+        goto iterator->barrier(task, odgCommitCPUWorker1, odgCommitCPUWorker6);
+    } else {
+        goto odgCommitCPUWorker1();
+    }
 }
 
-__code odgCommit(struct LoopCounter* loopCounter, struct Context* task, struct TaskManager* taskManager) {
-    int i = loopCounter->i ;
-    if (task->odg+i < task->maxOdg) {
-        goto meta(task, C_odgCommit1);
-    }
-    loopCounter->i = 0;
-    taskManager->taskManager = (union Data*)task->taskManager;
-    taskManager->next = C_taskReceiveWorker;
-    goto meta(context, task->taskManager->decrementTaskCount);
-}
-
-__code odgCommit_stub(struct Context* context) {
+__code odgCommitCPUWorker_stub(struct Context* context) {
     // switch worker context
     struct Context* workerContext = context->worker->worker->CPUWorker.context;
-    goto odgCommit(workerContext,
-                   Gearef(context, LoopCounter),
-                   context,
-                   Gearef(workerContext, TaskManager));
-}
-
-__code odgCommit1(struct LoopCounter* loopCounter, struct Queue* queue) {
-    int i = loopCounter->i ;
-    queue->queue = (union Data*)GET_WAIT_LIST(context->data[context->odg+i]);
-    queue->whenEmpty = C_odgCommit4;
-    queue->next = C_odgCommit2;
-    goto meta(context, queue->queue->Queue.isEmpty);
+    Gearef(workerContext, Worker)->worker = (union Data*)context->worker;
+    Gearef(workerContext, Worker)->task = context;
+    CPUWorker* cpuWorker = (CPUWorker*)GearImpl(workerContext, Worker, worker);
+    goto odgCommitCPUWorker(workerContext,
+                            cpuWorker,
+                            context);
 }
 
-__code odgCommit1_stub(struct Context* context) {
-    goto odgCommit1(context,
-                   Gearef(context, LoopCounter),
-                   Gearef(context, Queue));
+__code odgCommitCPUWorker1(struct CPUWorker* worker, struct Context* task) {
+    int i = worker->loopCounter;
+    if (task->odg+i < task->maxOdg) {
+        goto odgCommitCPUWorker2();
+    }
+    worker->loopCounter = 0;
+    struct TaskManager* taskManager = task->taskManager;
+    goto taskManager->decrementTaskCount(odgCommitCPUWorker6);
 }
 
-__code odgCommit2(struct Queue* queue) {
-    queue->next = C_odgCommit3;
-    goto meta(context, queue->queue->Queue.take);
+__code odgCommitCPUWorker2(struct CPUWorker* worker, struct Context* task) {
+    int i = worker->loopCounter;
+    struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
+    goto queue->isEmpty(odgCommitCPUWorker3, odgCommitCPUWorker5);
 }
 
-__code odgCommit2_stub(struct Context* context) {
-    goto odgCommit2(context,
-                   Gearef(context, Queue));
+__code odgCommitCPUWorker3(struct CPUWorker* worker, struct Context* task) {
+    int i = worker->loopCounter;
+    struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
+    goto queue->take(odgCommitCPUWorker4);
 }
 
-__code odgCommit3(struct TaskManager* taskManager, struct Context* task) {
-    if (__sync_fetch_and_sub(&task->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of task->idgCount point)
-        taskManager->taskManager = (union Data*)task->taskManager;
-        taskManager->task = task;
-        taskManager->next = C_odgCommit1;
-        goto meta(context, task->taskManager->spawn);
+__code odgCommitCPUWorker4(struct CPUWorker* worker, struct Context* task, struct Context* waitTask) {
+    if (__sync_fetch_and_sub(&waitTask->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of waitTask->idgCount point)
+        struct TaskManager* taskManager = waitTask->taskManager;
+        goto taskManager->spawn(waitTask, odgCommitCPUWorker2);
     }
-    goto meta(context, C_odgCommit1);
+    goto odgCommitCPUWorker2();
 }
 
-__code odgCommit3_stub(struct Context* context) {
-    struct Context* task = &Gearef(context, Queue)->data->Context;
-    goto odgCommit3(context,
-            Gearef(context, TaskManager),
-            task);
+__code odgCommitCPUWorker4_stub(struct Context* context) {
+    CPUWorker* cpuWorker     = (CPUWorker*)GearImpl(context, Worker, worker);
+    struct Context* task     = Gearef(context, Worker)->task;
+    struct Context* waitTask = &Gearef(context, Queue)->data->Context;
+    goto odgCommitCPUWorker4(context,
+                             cpuWorker,
+                             task,
+                             waitTask);
 }
 
-__code odgCommit4(struct LoopCounter* loopCounter) {
-    loopCounter->i++;
-    goto meta(context, C_odgCommit);
+__code odgCommitCPUWorker5(struct CPUWorker* worker, struct Context* task) {
+    worker->loopCounter++;
+    goto odgCommitCPUWorker1();
 }
 
-__code odgCommit4_stub(struct Context* context) {
-    goto odgCommit4(context,
-            Gearef(context, LoopCounter));
+__code odgCommitCPUWorker6(struct CPUWorker* worker, struct Context* task) {
+    struct Worker* taskWorker = task->worker;
+    goto taskWorker->taskReceive(taskWorker->tasks);
 }
 
-__code shutdownWorker(struct CPUWorker* worker) {
-    goto meta(context, C_exit_code);
+__code shutdownCPUWorker(struct CPUWorker* worker) {
+    goto exit_code();
 }
--- a/src/parallel_execution/CUDAWorker.cbc	Wed Dec 27 21:17:02 2017 +0900
+++ b/src/parallel_execution/CUDAWorker.cbc	Thu Dec 28 12:59:11 2017 +0900
@@ -1,4 +1,5 @@
 #include "../context.h"
+#interface "TaskManager.h"
 #interface "Worker.h"
 
 extern void cudaInit(struct CUDAWorker *cudaWorker,int phase) ;
@@ -28,9 +29,9 @@
     goto meta(cudaWorker->context, worker->taskReceive);
 }
 
-__code taskReceiveCUDAWorker(struct Worker* worker,struct Queue* queue) {
+__code taskReceiveCUDAWorker(struct Worker* worker) {
     queue->queue = (union Data*)worker->tasks;
-    queue->next = C_getTaskCUDA;
+    queue->next = C_getTaskCUDAWorker;
     goto meta(context, worker->tasks->take);
 }
 
@@ -38,7 +39,7 @@
     goto taskReceiveCUDAWorker(context, &Gearef(context, Worker)->worker->Worker, Gearef(context, Queue));
 }
 
-__code getTaskCUDA(struct Worker* worker, struct Context* task) {
+__code getTaskCUDAWorker(struct Worker* worker, struct Context* task) {
     if (!task)
         goto meta(context, worker->shutdown); // end thread
     task->worker = worker;
@@ -51,10 +52,10 @@
     goto meta(task, taskCg);
 }
 
-__code getTaskCUDA_stub(struct Context* context) {
+__code getTaskCUDAWorker_stub(struct Context* context) {
     Worker* worker = &Gearef(context,Worker)->worker->Worker;
     struct Context* task = &Gearef(context, Queue)->data->Context;
-    goto getTaskCUDA(context, worker, task);
+    goto getTaskCUDAWorker(context, worker, task);
 }
 
 __code iterateCommitCUDA(struct Iterator* iterator) {
--- a/src/parallel_execution/MultiDimIterator.cbc	Wed Dec 27 21:17:02 2017 +0900
+++ b/src/parallel_execution/MultiDimIterator.cbc	Thu Dec 28 12:59:11 2017 +0900
@@ -109,11 +109,3 @@
     }
     goto whenWait(...);
 }
-
-__code barrierMultiDimIterator_stub(struct Context* context) {
-    MultiDimIterator* iterator = (MultiDimIterator*)GearImpl(context, Iterator, iterator);
-    Context* task = Gearef(context, Iterator)->task;
-    enum Code next = Gearef(context, Iterator)->next;
-    enum Code whenWait = Gearef(context, Iterator)->whenWait;
-    goto barrierMultiDimIterator(context, iterator, task, next, whenWait);
-}
--- a/src/parallel_execution/SingleLinkedQueue.cbc	Wed Dec 27 21:17:02 2017 +0900
+++ b/src/parallel_execution/SingleLinkedQueue.cbc	Thu Dec 28 12:59:11 2017 +0900
@@ -1,6 +1,6 @@
 #include "../context.h"
+#include <stdio.h>
 #interface "Queue.h"
-#include <stdio.h>
 
 Queue* createSingleLinkedQueue(struct Context* context) {
     struct Queue* queue = new Queue();
--- a/src/parallel_execution/TaskManagerImpl.cbc	Wed Dec 27 21:17:02 2017 +0900
+++ b/src/parallel_execution/TaskManagerImpl.cbc	Thu Dec 28 12:59:11 2017 +0900
@@ -103,15 +103,13 @@
 }
 
 __code spawnTasksTaskManagerImpl5(struct TaskManagerImpl* taskManagerImpl, struct Context* task, struct TaskManager* taskManager) {
-    taskManager->task = task;
-    taskManager->next = C_spawnTasksTaskManagerImpl3;
-    goto meta(context, C_spawnTaskManagerImpl);
+    goto taskManager->spawn(task, spawnTasksTaskManagerImpl3);
 }
 
 __code spawnTasksTaskManagerImpl5_stub(struct Context* context) {
     TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
     Context* task = (struct Context*)Gearef(context, Queue)->data;
-    TaskManager* taskManager = Gearef(context, TaskManager);
+    TaskManager* taskManager = &Gearef(context, TaskManager)->taskManager->TaskManager;
     goto spawnTasksTaskManagerImpl5(context, taskManagerImpl, task, taskManager);
 }
 
@@ -154,7 +152,7 @@
             struct Iterator* iterator = task->iterator;
             goto iterator->exec(task, taskManager->cpu - taskManager->gpu, next(...));
         }
-        goto meta(context, C_taskSend);
+        goto taskSend();
     }
     pthread_mutex_unlock(&taskManager->mutex);
     goto next(...);
@@ -178,18 +176,10 @@
     goto queue->put(task, next(...));
 }
 
-__code taskSend_stub(struct Context* context) {
-    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto taskSend(context,
-                  taskManager,
-                  Gearef(context, TaskManager)->task,
-                  Gearef(context, TaskManager)->next);
-}
-
 __code shutdownTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) {
     if (taskManager->taskCount != 0) {
         usleep(1000);
-        goto meta(context, C_shutdownTaskManagerImpl);
+        goto shutdownTaskManagerImpl();
     }
     int i = taskManager->loopCounter->i;
     if (i < taskManager->numWorker) {
@@ -198,12 +188,12 @@
     }
 
     taskManager->loopCounter->i = 0;
-    goto meta(context, next);
+    goto next(...);
 }
 
 __code shutdownTaskManagerImpl1(struct TaskManagerImpl* taskManager) {
     int i = taskManager->loopCounter->i;
     pthread_join(taskManager->workers[i]->thread, NULL);
     taskManager->loopCounter->i++;
-    goto meta(context, C_shutdownTaskManagerImpl);
+    goto shutdownTaskManagerImpl();
 }
--- a/src/parallel_execution/Worker.h	Wed Dec 27 21:17:02 2017 +0900
+++ b/src/parallel_execution/Worker.h	Thu Dec 28 12:59:11 2017 +0900
@@ -1,7 +1,8 @@
 typedef struct Worker<Impl>{
     union Data* worker;
-    __code taskReseive(struct Worker* worker,struct Queue* queue);
+    struct Queue* tasks;
+    struct Context* task;
+    __code taskReceive(Impl* worker, struct Queue* tasks);
     __code shutdown(Impl* worker);
     __code next(...);
-    struct Queue* queue;
 } Worker;
--- a/src/parallel_execution/context.h	Wed Dec 27 21:17:02 2017 +0900
+++ b/src/parallel_execution/context.h	Thu Dec 28 12:59:11 2017 +0900
@@ -183,12 +183,14 @@
         struct Queue* tasks;
         pthread_t thread;
         struct TaskManager* taskManager;
+        struct Context* task;
     } Worker;
     struct CPUWorker {
         pthread_mutex_t mutex;
         pthread_cond_t cond;
         struct Context* context;
         int id;
+        int loopCounter;
     } CPUWorker;
 #ifdef USE_CUDAWorker
     struct CUDAWorker {