Mercurial > hg > GearsTemplate
changeset 288:f1b0cc555b6e
Add odgCommit
author | Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp> |
---|---|
date | Mon, 06 Feb 2017 04:04:25 +0900 |
parents | 6b099d73949c |
children | 3d70e21a3902 |
files | doc/dependency.graffle src/parallel_execution/CPUWorker.cbc src/parallel_execution/SynchronizedQueue.cbc src/parallel_execution/TaskManagerImpl.cbc src/parallel_execution/context.h src/parallel_execution/main.cbc src/parallel_execution/twice.cbc |
diffstat | 7 files changed, 52 insertions(+), 11 deletions(-) [+] |
line wrap: on
line diff
--- a/src/parallel_execution/CPUWorker.cbc Mon Feb 06 00:59:39 2017 +0900 +++ b/src/parallel_execution/CPUWorker.cbc Mon Feb 06 04:04:25 2017 +0900 @@ -38,7 +38,7 @@ if (!task) return; // end thread task->worker = worker; - context->next = C_taskReceiveWorker; // set CG after task exec + task->next = C_odgCommit; // set CG after task exec goto meta(task, task->next); } @@ -48,6 +48,46 @@ goto getTask(context, worker, task); } +__code odgCommit(struct LoopCounter* loopCounter, struct Queue* queue, struct Context* task) { + int i = loopCounter->i ; + if(task->odg + i < task->maxOdg) { + queue->queue = (union Data*)GET_WAIT_LIST(task->data[task->odg+i]); + queue->next = C_odgCommit1; + goto meta(context, queue->queue->Queue.take); + } + loopCounter->i = 0; + goto meta(context, C_taskReceiveWorker); +} + +__code odgCommit_stub(struct Context* context) { + struct Context* workerContext = context->worker->worker->CPUWorker.context; + goto odgCommit(workerContext, + Gearef(workerContext, LoopCounter), + Gearef(workerContext, Queue), + context); +} + +__code odgCommit1(struct TaskManager* taskManager, struct Context* task) { + if(__sync_fetch_and_sub(&task->idgCount, 1)) { + if(task->idgCount == 0) { + taskManager->taskManager = (union Data*)task->taskManager; + taskManager->context = task; + taskManager->next = C_odgCommit; + goto meta(context, task->taskManager->spawn); + } + } else { + goto meta(context, C_odgCommit1); + } +} + +__code odgCommit1_stub(struct Context* context) { + struct Context* task = &Gearef(context, Queue)->data->Context; + goto odgCommit1(context, + Gearef(context, TaskManager), + task); + +} + #ifdef USE_CUDA __code twiceGpu() { cuMemcpyHtoDAsync(context,context,context,context->stream); @@ -62,4 +102,3 @@ __code shutdownWorker(struct CPUWorker* worker) { } -
--- a/src/parallel_execution/SynchronizedQueue.cbc Mon Feb 06 00:59:39 2017 +0900 +++ b/src/parallel_execution/SynchronizedQueue.cbc Mon Feb 06 04:04:25 2017 +0900 @@ -46,7 +46,7 @@ goto meta(context, C_putSynchronizedQueue1); } -__code putSynchronizedQueue1(struct SynchronizedQueue* queue, union Data* data, struct Semaphore* semaphore, __code next(...)) { +__code putSynchronizedQueue1(struct SynchronizedQueue* queue, struct Semaphore* semaphore, __code next(...)) { semaphore->semaphore = (union Data*)queue->queueCount; semaphore->next = next; goto meta(context, queue->queueCount->v);
--- a/src/parallel_execution/TaskManagerImpl.cbc Mon Feb 06 00:59:39 2017 +0900 +++ b/src/parallel_execution/TaskManagerImpl.cbc Mon Feb 06 04:04:25 2017 +0900 @@ -48,6 +48,7 @@ __code createTask(struct TaskManager* taskManager) { taskManager->context = NEW(struct Context); initContext(taskManager->context); + taskManager->context->taskManager = taskManager; goto meta(context, C_setWorker); }
--- a/src/parallel_execution/context.h Mon Feb 06 00:59:39 2017 +0900 +++ b/src/parallel_execution/context.h Mon Feb 06 04:04:25 2017 +0900 @@ -45,9 +45,9 @@ meta->size = len; \ data; }) -#define GET_TYPE(dseg) ({ \ - struct Meta* meta = (struct Meta*)(((void*)dseg) - sizeof(struct Meta));\ - meta->type; }) +#define GET_META(dseg) ((struct Meta*)(((void*)dseg) - sizeof(struct Meta))) +#define GET_TYPE(dseg) (GET_META(dseg)->type) +#define GET_WAIT_LIST(dseg) (GET_META(dseg)->wait) #define Gearef(context, t) (&(context)->data[D_##t]->t) @@ -77,6 +77,7 @@ int dataNum; int idgCount; //number of waiting dataGear int odg; + int maxOdg; int workerId; union Data **data; }; @@ -125,6 +126,7 @@ enum Code shutdown; enum Code next; struct Queue* tasks; + struct TaskManager* taskManager; } Worker; struct CPUWorker { pthread_t thread;
--- a/src/parallel_execution/main.cbc Mon Feb 06 00:59:39 2017 +0900 +++ b/src/parallel_execution/main.cbc Mon Feb 06 04:04:25 2017 +0900 @@ -143,6 +143,7 @@ task->data[task->dataNum] = (union Data*)loopCounter2; task->data[task->dataNum+1] = (union Data*)array; task->odg = task->dataNum + 2; + task->maxOdg = task->odg; taskManager->next = C_createTask1; loopCounter->i++; goto meta(context, taskManager->taskManager->TaskManager.spawn);
--- a/src/parallel_execution/twice.cbc Mon Feb 06 00:59:39 2017 +0900 +++ b/src/parallel_execution/twice.cbc Mon Feb 06 04:04:25 2017 +0900 @@ -2,7 +2,7 @@ #include "../context.h" -__code twice(struct LoopCounter* loopCounter, int index, int prefix, int* array, struct Context* workerContext) { +__code twice(struct LoopCounter* loopCounter, int index, int prefix, int* array) { int i = loopCounter->i; if (i < prefix) { array[i+index*prefix] = array[i+index*prefix]*2; @@ -12,17 +12,15 @@ } loopCounter->i = 0; - goto meta(workerContext, workerContext->next); + goto meta(context, context->next); } __code twice_stub(struct Context* context) { - struct Context* workerContext = context->worker->worker->CPUWorker.context; struct LoopCounter* loopCounter = &context->data[context->dataNum]->LoopCounter; struct Array* array = &context->data[context->dataNum+1]->Array; goto twice(context, loopCounter, array->index, array->prefix, - array->array, - workerContext); + array->array); }