Mercurial > hg > GearsTemplate
changeset 245:308368406fe7
Merge
author | Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp> |
---|---|
date | Wed, 25 Jan 2017 03:04:39 +0900 |
parents | d1567718f12c (diff) 6a80ab36181c (current diff) |
children | 421ea91dd76c |
files | |
diffstat | 3 files changed, 26 insertions(+), 51 deletions(-) [+] |
line wrap: on
line diff
--- a/src/parallel_execution/context.h Tue Jan 24 19:52:20 2017 +0900 +++ b/src/parallel_execution/context.h Wed Jan 25 03:04:39 2017 +0900 @@ -40,7 +40,7 @@ struct Meta* meta = (struct Meta*)context->heap;\ context->heap += sizeof(struct Meta);\ union Data* data = context->heap; \ - context->heap += sizeof(struct dseg)*len; \ + context->heap += sizeof(struct dseg *)*len; \ meta->type = D_##dseg; \ meta->size = len; \ data; }) @@ -116,13 +116,14 @@ pthread_mutex_t mutex; struct Queue* activeQueue; struct Queue* taskQueue; - struct Worker* workers; + struct Worker** workers; } TaskManagerImpl; struct Worker { union Data* worker; enum Code taskReceive; enum Code shutdown; enum Code next; + struct Queue* tasks; } Worker; struct CPUWorker { pthread_t thread; @@ -130,7 +131,6 @@ pthread_cond_t cond; struct Context* context; int id; - struct Queue* tasks; } CPUWorker; #ifdef USE_CUDA struct CudaWorker {
--- a/src/parallel_execution/taskManager.c Tue Jan 24 19:52:20 2017 +0900 +++ b/src/parallel_execution/taskManager.c Wed Jan 25 03:04:39 2017 +0900 @@ -31,7 +31,7 @@ void createWorkers(Context* context, TaskManager* taskManager) { int i = 0; TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context,TaskManager,taskManager); - taskManagerImpl->workers = (Worker*)ALLOC_ARRAY(context,Worker,taskManager->maxCPU); + taskManagerImpl->workers = (Worker**)ALLOC_ARRAY(context, Worker, taskManager->maxCPU); for (;i<taskManager->gpu;i++) { Queue* queue = &createSynchronizedQueue(context)->Queue; taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); @@ -59,17 +59,17 @@ goto createTask(context,Gearef(context,TaskManager)); } -__code setRunWorker(struct Context* context, TaskManagerImpl* taskManager, enum Code next) { - task->workerId = taskManagerImpl->sendWorkerIndex; - if(++taskManagerImpl->sendWorkerIndex >= taskManagerImpl->numWorker) { - taskManagerImpl->sendWorkerIndex = 0; +__code setRunWorker(struct Context* context, TaskManagerImpl* taskManager, Context* task, enum Code next) { + task->workerId = taskManager->sendWorkerIndex; + if(++taskManager->sendWorkerIndex >= taskManager->numWorker) { + taskManager->sendWorkerIndex = 0; } goto meta(context, next); } __code setRunWorker_stub(struct Context* context) { TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); - goto createTask(context, taskManager, Gearef(context, TaskManager)->next); + goto setRunWorker(context, taskManager, Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); } __code spawnTaskManager(struct Context* context, struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, enum Code next) { @@ -103,12 +103,12 @@ __code spawnTaskManager1_stub(struct Context* context) { TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); - goto spawnTaskManager(context, + goto spawnTaskManager1(context, taskManager, Gearef(context, TaskManager)->next); } -__code taskSend(struct Context* context, TaskManagerImpl* taskManager) { +__code taskSend(struct Context* context, TaskManagerImpl* taskManager, Queue* queue) { queue->queue = (union Data*)taskManager->activeQueue; queue->next = C_taskSend1; goto meta(context, taskManager->activeQueue->put); @@ -116,51 +116,27 @@ __code taskSend_stub(struct Context* context) { TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); - goto taskSend(context, taskManager); + goto taskSend(context, taskManager, Gearef(context, Queue)); } -__code taskSend1(struct Context* context, TaskManagerImpl* taskManager, struct Context* task, enum Code next) { +__code taskSend1(struct Context* context, TaskManagerImpl* taskManager, Queue* queue, struct Context* task, enum Code next) { struct Queue* tasks = taskManager->workers[task->workerId]->tasks; - queue->queue = tasks; - queue->data = (union Data*)taskManager->context; + queue->queue = (union Data*)tasks; + queue->data = (union Data*)task; queue->next = next; - pthread_cond_signal(&taskManager->workers[task->workerId]->cond); + pthread_cond_signal(&taskManager->workers[task->workerId]->worker->CPUWorker.cond); goto meta(context, tasks->put); } __code taskSend1_stub(struct Context* context) { TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); - goto taskSend(context, taskManager, Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); + goto taskSend1(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); } -__code createWorker1(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker) { +__code shutdownTaskManager(struct Context* context, struct LoopCounter* loopCounter, TaskManager* taskManager, TaskManagerImpl* taskManagerImpl) { int i = loopCounter->i; - - if (i < worker->id) { - struct Context* worker_context = &worker->contexts[i]; - worker_context->next = C_getTask1; - worker_context->data[D_Tree] = context->data[D_Tree]; - // worker_context->data[D_ActiveQueue] = context->data[D_ActiveQueue]; - pthread_create(&worker_context->thread, NULL, (void*)&start_code, worker_context); - worker_context->thread_num = i; - loopCounter->i++; - - goto meta(context, C_createWorker1); - } - - loopCounter->i = 0; - goto meta(context, C_taskManager); -} - -__code createWorker1_stub(struct Context* context) { - goto createWorker1(context, &context->data[D_LoopCounter]->LoopCounter, &context->data[D_Worker]->Worker); -} - -__code shutdownTaskManager(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker, struct TaskManager* taskManager) { - int i = loopCounter->i; - - if (i < worker->id) { - pthread_join(worker->contexts[i].thread, NULL); + if (taskManager->cpu <= i && i < taskManager->maxCPU) { + pthread_join(taskManagerImpl->workers[i]->worker->CPUWorker.thread, NULL); loopCounter->i++; goto meta(context, C_shutdownTaskManager); @@ -171,5 +147,6 @@ } __code shutdownTaskManager_stub(struct Context* context) { - goto shutdownTaskManager(context, &context->data[D_LoopCounter]->LoopCounter, &context->data[D_Worker]->Worker, Gearef(context, TaskManager)); + TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); + goto shutdownTaskManager(context, Gearef(context, LoopCounter), Gearef(context, TaskManager), taskManagerImpl); }
--- a/src/parallel_execution/worker.c Tue Jan 24 19:52:20 2017 +0900 +++ b/src/parallel_execution/worker.c Wed Jan 25 03:04:39 2017 +0900 @@ -9,7 +9,7 @@ struct Worker* worker = ALLOC(context, Worker); struct CPUWorker* cpuWorker = ALLOC(context, CPUWorker); worker->worker = (union Data*)cpuWorker; - cpuWorker->tasks = queue; + worker->tasks = queue; cpuWorker->id = id; worker->taskReceive = C_taskReceiveWorker; worker->shutdown = C_shutdownWorker; @@ -25,15 +25,14 @@ goto meta(cpuWorker->context, C_taskReceiveWorker); } -__code taskReceiveWorker(struct Context* context, CPUWorker* worker) { +__code taskReceiveWorker(struct Context* context, Worker* worker) { Queue* queue = worker->tasks; queue->next = C_getTask1; goto meta(context, queue->take); } __code taskReceiveWorker_stub(struct Context* context) { - CPUWorker* worker = (CPUWorker *)GearImpl(context, Worker, worker); - goto taskReceiveWorker(context,worker); + goto taskReceiveWorker(context, Gearef(context, Worker)); } __code getTask1(struct Context* context, Worker* worker, struct Context* task) { @@ -45,8 +44,7 @@ __code getTask1_stub(struct Context* context) { Worker* worker = Gearef(context,Worker); - CPUWorker* cpuWorker = (CPUWorker *)GearImpl(context, Worker, worker); - Context* task = &cpuWorker->tasks->data->context; + Context* task = &worker->tasks->data->context; goto getTask1(context,worker,task); }