Mercurial > hg > Gears > GearsAgda
changeset 473:71b634a5ed65
Merge
author | Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 28 Dec 2017 11:55:59 +0900 |
parents | a4d94c591246 |
children | b92898d3a630 |
files | src/parallel_execution/CPUWorker.cbc src/parallel_execution/CUDAWorker.cbc src/parallel_execution/SingleLinkedQueue.cbc src/parallel_execution/Worker.h src/parallel_execution/context.h |
diffstat | 5 files changed, 79 insertions(+), 95 deletions(-) [+] |
line wrap: on
line diff
--- a/src/parallel_execution/CPUWorker.cbc Tue Dec 26 15:19:42 2017 +0900 +++ b/src/parallel_execution/CPUWorker.cbc Thu Dec 28 11:55:59 2017 +0900 @@ -1,5 +1,9 @@ #include "../context.h" #interface "TaskManager.h" +#interface "Worker.h" +#interface "Iterator.h" +#interface "Queue.h" + static void startWorker(Worker* worker); Worker* createCPUWorker(struct Context* context, int id, Queue* queue) { @@ -8,138 +12,115 @@ worker->worker = (union Data*)cpuWorker; worker->tasks = queue; cpuWorker->id = id; - worker->taskReceive = C_taskReceiveWorker; - worker->shutdown = C_shutdownWorker; + cpuWorker->loopCounter = 0; + worker->taskReceive = C_taskReceiveCPUWorker; + worker->shutdown = C_shutdownCPUWorker; pthread_create(&worker->thread, NULL, (void*)&startWorker, worker); return worker; } -static void startWorker(Worker* worker) { +static void startWorker(struct Worker* worker) { CPUWorker* cpuWorker = (CPUWorker*)worker->worker; cpuWorker->context = NEW(struct Context); initContext(cpuWorker->context); Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker; + Gearef(cpuWorker->context, Worker)->tasks = worker->tasks; goto meta(cpuWorker->context, worker->taskReceive); } -__code taskReceiveWorker(struct Worker* worker,struct Queue* queue) { - queue->queue = (union Data*)worker->tasks; - queue->next = C_getTask; - goto meta(context, worker->tasks->take); -} - -__code taskReceiveWorker_stub(struct Context* context) { - goto taskReceiveWorker(context, &Gearef(context, Worker)->worker->Worker, Gearef(context, Queue)); +__code taskReceiveCPUWorker(struct CPUWorker* worker, struct Queue* tasks) { + goto tasks->take(getTaskCPUWorker); } -__code getTask(struct Worker* worker, struct Context* task) { - if (!task) - goto meta(context, worker->shutdown); // end thread +__code getTaskCPUWorker(struct CPUWorker* cpuWorker, struct Context* task, struct Worker* worker) { + if (!task) { + goto worker->shutdown(); // end thread + } task->worker = worker; enum Code taskCg = task->next; - if (task->iterate) { - task->next = C_iterateCommit; - } else { - task->next = C_odgCommit; // set CG after task exec - } - goto meta(task, taskCg); + task->next = C_odgCommitCPUWorker; // set CG after task exec + goto meta(task, taskCg); // switch task context } -__code getTask_stub(struct Context* context) { +__code getTaskCPUWorker_stub(struct Context* context) { + CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker); Worker* worker = &Gearef(context,Worker)->worker->Worker; struct Context* task = &Gearef(context, Queue)->data->Context; - goto getTask(context, worker, task); + goto getTaskCPUWorker(context, cpuWorker, task, worker); +} + +__code iterateCommitCPUWorker(struct CPUWorker* worker) { + struct Iterator* iterator = context->iterator; + goto iterator->barrier(context, odgCommitCPUWorker, iterateCommitCPUWorker1); } -__code iterateCommit(struct Iterator* iterator) { - iterator->iterator = (union Data*)context->iterator; - iterator->task = context; - iterator->next = C_odgCommit; - iterator->whenWait = C_iterateCommit1; - goto meta(context, context->iterator->barrier); +__code iterateCommitCPUWorker1(struct CPUWorker* worker, struct Context* task) { + struct Worker* taskWorker = task->worker; + goto taskWorker->taskReceive(taskWorker->tasks); } -__code iterateCommit1(struct Context* task) { - goto meta(context, C_taskReceiveWorker); -} - -__code iterateCommit1_stub(struct Context* context) { +__code iterateCommitCPUWorker1_stub(struct Context* context) { // switch worker context struct Context* workerContext = context->worker->worker->CPUWorker.context; - goto iterateCommit1(workerContext, context); + CPUWorker* cpuWorker = &context->worker->worker->CPUWorker; + goto iterateCommitCPUWorker1(workerContext, + cpuWorker, + context); } -__code odgCommit(struct LoopCounter* loopCounter, struct Context* task, struct TaskManager* taskManager) { - int i = loopCounter->i ; +__code odgCommitCPUWorker(struct CPUWorker* worker, struct Context* task) { + int i = worker->loopCounter; if (task->odg+i < task->maxOdg) { - goto meta(task, C_odgCommit1); + goto odgCommitCPUWorker1(); } - loopCounter->i = 0; - taskManager->taskManager = (union Data*)task->taskManager; - taskManager->next = C_taskReceiveWorker; - goto meta(context, task->taskManager->decrementTaskCount); + worker->loopCounter = 0; + struct TaskManager* taskManager = task->taskManager; + goto taskManager->decrementTaskCount(taskReceiveCPUWorker); } -__code odgCommit_stub(struct Context* context) { +__code odgCommitCPUWorker_stub(struct Context* context) { // switch worker context struct Context* workerContext = context->worker->worker->CPUWorker.context; - goto odgCommit(workerContext, - Gearef(context, LoopCounter), - context, - Gearef(workerContext, TaskManager)); -} - -__code odgCommit1(struct LoopCounter* loopCounter, struct Queue* queue) { - int i = loopCounter->i ; - queue->queue = (union Data*)GET_WAIT_LIST(context->data[context->odg+i]); - queue->whenEmpty = C_odgCommit4; - queue->next = C_odgCommit2; - goto meta(context, queue->queue->Queue.isEmpty); + CPUWorker* cpuWorker = &context->worker->worker->CPUWorker; + goto odgCommitCPUWorker(workerContext, + cpuWorker, + context); } -__code odgCommit1_stub(struct Context* context) { - goto odgCommit1(context, - Gearef(context, LoopCounter), - Gearef(context, Queue)); +__code odgCommitCPUWorker1(struct CPUWorker* worker) { + int i = worker->loopCounter; + struct Queue* queue = GET_WAIT_LIST(context->data[context->odg+i]); + goto queue->isEmpty(odgCommitCPUWorker2, odgCommitCPUWorker4); } -__code odgCommit2(struct Queue* queue) { - queue->next = C_odgCommit3; - goto meta(context, queue->queue->Queue.take); +__code odgCommitCPUWorker2(struct CPUWorker* worker) { + int i = worker->loopCounter; + struct Queue* queue = GET_WAIT_LIST(context->data[context->odg+i]); + goto queue->take(odgCommitCPUWorker3); } -__code odgCommit2_stub(struct Context* context) { - goto odgCommit2(context, - Gearef(context, Queue)); +__code odgCommitCPUWorker3(struct CPUWorker* worker, struct Context* task) { + if (__sync_fetch_and_sub(&task->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of task->idgCount point) + struct TaskManager* taskManager = task->taskManager; + taskManager->task = task; + goto taskManager->spawn(task, odgCommitCPUWorker1); + } + goto odgCommitCPUWorker1(); } -__code odgCommit3(struct TaskManager* taskManager, struct Context* task) { - if (__sync_fetch_and_sub(&task->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of task->idgCount point) - taskManager->taskManager = (union Data*)task->taskManager; - taskManager->task = task; - taskManager->next = C_odgCommit1; - goto meta(context, task->taskManager->spawn); - } - goto meta(context, C_odgCommit1); -} - -__code odgCommit3_stub(struct Context* context) { +__code odgCommitCPUWorker3_stub(struct Context* context) { + CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker); struct Context* task = &Gearef(context, Queue)->data->Context; - goto odgCommit3(context, - Gearef(context, TaskManager), + goto odgCommitCPUWorker3(context, + cpuWorker, task); } -__code odgCommit4(struct LoopCounter* loopCounter) { - loopCounter->i++; - goto meta(context, C_odgCommit); +__code odgCommitCPUWorker4(struct CPUWorker* worker) { + worker->loopCounter++; + goto odgCommitCPUWorker(); } -__code odgCommit4_stub(struct Context* context) { - goto odgCommit4(context, - Gearef(context, LoopCounter)); +__code shutdownCPUWorker(struct CPUWorker* worker) { + goto exit_code(); } - -__code shutdownWorker(struct CPUWorker* worker) { - goto meta(context, C_exit_code); -}
--- a/src/parallel_execution/CUDAWorker.cbc Tue Dec 26 15:19:42 2017 +0900 +++ b/src/parallel_execution/CUDAWorker.cbc Thu Dec 28 11:55:59 2017 +0900 @@ -1,4 +1,5 @@ #include "../context.h" +#interface "TaskManager.h" #interface "Worker.h" extern void cudaInit(struct CUDAWorker *cudaWorker,int phase) ; @@ -28,9 +29,9 @@ goto meta(cudaWorker->context, worker->taskReceive); } -__code taskReceiveCUDAWorker(struct Worker* worker,struct Queue* queue) { +__code taskReceiveCUDAWorker(struct Worker* worker) { queue->queue = (union Data*)worker->tasks; - queue->next = C_getTaskCUDA; + queue->next = C_getTaskCUDAWorker; goto meta(context, worker->tasks->take); } @@ -38,7 +39,7 @@ goto taskReceiveCUDAWorker(context, &Gearef(context, Worker)->worker->Worker, Gearef(context, Queue)); } -__code getTaskCUDA(struct Worker* worker, struct Context* task) { +__code getTaskCUDAWorker(struct Worker* worker, struct Context* task) { if (!task) goto meta(context, worker->shutdown); // end thread task->worker = worker; @@ -51,10 +52,10 @@ goto meta(task, taskCg); } -__code getTaskCUDA_stub(struct Context* context) { +__code getTaskCUDAWorker_stub(struct Context* context) { Worker* worker = &Gearef(context,Worker)->worker->Worker; struct Context* task = &Gearef(context, Queue)->data->Context; - goto getTaskCUDA(context, worker, task); + goto getTaskCUDAWorker(context, worker, task); } __code iterateCommitCUDA(struct Iterator* iterator) {
--- a/src/parallel_execution/SingleLinkedQueue.cbc Tue Dec 26 15:19:42 2017 +0900 +++ b/src/parallel_execution/SingleLinkedQueue.cbc Thu Dec 28 11:55:59 2017 +0900 @@ -1,6 +1,6 @@ #include "../context.h" +#include <stdio.h> #interface "Queue.h" -#include <stdio.h> Queue* createSingleLinkedQueue(struct Context* context) { struct Queue* queue = new Queue();
--- a/src/parallel_execution/Worker.h Tue Dec 26 15:19:42 2017 +0900 +++ b/src/parallel_execution/Worker.h Thu Dec 28 11:55:59 2017 +0900 @@ -1,7 +1,7 @@ typedef struct Worker<Impl>{ union Data* worker; - __code taskReseive(struct Worker* worker,struct Queue* queue); + __code taskReceive(Impl* worker, struct Queue* tasks); __code shutdown(Impl* worker); __code next(...); - struct Queue* queue; + struct Queue* tasks; } Worker;
--- a/src/parallel_execution/context.h Tue Dec 26 15:19:42 2017 +0900 +++ b/src/parallel_execution/context.h Thu Dec 28 11:55:59 2017 +0900 @@ -183,12 +183,14 @@ struct Queue* tasks; pthread_t thread; struct TaskManager* taskManager; + struct Context* context; } Worker; struct CPUWorker { pthread_mutex_t mutex; pthread_cond_t cond; struct Context* context; int id; + int loopCounter; } CPUWorker; #ifdef USE_CUDAWorker struct CUDAWorker {