Mercurial > hg > GearsTemplate
view src/parallel_execution/TaskManagerImpl.cbc @ 479:b8b412a7670a
Fix segmentation fault if multithread
author | Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 28 Dec 2017 19:51:06 +0900 |
parents | a4d94c591246 |
children | 39b5df2d1c93 |
line wrap: on
line source
#include "../context.h" #interface "TaskManager.h" #interface "Iterator.h" #interface "Queue.h" #interface "Worker.h" #include <stdio.h> #include <unistd.h> void createWorkers(struct Context* context, TaskManagerImpl* taskManager); TaskManager* createTaskManagerImpl(struct Context* context, int numCPU, int numGPU, int numIO) { struct TaskManager* taskManager = new TaskManager(); taskManager->spawnTasks = C_spawnTasksTaskManagerImpl; taskManager->spawn = C_spawnTaskManagerImpl; taskManager->shutdown = C_shutdownTaskManagerImpl; taskManager->incrementTaskCount = C_incrementTaskCountTaskManagerImpl; taskManager->decrementTaskCount = C_decrementTaskCountTaskManagerImpl; taskManager->setWaitTask = C_setWaitTaskTaskManagerImpl; struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl(); // 0...numIO-1 IOProcessor // numIO...numIO+numGPU-1 GPUProcessor // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor taskManagerImpl->io = 0; taskManagerImpl->gpu = numIO; taskManagerImpl->cpu = numIO+numGPU; taskManagerImpl->maxCPU = numIO+numGPU+numCPU; taskManagerImpl->taskQueue = createSingleLinkedQueue(context); taskManagerImpl->numWorker = taskManagerImpl->maxCPU; taskManagerImpl->sendGPUWorkerIndex = taskManagerImpl->gpu; taskManagerImpl->sendCPUWorkerIndex = taskManagerImpl->cpu; taskManagerImpl->taskCount = 0; taskManagerImpl->loopCounter = new LoopCounter(); taskManagerImpl->loopCounter -> i = 0; createWorkers(context, taskManagerImpl); taskManager->taskManager = (union Data*)taskManagerImpl; return taskManager; } void createWorkers(struct Context* context, TaskManagerImpl* taskManager) { int i = 0; taskManager->workers = (Worker**)ALLOCATE_PTR_ARRAY(context, Worker, taskManager->maxCPU); for (;i<taskManager->gpu;i++) { Queue* queue = createSynchronizedQueue(context); taskManager->workers[i] = (Worker*)createCPUWorker(context, i, queue); } for (;i<taskManager->cpu;i++) { Queue* queue = createSynchronizedQueue(context); #ifdef USE_CUDAWorker taskManager->workers[i] = (Worker*)createCUDAWorker(context, i, queue,0); #else taskManager->workers[i] = (Worker*)createCPUWorker(context, i, queue); #endif } for (;i<taskManager->maxCPU;i++) { Queue* queue = createSynchronizedQueue(context); taskManager->workers[i] = (Worker*)createCPUWorker(context, i, queue); } } __code spawnTasksTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Queue* tasks, __code next1(...)) { goto tasks->isEmpty(spawnTasksTaskManagerImpl1, spawnTasksTaskManagerImpl3); } __code spawnTasksTaskManagerImpl_stub(struct Context* context) { TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); Queue* tasks = Gearef(context, TaskManager)->tasks; enum Code next1 = Gearef(context, TaskManager)->next1; goto spawnTasksTaskManagerImpl(context, taskManager, tasks, next1); } __code spawnTasksTaskManagerImpl1(struct TaskManagerImpl* taskManager, struct Queue* tasks, __code next1(...)) { goto tasks->take(spawnTasksTaskManagerImpl2); } __code spawnTasksTaskManagerImpl1_stub(struct Context* context) { TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); Queue* tasks = Gearef(context, TaskManager)->tasks; enum Code next1 = Gearef(context, TaskManager)->next1; goto spawnTasksTaskManagerImpl1(context, taskManager, tasks, next1); } __code spawnTasksTaskManagerImpl2(struct TaskManagerImpl* taskManagerImpl, struct Context* task, struct TaskManager* taskManager) { task->taskManager = taskManager; goto taskManager->setWaitTask(task, spawnTasksTaskManagerImpl); } __code spawnTasksTaskManagerImpl2_stub(struct Context* context) { TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); Context* task = (struct Context*)Gearef(context, Queue)->data; TaskManager* taskManager = &Gearef(context, TaskManager)->taskManager->TaskManager; goto spawnTasksTaskManagerImpl2(context, taskManagerImpl, task, taskManager); } __code spawnTasksTaskManagerImpl3(struct TaskManagerImpl* taskManager, __code next1(...)) { struct Queue* queue = taskManager->taskQueue; goto queue->isEmpty(spawnTasksTaskManagerImpl4, next1(...)); } __code spawnTasksTaskManagerImpl4(struct TaskManagerImpl* taskManager, __code next1(...)) { struct Queue* queue = taskManager->taskQueue; goto queue->take(spawnTasksTaskManagerImpl5); } __code spawnTasksTaskManagerImpl5(struct TaskManagerImpl* taskManagerImpl, struct Context* task, struct TaskManager* taskManager) { goto taskManager->spawn(task, spawnTasksTaskManagerImpl3); } __code spawnTasksTaskManagerImpl5_stub(struct Context* context) { TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); Context* task = (struct Context*)Gearef(context, Queue)->data; TaskManager* taskManager = &Gearef(context, TaskManager)->taskManager->TaskManager; goto spawnTasksTaskManagerImpl5(context, taskManagerImpl, task, taskManager); } __code setWaitTaskTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { int i = taskManager->loopCounter->i; if(task->idg+i < task->maxIdg) { struct Queue* queue = GET_WAIT_LIST(task->data[task->idg + i]); taskManager->loopCounter->i++; goto queue->put(task, setWaitTaskTaskManagerImpl); } taskManager->loopCounter->i = 0; struct Queue* queue = taskManager->taskQueue; goto queue->put(task, incrementTaskCountTaskManagerImpl); } __code setWaitTaskTaskManagerImpl_stub(struct Context* context) { TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); struct Context* task = Gearef(context, TaskManager)->task; goto setWaitTaskTaskManagerImpl(context, taskManager, task, Gearef(context, TaskManager)->next); } __code incrementTaskCountTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) { __sync_fetch_and_add(&taskManager->taskCount, 1); goto next(...); } __code decrementTaskCountTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) { __sync_fetch_and_sub(&taskManager->taskCount, 1); goto next(...); } __code spawnTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { if (task->idgCount == 0) { // iterator task is normal task until spawned if(task->iterator != NULL && task->iterate == 0) { pthread_mutex_unlock(&taskManager->mutex); struct Iterator* iterator = task->iterator; goto iterator->exec(task, taskManager->cpu - taskManager->gpu, next(...)); } goto taskSend(); } pthread_mutex_unlock(&taskManager->mutex); goto next(...); } __code taskSend(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { // set workerId if (task->gpu) { task->workerId = taskManager->sendGPUWorkerIndex; if(++taskManager->sendGPUWorkerIndex >= taskManager->cpu) { taskManager->sendGPUWorkerIndex = taskManager->gpu; } } else { task->workerId = taskManager->sendCPUWorkerIndex; if(++taskManager->sendCPUWorkerIndex >= taskManager->maxCPU) { taskManager->sendCPUWorkerIndex = taskManager->cpu; } } pthread_mutex_unlock(&taskManager->mutex); struct Queue* queue = taskManager->workers[task->workerId]->tasks; goto queue->put(task, next(...)); } __code shutdownTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) { if (taskManager->taskCount != 0) { usleep(1000); goto shutdownTaskManagerImpl(); } int i = taskManager->loopCounter->i; if (i < taskManager->numWorker) { struct Queue* tasks = taskManager->workers[i]->tasks; goto tasks->put(NULL, shutdownTaskManagerImpl1); } taskManager->loopCounter->i = 0; goto shutdownTaskManagerImpl2(); } __code shutdownTaskManagerImpl1(struct TaskManagerImpl* taskManager, __code next(...)) { taskManager->loopCounter->i++; goto shutdownTaskManagerImpl(); } __code shutdownTaskManagerImpl2(struct TaskManagerImpl* taskManager, __code next(...)) { int i = taskManager->loopCounter->i; if (i < taskManager->numWorker) { pthread_join(taskManager->workers[i]->thread, NULL); taskManager->loopCounter->i++; goto shutdownTaskManagerImpl2(); } taskManager->loopCounter->i = 0; goto next(...); }