#include "../context.h" #interface "TaskManager.h" #interface "Worker.h" #interface "Iterator.h" #interface "Queue.h" static void startWorker(Worker* worker); Worker* createCPUWorker(struct Context* context, int id, Queue* queue) { struct Worker* worker = new Worker(); struct CPUWorker* cpuWorker = new CPUWorker(); worker->worker = (union Data*)cpuWorker; worker->tasks = queue; cpuWorker->id = id; cpuWorker->loopCounter = 0; worker->taskReceive = C_taskReceiveCPUWorker; worker->shutdown = C_shutdownCPUWorker; pthread_create(&worker->thread, NULL, (void*)&startWorker, worker); return worker; } static void startWorker(struct Worker* worker) { CPUWorker* cpuWorker = (CPUWorker*)worker->worker; cpuWorker->context = NEW(struct Context); initContext(cpuWorker->context); Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker; Gearef(cpuWorker->context, Worker)->tasks = worker->tasks; goto meta(cpuWorker->context, worker->taskReceive); } __code taskReceiveCPUWorker(struct CPUWorker* worker, struct Queue* tasks) { goto tasks->take(getTaskCPUWorker); } __code getTaskCPUWorker(struct CPUWorker* cpuWorker, struct Context* task, struct Worker* worker) { if (!task) { goto worker->shutdown(); // end thread } task->worker = worker; enum Code taskCg = task->next; task->next = C_odgCommitCPUWorker; // set CG after task exec goto meta(task, taskCg); // switch task context } __code getTaskCPUWorker_stub(struct Context* context) { CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker); Worker* worker = &Gearef(context,Worker)->worker->Worker; struct Context* task = &Gearef(context, Queue)->data->Context; goto getTaskCPUWorker(context, cpuWorker, task, worker); } __code iterateCommitCPUWorker(struct CPUWorker* worker) { struct Iterator* iterator = context->iterator; goto iterator->barrier(context, odgCommitCPUWorker, iterateCommitCPUWorker1); } __code iterateCommitCPUWorker1(struct CPUWorker* worker, struct Context* task) { struct Worker* taskWorker = task->worker; goto taskWorker->taskReceive(taskWorker->tasks); } __code iterateCommitCPUWorker1_stub(struct Context* context) { // switch worker context struct Context* workerContext = context->worker->worker->CPUWorker.context; CPUWorker* cpuWorker = &context->worker->worker->CPUWorker; goto iterateCommitCPUWorker1(workerContext, cpuWorker, context); } __code odgCommitCPUWorker(struct CPUWorker* worker, struct Context* task) { int i = worker->loopCounter; if (task->odg+i < task->maxOdg) { goto odgCommitCPUWorker1(); } worker->loopCounter = 0; struct TaskManager* taskManager = task->taskManager; goto taskManager->decrementTaskCount(taskReceiveCPUWorker); } __code odgCommitCPUWorker_stub(struct Context* context) { // switch worker context struct Context* workerContext = context->worker->worker->CPUWorker.context; CPUWorker* cpuWorker = &context->worker->worker->CPUWorker; goto odgCommitCPUWorker(workerContext, cpuWorker, context); } __code odgCommitCPUWorker1(struct CPUWorker* worker) { int i = worker->loopCounter; struct Queue* queue = GET_WAIT_LIST(context->data[context->odg+i]); goto queue->isEmpty(odgCommitCPUWorker2, odgCommitCPUWorker4); } __code odgCommitCPUWorker2(struct CPUWorker* worker) { int i = worker->loopCounter; struct Queue* queue = GET_WAIT_LIST(context->data[context->odg+i]); goto queue->take(odgCommitCPUWorker3); } __code odgCommitCPUWorker3(struct CPUWorker* worker, struct Context* task) { if (__sync_fetch_and_sub(&task->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of task->idgCount point) struct TaskManager* taskManager = task->taskManager; taskManager->task = task; goto taskManager->spawn(task, odgCommitCPUWorker1); } goto odgCommitCPUWorker1(); } __code odgCommitCPUWorker3_stub(struct Context* context) { CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker); struct Context* task = &Gearef(context, Queue)->data->Context; goto odgCommitCPUWorker3(context, cpuWorker, task); } __code odgCommitCPUWorker4(struct CPUWorker* worker) { worker->loopCounter++; goto odgCommitCPUWorker(); } __code shutdownCPUWorker(struct CPUWorker* worker) { goto exit_code(); }