Mercurial > hg > Members > Moririn
changeset 269:5170539348ec
rename TaskManagerImpl.cbc
author | mir3636 |
---|---|
date | Sun, 29 Jan 2017 22:15:32 +0900 |
parents | 378ce6f74f4b |
children | b6ed4b2a5d9d |
files | src/parallel_execution/TaskManager.cbc src/parallel_execution/TaskManagerImpl.cbc src/parallel_execution/generate_stub.pl |
diffstat | 3 files changed, 156 insertions(+), 142 deletions(-) [+] |
line wrap: on
line diff
--- a/src/parallel_execution/TaskManager.cbc Sun Jan 29 21:30:58 2017 +0900 +++ b/src/parallel_execution/TaskManager.cbc Sun Jan 29 22:15:32 2017 +0900 @@ -1,140 +1,14 @@ -#include "../context.h" -#include "../stack.h" -#include "../queue.h" -#include "../worker.h" -#include "../origin_cs.h" -#include <stdio.h> - -void createWorkers(struct Context* context, TaskManager* taskManeger, TaskManagerImpl* taskManagerImpl); - -TaskManager** createTaskManager(struct Context* context, int numCPU, int numGPU, int numIO) { - struct TaskManager* taskManager = new TaskManager(); - // 0...numIO-1 IOProcessor - // numIO...numIO+numGPU-1 GPUProcessor - // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor - taskManager->io = 0; - taskManager->gpu = numIO; - taskManager->cpu = numIO+numGPU; - taskManager->maxCPU = numIO+numGPU+numCPU; - taskManager->createTask = C_createTask; - taskManager->spawn = C_spawnTaskManager; - taskManager->shutdown = C_shutdownTaskManager; - struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl(); - taskManager->taskManager = (union Data*)taskManagerImpl; - taskManagerImpl -> activeQueue = &createSingleLinkedQueue(context)->Queue; - taskManagerImpl -> taskQueue = &createSingleLinkedQueue(context)->Queue; - taskManagerImpl -> numWorker = taskManager->maxCPU; - createWorkers(context, taskManager, taskManagerImpl); - return taskManager; -} - -void createWorkers(struct Context* context, TaskManager* taskManager, TaskManagerImpl* taskManagerImpl) { - int i = 0; - taskManagerImpl->workers = (Worker**)ALLOC_ARRAY(context, Worker, taskManager->maxCPU); - for (;i<taskManager->gpu;i++) { - Queue* queue = &createSynchronizedQueue(context)->Queue; - taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); - } - for (;i<taskManager->cpu;i++) { -#ifdef USE_CUDA -#else - Queue* queue = &createSynchronizedQueue(context)->Queue; - taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); -#endif - } - for (;i<taskManager->maxCPU;i++) { - Queue* queue = &createSynchronizedQueue(context)->Queue; - taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); - } -} - -__code createTask(struct TaskManager* taskManager) { - taskManager->context = NEW(struct Context); - initContext(taskManager->context); - goto C_setWorker(...); -} - -__code setWorker(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { - task->workerId = taskManager->sendWorkerIndex; - if(++taskManager->sendWorkerIndex >= taskManager->numWorker) { - taskManager->sendWorkerIndex = 0; - } - goto next(...); -} - -__code spawnTaskManager(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) { - if (task->idgCount == 0) { - // enqueue activeQueue - queue->queue = (union Data*)taskManager->activeQueue; - } else { - // enqueue waitQueue - queue->queue = (union Data*)taskManager->taskQueue; - } - queue->data = (union Data*)task; - queue->next = C_spawnTaskManager1; - goto meta(context, queue->queue->Queue.put); -} - -__code spawnTaskManager_stub(struct Context* context) { - TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); - pthread_mutex_lock(&taskManager->mutex); - goto spawnTaskManager(context, - taskManager, - Gearef(context, Queue), - Gearef(context, TaskManager)->context, - Gearef(context, TaskManager)->next); -} - - -__code spawnTaskManager1(struct TaskManagerImpl* taskManager) { - pthread_mutex_unlock(&taskManager->mutex); - goto C_taskSend(...); -} - -__code spawnTaskManager1_stub(struct Context* context) { - TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); - goto spawnTaskManager1(context, - taskManager); -} - -__code taskSend(struct TaskManagerImpl* taskManager, struct Queue* queue) { - queue->queue = (union Data*)taskManager->activeQueue; - queue->next = C_taskSend1; - goto meta(context, taskManager->activeQueue->take); -} - -__code taskSend1(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) { - struct Queue* tasks = taskManager->workers[task->workerId]->tasks; - queue->queue = (union Data*)tasks; - queue->data = (union Data*)task; - queue->next = next; - pthread_cond_signal(&taskManager->workers[task->workerId]->worker->CPUWorker.cond); - goto meta(context, tasks->put); -} - -__code taskSend1_stub(struct Context* context) { - TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); - goto taskSend1(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); -} - -__code shutdownTaskManager(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct TaskManagerImpl* taskManagerImpl, struct Queue* queue, __code next(...)) { - int i = loopCounter->i; - if (taskManager->cpu <= i && i < taskManager->maxCPU) { - struct Queue* tasks = taskManagerImpl->workers[i]->tasks; - queue->queue = (union Data*)tasks; - queue->data = NULL; - queue->next = next; - goto meta(context, tasks->put); - pthread_join(taskManagerImpl->workers[i]->worker->CPUWorker.thread, NULL); - loopCounter->i++; - goto C_shutdownTaskManager(...); - } - - loopCounter->i = 0; - goto meta(context, taskManager->next); -} - -__code shutdownTaskManager_stub(struct Context* context) { - TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); - goto shutdownTaskManager(context, Gearef(context, LoopCounter), Gearef(context, TaskManager), taskManagerImpl, Gearef(context, Queue), Gearef(context, TaskManager)->next); -} +typedef struct TaskMabager<Impl>{ + union Data* taskManager; + __code createTask(struct TaskManager* taskManager); + __code spawn(Impl* taskManager, struct Queue* queue, struct Context* task, __code next(...)); + __code shutdown(struct LoopCounter* loopCounter, struct TaskManager* taskManager, Impl* taskManagerImpl, struct Queue* queue, __code next(...)); + __code next(...); + __code task(...); + struct Context* context; + int worker; + int cpu; + int gpu; + int io; + int maxCPU; +} TaskManager;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallel_execution/TaskManagerImpl.cbc Sun Jan 29 22:15:32 2017 +0900 @@ -0,0 +1,140 @@ +#include "../context.h" +#include "../stack.h" +#include "../queue.h" +#include "../worker.h" +#include "../origin_cs.h" +#include <stdio.h> + +void createWorkers(struct Context* context, TaskManager* taskManeger, TaskManagerImpl* taskManagerImpl); + +TaskManager** createTaskManager(struct Context* context, int numCPU, int numGPU, int numIO) { + struct TaskManager* taskManager = new TaskManager(); + // 0...numIO-1 IOProcessor + // numIO...numIO+numGPU-1 GPUProcessor + // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor + taskManager->io = 0; + taskManager->gpu = numIO; + taskManager->cpu = numIO+numGPU; + taskManager->maxCPU = numIO+numGPU+numCPU; + taskManager->createTask = C_createTask; + taskManager->spawn = C_spawnTaskManager; + taskManager->shutdown = C_shutdownTaskManager; + struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl(); + taskManager->taskManager = (union Data*)taskManagerImpl; + taskManagerImpl -> activeQueue = &createSingleLinkedQueue(context)->Queue; + taskManagerImpl -> taskQueue = &createSingleLinkedQueue(context)->Queue; + taskManagerImpl -> numWorker = taskManager->maxCPU; + createWorkers(context, taskManager, taskManagerImpl); + return taskManager; +} + +void createWorkers(struct Context* context, TaskManager* taskManager, TaskManagerImpl* taskManagerImpl) { + int i = 0; + taskManagerImpl->workers = (Worker**)ALLOC_ARRAY(context, Worker, taskManager->maxCPU); + for (;i<taskManager->gpu;i++) { + Queue* queue = &createSynchronizedQueue(context)->Queue; + taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); + } + for (;i<taskManager->cpu;i++) { +#ifdef USE_CUDA +#else + Queue* queue = &createSynchronizedQueue(context)->Queue; + taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); +#endif + } + for (;i<taskManager->maxCPU;i++) { + Queue* queue = &createSynchronizedQueue(context)->Queue; + taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); + } +} + +__code createTask(struct TaskManager* taskManager) { + taskManager->context = NEW(struct Context); + initContext(taskManager->context); + goto C_setWorker(...); +} + +__code setWorker(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { + task->workerId = taskManager->sendWorkerIndex; + if(++taskManager->sendWorkerIndex >= taskManager->numWorker) { + taskManager->sendWorkerIndex = 0; + } + goto next(...); +} + +__code spawnTaskManager(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) { + if (task->idgCount == 0) { + // enqueue activeQueue + queue->queue = (union Data*)taskManager->activeQueue; + } else { + // enqueue waitQueue + queue->queue = (union Data*)taskManager->taskQueue; + } + queue->data = (union Data*)task; + queue->next = C_spawnTaskManager1; + goto meta(context, queue->queue->Queue.put); +} + +__code spawnTaskManager_stub(struct Context* context) { + TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); + pthread_mutex_lock(&taskManager->mutex); + goto spawnTaskManager(context, + taskManager, + Gearef(context, Queue), + Gearef(context, TaskManager)->context, + Gearef(context, TaskManager)->next); +} + + +__code spawnTaskManager1(struct TaskManagerImpl* taskManager) { + pthread_mutex_unlock(&taskManager->mutex); + goto C_taskSend(...); +} + +__code spawnTaskManager1_stub(struct Context* context) { + TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); + goto spawnTaskManager1(context, + taskManager); +} + +__code taskSend(struct TaskManagerImpl* taskManager, struct Queue* queue) { + queue->queue = (union Data*)taskManager->activeQueue; + queue->next = C_taskSend1; + goto meta(context, taskManager->activeQueue->take); +} + +__code taskSend1(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) { + struct Queue* tasks = taskManager->workers[task->workerId]->tasks; + queue->queue = (union Data*)tasks; + queue->data = (union Data*)task; + queue->next = next; + pthread_cond_signal(&taskManager->workers[task->workerId]->worker->CPUWorker.cond); + goto meta(context, tasks->put); +} + +__code taskSend1_stub(struct Context* context) { + TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); + goto taskSend1(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); +} + +__code shutdownTaskManager(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct TaskManagerImpl* taskManagerImpl, struct Queue* queue, __code next(...)) { + int i = loopCounter->i; + if (taskManager->cpu <= i && i < taskManager->maxCPU) { + struct Queue* tasks = taskManagerImpl->workers[i]->tasks; + queue->queue = (union Data*)tasks; + queue->data = NULL; + queue->next = next; + goto meta(context, tasks->put); + pthread_join(taskManagerImpl->workers[i]->worker->CPUWorker.thread, NULL); + loopCounter->i++; + goto C_shutdownTaskManager(...); + } + + loopCounter->i = 0; + goto meta(context, taskManager->next); +} + +__code shutdownTaskManager_stub(struct Context* context) { + TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); + goto shutdownTaskManager(context, Gearef(context, LoopCounter), Gearef(context, TaskManager), taskManagerImpl, Gearef(context, Queue), Gearef(context, TaskManager)->next); +}
--- a/src/parallel_execution/generate_stub.pl Sun Jan 29 21:30:58 2017 +0900 +++ b/src/parallel_execution/generate_stub.pl Sun Jan 29 22:15:32 2017 +0900 @@ -53,12 +53,12 @@ $dataGear{$name} = $_; $var{$name} = {}; $code{$name} = {}; - } elsif (/^(\w+)\* create(\w+)\(/) { + } elsif (/^(\w+)(\*)+ create(\w+)\(/) { if (defined $interface) { die "duplicate interface $interface\n"; } $interface = $1; - $implementation = $2; + $implementation = $3; if ( -f "$interface.cbc") { &getDataGear("$interface.cbc"); }