Mercurial > hg > GearsTemplate
view src/parallel_execution/CPUWorker.cbc @ 538:c0b6ce2ed820
Add comment
author | Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 13 Feb 2018 04:35:17 +0900 |
parents | a517b11c37f7 |
children |
line wrap: on
line source
#include "../context.h" #interface "TaskManager.h" #interface "Worker.h" #interface "Iterator.h" #interface "Queue.h" static void startWorker(Worker* worker); Worker* createCPUWorker(struct Context* context, int id, Queue* queue) { struct Worker* worker = new Worker(); struct CPUWorker* cpuWorker = new CPUWorker(); worker->worker = (union Data*)cpuWorker; worker->tasks = queue; cpuWorker->id = id; cpuWorker->loopCounter = 0; worker->taskReceive = C_taskReceiveCPUWorker; worker->shutdown = C_shutdownCPUWorker; pthread_create(&worker->thread, NULL, (void*)&startWorker, worker); return worker; } static void startWorker(struct Worker* worker) { struct CPUWorker* cpuWorker = &worker->worker->CPUWorker; cpuWorker->context = NEW(struct Context); initContext(cpuWorker->context); Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker; Gearef(cpuWorker->context, Worker)->tasks = worker->tasks; goto meta(cpuWorker->context, worker->taskReceive); } __code taskReceiveCPUWorker(struct CPUWorker* worker, struct Queue* tasks) { goto tasks->take(getTaskCPUWorker); } __code getTaskCPUWorker(struct CPUWorker* cpuWorker, struct Context* task, struct Worker* worker) { if (!task) { goto worker->shutdown(); // end thread } task->worker = worker; enum Code taskCg = task->next; task->next = C_odgCommitCPUWorker; // commit outputDG after task exec goto meta(task, taskCg); // switch task context } __code getTaskCPUWorker_stub(struct Context* context) { CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker); Worker* worker = &Gearef(context,Worker)->worker->Worker; struct Context* task = &Gearef(context, Queue)->data->Context; goto getTaskCPUWorker(context, cpuWorker, task, worker); } __code odgCommitCPUWorker(struct CPUWorker* worker, struct Context* task) { if (task->iterate) { struct Iterator* iterator = task->iterator; goto iterator->barrier(task, odgCommitCPUWorker1, odgCommitCPUWorker6); } else { goto odgCommitCPUWorker1(); } } __code odgCommitCPUWorker_stub(struct Context* context) { // switch worker context struct Context* workerContext = context->worker->worker->CPUWorker.context; Gearef(workerContext, Worker)->worker = (union Data*)context->worker; Gearef(workerContext, Worker)->task = context; CPUWorker* cpuWorker = (CPUWorker*)GearImpl(workerContext, Worker, worker); goto odgCommitCPUWorker(workerContext, cpuWorker, context); } __code odgCommitCPUWorker1(struct CPUWorker* worker, struct Context* task) { int i = worker->loopCounter; if (task->odg+i < task->maxOdg) { goto odgCommitCPUWorker2(); } worker->loopCounter = 0; struct TaskManager* taskManager = task->taskManager; goto taskManager->decrementTaskCount(odgCommitCPUWorker6); } __code odgCommitCPUWorker2(struct CPUWorker* worker, struct Context* task) { int i = worker->loopCounter; struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]); goto queue->isEmpty(odgCommitCPUWorker3, odgCommitCPUWorker5); } __code odgCommitCPUWorker3(struct CPUWorker* worker, struct Context* task) { int i = worker->loopCounter; struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]); goto queue->take(odgCommitCPUWorker4); } __code odgCommitCPUWorker4(struct CPUWorker* worker, struct Context* task, struct Context* waitTask) { if (__sync_fetch_and_sub(&waitTask->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of waitTask->idgCount point) struct TaskManager* taskManager = waitTask->taskManager; goto taskManager->spawn(waitTask, odgCommitCPUWorker2); } goto odgCommitCPUWorker2(); } __code odgCommitCPUWorker4_stub(struct Context* context) { CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker); struct Context* task = Gearef(context, Worker)->task; struct Context* waitTask = &Gearef(context, Queue)->data->Context; goto odgCommitCPUWorker4(context, cpuWorker, task, waitTask); } __code odgCommitCPUWorker5(struct CPUWorker* worker, struct Context* task) { worker->loopCounter++; goto odgCommitCPUWorker1(); } __code odgCommitCPUWorker6(struct CPUWorker* worker, struct Context* task) { struct Worker* taskWorker = task->worker; goto taskWorker->taskReceive(taskWorker->tasks); } __code shutdownCPUWorker(struct CPUWorker* worker) { goto exit_code(); }