changeset 236:865179a0a56d

fix taskManager
author ikkun
date Mon, 23 Jan 2017 20:00:46 +0900
parents 05e61405cc88
children 6f6cc49213c5
files src/parallel_execution/context.c src/parallel_execution/context.h src/parallel_execution/main.c src/parallel_execution/origin_cs.h src/parallel_execution/taskManager.c src/parallel_execution/worker.c
diffstat 6 files changed, 62 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/context.c	Mon Jan 23 17:49:36 2017 +0900
+++ b/src/parallel_execution/context.c	Mon Jan 23 20:00:46 2017 +0900
@@ -14,7 +14,7 @@
     context->heap = context->heapStart;
 
     // context->codeNum = Exit;
-
+    context->idgCount = 0;
 #include "c/codeGearInit.c"
 
 #include "c/dataGearInit.c"
--- a/src/parallel_execution/context.h	Mon Jan 23 17:49:36 2017 +0900
+++ b/src/parallel_execution/context.h	Mon Jan 23 20:00:46 2017 +0900
@@ -14,13 +14,15 @@
 #define ALLOC_DATA(context, dseg) ({\
     struct Meta* meta = (struct Meta*)context->heap;\
     meta->type = D_##dseg;\
+    meta->size = 1;\
     context->heap += sizeof(struct Meta);\
     context->data[D_##dseg] = context->heap; context->heap += sizeof(struct dseg); (struct dseg *)context->data[D_##dseg]; })
 
 #define ALLOC_DATA_TYPE(context, dseg, t) ({\
     struct Meta* meta = (struct Meta*)context->heap;\
     meta->type = D_##t;\
-    context->heap += sizeof(struct Meta);\
+    meta->size = 1;\
+    context->heap += sizeof(struct Meta);                               \
     context->data[D_##dseg] = context->heap; context->heap += sizeof(struct t); (struct t *)context->data[D_##dseg]; })
 
 #define ALLOCATE(context, t) ({ \
@@ -29,10 +31,20 @@
     union Data* data = context->heap; \
     context->heap += sizeof(struct t); \
     meta->type = D_##t; \
+    meta->size = 1;     \
     data; })
 
 #define ALLOC(context, t) (&ALLOCATE(context, t)->t)
 
+#define ALLOC_ARRY(context, dseg, size) ({\
+    struct Meta* meta = (struct Meta*)context->heap;\
+    context->heap += sizeof(struct Meta);\
+    union Data* data = context->heap; \
+    context->heap += sizeof(struct t)*size; \
+    meta->type = D_##dseg; \
+    meta->size = size; \
+    data; })
+
 #define GET_TYPE(dseg) ({ \
     struct Meta* meta = (struct Meta*)(((void*)dseg) - sizeof(struct Meta));\
     meta->type; })
@@ -62,14 +74,15 @@
     void* heapStart;
     void* heap;
     long heapLimit;
-    int thread_num;
     int dataNum;
+    int idgCount; //number of waiting dataGear
     union Data **data;
 };
 
 union Data {
     struct Meta {
         enum DataType type;
+        long size;
         struct Queue* wait; // tasks waiting this dataGear
     } meta;
     struct Context context;
@@ -112,8 +125,6 @@
         struct Context* context;
         int id;
         struct Queue* tasks;
-        int runFlag;
-        enum Code next;
     } CPUWorker;
 #ifdef USE_CUDA
     struct CudaWorker {
--- a/src/parallel_execution/main.c	Mon Jan 23 17:49:36 2017 +0900
+++ b/src/parallel_execution/main.c	Mon Jan 23 20:00:46 2017 +0900
@@ -125,7 +125,7 @@
     task->code = C_twice;
     task->idsCount = 0;
 
-    taskManager->task = task;
+//    taskManager->task = task;
     taskManager->next = C_createData1;
     loopCounter->i++;
 
--- a/src/parallel_execution/origin_cs.h	Mon Jan 23 17:49:36 2017 +0900
+++ b/src/parallel_execution/origin_cs.h	Mon Jan 23 20:00:46 2017 +0900
@@ -1,3 +1,4 @@
 extern __code start_code(struct Context* context);
 extern __code exit_code(struct Context* context);
 extern __code meta(struct Context* context, enum Code next);
+extern __code initContext(struct Context* context);
--- a/src/parallel_execution/taskManager.c	Mon Jan 23 17:49:36 2017 +0900
+++ b/src/parallel_execution/taskManager.c	Mon Jan 23 20:00:46 2017 +0900
@@ -16,14 +16,47 @@
     taskManager->io = 0;
     taskManager->gpu = numIO;
     taskManager->cpu = numIO+numGPU;
+    taskManager->maxCPU = numIO+numGPU+numCPU;
     taskManager->createTask = C_createTask;
     taskManager->spawn = C_spawnTaskManager;
     taskManager->shutdown  = C_shutdownTaskManager;
+    createWorkers(taskManager);
     return (union Data*)(taskManager);
 }
 
