Mercurial > hg > Gears > GearsAgda
changeset 233:06133afb3b5b
create worker start_code
author | mir3636 |
---|---|
date | Sun, 22 Jan 2017 20:02:21 +0900 |
parents | 123b0d277b84 |
children | 47588c28f189 |
files | src/parallel_execution/context.h src/parallel_execution/taskManager.c src/parallel_execution/worker.c |
diffstat | 3 files changed, 35 insertions(+), 12 deletions(-) [+] |
line wrap: on
line diff
--- a/src/parallel_execution/context.h Sun Jan 22 19:02:12 2017 +0900 +++ b/src/parallel_execution/context.h Sun Jan 22 20:02:21 2017 +0900 @@ -31,6 +31,8 @@ meta->type = D_##t; \ data; }) +#define ALLOC(context, t) (&ALLOCATE(context, t)->t) + #define GET_TYPE(dseg) ({ \ struct Meta* meta = (struct Meta*)(((void*)dseg) - sizeof(struct Meta));\ meta->type; }) @@ -60,7 +62,6 @@ void* heapStart; void* heap; long heapLimit; - pthread_t thread; int thread_num; int dataNum; union Data **data; @@ -82,17 +83,22 @@ } LoopCounter; struct TaskManager { union Data* taskManager; - int numWorker; enum Code spawn; enum Code shutdown; - enum Code deadLockDetected; + enum Code next; - struct Task* task; - struct Worker* workers; + enum Code task; + struct Context* context; + int worker; + int cpu; + int gpu; + int io; } TaskManager; struct TaskManagerImpl { + int numWorker; struct Queue* activeQueue; struct Queue* taskQueue; + struct Worker* workers; } TaskManagerImpl; struct Worker { union Data* worker; @@ -101,6 +107,7 @@ enum Code next; } Worker; struct CPUWorker { + pthread_t thread; struct Context* context; int id; struct Queue* tasks; @@ -109,6 +116,7 @@ } CPUWorker; #ifdef USE_CUDA struct CudaWorker { + pthread_t thread; struct Context* context; int id; struct Queue* tasks;
--- a/src/parallel_execution/taskManager.c Sun Jan 22 19:02:12 2017 +0900 +++ b/src/parallel_execution/taskManager.c Sun Jan 22 20:02:21 2017 +0900 @@ -4,15 +4,20 @@ #include "origin_cs.h" #include <stdio.h> -union Data* createTaskManager(struct Context* context) { +union Data* createTaskManager(struct Context* context, int numCPU, int numGPU, int numIO) { struct TaskManager* taskManager = &ALLOCATE(context, TaskManager)->TaskManager; struct TaskManagerImpl* taskManagerImpl = &ALLOCATE(context, TaskManagerImpl)->TaskManagerImpl; taskManager->taskManager = (union Data*)taskManagerImpl; - taskManagerImpl -> activeQueue = &createSynchronizedQueue(context)->Queue; - taskManagerImpl -> taskQueue = &createSynchronizedQueue(context)->Queue; + taskManagerImpl -> activeQueue = &createSingleLinkedQueue(context)->Queue; + taskManagerImpl -> taskQueue = &createSingleLinkedQueue(context)->Queue; + // 0...numIO-1 IOProcessor + // numIO...numIO+numGPU-1 GPUProcessor + // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor + taskManager->io = 0; + taskManager->gpu = numIO; + taskManeger->cpu = numIO+numGPU; taskManager->spawn = C_spawnTaskManager; taskManager->shutdown = C_shutdownTaskManager; - // taskManager->deadLockDetected = C_deadLockDetected; return (union Data*)(taskManager); }
--- a/src/parallel_execution/worker.c Sun Jan 22 19:02:12 2017 +0900 +++ b/src/parallel_execution/worker.c Sun Jan 22 20:02:21 2017 +0900 @@ -3,9 +3,11 @@ #include "context.h" #include "origin_cs.h" +static void start_code(Worker* worker); + union Data* createCPUWorker(struct Context* context, int id, Queue* queue, enum Code next) { - struct Worker* worker = &ALLOCATE(context, Worker)->Worker; - struct CPUWorker* cpuWorker = &ALLOCATE(context, CPUWorker)->CPUWorker; + struct Worker* worker = ALLOC(context, Worker); + struct CPUWorker* cpuWorker = ALLOC(context, CPUWorker); worker->worker = (union Data*)cpuWorker; cpuWorker->tasks = queue; cpuWorker->id = id; @@ -13,12 +15,20 @@ cpuWorker->next = next; worker->taskReceive = C_taskReceiveWorker; worker->shutdown = C_shutdownWorker; + pthread_create(&worker->thread, NULL, (void*)&start_code, worker); return (union Data*)(worker); } +static void start_code(Worker* worker) { + CPUWorker* worker = (CPUWorker*)worker->worker; + worker->context = NEW(struct Context); + initContext(worker->context); + goto meta(worker->context, C_taskReceiveWorker); +} + __code taskReceiveWorker(struct Context* context, CPUWorker* worker) { if (! worker->runFlag) - goto meta(context, worker->next); + return; // end thread Queue* queue = worker->tasks; queue->next = C_getTask1; goto meta(context, queue->take);