Mercurial > hg > Gears > GearsAgda
changeset 293:198affea1be1
merge
author | Shinji KONO <kono@ie.u-ryukyu.ac.jp> |
---|---|
date | Fri, 10 Feb 2017 08:48:51 +0900 |
parents | 3d70e21a3902 (diff) 2bc63a22dd21 (current diff) |
children | f6770c0a24c2 |
files | |
diffstat | 11 files changed, 130 insertions(+), 72 deletions(-) [+] |
line wrap: on
line diff
--- a/src/parallel_execution/CMakeLists.txt Thu Feb 09 19:51:32 2017 +0900 +++ b/src/parallel_execution/CMakeLists.txt Fri Feb 10 08:48:51 2017 +0900 @@ -40,7 +40,7 @@ TARGET twice SOURCES - main.cbc RedBlackTree.cbc compare.c SingleLinkedStack.cbc CPUWorker.cbc time.cbc twice.cbc TaskManagerImpl.cbc SingleLinkedQueue.cbc SynchronizedQueue.cbc + main.cbc RedBlackTree.cbc compare.c SingleLinkedStack.cbc CPUWorker.cbc time.cbc twice.cbc TaskManagerImpl.cbc SingleLinkedQueue.cbc SynchronizedQueue.cbc SemaphoreImpl.cbc ) GearsCommand(
--- a/src/parallel_execution/CPUWorker.cbc Thu Feb 09 19:51:32 2017 +0900 +++ b/src/parallel_execution/CPUWorker.cbc Fri Feb 10 08:48:51 2017 +0900 @@ -38,8 +38,9 @@ if (!task) return; // end thread task->worker = worker; - context->next = C_taskReceiveWorker; // set CG after task exec - goto meta(task, task->next); + enum Code taskCg = task->next; + task->next = C_odgCommit; // set CG after task exec + goto meta(task, taskCg); } __code getTask_stub(struct Context* context) { @@ -48,6 +49,46 @@ goto getTask(context, worker, task); } +__code odgCommit(struct LoopCounter* loopCounter, struct Queue* queue, struct Context* task) { + int i = loopCounter->i ; + if(task->odg + i < task->maxOdg) { + queue->queue = (union Data*)GET_WAIT_LIST(task->data[task->odg+i]); + queue->next = C_odgCommit1; + goto meta(context, queue->queue->Queue.take); + } + loopCounter->i = 0; + goto meta(context, C_taskReceiveWorker); +} + +__code odgCommit_stub(struct Context* context) { + struct Context* workerContext = context->worker->worker->CPUWorker.context; + goto odgCommit(workerContext, + Gearef(workerContext, LoopCounter), + Gearef(workerContext, Queue), + context); +} + +__code odgCommit1(struct TaskManager* taskManager, struct Context* task) { + if(__sync_fetch_and_sub(&task->idgCount, 1)) { + if(task->idgCount == 0) { + taskManager->taskManager = (union Data*)task->taskManager; + taskManager->context = task; + taskManager->next = C_odgCommit; + goto meta(context, task->taskManager->spawn); + } + } else { + goto meta(context, C_odgCommit1); + } +} + +__code odgCommit1_stub(struct Context* context) { + struct Context* task = &Gearef(context, Queue)->data->Context; + goto odgCommit1(context, + Gearef(context, TaskManager), + task); + +} + #ifdef USE_CUDA __code twiceGpu() { cuMemcpyHtoDAsync(context,context,context,context->stream); @@ -62,4 +103,3 @@ __code shutdownWorker(struct CPUWorker* worker) { } -
--- a/src/parallel_execution/Queue.cbc Thu Feb 09 19:51:32 2017 +0900 +++ b/src/parallel_execution/Queue.cbc Fri Feb 10 08:48:51 2017 +0900 @@ -7,5 +7,5 @@ __code take(Impl* queue, __code next(union Data*, ...)); __code isEmpty(Impl* queue, __code next(...), __code whenEmpty(...)); __code next(...); -} Stack; +} Queue;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/Semaphore.cbc Fri Feb 10 08:48:51 2017 +0900 @@ -0,0 +1,7 @@ +typedef struct Semaphore<Impl>{ + union Data* semaphore; + __code p(Impl* semaphore, __code next(...)); + __code v(Impl* semaphore, __code next(...)); + __code next(...); +} Semaphore; +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/SemaphoreImpl.cbc Fri Feb 10 08:48:51 2017 +0900 @@ -0,0 +1,37 @@ +#include "../context.h" + +Semaphore* createSemaphoreImpl(struct Context* context, int n) { + struct Semaphore* semaphore = new Semaphore(); + struct SemaphoreImpl* semaphoreImpl = new SemaphoreImpl(); + semaphore->semaphore = (union Data*)semaphoreImpl; + semaphoreImpl->value = n; + pthread_mutex_init(&semaphoreImpl->mutex, NULL); + pthread_cond_init(&semaphoreImpl->cond, NULL); + semaphore->p = C_pOperationSemaphoreImpl; + semaphore->v = C_vOperationSemaphoreImpl; + return semaphore; +} + +__code pOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) { + pthread_mutex_lock(&semaphore->mutex); + goto meta(context, C_pOperationSemaphoreImpl1); +} + +__code pOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) { + if(semaphore->value == 0) { + pthread_cond_wait(&semaphore->cond, &semaphore->mutex); + goto meta(context, C_pOperationSemaphoreImpl1); + } + semaphore->value--; + pthread_mutex_unlock(&semaphore->mutex); + goto next(...); +} + +__code vOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) { + pthread_mutex_lock(&semaphore->mutex); + semaphore->value++; + pthread_cond_signal(&semaphore->cond); + pthread_mutex_unlock(&semaphore->mutex); + goto next(...); +} +
--- a/src/parallel_execution/SynchronizedQueue.cbc Thu Feb 09 19:51:32 2017 +0900 +++ b/src/parallel_execution/SynchronizedQueue.cbc Fri Feb 10 08:48:51 2017 +0900 @@ -7,6 +7,7 @@ struct SynchronizedQueue* synchronizedQueue = new SynchronizedQueue(); synchronizedQueue->top = NULL; synchronizedQueue->last = NULL; + synchronizedQueue ->queueCount = createSemaphoreImpl(context, 0); queue->queue = (union Data*)synchronizedQueue; queue->take = C_takeSynchronizedQueue; queue->put = C_putSynchronizedQueue; @@ -42,17 +43,25 @@ goto meta(context, C_putSynchronizedQueue); } } - goto next(...); + goto meta(context, C_putSynchronizedQueue1); +} + +__code putSynchronizedQueue1(struct SynchronizedQueue* queue, struct Semaphore* semaphore, __code next(...)) { + semaphore->semaphore = (union Data*)queue->queueCount; + semaphore->next = next; + goto meta(context, queue->queueCount->v); } -__code takeSynchronizedQueue(struct SynchronizedQueue* queue, __code next(union Data* data, ...)) { - if (queue->top) { - struct Element* top = queue->top; - if (__sync_bool_compare_and_swap(&queue->top, top, top->next)) { - data = top->data; - } else { - goto meta(context, C_takeSynchronizedQueue); - } +__code takeSynchronizedQueue(struct SynchronizedQueue* queue, struct Semaphore* semaphore) { + semaphore->semaphore = (union Data*)queue->queueCount; + semaphore->next = C_takeSynchronizedQueue1; + goto meta(context, queue->queueCount->p); +} + +__code takeSynchronizedQueue1(struct SynchronizedQueue* queue, __code next(union Data* data, ...)) { + struct Element* top = queue->top; + if (__sync_bool_compare_and_swap(&queue->top, top, top->next)) { + data = top->data; } else { goto meta(context, C_takeSynchronizedQueue); }
--- a/src/parallel_execution/TaskManagerImpl.cbc Thu Feb 09 19:51:32 2017 +0900 +++ b/src/parallel_execution/TaskManagerImpl.cbc Fri Feb 10 08:48:51 2017 +0900 @@ -48,6 +48,7 @@ __code createTask(struct TaskManager* taskManager) { taskManager->context = NEW(struct Context); initContext(taskManager->context); + taskManager->context->taskManager = taskManager; goto meta(context, C_setWorker); }
--- a/src/parallel_execution/context.h Thu Feb 09 19:51:32 2017 +0900 +++ b/src/parallel_execution/context.h Fri Feb 10 08:48:51 2017 +0900 @@ -45,9 +45,9 @@ meta->size = len; \ data; }) -#define GET_TYPE(dseg) ({ \ - struct Meta* meta = (struct Meta*)(((void*)dseg) - sizeof(struct Meta));\ - meta->type; }) +#define GET_META(dseg) ((struct Meta*)(((void*)dseg) - sizeof(struct Meta))) +#define GET_TYPE(dseg) (GET_META(dseg)->type) +#define GET_WAIT_LIST(dseg) (GET_META(dseg)->wait) #define Gearef(context, t) (&(context)->data[D_##t]->t) @@ -77,6 +77,7 @@ int dataNum; int idgCount; //number of waiting dataGear int odg; + int maxOdg; int workerId; union Data **data; }; @@ -125,6 +126,7 @@ enum Code shutdown; enum Code next; struct Queue* tasks; + struct TaskManager* taskManager; } Worker; struct CPUWorker { pthread_t thread; @@ -179,6 +181,7 @@ struct SynchronizedQueue { struct Element* top; struct Element* last; + struct Semaphore* queueCount; } SynchronizedQueue; // Stack Interface struct Stack { @@ -249,6 +252,17 @@ Black, } color; } Node; + struct Semaphore { + union Data* semaphore; + enum Code p; + enum Code v; + enum Code next; + } Semaphore; + struct SemaphoreImpl { + int value; + pthread_mutex_t mutex; + pthread_cond_t cond; + } SemaphoreImpl; struct Allocate { enum Code next; long size;
--- a/src/parallel_execution/main.cbc Thu Feb 09 19:51:32 2017 +0900 +++ b/src/parallel_execution/main.cbc Fri Feb 10 08:48:51 2017 +0900 @@ -59,7 +59,7 @@ int i = loopCounter->i; if (i < length) { - printf("%d\n", array_ptr[i]); + //printf("%d\n", array_ptr[i]); if (array_ptr[i] == (i*2)) { loopCounter->i++; goto meta(context, C_code2); @@ -71,48 +71,6 @@ goto meta(context, C_exit_code); } -__code createData1(struct Allocate* allocate, struct LoopCounter* loopCounter) { - int i = loopCounter->i; - - if ((length/split*i) < length) { - goto meta(context, C_createData2); - } - - loopCounter->i = 0; - goto meta(context, C_code1); -} - -__code createData1_stub(struct Context* context) { - goto createData1(context, Gearef(context, Allocate), Gearef(context, LoopCounter)); -} - -__code createData2(struct LoopCounter* loopCounter, struct Array* array, struct Node* node, Tree* tree) { - int i = loopCounter->i; - - array->index = i; - array->prefix = length/split; - array->array = array_ptr; - - node->key = i; - node->value = (union Data*)array; - - tree->tree = (union Data*)loopCounter->tree; - - tree->next = C_createTask1; - tree->node = node; - - goto meta(context, loopCounter->tree->put); -} - -__code createData2_stub(struct Context* context) { - Array* array = &ALLOCATE(context, Array)->Array; - goto createData2(context, - Gearef(context, LoopCounter), - array, - Gearef(context, Node), - Gearef(context, Tree)); -} - __code createTask1(struct LoopCounter* loopCounter, struct TaskManager* taskManager) { int i = loopCounter->i; @@ -126,12 +84,6 @@ goto meta(context, taskManager->taskManager->TaskManager.shutdown); } -__code createTask1_stub(struct Context* context) { - goto createTask1(context, - Gearef(context, LoopCounter), - Gearef(context, TaskManager)); -} - __code createTask2(LoopCounter* loopCounter, TaskManager* taskManager,struct Context* task, LoopCounter* loopCounter2, Array* array) { int i = loopCounter->i; array->index = i; @@ -143,6 +95,7 @@ task->data[task->dataNum] = (union Data*)loopCounter2; task->data[task->dataNum+1] = (union Data*)array; task->odg = task->dataNum + 2; + task->maxOdg = task->odg; taskManager->next = C_createTask1; loopCounter->i++; goto meta(context, taskManager->taskManager->TaskManager.spawn); @@ -185,4 +138,3 @@ goto start_code(main_context); } -
--- a/src/parallel_execution/twice.cbc Thu Feb 09 19:51:32 2017 +0900 +++ b/src/parallel_execution/twice.cbc Fri Feb 10 08:48:51 2017 +0900 @@ -2,7 +2,7 @@ #include "../context.h" -__code twice(struct LoopCounter* loopCounter, int index, int prefix, int* array, struct Context* workerContext) { +__code twice(struct LoopCounter* loopCounter, int index, int prefix, int* array) { int i = loopCounter->i; if (i < prefix) { array[i+index*prefix] = array[i+index*prefix]*2; @@ -12,17 +12,15 @@ } loopCounter->i = 0; - goto meta(workerContext, workerContext->next); + goto meta(context, context->next); } __code twice_stub(struct Context* context) { - struct Context* workerContext = context->worker->worker->CPUWorker.context; struct LoopCounter* loopCounter = &context->data[context->dataNum]->LoopCounter; struct Array* array = &context->data[context->dataNum+1]->Array; goto twice(context, loopCounter, array->index, array->prefix, - array->array, - workerContext); + array->array); }