Mercurial > hg > GearsTemplate
changeset 267:d041069bc7fe
add \TaskManager.cbc
author | mir3636 |
---|---|
date | Sun, 29 Jan 2017 21:26:27 +0900 |
parents | ffcd80cc3a83 |
children | 378ce6f74f4b |
files | src/parallel_execution/TaskManager.cbc |
diffstat | 1 files changed, 140 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/TaskManager.cbc Sun Jan 29 21:26:27 2017 +0900 @@ -0,0 +1,140 @@ +#include "../context.h" +#include "../stack.h" +#include "../queue.h" +#include "../worker.h" +#include "../origin_cs.h" +#include <stdio.h> + +void createWorkers(struct Context* context, TaskManager* taskManeger, TaskManagerImpl* taskManagerImpl); + +TaskManager** createTaskManager(struct Context* context, int numCPU, int numGPU, int numIO) { + struct TaskManager* taskManager = new TaskManager(); + // 0...numIO-1 IOProcessor + // numIO...numIO+numGPU-1 GPUProcessor + // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor + taskManager->io = 0; + taskManager->gpu = numIO; + taskManager->cpu = numIO+numGPU; + taskManager->maxCPU = numIO+numGPU+numCPU; + taskManager->createTask = C_createTask; + taskManager->spawn = C_spawnTaskManager; + taskManager->shutdown = C_shutdownTaskManager; + struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl(); + taskManager->taskManager = (union Data*)taskManagerImpl; + taskManagerImpl -> activeQueue = &createSingleLinkedQueue(context)->Queue; + taskManagerImpl -> taskQueue = &createSingleLinkedQueue(context)->Queue; + taskManagerImpl -> numWorker = taskManager->maxCPU; + createWorkers(context, taskManager, taskManagerImpl); + return taskManager; +} + +void createWorkers(struct Context* context, TaskManager* taskManager, TaskManagerImpl* taskManagerImpl) { + int i = 0; + taskManagerImpl->workers = (Worker**)ALLOC_ARRAY(context, Worker, taskManager->maxCPU); + for (;i<taskManager->gpu;i++) { + Queue* queue = &createSynchronizedQueue(context)->Queue; + taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); + } + for (;i<taskManager->cpu;i++) { +#ifdef USE_CUDA +#else + Queue* queue = &createSynchronizedQueue(context)->Queue; + taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); +#endif + } + for (;i<taskManager->maxCPU;i++) { + Queue* queue = &createSynchronizedQueue(context)->Queue; + taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); + } +} + +__code createTask(struct TaskManager* taskManager) { + taskManager->context = NEW(struct Context); + initContext(taskManager->context); + goto C_setWorker(...); +} + +__code setWorker(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { + task->workerId = taskManager->sendWorkerIndex; + if(++taskManager->sendWorkerIndex >= taskManager->numWorker) { + taskManager->sendWorkerIndex = 0; + } + goto next(...); +} + +__code spawnTaskManager(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) { + if (task->idgCount == 0) { + // enqueue activeQueue + queue->queue = (union Data*)taskManager->activeQueue; + } else { + // enqueue waitQueue + queue->queue = (union Data*)taskManager->taskQueue; + } + queue->data = (union Data*)task; + queue->next = C_spawnTaskManager1; + goto meta(context, queue->queue->Queue.put); +} + +__code spawnTaskManager_stub(struct Context* context) { + TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); + pthread_mutex_lock(&taskManager->mutex); + goto spawnTaskManager(context, + taskManager, + Gearef(context, Queue), + Gearef(context, TaskManager)->context, + Gearef(context, TaskManager)->next); +} + + +__code spawnTaskManager1(struct TaskManagerImpl* taskManager) { + pthread_mutex_unlock(&taskManager->mutex); + goto C_taskSend(...); +} + +__code spawnTaskManager1_stub(struct Context* context) { + TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); + goto spawnTaskManager1(context, + taskManager); +} + +__code taskSend(struct TaskManagerImpl* taskManager, struct Queue* queue) { + queue->queue = (union Data*)taskManager->activeQueue; + queue->next = C_taskSend1; + goto meta(context, taskManager->activeQueue->take); +} + +__code taskSend1(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) { + struct Queue* tasks = taskManager->workers[task->workerId]->tasks; + queue->queue = (union Data*)tasks; + queue->data = (union Data*)task; + queue->next = next; + pthread_cond_signal(&taskManager->workers[task->workerId]->worker->CPUWorker.cond); + goto meta(context, tasks->put); +} + +__code taskSend1_stub(struct Context* context) { + TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); + goto taskSend1(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); +} + +__code shutdownTaskManager(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct TaskManagerImpl* taskManagerImpl, struct Queue* queue, __code next(...)) { + int i = loopCounter->i; + if (taskManager->cpu <= i && i < taskManager->maxCPU) { + struct Queue* tasks = taskManagerImpl->workers[i]->tasks; + queue->queue = (union Data*)tasks; + queue->data = NULL; + queue->next = next; + goto meta(context, tasks->put); + pthread_join(taskManagerImpl->workers[i]->worker->CPUWorker.thread, NULL); + loopCounter->i++; + goto C_shutdownTaskManager(...); + } + + loopCounter->i = 0; + goto meta(context, taskManager->next); +} + +__code shutdownTaskManager_stub(struct Context* context) { + TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); + goto shutdownTaskManager(context, Gearef(context, LoopCounter), Gearef(context, TaskManager), taskManagerImpl, Gearef(context, Queue), Gearef(context, TaskManager)->next); +}