Mercurial > hg > Members > Moririn
changeset 242:9f3f8ed6ed9f
Add sendTask
author | Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 24 Jan 2017 18:39:42 +0900 |
parents | 9135e22799dd |
children | 6a80ab36181c d1567718f12c |
files | src/parallel_execution/context.h src/parallel_execution/taskManager.c src/parallel_execution/worker.c |
diffstat | 3 files changed, 52 insertions(+), 21 deletions(-) [+] |
line wrap: on
line diff
--- a/src/parallel_execution/context.h Tue Jan 24 16:44:16 2017 +0900 +++ b/src/parallel_execution/context.h Tue Jan 24 18:39:42 2017 +0900 @@ -76,6 +76,7 @@ long heapLimit; int dataNum; int idgCount; //number of waiting dataGear + int workerId; union Data **data; }; @@ -111,6 +112,7 @@ } TaskManager; struct TaskManagerImpl { int numWorker; + int sendWorkerIndex; pthread_mutex_t mutex; struct Queue* activeQueue; struct Queue* taskQueue; @@ -124,6 +126,8 @@ } Worker; struct CPUWorker { pthread_t thread; + pthread_mutex_t mutex; + pthread_cond_t cond; struct Context* context; int id; struct Queue* tasks;
--- a/src/parallel_execution/taskManager.c Tue Jan 24 16:44:16 2017 +0900 +++ b/src/parallel_execution/taskManager.c Tue Jan 24 18:39:42 2017 +0900 @@ -9,10 +9,6 @@ union Data* createTaskManager(struct Context* context, int numCPU, int numGPU, int numIO) { struct TaskManager* taskManager = &ALLOCATE(context, TaskManager)->TaskManager; - struct TaskManagerImpl* taskManagerImpl = &ALLOCATE(context, TaskManagerImpl)->TaskManagerImpl; - taskManager->taskManager = (union Data*)taskManagerImpl; - taskManagerImpl -> activeQueue = &createSingleLinkedQueue(context)->Queue; - taskManagerImpl -> taskQueue = &createSingleLinkedQueue(context)->Queue; // 0...numIO-1 IOProcessor // numIO...numIO+numGPU-1 GPUProcessor // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor @@ -24,6 +20,11 @@ taskManager->spawn = C_spawnTaskManager; taskManager->shutdown = C_shutdownTaskManager; createWorkers(context, taskManager); + struct TaskManagerImpl* taskManagerImpl = &ALLOCATE(context, TaskManagerImpl)->TaskManagerImpl; + taskManager->taskManager = (union Data*)taskManagerImpl; + taskManagerImpl -> activeQueue = &createSingleLinkedQueue(context)->Queue; + taskManagerImpl -> taskQueue = &createSingleLinkedQueue(context)->Queue; + taskManagerImpl -> numWorker = taskManager->maxCPU; return (union Data*)(taskManager); } @@ -33,29 +34,42 @@ taskManagerImpl->workers = (Worker*)ALLOC_ARRAY(context,Worker,taskManager->maxCPU); for (;i<taskManager->gpu;i++) { Queue* queue = &createSynchronizedQueue(context)->Queue; - taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context,i,queue); + taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); } for (;i<taskManager->cpu;i++) { #ifdef USE_CUDA #else Queue* queue = &createSynchronizedQueue(context)->Queue; - taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context,i,queue); + taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); #endif } for (;i<taskManager->maxCPU;i++) { Queue* queue = &createSynchronizedQueue(context)->Queue; - taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context,i,queue); + taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); } } -__code createTask(struct Context* context, TaskManager* taskManager, enum Code next) { +__code createTask(struct Context* context, TaskManager* taskManager) { taskManager->context = NEW(struct Context); initContext(taskManager->context); + goto meta(context, C_setRunWorker); +} + +__code createTask_stub(struct Context* context) { + goto createTask(context,Gearef(context,TaskManager)); +} + +__code setRunWorker(struct Context* context, TaskManagerImpl* taskManager, enum Code next) { + task->workerId = taskManagerImpl->sendWorkerIndex; + if(++taskManagerImpl->sendWorkerIndex >= taskManagerImpl->numWorker) { + taskManagerImpl->sendWorkerIndex = 0; + } goto meta(context, next); } -__code createTask_stub(struct Context* context) { - goto createTask(context,Gearef(context,TaskManager),Gearef(context,TaskManager)->next); +__code setRunWorker_stub(struct Context* context) { + TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); + goto createTask(context, taskManager, Gearef(context, TaskManager)->next); } __code spawnTaskManager(struct Context* context, struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, enum Code next) { @@ -83,28 +97,40 @@ __code spawnTaskManager1(struct Context* context, struct TaskManagerImpl* taskManager, enum Code next) { - pthread_mutex_unlock(taskManagerImpl->mutex); + pthread_mutex_unlock(&taskManager->mutex); goto meta(context, next); } __code spawnTaskManager1_stub(struct Context* context) { + TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); goto spawnTaskManager(context, - (struct TaskManager*)GearImpl(context, TaskManager, taskManager), + taskManager, Gearef(context, TaskManager)->next); } -__code taskSend(struct Context* context) { - if(loopCounter->i < taskManager->numWorker) { - queue->queue = taskManager->workers[i]->tasks; - queue->next = C_taskSend; - loopCounter->i++; - goto meta(context, queue->queue->Queue.put); - } - goto meta(context, TaskManager->next); +__code taskSend(struct Context* context, TaskManagerImpl* taskManager) { + queue->queue = (union Data*)taskManager->activeQueue; + queue->next = C_taskSend1; + goto meta(context, taskManager->activeQueue->put); } __code taskSend_stub(struct Context* context) { - goto taskSend(context, (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager)); + TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); + goto taskSend(context, taskManager); +} + +__code taskSend1(struct Context* context, TaskManagerImpl* taskManager, struct Context* task, enum Code next) { + struct Queue* tasks = taskManager->workers[task->workerId]->tasks; + queue->queue = tasks; + queue->data = (union Data*)taskManager->context; + queue->next = next; + pthread_cond_signal(&taskManager->workers[task->workerId]->cond); + goto meta(context, tasks->put); +} + +__code taskSend1_stub(struct Context* context) { + TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); + goto taskSend(context, taskManager, Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); } __code createWorker1(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker) {
--- a/src/parallel_execution/worker.c Tue Jan 24 16:44:16 2017 +0900 +++ b/src/parallel_execution/worker.c Tue Jan 24 18:39:42 2017 +0900 @@ -21,6 +21,7 @@ CPUWorker* cpuWorker = (CPUWorker*)worker->worker; cpuWorker->context = NEW(struct Context); initContext(cpuWorker->context); + pthread_cond_wait(&cpuWorker->cond, &cpuWorker->mutex); goto meta(cpuWorker->context, C_taskReceiveWorker); }