Mercurial > hg > Members > Moririn
changeset 230:a1fb3f2d1a36
fix worker
author | ikkun |
---|---|
date | Sat, 21 Jan 2017 20:21:00 +0900 |
parents | a10ea0cfc929 |
children | 24da4f217447 |
files | src/parallel_execution/context.h src/parallel_execution/taskManager.c src/parallel_execution/worker.c |
diffstat | 3 files changed, 46 insertions(+), 42 deletions(-) [+] |
line wrap: on
line diff
--- a/src/parallel_execution/context.h Sat Jan 21 20:04:32 2017 +0900 +++ b/src/parallel_execution/context.h Sat Jan 21 20:21:00 2017 +0900 @@ -49,6 +49,8 @@ struct Context { enum Code next; + struct Worker* worker; + struct TaskManager* taskManager; int codeNum; __code (**code) (struct Context*); void* heapStart; @@ -65,6 +67,7 @@ enum DataType type; struct Queue* wait; // tasks waiting this dataGear } meta; + struct Context context; struct Time { enum Code next; double time; @@ -89,12 +92,12 @@ } TaskManagerImpl; struct Worker { int id; - struct Context* contexts; - enum Code execute; - enum Code taskSend; + struct Queue* tasks; + int runFlag; + struct Context* context; enum Code taskReceive; enum Code shutdown; - struct Queue* tasks; + enum Code next; } Worker; #ifdef USE_CUDA struct CudaTask {
--- a/src/parallel_execution/taskManager.c Sat Jan 21 20:04:32 2017 +0900 +++ b/src/parallel_execution/taskManager.c Sat Jan 21 20:21:00 2017 +0900 @@ -46,6 +46,24 @@ } goto meta(context, TaskManager->next); } +__code createWorker1(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker) { + int i = loopCounter->i; + + if (i < worker->id) { + struct Context* worker_context = &worker->contexts[i]; + worker_context->next = C_getTask1; + worker_context->data[D_Tree] = context->data[D_Tree]; + // worker_context->data[D_ActiveQueue] = context->data[D_ActiveQueue]; + pthread_create(&worker_context->thread, NULL, (void*)&start_code, worker_context); + worker_context->thread_num = i; + loopCounter->i++; + + goto meta(context, C_createWorker1); + } + + loopCounter->i = 0; + goto meta(context, C_taskManager); +} __code shutdownTaskManager(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker, struct TaskManager* taskManager) { int i = loopCounter->i;
--- a/src/parallel_execution/worker.c Sat Jan 21 20:04:32 2017 +0900 +++ b/src/parallel_execution/worker.c Sat Jan 21 20:21:00 2017 +0900 @@ -3,60 +3,38 @@ #include "context.h" #include "origin_cs.h" -union Data* createWorker(struct Context* context) { +union Data* createWorker(struct Context* context, int id, Queue* queue) { struct Worker* worker = &ALLOCATE(context, Worker)->Worker; - worker->execute = C_executeWorker; - worker->taskSend = C_taskSendWorker; + worker->tasks = queue; + worker->id = id; + worker->runFlag = 1; worker->taskReceive = C_taskReceiveWorker; worker->shutdown = C_shutdownWorker; return (union Data*)(worker); } -__code taskSendWorker(struct Context* context) { -} - -__code taskSendWorker_stub(struct Context* context) { - goto taskSendWorker(context); -} - -__code executeWorker(struct Context* context, Worker* worker) { - worker->next = worker->taskReceive; - goto meta(context, task->code); -} - -__code executeWorker_stub(struct Context* context) { - Worker* worker = &Gearef(context,Worker); - goto extcuteWorker(context,worker); -} - __code taskReceiveWorker(struct Context* context, Worker* worker) { - Queue* queue = &Gearef(context,queue); - queue->queue = worker->tasks; + if (! worker->runFlag) + goto meta(context, worker->next); + Queue* queue = worker->tasks; queue->next = C_getTask1; goto meta(context, queue->take); } __code taskReceiveWorker_stub(struct Context* context) { - Worker* worker = Gearef(context,Worker).worker; - goto taskReceiveWorker(context,queue); + Worker* worker = Gearef(context,Worker); + goto taskReceiveWorker(context,worker); } -__code getTask1(struct Context* context, struct Queue* queue) { - queue->next = C_getTask2; - goto meta(context, queue->take); +__code getTask1(struct Context* context, Worker* worker, struct Context* task) { + task->worker = worker; + goto meta(task, task->next); } __code getTask1_stub(struct Context* context) { - goto getTask1(context,/* &context->data[D_ActiveQueue]->Queue*/ NULL); -} - -__code getTask2(struct Context* context, struct Task* task, struct Node* node) { - context->next = C_getTask1; - goto meta(context, C_getTask2); -} - -__code getTask2_stub(struct Context* context) { - goto getTask2(context, /*&(context->data[D_ActiveQueue]->Queue.data->Task)*/ NULL, &context->data[D_Node]->Node); + Worker* worker = Gearef(context,Worker); + Context* task = &worker->tasks->data->context; + goto getTask1(context,worker,task); } #ifdef USE_CUDA @@ -67,6 +45,11 @@ } #endif -__code shutdownWorker(struct Context* context) { +__code shutdownWorker(struct Context* context, Worker* worker) { + worker->runFlag = 0; +} +__code shutdownWorker_stub(struct Context* context) { + Worker* worker = Gearef(context,Worker); + goto shutdownWorker(context,worker); }