Mercurial > hg > GearsTemplate
view src/parallel_execution/CPUWorker.cbc @ 372:d6ce4273e7d1
Add dimension task spawn
author | Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 11 Jul 2017 17:47:11 +0900 |
parents | 534601ed8c50 |
children | fb50cf8aa615 |
line wrap: on
line source
#include "../context.h" static void start_worker(Worker* worker); Worker* createCPUWorker(struct Context* context, int id, Queue* queue) { struct Worker* worker = new Worker(); struct CPUWorker* cpuWorker = new CPUWorker(); worker->worker = (union Data*)cpuWorker; worker->tasks = queue; cpuWorker->id = id; worker->taskReceive = C_taskReceiveWorker; worker->shutdown = C_shutdownWorker; pthread_create(&worker->worker->CPUWorker.thread, NULL, (void*)&start_worker, worker); return worker; } static void start_worker(Worker* worker) { CPUWorker* cpuWorker = (CPUWorker*)worker->worker; cpuWorker->context = NEW(struct Context); initContext(cpuWorker->context); Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker; goto meta(cpuWorker->context, worker->taskReceive); } __code taskReceiveWorker(struct Worker* worker,struct Queue* queue) { queue->queue = (union Data*)worker->tasks; queue->next = C_getTask; goto meta(context, worker->tasks->take); } __code taskReceiveWorker_stub(struct Context* context) { goto taskReceiveWorker(context, &Gearef(context, Worker)->worker->Worker, Gearef(context, Queue)); } __code getTask(struct Worker* worker, struct Context* task) { if (!task) return; // end thread task->worker = worker; enum Code taskCg = task->next; task->next = C_odgCommit; // set CG after task exec goto meta(task, taskCg); } __code getTask_stub(struct Context* context) { Worker* worker = &Gearef(context,Worker)->worker->Worker; struct Context* task = &Gearef(context, Queue)->data->Context; goto getTask(context, worker, task); } __code odgCommit(struct LoopCounter* loopCounter, struct Context* task) { int i = loopCounter->i ; if(task->odg + i < task->maxOdg) { goto meta(task, C_odgCommit1); } loopCounter->i = 0; goto meta(context, C_taskReceiveWorker); } __code odgCommit_stub(struct Context* context) { struct Context* workerContext = context->worker->worker->CPUWorker.context; goto odgCommit(workerContext, Gearef(context, LoopCounter), context); } __code odgCommit1(struct LoopCounter* loopCounter, struct Queue* queue) { int i = loopCounter->i ; queue->queue = (union Data*)GET_WAIT_LIST(context->data[context->odg+i]); queue->whenEmpty = C_odgCommit4; queue->next = C_odgCommit2; goto meta(context, queue->queue->Queue.isEmpty); } __code odgCommit1_stub(struct Context* context) { goto odgCommit1(context, Gearef(context, LoopCounter), Gearef(context, Queue)); } __code odgCommit2(struct Queue* queue) { queue->next = C_odgCommit3; goto meta(context, queue->queue->Queue.take); } __code odgCommit2_stub(struct Context* context) { goto odgCommit2(context, Gearef(context, Queue)); } __code odgCommit3(struct TaskManager* taskManager, struct Context* task) { int idgCount = task->idgCount; if(__sync_bool_compare_and_swap(&task->idgCount, idgCount, idgCount-1)) { // atomic decrement idg counter if(idgCount-1 == 0) { taskManager->taskManager = (union Data*)task->taskManager; taskManager->context = task; taskManager->next = C_odgCommit1; goto meta(context, task->taskManager->spawn); } else { goto meta(context, C_odgCommit1); } } else { goto meta(context, C_odgCommit3); } } __code odgCommit3_stub(struct Context* context) { struct Context* task = &Gearef(context, Queue)->data->Context; goto odgCommit3(context, Gearef(context, TaskManager), task); } __code odgCommit4(struct LoopCounter* loopCounter) { loopCounter->i++; goto meta(context, C_odgCommit); } __code odgCommit4_stub(struct Context* context) { goto odgCommit4(context, Gearef(context, LoopCounter)); } __code shutdownWorker(struct CPUWorker* worker) { }