Mercurial > hg > GearsTemplate
changeset 327:534601ed8c50 examples_directory
Running dependency example for single thread and single task
author | Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 18 Apr 2017 05:53:37 +0900 |
parents | f23f6d0aa4e9 |
children | 48c2b5bcab79 |
files | src/parallel_execution/CPUWorker.cbc src/parallel_execution/TaskManagerImpl.cbc src/parallel_execution/context.h src/parallel_execution/examples/calc.cbc |
diffstat | 4 files changed, 82 insertions(+), 74 deletions(-) [+] |
line wrap: on
line diff
--- a/src/parallel_execution/CPUWorker.cbc Tue Apr 18 01:47:42 2017 +0900 +++ b/src/parallel_execution/CPUWorker.cbc Tue Apr 18 05:53:37 2017 +0900 @@ -47,12 +47,10 @@ goto getTask(context, worker, task); } -__code odgCommit(struct LoopCounter* loopCounter, struct Queue* queue, struct Context* task) { +__code odgCommit(struct LoopCounter* loopCounter, struct Context* task) { int i = loopCounter->i ; if(task->odg + i < task->maxOdg) { - queue->queue = (union Data*)GET_WAIT_LIST(task->data[task->odg+i]); - queue->next = C_odgCommit1; - goto meta(context, queue->queue->Queue.take); + goto meta(task, C_odgCommit1); } loopCounter->i = 0; goto meta(context, C_taskReceiveWorker); @@ -61,30 +59,64 @@ __code odgCommit_stub(struct Context* context) { struct Context* workerContext = context->worker->worker->CPUWorker.context; goto odgCommit(workerContext, - Gearef(workerContext, LoopCounter), - Gearef(workerContext, Queue), + Gearef(context, LoopCounter), context); } -__code odgCommit1(struct TaskManager* taskManager, struct Context* task) { +__code odgCommit1(struct LoopCounter* loopCounter, struct Queue* queue) { + int i = loopCounter->i ; + queue->queue = (union Data*)GET_WAIT_LIST(context->data[context->odg+i]); + queue->whenEmpty = C_odgCommit4; + queue->next = C_odgCommit2; + goto meta(context, queue->queue->Queue.isEmpty); +} + +__code odgCommit1_stub(struct Context* context) { + goto odgCommit1(context, + Gearef(context, LoopCounter), + Gearef(context, Queue)); +} + +__code odgCommit2(struct Queue* queue) { + queue->next = C_odgCommit3; + goto meta(context, queue->queue->Queue.take); +} + +__code odgCommit2_stub(struct Context* context) { + goto odgCommit2(context, + Gearef(context, Queue)); +} + +__code odgCommit3(struct TaskManager* taskManager, struct Context* task) { if(__sync_fetch_and_sub(&task->idgCount, 1)) { if(task->idgCount == 0) { taskManager->taskManager = (union Data*)task->taskManager; taskManager->context = task; - taskManager->next = C_odgCommit; + taskManager->next = C_odgCommit1; goto meta(context, task->taskManager->spawn); + } else { + goto meta(context, C_odgCommit1); } } else { - goto meta(context, C_odgCommit1); + goto meta(context, C_odgCommit3); } } -__code odgCommit1_stub(struct Context* context) { +__code odgCommit3_stub(struct Context* context) { struct Context* task = &Gearef(context, Queue)->data->Context; - goto odgCommit1(context, + goto odgCommit3(context, Gearef(context, TaskManager), task); - +} + +__code odgCommit4(struct LoopCounter* loopCounter) { + loopCounter->i++; + goto meta(context, C_odgCommit); +} + +__code odgCommit4_stub(struct Context* context) { + goto odgCommit4(context, + Gearef(context, LoopCounter)); } __code shutdownWorker(struct CPUWorker* worker) {
--- a/src/parallel_execution/TaskManagerImpl.cbc Tue Apr 18 01:47:42 2017 +0900 +++ b/src/parallel_execution/TaskManagerImpl.cbc Tue Apr 18 05:53:37 2017 +0900 @@ -55,26 +55,11 @@ __code createTask(struct TaskManager* taskManager) { taskManager->context = NEW(struct Context); initContext(taskManager->context); - taskManager->context->taskManager = taskManager; + taskManager->context->taskManager = (struct TaskManager*)taskManager->taskManager; taskManager->context->idg = taskManager->context->dataNum; goto meta(context, C_setWorker); } -__code setWaitTask(struct Queue* queue, struct Context* task, Data* data, __code next(...)) { - struct Meta *metaData = GET_META(data); - if (!metaData->wait) { - metaData->wait = createSynchronizedQueue(context); - } - queue->queue = (union Data*)metaData->wait; - queue->next = next; - queue->data = (Data *)task; - goto meta(context, queue->queue->Queue.put); -} - -__code setWaitTask_stub(struct Context* context) { - goto setWaitTask(context, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->data, Gearef(context, TaskManager)->next); -} - __code setWorker(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { task->workerId = taskManager->sendWorkerIndex; if(++taskManager->sendWorkerIndex >= taskManager->numWorker) { @@ -88,17 +73,24 @@ goto setWorker(context, taskManager, Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); } +__code setWaitTask(struct Queue* queue, struct Context* task, Data* data, __code next(...)) { + queue->queue = (Data *)GET_WAIT_LIST(data); + queue->next = next; + queue->data = (Data *)task; + goto meta(context, queue->queue->Queue.put); +} + +__code setWaitTask_stub(struct Context* context) { + goto setWaitTask(context, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->data, Gearef(context, TaskManager)->next); +} + __code spawnTaskManager(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) { if (task->idgCount == 0) { - // enqueue activeQueue - queue->queue = (union Data*)taskManager->activeQueue; + goto meta(context, C_taskSend); } else { - // enqueue waitQueue - queue->queue = (union Data*)taskManager->taskQueue; + pthread_mutex_unlock(&taskManager->mutex); + goto next(...); } - queue->data = (union Data*)task; - queue->next = C_spawnTaskManager1; - goto meta(context, queue->queue->Queue.put); } __code spawnTaskManager_stub(struct Context* context) { @@ -111,35 +103,18 @@ Gearef(context, TaskManager)->next); } - -__code spawnTaskManager1(struct TaskManagerImpl* taskManager) { - pthread_mutex_unlock(&taskManager->mutex); - goto meta(context, C_taskSend); -} - -__code spawnTaskManager1_stub(struct Context* context) { - TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); - goto spawnTaskManager1(context, - taskManager); -} - -__code taskSend(struct TaskManagerImpl* taskManager, struct Queue* queue) { - queue->queue = (union Data*)taskManager->activeQueue; - queue->next = C_taskSend1; - goto meta(context, taskManager->activeQueue->take); -} - -__code taskSend1(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) { +__code taskSend(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) { struct Queue* tasks = taskManager->workers[task->workerId]->tasks; queue->queue = (union Data*)tasks; queue->data = (union Data*)task; queue->next = next; + pthread_mutex_unlock(&taskManager->mutex); goto meta(context, tasks->put); } -__code taskSend1_stub(struct Context* context) { +__code taskSend_stub(struct Context* context) { TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); - goto taskSend1(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); + goto taskSend(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); } __code shutdownTaskManager(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct TaskManagerImpl* taskManagerImpl, struct Queue* queue, __code next(...)) {
--- a/src/parallel_execution/context.h Tue Apr 18 01:47:42 2017 +0900 +++ b/src/parallel_execution/context.h Tue Apr 18 05:53:37 2017 +0900 @@ -45,6 +45,12 @@ meta->size = len; \ data; }) +#define ALLOCATE_DATA_GEAR(context, t) ({ \ + union Data* data = ALLOCATE(context, t); \ + struct Meta* meta = GET_META(data); \ + meta->wait = createSingleLinkedQueue(context); \ + data; }) + #define GET_META(dseg) ((struct Meta*)(((void*)dseg) - sizeof(struct Meta))) #define GET_TYPE(dseg) (GET_META(dseg)->type) #define GET_WAIT_LIST(dseg) (GET_META(dseg)->wait)
--- a/src/parallel_execution/examples/calc.cbc Tue Apr 18 01:47:42 2017 +0900 +++ b/src/parallel_execution/examples/calc.cbc Tue Apr 18 05:53:37 2017 +0900 @@ -111,20 +111,21 @@ __code createTask1(struct LoopCounter* loopCounter, struct TaskManager* taskManager) { int i = loopCounter->i; - if ((length/split*i) < length) { + if (i < 1) { + loopCounter->i++; taskManager->next = C_createTask2; goto meta(context, taskManager->taskManager->TaskManager.createTask); } loopCounter->i = 0; taskManager->next = C_code1; + sleep(3); goto meta(context, taskManager->taskManager->TaskManager.shutdown); } __code createTask2(LoopCounter* loopCounter, TaskManager* taskManager, struct Context *task, Integer *integer1, Integer *integer2, Integer *integer3) { int i = loopCounter->i; - loopCounter->i++; - task->idgCount = 0; + task->idgCount = 1; task->next = C_mult; integer2->value = i+1; task->data[task->idg] = (union Data*)integer1; @@ -139,9 +140,9 @@ } __code createTask2_stub(struct Context* context) { - Integer* integer1 = &ALLOCATE(context, Integer)->Integer; - Integer* integer2 = &ALLOCATE(context, Integer)->Integer; - Integer* integer3 = &ALLOCATE(context, Integer)->Integer; + Integer* integer1 = &ALLOCATE_DATA_GEAR(context, Integer)->Integer; + Integer* integer2 = &ALLOCATE_DATA_GEAR(context, Integer)->Integer; + Integer* integer3 = &ALLOCATE_DATA_GEAR(context, Integer)->Integer; goto createTask2(context, Gearef(context, LoopCounter), Gearef(context, TaskManager), @@ -151,16 +152,11 @@ integer3); } -__code createTask3(TaskManager* taskManager) { +__code createTask3(struct TaskManager* taskManager) { taskManager->next = C_createTask4; goto meta(context, taskManager->taskManager->TaskManager.spawn); } -__code createTask3_stub(struct Context* context) { - goto createTask3(context, - Gearef(context, TaskManager)); -} - __code createTask4(struct TaskManager* taskManager) { taskManager->next = C_createTask5; goto meta(context, taskManager->taskManager->TaskManager.createTask); @@ -171,21 +167,20 @@ task->next = C_add; task->idgCount = 0; integer1->value = i; - integer1->value = i+1; + integer2->value = i+1; task->data[task->idg] = (union Data*)integer1; task->data[task->idg+1] = (union Data*)integer2; task->maxIdg = task->idg + 2; task->odg = task->maxIdg; task->data[task->odg] = (union Data*)integer3; task->maxOdg = task->odg + 1; - taskManager->next = C_createTask3; - taskManager->data = (union Data*)integer1; + taskManager->next = C_createTask1; goto meta(context, taskManager->taskManager->TaskManager.spawn); } __code createTask5_stub(struct Context* context) { - Integer* integer1 = &ALLOCATE(context, Integer)->Integer; - Integer* integer2 = &ALLOCATE(context, Integer)->Integer; + Integer* integer1 = &ALLOCATE_DATA_GEAR(context, Integer)->Integer; + Integer* integer2 = &ALLOCATE_DATA_GEAR(context, Integer)->Integer; goto createTask5(context, Gearef(context, LoopCounter), Gearef(context, TaskManager), @@ -248,7 +243,7 @@ } __code mult_stub(struct Context* context) { - goto add(context, + goto mult(context, &context->data[context->idg]->Integer, &context->data[context->idg + 1]->Integer, &context->data[context->odg]->Integer);