-__code spawnTaskManager(struct Context* context, struct TaskManagerImpl* taskManager, struct Queue* queue, struct Task* task, enum Code next) {
-    if (task->idsCount == 0) {
+void createWorker(Context* context, TaskManeger * taskManeger) {
+    int i = 0;
+    TaskManagerImpl *taskManagerImpl = GearImpl(context,TaskManagerImpl,taskManager);
+    taskManagerImpl->workers = ALLOC_ARRAY(context,Worker,taskManager->maxCPU);
+    for (;i>taskManeger->gpu;i++) {
+        Queue* queue = &createSynchronizedQueue(context)->Queue;
+        taskManagerImpl->workers[i] = (Worker*)createCPUWorker (context,i,queue);
+    }
+    for (;i>taskManeger->cpu;i++) {
+#ifdef USE_CUDA
+#else
+        Queue* queue = &createSynchronizedQueue(context)->Queue;
+        taskManagerImpl->workers[i] = (Worker*)createCPUWorker (context,i,queue);
+#endif        
+    }
+    for (;i>taskManeger->maxCPU;i++) {
+        Queue* queue = &createSynchronizedQueue(context)->Queue;
+        taskManagerImpl->workers[i] = (Worker*)createCPUWorker (context,i,queue);
+    }
+}
+
+__code createTask(struct Context* context, TaskManeger* taskManager, enum Code next) {
+    taskManager->context = NEW(struct Context);
+    initContext(taskManager->context);
+    goto meta(context, next);
+}
+
+__code createTask_stub(struct Context* context) {
+    goto createTask(context,Gearef(context,TaskManager),Gearef(context,TaskManager)->next);
+}
+
+__code spawnTaskManager(struct Context* context, struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, enum Code next) {
+    if (task->idgCount == 0) {
         // enqueue activeQueue
         queue->queue = (union Data*)taskManager->activeQueue;
     } else {
@@ -32,15 +65,17 @@
     }
     queue->data = (union Data*)task;
     queue->next = next;
+    pthread_mutex_unlock(taskManagerImpl->mutex);
     goto meta(context, queue->queue->Queue.put);
 }
 
 __code spawnTaskManager_stub(struct Context* context) {
+    pthread_mutex_lock(taskManager->mutex);
     goto spawnTaskManager(context,
-            &context->data[D_TaskManager]->TaskManager.taskManager->TaskManager.taskManager->TaskManagerImpl,
-            &context->data[D_Queue]->Queue,
-            context->data[D_TaskManager]->TaskManager.task,
-            context->data[D_TaskManager]->TaskManager.next
+                          GearImpl(context, TaskManagerImpl, taskManager),
+                          Gearef(context, Queue),
+                          Gearef(context, TaskManager)->context,
+                          Gearef(context, TaskManager)->next
             );
 }
 
--- a/src/parallel_execution/worker.c	Mon Jan 23 17:49:36 2017 +0900
+++ b/src/parallel_execution/worker.c	Mon Jan 23 20:00:46 2017 +0900
@@ -5,14 +5,12 @@
 
 static void start_worker(Worker* worker);
 
-union Data* createCPUWorker(struct Context* context, int id, Queue* queue, enum Code next) {
+union Data* createCPUWorker(struct Context* context, int id, Queue* queue) {
     struct Worker* worker = ALLOC(context, Worker);
     struct CPUWorker* cpuWorker = ALLOC(context, CPUWorker);
     worker->worker = (union Data*)cpuWorker;
     cpuWorker->tasks = queue;
     cpuWorker->id = id;
-    cpuWorker->runFlag = 1;
-    cpuWorker->next = next;
     worker->taskReceive = C_taskReceiveWorker;
     worker->shutdown = C_shutdownWorker;
     pthread_create(&worker->worker->CPUWorker.thread, NULL, (void*)&start_worker, worker);
@@ -27,8 +25,6 @@
 }
 
 __code taskReceiveWorker(struct Context* context, CPUWorker* worker) {
-    if (! worker->runFlag)
-        return; // end thread
     Queue* queue = worker->tasks;
     queue->next = C_getTask1;
     goto meta(context, queue->take);
@@ -40,6 +36,8 @@
 }
 
 __code getTask1(struct Context* context, Worker* worker, struct Context* task) {
+    if (! task)
+        return; // end thread
     task->worker = worker;
     goto meta(task, task->next);
 }