#include "../context.h" #interface "TaskManager.h" #interface "Worker.h" #interface "Iterator.h" #interface "Queue.h" extern void cudaInit(struct CUDAWorker *cudaWorker,int phase) ; extern void cudaShutdown(CUDAWorker *cudaWorker); static void startCUDAWorker(Worker* worker); Worker* createCUDAWorker(struct Context* context, int id, Queue* queue, TaskManagerImpl *im) { struct Worker* worker = new Worker(); struct CUDAWorker* cudaWorker = new CUDAWorker(); worker->worker = (union Data*)cudaWorker; worker->tasks = queue; cudaWorker->id = id; cudaWorker->loopCounter = 0; worker->taskReceive = C_taskReceiveCUDAWorker; worker->shutdown = C_shutdownCUDAWorker; pthread_create(&worker->thread, NULL, (void*)&startCUDAWorker, worker); return worker; } static void startCUDAWorker(Worker* worker) { struct CUDAWorker* cudaWorker = &worker->worker->CUDAWorker; int deviceNum = 0; cudaInit(cudaWorker, deviceNum); cudaWorker->context = NEW(struct Context); initContext(cudaWorker->context); cudaWorker->executor = createCUDAExecutor(cudaWorker->context, cudaWorker->device); Gearef(cudaWorker->context, Worker)->worker = (union Data*)worker; Gearef(cudaWorker->context, Worker)->tasks = worker->tasks; goto meta(cudaWorker->context, worker->taskReceive); } __code taskReceiveCUDAWorker(struct Worker* worker, struct Queue* tasks) { goto tasks->take(getTaskCUDAWorker); } __code getTaskCUDAWorker(struct CUDAWorker* cudaWorker, struct Context* task, struct Worker* worker) { if (!task) { goto worker->shutdown(); // end thread } task->worker = worker; enum Code taskCg = task->next; task->next = C_odgCommitCUDAWorker; // commit outputDG after task exec goto meta(task, taskCg); // switch task context } __code getTaskCUDAWorker_stub(struct Context* context) { CUDAWorker* cudaWorker = (CUDAWorker*)GearImpl(context, Worker, worker); Worker* worker = &Gearef(context,Worker)->worker->Worker; struct Context* task = &Gearef(context, Queue)->data->Context; goto getTaskCUDAWorker(context, cudaWorker, task, worker); } __code odgCommitCUDAWorker(struct CUDAWorker* worker, struct Context* task) { if (task->iterate) { struct Iterator* iterator = task->iterator; goto iterator->barrier(task, odgCommitCUDAWorker1, odgCommitCUDAWorker6); } else { goto odgCommitCUDAWorker1(); } } __code odgCommitCUDAWorker_stub(struct Context* context) { // switch worker context struct Context* workerContext = context->worker->worker->CUDAWorker.context; Gearef(workerContext, Worker)->worker = (union Data*)context->worker; Gearef(workerContext, Worker)->task = context; CUDAWorker* cudaWorker = (CUDAWorker*)GearImpl(workerContext, Worker, worker); goto odgCommitCUDAWorker(workerContext, cudaWorker, context); } __code odgCommitCUDAWorker1(struct CUDAWorker* worker, struct Context* task) { int i = worker->loopCounter; if (task->odg+i < task->maxOdg) { goto odgCommitCUDAWorker2(); } worker->loopCounter = 0; struct TaskManager* taskManager = task->taskManager; goto taskManager->decrementTaskCount(odgCommitCUDAWorker6); } __code odgCommitCUDAWorker2(struct CUDAWorker* worker, struct Context* task) { int i = worker->loopCounter; struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]); goto queue->isEmpty(odgCommitCUDAWorker3, odgCommitCUDAWorker5); } __code odgCommitCUDAWorker3(struct CUDAWorker* worker, struct Context* task) { int i = worker->loopCounter; struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]); goto queue->take(odgCommitCUDAWorker4); } __code odgCommitCUDAWorker4(struct CUDAWorker* worker, struct Context* task, struct Context* waitTask) { if (__sync_fetch_and_sub(&waitTask->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of waitTask->idgCount point) struct TaskManager* taskManager = waitTask->taskManager; goto taskManager->spawn(waitTask, odgCommitCUDAWorker2); } goto odgCommitCUDAWorker2(); } __code odgCommitCUDAWorker4_stub(struct Context* context) { CUDAWorker* cudaWorker = (CUDAWorker*)GearImpl(context, Worker, worker); struct Context* task = Gearef(context, Worker)->task; struct Context* waitTask = &Gearef(context, Queue)->data->Context; goto odgCommitCUDAWorker4(context, cudaWorker, task, waitTask); } __code odgCommitCUDAWorker5(struct CUDAWorker* worker, struct Context* task) { worker->loopCounter++; goto odgCommitCUDAWorker1(); } __code odgCommitCUDAWorker6(struct CUDAWorker* worker, struct Context* task) { struct Worker* taskWorker = task->worker; goto taskWorker->taskReceive(taskWorker->tasks); } __code shutdownCUDAWorker(struct CUDAWorker* worker) { cudaShutdown(worker); goto meta(context, C_exit_code); }