changeset 327:534601ed8c50 examples_directory

Running dependency example for single thread and single task
author Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
date Tue, 18 Apr 2017 05:53:37 +0900
parents f23f6d0aa4e9
children 48c2b5bcab79
files src/parallel_execution/CPUWorker.cbc src/parallel_execution/TaskManagerImpl.cbc src/parallel_execution/context.h src/parallel_execution/examples/calc.cbc
diffstat 4 files changed, 82 insertions(+), 74 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/CPUWorker.cbc	Tue Apr 18 01:47:42 2017 +0900
+++ b/src/parallel_execution/CPUWorker.cbc	Tue Apr 18 05:53:37 2017 +0900
@@ -47,12 +47,10 @@
     goto getTask(context, worker, task);
 }
 
-__code odgCommit(struct LoopCounter* loopCounter, struct Queue* queue, struct Context* task) {
+__code odgCommit(struct LoopCounter* loopCounter, struct Context* task) {
     int i = loopCounter->i ;
     if(task->odg + i < task->maxOdg) {
-        queue->queue = (union Data*)GET_WAIT_LIST(task->data[task->odg+i]);
-        queue->next = C_odgCommit1;
-        goto meta(context, queue->queue->Queue.take);
+        goto meta(task, C_odgCommit1);
     }
     loopCounter->i = 0;
     goto meta(context, C_taskReceiveWorker);
@@ -61,30 +59,64 @@
 __code odgCommit_stub(struct Context* context) {
     struct Context* workerContext = context->worker->worker->CPUWorker.context;
     goto odgCommit(workerContext,
-                   Gearef(workerContext, LoopCounter),
-                   Gearef(workerContext, Queue),
+                   Gearef(context, LoopCounter),
                    context);
 }
 
-__code odgCommit1(struct TaskManager* taskManager, struct Context* task) {
+__code odgCommit1(struct LoopCounter* loopCounter, struct Queue* queue) {
+    int i = loopCounter->i ;
+    queue->queue = (union Data*)GET_WAIT_LIST(context->data[context->odg+i]);
+    queue->whenEmpty = C_odgCommit4;
+    queue->next = C_odgCommit2;
+    goto meta(context, queue->queue->Queue.isEmpty);
+}
+
+__code odgCommit1_stub(struct Context* context) {
+    goto odgCommit1(context,
+                   Gearef(context, LoopCounter),
+                   Gearef(context, Queue));
+}
+
+__code odgCommit2(struct Queue* queue) {
+    queue->next = C_odgCommit3;
+    goto meta(context, queue->queue->Queue.take);
+}
+
+__code odgCommit2_stub(struct Context* context) {
+    goto odgCommit2(context,
+                   Gearef(context, Queue));
+}
+
+__code odgCommit3(struct TaskManager* taskManager, struct Context* task) {
     if(__sync_fetch_and_sub(&task->idgCount, 1)) {
         if(task->idgCount == 0) {
             taskManager->taskManager = (union Data*)task->taskManager;
             taskManager->context = task;
-            taskManager->next = C_odgCommit;
+            taskManager->next = C_odgCommit1;
             goto meta(context, task->taskManager->spawn);
+        } else {
+            goto meta(context, C_odgCommit1);
         }
     } else {
-        goto meta(context, C_odgCommit1);
+        goto meta(context, C_odgCommit3);
     }
 }
 
-__code odgCommit1_stub(struct Context* context) {
+__code odgCommit3_stub(struct Context* context) {
     struct Context* task = &Gearef(context, Queue)->data->Context;
-    goto odgCommit1(context,
+    goto odgCommit3(context,
                     Gearef(context, TaskManager),
                     task);
-                 
+}
+
+__code odgCommit4(struct LoopCounter* loopCounter) {
+    loopCounter->i++;
+    goto meta(context, C_odgCommit);
+}
+
+__code odgCommit4_stub(struct Context* context) {
+    goto odgCommit4(context,
+                    Gearef(context, LoopCounter));
 }
 
 __code shutdownWorker(struct CPUWorker* worker) {
--- a/src/parallel_execution/TaskManagerImpl.cbc	Tue Apr 18 01:47:42 2017 +0900
+++ b/src/parallel_execution/TaskManagerImpl.cbc	Tue Apr 18 05:53:37 2017 +0900
@@ -55,26 +55,11 @@
 __code createTask(struct TaskManager* taskManager) {
     taskManager->context = NEW(struct Context);
     initContext(taskManager->context);
-    taskManager->context->taskManager = taskManager;
+    taskManager->context->taskManager = (struct TaskManager*)taskManager->taskManager;
     taskManager->context->idg = taskManager->context->dataNum;
     goto meta(context, C_setWorker);
 }
 
-__code setWaitTask(struct Queue* queue, struct Context* task, Data* data, __code next(...)) {
-    struct Meta *metaData = GET_META(data);
-    if (!metaData->wait) {
-        metaData->wait = createSynchronizedQueue(context);
-    }
-    queue->queue = (union Data*)metaData->wait;
-    queue->next = next;
-    queue->data = (Data *)task;
-    goto meta(context, queue->queue->Queue.put);
-}
-
-__code setWaitTask_stub(struct Context* context) {
-    goto setWaitTask(context, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->data, Gearef(context, TaskManager)->next);
-}
-
 __code setWorker(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) {
     task->workerId = taskManager->sendWorkerIndex;
     if(++taskManager->sendWorkerIndex >= taskManager->numWorker) {
@@ -88,17 +73,24 @@
     goto setWorker(context, taskManager, Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next);
 }
 
+__code setWaitTask(struct Queue* queue, struct Context* task, Data* data, __code next(...)) {
+    queue->queue = (Data *)GET_WAIT_LIST(data);
+    queue->next = next;
+    queue->data = (Data *)task;
+    goto meta(context, queue->queue->Queue.put);
+}
+
+__code setWaitTask_stub(struct Context* context) {
+    goto setWaitTask(context, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->data, Gearef(context, TaskManager)->next);
+}
+
 __code spawnTaskManager(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
     if (task->idgCount == 0) {
-        // enqueue activeQueue
-        queue->queue = (union Data*)taskManager->activeQueue;
+        goto meta(context, C_taskSend);
     } else {
-        // enqueue waitQueue
-        queue->queue = (union Data*)taskManager->taskQueue;
+        pthread_mutex_unlock(&taskManager->mutex);
+        goto next(...);
     }
-    queue->data = (union Data*)task;
-    queue->next = C_spawnTaskManager1;
-    goto meta(context, queue->queue->Queue.put);
 }
 
 __code spawnTaskManager_stub(struct Context* context) {
@@ -111,35 +103,18 @@
                           Gearef(context, TaskManager)->next);
 }
 
-
-__code spawnTaskManager1(struct TaskManagerImpl* taskManager) {
-    pthread_mutex_unlock(&taskManager->mutex);
-    goto meta(context, C_taskSend);
-}
-
-__code spawnTaskManager1_stub(struct Context* context) {
-    TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto spawnTaskManager1(context,
-                           taskManager);
-}
-
-__code taskSend(struct TaskManagerImpl* taskManager, struct Queue* queue) {
-    queue->queue = (union Data*)taskManager->activeQueue;
-    queue->next = C_taskSend1;
-    goto meta(context, taskManager->activeQueue->take);
-}
-
-__code taskSend1(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
+__code taskSend(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
     struct Queue* tasks = taskManager->workers[task->workerId]->tasks;
     queue->queue = (union Data*)tasks;
     queue->data = (union Data*)task;
     queue->next = next;
+    pthread_mutex_unlock(&taskManager->mutex);
     goto meta(context, tasks->put);
 }
 
-__code taskSend1_stub(struct Context* context) {
+__code taskSend_stub(struct Context* context) {
     TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto taskSend1(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next);
+    goto taskSend(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next);
 }
 
 __code shutdownTaskManager(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct TaskManagerImpl* taskManagerImpl, struct Queue* queue, __code next(...)) {
--- a/src/parallel_execution/context.h	Tue Apr 18 01:47:42 2017 +0900
+++ b/src/parallel_execution/context.h	Tue Apr 18 05:53:37 2017 +0900
@@ -45,6 +45,12 @@
     meta->size = len; \
     data; })
 
+#define ALLOCATE_DATA_GEAR(context, t) ({ \
+        union Data* data = ALLOCATE(context, t); \
+        struct Meta* meta = GET_META(data); \
+        meta->wait = createSingleLinkedQueue(context); \
+        data; })
+
 #define GET_META(dseg) ((struct Meta*)(((void*)dseg) - sizeof(struct Meta)))
 #define GET_TYPE(dseg) (GET_META(dseg)->type)
 #define GET_WAIT_LIST(dseg) (GET_META(dseg)->wait)
--- a/src/parallel_execution/examples/calc.cbc	Tue Apr 18 01:47:42 2017 +0900
+++ b/src/parallel_execution/examples/calc.cbc	Tue Apr 18 05:53:37 2017 +0900
@@ -111,20 +111,21 @@
 __code createTask1(struct LoopCounter* loopCounter, struct TaskManager* taskManager) {
     int i = loopCounter->i;
 
-    if ((length/split*i) < length) {
+    if (i < 1) {
+        loopCounter->i++;
         taskManager->next = C_createTask2;
         goto meta(context, taskManager->taskManager->TaskManager.createTask);
     }
 
     loopCounter->i = 0;
     taskManager->next = C_code1;
+    sleep(3);
     goto meta(context, taskManager->taskManager->TaskManager.shutdown);
 }
 
 __code createTask2(LoopCounter* loopCounter, TaskManager* taskManager, struct Context *task, Integer *integer1, Integer *integer2, Integer *integer3) {
     int i = loopCounter->i;
-    loopCounter->i++;
-    task->idgCount = 0;
+    task->idgCount = 1;
     task->next = C_mult;
     integer2->value = i+1;
     task->data[task->idg] = (union Data*)integer1;
@@ -139,9 +140,9 @@
 }
 
 __code createTask2_stub(struct Context* context) {
-    Integer* integer1 = &ALLOCATE(context, Integer)->Integer;
-    Integer* integer2 = &ALLOCATE(context, Integer)->Integer;
-    Integer* integer3 = &ALLOCATE(context, Integer)->Integer;
+    Integer* integer1 = &ALLOCATE_DATA_GEAR(context, Integer)->Integer;
+    Integer* integer2 = &ALLOCATE_DATA_GEAR(context, Integer)->Integer;
+    Integer* integer3 = &ALLOCATE_DATA_GEAR(context, Integer)->Integer;
     goto createTask2(context,
             Gearef(context, LoopCounter),
             Gearef(context, TaskManager),
@@ -151,16 +152,11 @@
             integer3);
 }
 
-__code createTask3(TaskManager* taskManager) {
+__code createTask3(struct TaskManager* taskManager) {
     taskManager->next = C_createTask4;
     goto meta(context, taskManager->taskManager->TaskManager.spawn);
 }
 
-__code createTask3_stub(struct Context* context) {
-    goto createTask3(context,
-            Gearef(context, TaskManager));
-}
-
 __code createTask4(struct TaskManager* taskManager) {
     taskManager->next = C_createTask5;
     goto meta(context, taskManager->taskManager->TaskManager.createTask);
@@ -171,21 +167,20 @@
     task->next = C_add;
     task->idgCount = 0;
     integer1->value = i;
-    integer1->value = i+1;
+    integer2->value = i+1;
     task->data[task->idg] = (union Data*)integer1;
     task->data[task->idg+1] = (union Data*)integer2;
     task->maxIdg = task->idg + 2;
     task->odg = task->maxIdg;
     task->data[task->odg] = (union Data*)integer3;
     task->maxOdg = task->odg + 1;
-    taskManager->next = C_createTask3;
-    taskManager->data = (union Data*)integer1;
+    taskManager->next = C_createTask1;
     goto meta(context, taskManager->taskManager->TaskManager.spawn);
 }
 
 __code createTask5_stub(struct Context* context) {
-    Integer* integer1 = &ALLOCATE(context, Integer)->Integer;
-    Integer* integer2 = &ALLOCATE(context, Integer)->Integer;
+    Integer* integer1 = &ALLOCATE_DATA_GEAR(context, Integer)->Integer;
+    Integer* integer2 = &ALLOCATE_DATA_GEAR(context, Integer)->Integer;
     goto createTask5(context,
             Gearef(context, LoopCounter),
             Gearef(context, TaskManager),
@@ -248,7 +243,7 @@
 }
 
 __code mult_stub(struct Context* context) { 
-    goto add(context, 
+    goto mult(context, 
             &context->data[context->idg]->Integer,
             &context->data[context->idg + 1]->Integer,
             &context->data[context->odg]->Integer);