Mercurial > hg > Members > Moririn
changeset 405:8915fce522b3
Fix shutdown TaskManager
author | Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 05 Sep 2017 16:46:31 +0900 |
parents | c5cd9888bf2a |
children | 9b35e6581b5c |
files | src/parallel_execution/CPUWorker.cbc src/parallel_execution/TaskManager.cbc src/parallel_execution/TaskManagerImpl.cbc src/parallel_execution/context.h src/parallel_execution/examples/bitonicSort/bitonicSort.cbc src/parallel_execution/examples/twice/twice.cbc src/parallel_execution/twice.cbc |
diffstat | 7 files changed, 62 insertions(+), 226 deletions(-) [+] |
line wrap: on
line diff
--- a/src/parallel_execution/CPUWorker.cbc Sun Sep 03 00:21:16 2017 +0900 +++ b/src/parallel_execution/CPUWorker.cbc Tue Sep 05 16:46:31 2017 +0900 @@ -68,20 +68,23 @@ goto iterateCommit1(workerContext, context); } -__code odgCommit(struct LoopCounter* loopCounter, struct Context* task) { +__code odgCommit(struct LoopCounter* loopCounter, struct Context* task, struct TaskManager* taskManager) { int i = loopCounter->i ; if (task->odg+i < task->maxOdg) { goto meta(task, C_odgCommit1); } loopCounter->i = 0; - goto meta(context, C_taskReceiveWorker); + taskManager->taskManager = (union Data*)task->taskManager; + taskManager->next = C_taskReceiveWorker; + goto meta(context, task->taskManager->decrementTaskCount); } __code odgCommit_stub(struct Context* context) { struct Context* workerContext = context->worker->worker->CPUWorker.context; goto odgCommit(workerContext, Gearef(context, LoopCounter), - context); + context, + Gearef(workerContext, TaskManager)); } __code odgCommit1(struct LoopCounter* loopCounter, struct Queue* queue) {
--- a/src/parallel_execution/TaskManager.cbc Sun Sep 03 00:21:16 2017 +0900 +++ b/src/parallel_execution/TaskManager.cbc Tue Sep 05 16:46:31 2017 +0900 @@ -5,6 +5,8 @@ __code spawn(Impl* taskManager, struct Queue* queue, struct Context* task, __code next(...)); __code spawnTasks(Impl* taskManagerImpl, struct Context** tasks, __code next1(...), struct TaskManager* taskManager); __code shutdown(Impl* taskManagerImpl, __code next(...), struct TaskManager* taskManager, struct Queue* queue); + __code incrementTaskCount(Impl* taskManagerImpl, __code next(...)); + __code decrementTaskCount(Impl* taskManagerImpl, __code next(...)); __code next(...); __code next1(...); int worker;
--- a/src/parallel_execution/TaskManagerImpl.cbc Sun Sep 03 00:21:16 2017 +0900 +++ b/src/parallel_execution/TaskManagerImpl.cbc Tue Sep 05 16:46:31 2017 +0900 @@ -1,6 +1,7 @@ #include "../context.h" #include <stdio.h> +#include <unistd.h> void createWorkers(struct Context* context, TaskManager* taskManeger, TaskManagerImpl* taskManagerImpl); @@ -16,11 +17,15 @@ taskManager->spawnTasks = C_spawnTasksTaskManagerImpl; taskManager->spawn = C_spawnTaskManagerImpl; taskManager->shutdown = C_shutdownTaskManagerImpl; + taskManager->incrementTaskCount = C_incrementTaskCountTaskManagerImpl; + taskManager->decrementTaskCount = C_decrementTaskCountTaskManagerImpl; struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl(); - taskManagerImpl -> taskQueue = createSingleLinkedQueue(context); - taskManagerImpl -> numWorker = taskManager->maxCPU; - taskManagerImpl -> loopCounter = new LoopCounter(); - taskManagerImpl -> loopCounter -> i = 0; + taskManagerImpl->taskQueue = createSingleLinkedQueue(context); + taskManagerImpl->numWorker = taskManager->maxCPU; + taskManagerImpl->sendWorkerIndex = 0; + taskManagerImpl->taskCount = 0; + taskManagerImpl->loopCounter = new LoopCounter(); + taskManagerImpl->loopCounter -> i = 0; createWorkers(context, taskManager, taskManagerImpl); taskManager->taskManager = (union Data*)taskManagerImpl; return taskManager; @@ -54,7 +59,7 @@ __code spawnTasksTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Queue* tasks, __code next1(...), struct Queue* queue) { queue->queue = (union Data*)tasks; - queue->next = C_spawnTasksTaskManagerImpl1; + queue->next = C_spawnTasksTaskManagerImpl1; queue->whenEmpty = C_spawnTasksTaskManagerImpl3; goto meta(context, tasks->isEmpty); } @@ -69,7 +74,7 @@ __code spawnTasksTaskManagerImpl1(struct TaskManagerImpl* taskManager, struct Queue* tasks, __code next1(...), struct Queue* queue) { queue->queue = (union Data*)tasks; - queue->next = C_spawnTasksTaskManagerImpl2; + queue->next = C_spawnTasksTaskManagerImpl2; goto meta(context, tasks->take); } @@ -85,7 +90,7 @@ task->taskManager = &taskManager->taskManager->TaskManager; taskManager->context = task; taskManager->next = C_spawnTasksTaskManagerImpl; - goto meta(context, C_setWaitTask); + goto meta(context, C_setWaitTaskTaskManagerImpl); } __code spawnTasksTaskManagerImpl2_stub(struct Context* context) { @@ -121,30 +126,40 @@ goto spawnTasksTaskManagerImpl5(context, taskManagerImpl, task, taskManager); } -__code setWaitTask(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...), struct Queue* queue) { +__code setWaitTaskTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...), struct Queue* queue) { int i = taskManager->loopCounter->i; if(task->idg+i < task->maxIdg) { queue->queue = (Data *)GET_WAIT_LIST(task->data[task->idg + i]); queue->data = (Data *)task; - queue->next = C_setWaitTask; + queue->next = C_setWaitTaskTaskManagerImpl; taskManager->loopCounter->i++; goto meta(context, queue->queue->Queue.put); } taskManager->loopCounter->i = 0; queue->queue = (Data *)taskManager->taskQueue; queue->data = (Data *)task; - queue->next = next; + queue->next = C_incrementTaskCountTaskManagerImpl; goto meta(context, taskManager->taskQueue->put); } -__code setWaitTask_stub(struct Context* context) { +__code setWaitTaskTaskManagerImpl_stub(struct Context* context) { TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); struct Context* task = Gearef(context, TaskManager)->context; - goto setWaitTask(context, - taskManager, - task, - Gearef(context, TaskManager)->next, - Gearef(context, Queue)); + goto setWaitTaskTaskManagerImpl(context, + taskManager, + task, + Gearef(context, TaskManager)->next, + Gearef(context, Queue)); +} + +__code incrementTaskCountTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) { + __sync_fetch_and_add(&taskManager->taskCount, 1); + goto next(...); +} + +__code decrementTaskCountTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) { + __sync_fetch_and_sub(&taskManager->taskCount, 1); + goto next(...); } __code spawnTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Iterator* iterator, struct Context* task, __code next(...)) { @@ -195,6 +210,10 @@ } __code shutdownTaskManagerImpl(struct TaskManagerImpl* taskManagerImpl, __code next(...), struct TaskManager* taskManager, struct Queue* queue) { + if (taskManagerImpl->taskCount != 0) { + sleep(1); + goto meta(context, taskManager->shutdown); + } int i = taskManagerImpl->loopCounter->i; if (taskManager->cpu <= i && i < taskManager->maxCPU) { struct Queue* tasks = taskManagerImpl->workers[i]->tasks;
--- a/src/parallel_execution/context.h Sun Sep 03 00:21:16 2017 +0900 +++ b/src/parallel_execution/context.h Tue Sep 05 16:46:31 2017 +0900 @@ -137,6 +137,8 @@ enum Code spawn; // start NEW context on the worker enum Code spawnTasks; // start NEW tasks on the worker enum Code shutdown; + enum Code incrementTaskCount; + enum Code decrementTaskCount; enum Code next; enum Code next1; enum Code task; @@ -153,6 +155,7 @@ enum Code next; int numWorker; int sendWorkerIndex; + int taskCount; pthread_mutex_t mutex; struct Queue* activeQueue; struct Queue* taskQueue; @@ -246,7 +249,6 @@ } Element; struct Array { int size; - int index; int prefix; int* array; } Array;
--- a/src/parallel_execution/examples/bitonicSort/bitonicSort.cbc Sun Sep 03 00:21:16 2017 +0900 +++ b/src/parallel_execution/examples/bitonicSort/bitonicSort.cbc Tue Sep 05 16:46:31 2017 +0900 @@ -96,7 +96,6 @@ } __code code2(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct Time* time) { - sleep(40); taskManager->next = C_exit_code; goto meta(context, taskManager->taskManager->TaskManager.shutdown); }
--- a/src/parallel_execution/examples/twice/twice.cbc Sun Sep 03 00:21:16 2017 +0900 +++ b/src/parallel_execution/examples/twice/twice.cbc Tue Sep 05 16:46:31 2017 +0900 @@ -1,189 +1,26 @@ #include <stdio.h> -#include <string.h> -#include <stdlib.h> -#include <unistd.h> - -#include "../../../context.h" - -int cpu_num = 1; -int length = 102400; -int split = 8; -int* array_ptr; -int gpu_num = 0; -int CPU_ANY = -1; -int CPU_CUDA = -1; - -void print_queue(struct Element* element) { - while (element) { - printf("%p\n", ((struct Task *)(element->data))); - element = element->next; - } -} -void print_tree(struct Node* node) { - if (node != 0) { - printf("%d\n", node->value->Array.index); - print_tree(node->left); - print_tree(node->right); - } -} - -void *start_taskManager(struct Context *context) { - goto initDataGears(context, Gearef(context, LoopCounter), Gearef(context, TaskManager)); - return 0; -} - -#ifdef USE_CUDAWorker -#ifdef USE_CUDA_MAIN_THREAD -extern volatile int cuda_initialized; -#endif -#endif - -__code initDataGears(struct LoopCounter* loopCounter, struct TaskManager* taskManager) { - // loopCounter->tree = createRedBlackTree(context); - loopCounter->i = 0; - taskManager->taskManager = (union Data*)createTaskManagerImpl(context, cpu_num, gpu_num, 0); -#ifdef USE_CUDAWorker -#ifdef USE_CUDA_MAIN_THREAD - while(! cuda_initialized) {}; -#endif -#endif - goto meta(context, C_code1); -} +#include "../context.h" -__code initDataGears_stub(struct Context* context) { - struct TaskManager* taskManager = Gearef(context, TaskManager); - taskManager->taskManager = 0; -#if (! defined(USE_CUDAWorker) || ! defined(USE_CUDA_MAIN_THREAD)) - struct LoopCounter* loopCounter = Gearef(context, LoopCounter); - goto initDataGears(context, loopCounter, taskManager); -#else - cuda_initialized = 0; - pthread_t thread; - pthread_create(&thread, NULL, (void*)&start_taskManager, context); - while (taskManager->taskManager == 0); - TaskManager *t = (TaskManager*)taskManager->taskManager; - TaskManagerImpl *im = (TaskManagerImpl*)t->taskManager; - struct Queue *q = (Queue *)im->workers[0]; - createCUDAWorker(context,0,q, im); - pthread_join(thread,0); - exit(0); -#endif -} +__code twice(struct Array* array, __code next(...), struct LoopCounter loopCounter) { + int i = loopCounter->i; + if (i < prefix) { + array[i+index*prefix] = array[i+index*prefix]*2; + loopCounter->i++; -__code code1(struct Time* time) { - printf("cpus:\t\t%d\n", cpu_num); - printf("gpus:\t\t%d\n", gpu_num); - printf("length:\t\t%d\n", length); - printf("length/task:\t%d\n", length/split); - /* puts("queue"); */ - /* print_queue(context->data[ActiveQueue]->queue.first); */ - /* puts("tree"); */ - /* print_tree(context->data[Tree]->tree.root); */ - /* puts("result"); */ - time->time = (union Data*)createTimeImpl(context); - time->next = C_createTask1; - goto meta(context, time->time->Time.start); -} - -__code code2(struct LoopCounter* loopCounter) { - int i = loopCounter->i; - - if (i < length) { - //printf("%d\n", array_ptr[i]); - if (array_ptr[i] == (i*2)) { - loopCounter->i++; - goto meta(context, C_code2); - } else - puts("wrong result"); - - } - - goto meta(context, C_exit_code); -} - -__code createTask1(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct Time* time) { - int i = loopCounter->i; - - if ((length/split*i) < length) { - goto meta(context, C_createTask2); + goto meta(context, C_twice); } loopCounter->i = 0; - taskManager->next = time->time->Time.end; - time->next = C_code2; -#if ( defined(USE_CUDAWorker) && defined(USE_CUDA_MAIN_THREAD)) -sleep(5); -#endif - goto meta(context, taskManager->taskManager->TaskManager.shutdown); + goto meta(context, context->next); } -__code createTask2(struct LoopCounter* loopCounter, struct TaskManager* taskManager) { - struct Context** tasks = (struct Context**)ALLOC_ARRAY(context, Context, 1); - - int i = loopCounter->i; - LoopCounter* loopCounter2 = &ALLOCATE_DATA_GEAR(context, LoopCounter)->LoopCounter; - Array* array = &ALLOCATE_DATA_GEAR(context, Array)->Array; - array->index = i; - array->prefix = length/split; - array->array = array_ptr; - array->size = length; - loopCounter2->i = 0; - loopCounter->i++; - - // par goto twice(loopCounter2, array, __exit); - struct Context* task = NEW(struct Context); - initContext(task); - task->idgCount = 0; - if (gpu_num) { -#ifdef USE_CUDAWorker - task->next = C_CUDAtwice; - task->workerId = CPU_CUDA; -#else - task->next = C_twice; -#endif - } else { - task->next = C_twice; - } - task->idg = task->dataNum; - task->data[task->idg] = (union Data*)loopCounter2; - task->data[task->idg+1] = (union Data*)array; - task->maxIdg = task->idg + 2; - task->odg = task->maxIdg; - task->maxOdg = task->odg; - tasks[0] = task; - - // goto crateTask1(); - goto taskManager->spawnTasks(tasks, createTask1); +__code twice_stub(struct Context* context) { + struct LoopCounter* loopCounter = &context->data[context->dataNum]->LoopCounter; + struct Array* array = &context->data[context->dataNum+1]->Array; + goto twice(context, + loopCounter, + array->index, + array->prefix, + array->array); } - -void init(int argc, char** argv) { - for (int i = 1; argv[i]; ++i) { - if (strcmp(argv[i], "-cpu") == 0) - cpu_num = (int)atoi(argv[i+1]); - else if (strcmp(argv[i], "-l") == 0) - length = (int)atoi(argv[i+1]); - else if (strcmp(argv[i], "-s") == 0) - split = (int)atoi(argv[i+1]); - else if (strcmp(argv[i], "-cuda") == 0) { - gpu_num = 1; - CPU_CUDA = 0; - } - } -} - - -int main(int argc, char** argv) { - init(argc, argv); - - array_ptr = NEWN(length, int); - - for(int i=0; i<length; i++) - array_ptr[i]=i; - - struct Context* main_context = NEW(struct Context); - initContext(main_context); - main_context->next = C_initDataGears; - - goto start_code(main_context); -}
--- a/src/parallel_execution/twice.cbc Sun Sep 03 00:21:16 2017 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,26 +0,0 @@ -#include <stdio.h> - -#include "../context.h" - -__code twice(struct LoopCounter* loopCounter, int index, int prefix, int* array) { - int i = loopCounter->i; - if (i < prefix) { - array[i+index*prefix] = array[i+index*prefix]*2; - loopCounter->i++; - - goto meta(context, C_twice); - } - - loopCounter->i = 0; - goto meta(context, context->next); -} - -__code twice_stub(struct Context* context) { - struct LoopCounter* loopCounter = &context->data[context->dataNum]->LoopCounter; - struct Array* array = &context->data[context->dataNum+1]->Array; - goto twice(context, - loopCounter, - array->index, - array->prefix, - array->array); -}