changeset 242:9f3f8ed6ed9f

Add sendTask
author Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
date Tue, 24 Jan 2017 18:39:42 +0900
parents 9135e22799dd
children 6a80ab36181c d1567718f12c
files src/parallel_execution/context.h src/parallel_execution/taskManager.c src/parallel_execution/worker.c
diffstat 3 files changed, 52 insertions(+), 21 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/context.h	Tue Jan 24 16:44:16 2017 +0900
+++ b/src/parallel_execution/context.h	Tue Jan 24 18:39:42 2017 +0900
@@ -76,6 +76,7 @@
     long heapLimit;
     int dataNum;
     int idgCount; //number of waiting dataGear
+    int workerId;
     union Data **data;
 };
 
@@ -111,6 +112,7 @@
     } TaskManager;
     struct TaskManagerImpl {
         int numWorker;
+        int sendWorkerIndex;
         pthread_mutex_t mutex;
         struct Queue* activeQueue;
         struct Queue* taskQueue;
@@ -124,6 +126,8 @@
     } Worker;
     struct CPUWorker {
         pthread_t thread;
+        pthread_mutex_t mutex;
+        pthread_cond_t cond;
         struct Context* context;
         int id;
         struct Queue* tasks;
--- a/src/parallel_execution/taskManager.c	Tue Jan 24 16:44:16 2017 +0900
+++ b/src/parallel_execution/taskManager.c	Tue Jan 24 18:39:42 2017 +0900
@@ -9,10 +9,6 @@
 
 union Data* createTaskManager(struct Context* context, int numCPU, int numGPU, int numIO) {
     struct TaskManager* taskManager = &ALLOCATE(context, TaskManager)->TaskManager;
-    struct TaskManagerImpl* taskManagerImpl = &ALLOCATE(context, TaskManagerImpl)->TaskManagerImpl;
-    taskManager->taskManager = (union Data*)taskManagerImpl;
-    taskManagerImpl -> activeQueue = &createSingleLinkedQueue(context)->Queue;
-    taskManagerImpl -> taskQueue = &createSingleLinkedQueue(context)->Queue;
     // 0...numIO-1 IOProcessor 
     // numIO...numIO+numGPU-1 GPUProcessor
     // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor
@@ -24,6 +20,11 @@
     taskManager->spawn = C_spawnTaskManager;
     taskManager->shutdown  = C_shutdownTaskManager;
     createWorkers(context, taskManager);
+    struct TaskManagerImpl* taskManagerImpl = &ALLOCATE(context, TaskManagerImpl)->TaskManagerImpl;
+    taskManager->taskManager = (union Data*)taskManagerImpl;
+    taskManagerImpl -> activeQueue = &createSingleLinkedQueue(context)->Queue;
+    taskManagerImpl -> taskQueue = &createSingleLinkedQueue(context)->Queue;
+    taskManagerImpl -> numWorker = taskManager->maxCPU;
     return (union Data*)(taskManager);
 }
 
@@ -33,29 +34,42 @@
     taskManagerImpl->workers = (Worker*)ALLOC_ARRAY(context,Worker,taskManager->maxCPU);
     for (;i<taskManager->gpu;i++) {
         Queue* queue = &createSynchronizedQueue(context)->Queue;
-        taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context,i,queue);
+        taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue);
     }
     for (;i<taskManager->cpu;i++) {
 #ifdef USE_CUDA
 #else
         Queue* queue = &createSynchronizedQueue(context)->Queue;
-        taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context,i,queue);
+        taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue);
 #endif        
     }
     for (;i<taskManager->maxCPU;i++) {
         Queue* queue = &createSynchronizedQueue(context)->Queue;
-        taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context,i,queue);
+        taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue);
     }
 }
 
-__code createTask(struct Context* context, TaskManager* taskManager, enum Code next) {
+__code createTask(struct Context* context, TaskManager* taskManager) {
     taskManager->context = NEW(struct Context);
     initContext(taskManager->context);
+    goto meta(context, C_setRunWorker);
+}
+
+__code createTask_stub(struct Context* context) {
+    goto createTask(context,Gearef(context,TaskManager));
+}
+
+__code setRunWorker(struct Context* context, TaskManagerImpl* taskManager, enum Code next) {
+    task->workerId = taskManagerImpl->sendWorkerIndex;
+    if(++taskManagerImpl->sendWorkerIndex >= taskManagerImpl->numWorker) {
+        taskManagerImpl->sendWorkerIndex = 0;
+    }
     goto meta(context, next);
 }
 
-__code createTask_stub(struct Context* context) {
-    goto createTask(context,Gearef(context,TaskManager),Gearef(context,TaskManager)->next);
+__code setRunWorker_stub(struct Context* context) {
+    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+    goto createTask(context, taskManager, Gearef(context, TaskManager)->next);
 }
 
 __code spawnTaskManager(struct Context* context, struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, enum Code next) {
@@ -83,28 +97,40 @@
 
 
 __code spawnTaskManager1(struct Context* context, struct TaskManagerImpl* taskManager, enum Code next) {
-    pthread_mutex_unlock(taskManagerImpl->mutex);
+    pthread_mutex_unlock(&taskManager->mutex);
     goto meta(context, next);
 }
 
 __code spawnTaskManager1_stub(struct Context* context) {
+    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
     goto spawnTaskManager(context,
-                          (struct TaskManager*)GearImpl(context, TaskManager, taskManager),
+                          taskManager,
                           Gearef(context, TaskManager)->next);
 }
 
-__code taskSend(struct Context* context) {
-    if(loopCounter->i < taskManager->numWorker) {
-        queue->queue = taskManager->workers[i]->tasks;
-        queue->next  =  C_taskSend;
-        loopCounter->i++;
-        goto meta(context, queue->queue->Queue.put);
-    }
-    goto meta(context, TaskManager->next);
+__code taskSend(struct Context* context, TaskManagerImpl* taskManager) {
+    queue->queue = (union Data*)taskManager->activeQueue;
+    queue->next = C_taskSend1;
+    goto meta(context, taskManager->activeQueue->put);
 }
 
 __code taskSend_stub(struct Context* context) {
-    goto taskSend(context, (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager));
+    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+    goto taskSend(context, taskManager);
+}
+
+__code taskSend1(struct Context* context, TaskManagerImpl* taskManager, struct Context* task, enum Code next) {
+    struct Queue* tasks = taskManager->workers[task->workerId]->tasks;
+    queue->queue = tasks;
+    queue->data = (union Data*)taskManager->context;
+    queue->next = next;
+    pthread_cond_signal(&taskManager->workers[task->workerId]->cond);
+    goto meta(context, tasks->put);
+}
+
+__code taskSend1_stub(struct Context* context) {
+    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+    goto taskSend(context, taskManager, Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next);
 }
 
 __code createWorker1(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker) {
--- a/src/parallel_execution/worker.c	Tue Jan 24 16:44:16 2017 +0900
+++ b/src/parallel_execution/worker.c	Tue Jan 24 18:39:42 2017 +0900
@@ -21,6 +21,7 @@
     CPUWorker* cpuWorker = (CPUWorker*)worker->worker;
     cpuWorker->context = NEW(struct Context);
     initContext(cpuWorker->context);
+    pthread_cond_wait(&cpuWorker->cond, &cpuWorker->mutex);
     goto meta(cpuWorker->context, C_taskReceiveWorker);
 }