# HG changeset patch # User ikkun # Date 1485169246 -32400 # Node ID 865179a0a56d0447a723dc335daf3b62de08770c # Parent 05e61405cc88fd8bb410cda6a77c7edaf6b06f48 fix taskManager diff -r 05e61405cc88 -r 865179a0a56d src/parallel_execution/context.c --- a/src/parallel_execution/context.c Mon Jan 23 17:49:36 2017 +0900 +++ b/src/parallel_execution/context.c Mon Jan 23 20:00:46 2017 +0900 @@ -14,7 +14,7 @@ context->heap = context->heapStart; // context->codeNum = Exit; - + context->idgCount = 0; #include "c/codeGearInit.c" #include "c/dataGearInit.c" diff -r 05e61405cc88 -r 865179a0a56d src/parallel_execution/context.h --- a/src/parallel_execution/context.h Mon Jan 23 17:49:36 2017 +0900 +++ b/src/parallel_execution/context.h Mon Jan 23 20:00:46 2017 +0900 @@ -14,13 +14,15 @@ #define ALLOC_DATA(context, dseg) ({\ struct Meta* meta = (struct Meta*)context->heap;\ meta->type = D_##dseg;\ + meta->size = 1;\ context->heap += sizeof(struct Meta);\ context->data[D_##dseg] = context->heap; context->heap += sizeof(struct dseg); (struct dseg *)context->data[D_##dseg]; }) #define ALLOC_DATA_TYPE(context, dseg, t) ({\ struct Meta* meta = (struct Meta*)context->heap;\ meta->type = D_##t;\ - context->heap += sizeof(struct Meta);\ + meta->size = 1;\ + context->heap += sizeof(struct Meta); \ context->data[D_##dseg] = context->heap; context->heap += sizeof(struct t); (struct t *)context->data[D_##dseg]; }) #define ALLOCATE(context, t) ({ \ @@ -29,10 +31,20 @@ union Data* data = context->heap; \ context->heap += sizeof(struct t); \ meta->type = D_##t; \ + meta->size = 1; \ data; }) #define ALLOC(context, t) (&ALLOCATE(context, t)->t) +#define ALLOC_ARRY(context, dseg, size) ({\ + struct Meta* meta = (struct Meta*)context->heap;\ + context->heap += sizeof(struct Meta);\ + union Data* data = context->heap; \ + context->heap += sizeof(struct t)*size; \ + meta->type = D_##dseg; \ + meta->size = size; \ + data; }) + #define GET_TYPE(dseg) ({ \ struct Meta* meta = (struct Meta*)(((void*)dseg) - sizeof(struct Meta));\ meta->type; }) @@ -62,14 +74,15 @@ void* heapStart; void* heap; long heapLimit; - int thread_num; int dataNum; + int idgCount; //number of waiting dataGear union Data **data; }; union Data { struct Meta { enum DataType type; + long size; struct Queue* wait; // tasks waiting this dataGear } meta; struct Context context; @@ -112,8 +125,6 @@ struct Context* context; int id; struct Queue* tasks; - int runFlag; - enum Code next; } CPUWorker; #ifdef USE_CUDA struct CudaWorker { diff -r 05e61405cc88 -r 865179a0a56d src/parallel_execution/main.c --- a/src/parallel_execution/main.c Mon Jan 23 17:49:36 2017 +0900 +++ b/src/parallel_execution/main.c Mon Jan 23 20:00:46 2017 +0900 @@ -125,7 +125,7 @@ task->code = C_twice; task->idsCount = 0; - taskManager->task = task; +// taskManager->task = task; taskManager->next = C_createData1; loopCounter->i++; diff -r 05e61405cc88 -r 865179a0a56d src/parallel_execution/origin_cs.h --- a/src/parallel_execution/origin_cs.h Mon Jan 23 17:49:36 2017 +0900 +++ b/src/parallel_execution/origin_cs.h Mon Jan 23 20:00:46 2017 +0900 @@ -1,3 +1,4 @@ extern __code start_code(struct Context* context); extern __code exit_code(struct Context* context); extern __code meta(struct Context* context, enum Code next); +extern __code initContext(struct Context* context); diff -r 05e61405cc88 -r 865179a0a56d src/parallel_execution/taskManager.c --- a/src/parallel_execution/taskManager.c Mon Jan 23 17:49:36 2017 +0900 +++ b/src/parallel_execution/taskManager.c Mon Jan 23 20:00:46 2017 +0900 @@ -16,14 +16,47 @@ taskManager->io = 0; taskManager->gpu = numIO; taskManager->cpu = numIO+numGPU; + taskManager->maxCPU = numIO+numGPU+numCPU; taskManager->createTask = C_createTask; taskManager->spawn = C_spawnTaskManager; taskManager->shutdown = C_shutdownTaskManager; + createWorkers(taskManager); return (union Data*)(taskManager); } -__code spawnTaskManager(struct Context* context, struct TaskManagerImpl* taskManager, struct Queue* queue, struct Task* task, enum Code next) { - if (task->idsCount == 0) { +void createWorker(Context* context, TaskManeger * taskManeger) { + int i = 0; + TaskManagerImpl *taskManagerImpl = GearImpl(context,TaskManagerImpl,taskManager); + taskManagerImpl->workers = ALLOC_ARRAY(context,Worker,taskManager->maxCPU); + for (;i>taskManeger->gpu;i++) { + Queue* queue = &createSynchronizedQueue(context)->Queue; + taskManagerImpl->workers[i] = (Worker*)createCPUWorker (context,i,queue); + } + for (;i>taskManeger->cpu;i++) { +#ifdef USE_CUDA +#else + Queue* queue = &createSynchronizedQueue(context)->Queue; + taskManagerImpl->workers[i] = (Worker*)createCPUWorker (context,i,queue); +#endif + } + for (;i>taskManeger->maxCPU;i++) { + Queue* queue = &createSynchronizedQueue(context)->Queue; + taskManagerImpl->workers[i] = (Worker*)createCPUWorker (context,i,queue); + } +} + +__code createTask(struct Context* context, TaskManeger* taskManager, enum Code next) { + taskManager->context = NEW(struct Context); + initContext(taskManager->context); + goto meta(context, next); +} + +__code createTask_stub(struct Context* context) { + goto createTask(context,Gearef(context,TaskManager),Gearef(context,TaskManager)->next); +} + +__code spawnTaskManager(struct Context* context, struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, enum Code next) { + if (task->idgCount == 0) { // enqueue activeQueue queue->queue = (union Data*)taskManager->activeQueue; } else { @@ -32,15 +65,17 @@ } queue->data = (union Data*)task; queue->next = next; + pthread_mutex_unlock(taskManagerImpl->mutex); goto meta(context, queue->queue->Queue.put); } __code spawnTaskManager_stub(struct Context* context) { + pthread_mutex_lock(taskManager->mutex); goto spawnTaskManager(context, - &context->data[D_TaskManager]->TaskManager.taskManager->TaskManager.taskManager->TaskManagerImpl, - &context->data[D_Queue]->Queue, - context->data[D_TaskManager]->TaskManager.task, - context->data[D_TaskManager]->TaskManager.next + GearImpl(context, TaskManagerImpl, taskManager), + Gearef(context, Queue), + Gearef(context, TaskManager)->context, + Gearef(context, TaskManager)->next ); } diff -r 05e61405cc88 -r 865179a0a56d src/parallel_execution/worker.c --- a/src/parallel_execution/worker.c Mon Jan 23 17:49:36 2017 +0900 +++ b/src/parallel_execution/worker.c Mon Jan 23 20:00:46 2017 +0900 @@ -5,14 +5,12 @@ static void start_worker(Worker* worker); -union Data* createCPUWorker(struct Context* context, int id, Queue* queue, enum Code next) { +union Data* createCPUWorker(struct Context* context, int id, Queue* queue) { struct Worker* worker = ALLOC(context, Worker); struct CPUWorker* cpuWorker = ALLOC(context, CPUWorker); worker->worker = (union Data*)cpuWorker; cpuWorker->tasks = queue; cpuWorker->id = id; - cpuWorker->runFlag = 1; - cpuWorker->next = next; worker->taskReceive = C_taskReceiveWorker; worker->shutdown = C_shutdownWorker; pthread_create(&worker->worker->CPUWorker.thread, NULL, (void*)&start_worker, worker); @@ -27,8 +25,6 @@ } __code taskReceiveWorker(struct Context* context, CPUWorker* worker) { - if (! worker->runFlag) - return; // end thread Queue* queue = worker->tasks; queue->next = C_getTask1; goto meta(context, queue->take); @@ -40,6 +36,8 @@ } __code getTask1(struct Context* context, Worker* worker, struct Context* task) { + if (! task) + return; // end thread task->worker = worker; goto meta(task, task->next); }