Mercurial > hg > Members > Moririn
changeset 475:fae47dc256b6
Merge
author | Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 28 Dec 2017 12:59:11 +0900 |
parents | b92898d3a630 (diff) e5f0cced7d43 (current diff) |
children | 4b5f9884b777 |
files | src/parallel_execution/context.h |
diffstat | 7 files changed, 90 insertions(+), 126 deletions(-) [+] |
line wrap: on
line diff
--- a/src/parallel_execution/CPUWorker.cbc Wed Dec 27 21:17:02 2017 +0900 +++ b/src/parallel_execution/CPUWorker.cbc Thu Dec 28 12:59:11 2017 +0900 @@ -1,5 +1,9 @@ #include "../context.h" #interface "TaskManager.h" +#interface "Worker.h" +#interface "Iterator.h" +#interface "Queue.h" + static void startWorker(Worker* worker); Worker* createCPUWorker(struct Context* context, int id, Queue* queue) { @@ -8,138 +12,112 @@ worker->worker = (union Data*)cpuWorker; worker->tasks = queue; cpuWorker->id = id; - worker->taskReceive = C_taskReceiveWorker; - worker->shutdown = C_shutdownWorker; + cpuWorker->loopCounter = 0; + worker->taskReceive = C_taskReceiveCPUWorker; + worker->shutdown = C_shutdownCPUWorker; pthread_create(&worker->thread, NULL, (void*)&startWorker, worker); return worker; } -static void startWorker(Worker* worker) { +static void startWorker(struct Worker* worker) { CPUWorker* cpuWorker = (CPUWorker*)worker->worker; cpuWorker->context = NEW(struct Context); initContext(cpuWorker->context); Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker; + Gearef(cpuWorker->context, Worker)->tasks = worker->tasks; goto meta(cpuWorker->context, worker->taskReceive); } -__code taskReceiveWorker(struct Worker* worker,struct Queue* queue) { - queue->queue = (union Data*)worker->tasks; - queue->next = C_getTask; - goto meta(context, worker->tasks->take); -} - -__code taskReceiveWorker_stub(struct Context* context) { - goto taskReceiveWorker(context, &Gearef(context, Worker)->worker->Worker, Gearef(context, Queue)); +__code taskReceiveCPUWorker(struct CPUWorker* worker, struct Queue* tasks) { + goto tasks->take(getTaskCPUWorker); } -__code getTask(struct Worker* worker, struct Context* task) { - if (!task) - goto meta(context, worker->shutdown); // end thread +__code getTaskCPUWorker(struct CPUWorker* cpuWorker, struct Context* task, struct Worker* worker) { + if (!task) { goto worker->shutdown(); // end thread + } task->worker = worker; enum Code taskCg = task->next; - if (task->iterate) { - task->next = C_iterateCommit; - } else { - task->next = C_odgCommit; // set CG after task exec - } - goto meta(task, taskCg); -} - -__code getTask_stub(struct Context* context) { - Worker* worker = &Gearef(context,Worker)->worker->Worker; - struct Context* task = &Gearef(context, Queue)->data->Context; - goto getTask(context, worker, task); + task->next = C_odgCommitCPUWorker; // commit outputDG after task exec + goto meta(task, taskCg); // switch task context } -__code iterateCommit(struct Iterator* iterator) { - iterator->iterator = (union Data*)context->iterator; - iterator->task = context; - iterator->next = C_odgCommit; - iterator->whenWait = C_iterateCommit1; - goto meta(context, context->iterator->barrier); -} - -__code iterateCommit1(struct Context* task) { - goto meta(context, C_taskReceiveWorker); +__code getTaskCPUWorker_stub(struct Context* context) { + CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker); + Worker* worker = &Gearef(context,Worker)->worker->Worker; + struct Context* task = &Gearef(context, Queue)->data->Context; + goto getTaskCPUWorker(context, cpuWorker, task, worker); } -__code iterateCommit1_stub(struct Context* context) { - // switch worker context - struct Context* workerContext = context->worker->worker->CPUWorker.context; - goto iterateCommit1(workerContext, context); +__code odgCommitCPUWorker(struct CPUWorker* worker, struct Context* task) { + if (task->iterate) { + struct Iterator* iterator = task->iterator; + goto iterator->barrier(task, odgCommitCPUWorker1, odgCommitCPUWorker6); + } else { + goto odgCommitCPUWorker1(); + } } -__code odgCommit(struct LoopCounter* loopCounter, struct Context* task, struct TaskManager* taskManager) { - int i = loopCounter->i ; - if (task->odg+i < task->maxOdg) { - goto meta(task, C_odgCommit1); - } - loopCounter->i = 0; - taskManager->taskManager = (union Data*)task->taskManager; - taskManager->next = C_taskReceiveWorker; - goto meta(context, task->taskManager->decrementTaskCount); -} - -__code odgCommit_stub(struct Context* context) { +__code odgCommitCPUWorker_stub(struct Context* context) { // switch worker context struct Context* workerContext = context->worker->worker->CPUWorker.context; - goto odgCommit(workerContext, - Gearef(context, LoopCounter), - context, - Gearef(workerContext, TaskManager)); -} - -__code odgCommit1(struct LoopCounter* loopCounter, struct Queue* queue) { - int i = loopCounter->i ; - queue->queue = (union Data*)GET_WAIT_LIST(context->data[context->odg+i]); - queue->whenEmpty = C_odgCommit4; - queue->next = C_odgCommit2; - goto meta(context, queue->queue->Queue.isEmpty); + Gearef(workerContext, Worker)->worker = (union Data*)context->worker; + Gearef(workerContext, Worker)->task = context; + CPUWorker* cpuWorker = (CPUWorker*)GearImpl(workerContext, Worker, worker); + goto odgCommitCPUWorker(workerContext, + cpuWorker, + context); } -__code odgCommit1_stub(struct Context* context) { - goto odgCommit1(context, - Gearef(context, LoopCounter), - Gearef(context, Queue)); +__code odgCommitCPUWorker1(struct CPUWorker* worker, struct Context* task) { + int i = worker->loopCounter; + if (task->odg+i < task->maxOdg) { + goto odgCommitCPUWorker2(); + } + worker->loopCounter = 0; + struct TaskManager* taskManager = task->taskManager; + goto taskManager->decrementTaskCount(odgCommitCPUWorker6); } -__code odgCommit2(struct Queue* queue) { - queue->next = C_odgCommit3; - goto meta(context, queue->queue->Queue.take); +__code odgCommitCPUWorker2(struct CPUWorker* worker, struct Context* task) { + int i = worker->loopCounter; + struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]); + goto queue->isEmpty(odgCommitCPUWorker3, odgCommitCPUWorker5); } -__code odgCommit2_stub(struct Context* context) { - goto odgCommit2(context, - Gearef(context, Queue)); +__code odgCommitCPUWorker3(struct CPUWorker* worker, struct Context* task) { + int i = worker->loopCounter; + struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]); + goto queue->take(odgCommitCPUWorker4); } -__code odgCommit3(struct TaskManager* taskManager, struct Context* task) { - if (__sync_fetch_and_sub(&task->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of task->idgCount point) - taskManager->taskManager = (union Data*)task->taskManager; - taskManager->task = task; - taskManager->next = C_odgCommit1; - goto meta(context, task->taskManager->spawn); +__code odgCommitCPUWorker4(struct CPUWorker* worker, struct Context* task, struct Context* waitTask) { + if (__sync_fetch_and_sub(&waitTask->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of waitTask->idgCount point) + struct TaskManager* taskManager = waitTask->taskManager; + goto taskManager->spawn(waitTask, odgCommitCPUWorker2); } - goto meta(context, C_odgCommit1); + goto odgCommitCPUWorker2(); } -__code odgCommit3_stub(struct Context* context) { - struct Context* task = &Gearef(context, Queue)->data->Context; - goto odgCommit3(context, - Gearef(context, TaskManager), - task); +__code odgCommitCPUWorker4_stub(struct Context* context) { + CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker); + struct Context* task = Gearef(context, Worker)->task; + struct Context* waitTask = &Gearef(context, Queue)->data->Context; + goto odgCommitCPUWorker4(context, + cpuWorker, + task, + waitTask); } -__code odgCommit4(struct LoopCounter* loopCounter) { - loopCounter->i++; - goto meta(context, C_odgCommit); +__code odgCommitCPUWorker5(struct CPUWorker* worker, struct Context* task) { + worker->loopCounter++; + goto odgCommitCPUWorker1(); } -__code odgCommit4_stub(struct Context* context) { - goto odgCommit4(context, - Gearef(context, LoopCounter)); +__code odgCommitCPUWorker6(struct CPUWorker* worker, struct Context* task) { + struct Worker* taskWorker = task->worker; + goto taskWorker->taskReceive(taskWorker->tasks); } -__code shutdownWorker(struct CPUWorker* worker) { - goto meta(context, C_exit_code); +__code shutdownCPUWorker(struct CPUWorker* worker) { + goto exit_code(); }
--- a/src/parallel_execution/CUDAWorker.cbc Wed Dec 27 21:17:02 2017 +0900 +++ b/src/parallel_execution/CUDAWorker.cbc Thu Dec 28 12:59:11 2017 +0900 @@ -1,4 +1,5 @@ #include "../context.h" +#interface "TaskManager.h" #interface "Worker.h" extern void cudaInit(struct CUDAWorker *cudaWorker,int phase) ; @@ -28,9 +29,9 @@ goto meta(cudaWorker->context, worker->taskReceive); } -__code taskReceiveCUDAWorker(struct Worker* worker,struct Queue* queue) { +__code taskReceiveCUDAWorker(struct Worker* worker) { queue->queue = (union Data*)worker->tasks; - queue->next = C_getTaskCUDA; + queue->next = C_getTaskCUDAWorker; goto meta(context, worker->tasks->take); } @@ -38,7 +39,7 @@ goto taskReceiveCUDAWorker(context, &Gearef(context, Worker)->worker->Worker, Gearef(context, Queue)); } -__code getTaskCUDA(struct Worker* worker, struct Context* task) { +__code getTaskCUDAWorker(struct Worker* worker, struct Context* task) { if (!task) goto meta(context, worker->shutdown); // end thread task->worker = worker; @@ -51,10 +52,10 @@ goto meta(task, taskCg); } -__code getTaskCUDA_stub(struct Context* context) { +__code getTaskCUDAWorker_stub(struct Context* context) { Worker* worker = &Gearef(context,Worker)->worker->Worker; struct Context* task = &Gearef(context, Queue)->data->Context; - goto getTaskCUDA(context, worker, task); + goto getTaskCUDAWorker(context, worker, task); } __code iterateCommitCUDA(struct Iterator* iterator) {
--- a/src/parallel_execution/MultiDimIterator.cbc Wed Dec 27 21:17:02 2017 +0900 +++ b/src/parallel_execution/MultiDimIterator.cbc Thu Dec 28 12:59:11 2017 +0900 @@ -109,11 +109,3 @@ } goto whenWait(...); } - -__code barrierMultiDimIterator_stub(struct Context* context) { - MultiDimIterator* iterator = (MultiDimIterator*)GearImpl(context, Iterator, iterator); - Context* task = Gearef(context, Iterator)->task; - enum Code next = Gearef(context, Iterator)->next; - enum Code whenWait = Gearef(context, Iterator)->whenWait; - goto barrierMultiDimIterator(context, iterator, task, next, whenWait); -}
--- a/src/parallel_execution/SingleLinkedQueue.cbc Wed Dec 27 21:17:02 2017 +0900 +++ b/src/parallel_execution/SingleLinkedQueue.cbc Thu Dec 28 12:59:11 2017 +0900 @@ -1,6 +1,6 @@ #include "../context.h" +#include <stdio.h> #interface "Queue.h" -#include <stdio.h> Queue* createSingleLinkedQueue(struct Context* context) { struct Queue* queue = new Queue();
--- a/src/parallel_execution/TaskManagerImpl.cbc Wed Dec 27 21:17:02 2017 +0900 +++ b/src/parallel_execution/TaskManagerImpl.cbc Thu Dec 28 12:59:11 2017 +0900 @@ -103,15 +103,13 @@ } __code spawnTasksTaskManagerImpl5(struct TaskManagerImpl* taskManagerImpl, struct Context* task, struct TaskManager* taskManager) { - taskManager->task = task; - taskManager->next = C_spawnTasksTaskManagerImpl3; - goto meta(context, C_spawnTaskManagerImpl); + goto taskManager->spawn(task, spawnTasksTaskManagerImpl3); } __code spawnTasksTaskManagerImpl5_stub(struct Context* context) { TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); Context* task = (struct Context*)Gearef(context, Queue)->data; - TaskManager* taskManager = Gearef(context, TaskManager); + TaskManager* taskManager = &Gearef(context, TaskManager)->taskManager->TaskManager; goto spawnTasksTaskManagerImpl5(context, taskManagerImpl, task, taskManager); } @@ -154,7 +152,7 @@ struct Iterator* iterator = task->iterator; goto iterator->exec(task, taskManager->cpu - taskManager->gpu, next(...)); } - goto meta(context, C_taskSend); + goto taskSend(); } pthread_mutex_unlock(&taskManager->mutex); goto next(...); @@ -178,18 +176,10 @@ goto queue->put(task, next(...)); } -__code taskSend_stub(struct Context* context) { - TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); - goto taskSend(context, - taskManager, - Gearef(context, TaskManager)->task, - Gearef(context, TaskManager)->next); -} - __code shutdownTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) { if (taskManager->taskCount != 0) { usleep(1000); - goto meta(context, C_shutdownTaskManagerImpl); + goto shutdownTaskManagerImpl(); } int i = taskManager->loopCounter->i; if (i < taskManager->numWorker) { @@ -198,12 +188,12 @@ } taskManager->loopCounter->i = 0; - goto meta(context, next); + goto next(...); } __code shutdownTaskManagerImpl1(struct TaskManagerImpl* taskManager) { int i = taskManager->loopCounter->i; pthread_join(taskManager->workers[i]->thread, NULL); taskManager->loopCounter->i++; - goto meta(context, C_shutdownTaskManagerImpl); + goto shutdownTaskManagerImpl(); }
--- a/src/parallel_execution/Worker.h Wed Dec 27 21:17:02 2017 +0900 +++ b/src/parallel_execution/Worker.h Thu Dec 28 12:59:11 2017 +0900 @@ -1,7 +1,8 @@ typedef struct Worker<Impl>{ union Data* worker; - __code taskReseive(struct Worker* worker,struct Queue* queue); + struct Queue* tasks; + struct Context* task; + __code taskReceive(Impl* worker, struct Queue* tasks); __code shutdown(Impl* worker); __code next(...); - struct Queue* queue; } Worker;
--- a/src/parallel_execution/context.h Wed Dec 27 21:17:02 2017 +0900 +++ b/src/parallel_execution/context.h Thu Dec 28 12:59:11 2017 +0900 @@ -183,12 +183,14 @@ struct Queue* tasks; pthread_t thread; struct TaskManager* taskManager; + struct Context* task; } Worker; struct CPUWorker { pthread_mutex_t mutex; pthread_cond_t cond; struct Context* context; int id; + int loopCounter; } CPUWorker; #ifdef USE_CUDAWorker struct CUDAWorker {