Mercurial > hg > Members > Moririn
changeset 474:b92898d3a630
Refactoring CPUWorker.cbc
author | Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 28 Dec 2017 12:58:25 +0900 |
parents | 71b634a5ed65 |
children | fae47dc256b6 |
files | src/parallel_execution/CPUWorker.cbc src/parallel_execution/MultiDimIterator.cbc src/parallel_execution/Worker.h src/parallel_execution/context.h |
diffstat | 4 files changed, 51 insertions(+), 61 deletions(-) [+] |
line wrap: on
line diff
--- a/src/parallel_execution/CPUWorker.cbc Thu Dec 28 11:55:59 2017 +0900 +++ b/src/parallel_execution/CPUWorker.cbc Thu Dec 28 12:58:25 2017 +0900 @@ -33,12 +33,11 @@ } __code getTaskCPUWorker(struct CPUWorker* cpuWorker, struct Context* task, struct Worker* worker) { - if (!task) { - goto worker->shutdown(); // end thread + if (!task) { goto worker->shutdown(); // end thread } task->worker = worker; enum Code taskCg = task->next; - task->next = C_odgCommitCPUWorker; // set CG after task exec + task->next = C_odgCommitCPUWorker; // commit outputDG after task exec goto meta(task, taskCg); // switch task context } @@ -49,76 +48,74 @@ goto getTaskCPUWorker(context, cpuWorker, task, worker); } -__code iterateCommitCPUWorker(struct CPUWorker* worker) { - struct Iterator* iterator = context->iterator; - goto iterator->barrier(context, odgCommitCPUWorker, iterateCommitCPUWorker1); -} - -__code iterateCommitCPUWorker1(struct CPUWorker* worker, struct Context* task) { - struct Worker* taskWorker = task->worker; - goto taskWorker->taskReceive(taskWorker->tasks); -} - -__code iterateCommitCPUWorker1_stub(struct Context* context) { - // switch worker context - struct Context* workerContext = context->worker->worker->CPUWorker.context; - CPUWorker* cpuWorker = &context->worker->worker->CPUWorker; - goto iterateCommitCPUWorker1(workerContext, - cpuWorker, - context); -} - __code odgCommitCPUWorker(struct CPUWorker* worker, struct Context* task) { - int i = worker->loopCounter; - if (task->odg+i < task->maxOdg) { + if (task->iterate) { + struct Iterator* iterator = task->iterator; + goto iterator->barrier(task, odgCommitCPUWorker1, odgCommitCPUWorker6); + } else { goto odgCommitCPUWorker1(); } - worker->loopCounter = 0; - struct TaskManager* taskManager = task->taskManager; - goto taskManager->decrementTaskCount(taskReceiveCPUWorker); } __code odgCommitCPUWorker_stub(struct Context* context) { // switch worker context struct Context* workerContext = context->worker->worker->CPUWorker.context; - CPUWorker* cpuWorker = &context->worker->worker->CPUWorker; + Gearef(workerContext, Worker)->worker = (union Data*)context->worker; + Gearef(workerContext, Worker)->task = context; + CPUWorker* cpuWorker = (CPUWorker*)GearImpl(workerContext, Worker, worker); goto odgCommitCPUWorker(workerContext, - cpuWorker, - context); + cpuWorker, + context); } -__code odgCommitCPUWorker1(struct CPUWorker* worker) { +__code odgCommitCPUWorker1(struct CPUWorker* worker, struct Context* task) { int i = worker->loopCounter; - struct Queue* queue = GET_WAIT_LIST(context->data[context->odg+i]); - goto queue->isEmpty(odgCommitCPUWorker2, odgCommitCPUWorker4); + if (task->odg+i < task->maxOdg) { + goto odgCommitCPUWorker2(); + } + worker->loopCounter = 0; + struct TaskManager* taskManager = task->taskManager; + goto taskManager->decrementTaskCount(odgCommitCPUWorker6); } -__code odgCommitCPUWorker2(struct CPUWorker* worker) { +__code odgCommitCPUWorker2(struct CPUWorker* worker, struct Context* task) { int i = worker->loopCounter; - struct Queue* queue = GET_WAIT_LIST(context->data[context->odg+i]); - goto queue->take(odgCommitCPUWorker3); + struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]); + goto queue->isEmpty(odgCommitCPUWorker3, odgCommitCPUWorker5); } __code odgCommitCPUWorker3(struct CPUWorker* worker, struct Context* task) { - if (__sync_fetch_and_sub(&task->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of task->idgCount point) - struct TaskManager* taskManager = task->taskManager; - taskManager->task = task; - goto taskManager->spawn(task, odgCommitCPUWorker1); + int i = worker->loopCounter; + struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]); + goto queue->take(odgCommitCPUWorker4); +} + +__code odgCommitCPUWorker4(struct CPUWorker* worker, struct Context* task, struct Context* waitTask) { + if (__sync_fetch_and_sub(&waitTask->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of waitTask->idgCount point) + struct TaskManager* taskManager = waitTask->taskManager; + goto taskManager->spawn(waitTask, odgCommitCPUWorker2); } + goto odgCommitCPUWorker2(); +} + +__code odgCommitCPUWorker4_stub(struct Context* context) { + CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker); + struct Context* task = Gearef(context, Worker)->task; + struct Context* waitTask = &Gearef(context, Queue)->data->Context; + goto odgCommitCPUWorker4(context, + cpuWorker, + task, + waitTask); +} + +__code odgCommitCPUWorker5(struct CPUWorker* worker, struct Context* task) { + worker->loopCounter++; goto odgCommitCPUWorker1(); } -__code odgCommitCPUWorker3_stub(struct Context* context) { - CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker); - struct Context* task = &Gearef(context, Queue)->data->Context; - goto odgCommitCPUWorker3(context, - cpuWorker, - task); -} - -__code odgCommitCPUWorker4(struct CPUWorker* worker) { - worker->loopCounter++; - goto odgCommitCPUWorker(); +__code odgCommitCPUWorker6(struct CPUWorker* worker, struct Context* task) { + struct Worker* taskWorker = task->worker; + goto taskWorker->taskReceive(taskWorker->tasks); } __code shutdownCPUWorker(struct CPUWorker* worker) {
--- a/src/parallel_execution/MultiDimIterator.cbc Thu Dec 28 11:55:59 2017 +0900 +++ b/src/parallel_execution/MultiDimIterator.cbc Thu Dec 28 12:58:25 2017 +0900 @@ -109,11 +109,3 @@ } goto whenWait(...); } - -__code barrierMultiDimIterator_stub(struct Context* context) { - MultiDimIterator* iterator = (MultiDimIterator*)GearImpl(context, Iterator, iterator); - Context* task = Gearef(context, Iterator)->task; - enum Code next = Gearef(context, Iterator)->next; - enum Code whenWait = Gearef(context, Iterator)->whenWait; - goto barrierMultiDimIterator(context, iterator, task, next, whenWait); -}
--- a/src/parallel_execution/Worker.h Thu Dec 28 11:55:59 2017 +0900 +++ b/src/parallel_execution/Worker.h Thu Dec 28 12:58:25 2017 +0900 @@ -1,7 +1,8 @@ typedef struct Worker<Impl>{ union Data* worker; + struct Queue* tasks; + struct Context* task; __code taskReceive(Impl* worker, struct Queue* tasks); __code shutdown(Impl* worker); __code next(...); - struct Queue* tasks; } Worker;
--- a/src/parallel_execution/context.h Thu Dec 28 11:55:59 2017 +0900 +++ b/src/parallel_execution/context.h Thu Dec 28 12:58:25 2017 +0900 @@ -183,7 +183,7 @@ struct Queue* tasks; pthread_t thread; struct TaskManager* taskManager; - struct Context* context; + struct Context* task; } Worker; struct CPUWorker { pthread_mutex_t mutex;