diff src/parallel_execution/TaskManagerImpl.cbc @ 327:534601ed8c50 examples_directory

Running dependency example for single thread and single task
author Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
date Tue, 18 Apr 2017 05:53:37 +0900
parents f23f6d0aa4e9
children c03159481cb6
line wrap: on
line diff
--- a/src/parallel_execution/TaskManagerImpl.cbc	Tue Apr 18 01:47:42 2017 +0900
+++ b/src/parallel_execution/TaskManagerImpl.cbc	Tue Apr 18 05:53:37 2017 +0900
@@ -55,26 +55,11 @@
 __code createTask(struct TaskManager* taskManager) {
     taskManager->context = NEW(struct Context);
     initContext(taskManager->context);
-    taskManager->context->taskManager = taskManager;
+    taskManager->context->taskManager = (struct TaskManager*)taskManager->taskManager;
     taskManager->context->idg = taskManager->context->dataNum;
     goto meta(context, C_setWorker);
 }
 
-__code setWaitTask(struct Queue* queue, struct Context* task, Data* data, __code next(...)) {
-    struct Meta *metaData = GET_META(data);
-    if (!metaData->wait) {
-        metaData->wait = createSynchronizedQueue(context);
-    }
-    queue->queue = (union Data*)metaData->wait;
-    queue->next = next;
-    queue->data = (Data *)task;
-    goto meta(context, queue->queue->Queue.put);
-}
-
-__code setWaitTask_stub(struct Context* context) {
-    goto setWaitTask(context, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->data, Gearef(context, TaskManager)->next);
-}
-
 __code setWorker(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) {
     task->workerId = taskManager->sendWorkerIndex;
     if(++taskManager->sendWorkerIndex >= taskManager->numWorker) {
@@ -88,17 +73,24 @@
     goto setWorker(context, taskManager, Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next);
 }
 
+__code setWaitTask(struct Queue* queue, struct Context* task, Data* data, __code next(...)) {
+    queue->queue = (Data *)GET_WAIT_LIST(data);
+    queue->next = next;
+    queue->data = (Data *)task;
+    goto meta(context, queue->queue->Queue.put);
+}
+
+__code setWaitTask_stub(struct Context* context) {
+    goto setWaitTask(context, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->data, Gearef(context, TaskManager)->next);
+}
+
 __code spawnTaskManager(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
     if (task->idgCount == 0) {
-        // enqueue activeQueue
-        queue->queue = (union Data*)taskManager->activeQueue;
+        goto meta(context, C_taskSend);
     } else {
-        // enqueue waitQueue
-        queue->queue = (union Data*)taskManager->taskQueue;
+        pthread_mutex_unlock(&taskManager->mutex);
+        goto next(...);
     }
-    queue->data = (union Data*)task;
-    queue->next = C_spawnTaskManager1;
-    goto meta(context, queue->queue->Queue.put);
 }
 
 __code spawnTaskManager_stub(struct Context* context) {
@@ -111,35 +103,18 @@
                           Gearef(context, TaskManager)->next);
 }
 
-
-__code spawnTaskManager1(struct TaskManagerImpl* taskManager) {
-    pthread_mutex_unlock(&taskManager->mutex);
-    goto meta(context, C_taskSend);
-}
-
-__code spawnTaskManager1_stub(struct Context* context) {
-    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto spawnTaskManager1(context,
-                           taskManager);
-}
-
-__code taskSend(struct TaskManagerImpl* taskManager, struct Queue* queue) {
-    queue->queue = (union Data*)taskManager->activeQueue;
-    queue->next = C_taskSend1;
-    goto meta(context, taskManager->activeQueue->take);
-}
-
-__code taskSend1(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
+__code taskSend(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
     struct Queue* tasks = taskManager->workers[task->workerId]->tasks;
     queue->queue = (union Data*)tasks;
     queue->data = (union Data*)task;
     queue->next = next;
+    pthread_mutex_unlock(&taskManager->mutex);
     goto meta(context, tasks->put);
 }
 
-__code taskSend1_stub(struct Context* context) {
+__code taskSend_stub(struct Context* context) {
     TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto taskSend1(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next);
+    goto taskSend(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next);
 }
 
 __code shutdownTaskManager(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct TaskManagerImpl* taskManagerImpl, struct Queue* queue, __code next(...)) {