changeset 405:8915fce522b3

Fix shutdown TaskManager
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Tue, 05 Sep 2017 16:46:31 +0900
parents c5cd9888bf2a
children 9b35e6581b5c
files src/parallel_execution/CPUWorker.cbc src/parallel_execution/TaskManager.cbc src/parallel_execution/TaskManagerImpl.cbc src/parallel_execution/context.h src/parallel_execution/examples/bitonicSort/bitonicSort.cbc src/parallel_execution/examples/twice/twice.cbc src/parallel_execution/twice.cbc
diffstat 7 files changed, 62 insertions(+), 226 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/CPUWorker.cbc	Sun Sep 03 00:21:16 2017 +0900
+++ b/src/parallel_execution/CPUWorker.cbc	Tue Sep 05 16:46:31 2017 +0900
@@ -68,20 +68,23 @@
     goto iterateCommit1(workerContext, context);
 }
 
-__code odgCommit(struct LoopCounter* loopCounter, struct Context* task) {
+__code odgCommit(struct LoopCounter* loopCounter, struct Context* task, struct TaskManager* taskManager) {
     int i = loopCounter->i ;
     if (task->odg+i < task->maxOdg) {
         goto meta(task, C_odgCommit1);
     }
     loopCounter->i = 0;
-    goto meta(context, C_taskReceiveWorker);
+    taskManager->taskManager = (union Data*)task->taskManager;
+    taskManager->next = C_taskReceiveWorker;
+    goto meta(context, task->taskManager->decrementTaskCount);
 }
 
 __code odgCommit_stub(struct Context* context) {
     struct Context* workerContext = context->worker->worker->CPUWorker.context;
     goto odgCommit(workerContext,
                    Gearef(context, LoopCounter),
-                   context);
+                   context,
+                   Gearef(workerContext, TaskManager));
 }
 
 __code odgCommit1(struct LoopCounter* loopCounter, struct Queue* queue) {
--- a/src/parallel_execution/TaskManager.cbc	Sun Sep 03 00:21:16 2017 +0900
+++ b/src/parallel_execution/TaskManager.cbc	Tue Sep 05 16:46:31 2017 +0900
@@ -5,6 +5,8 @@
     __code spawn(Impl* taskManager, struct Queue* queue, struct Context* task, __code next(...));
     __code spawnTasks(Impl* taskManagerImpl, struct Context** tasks, __code next1(...), struct TaskManager* taskManager);
     __code shutdown(Impl* taskManagerImpl, __code next(...), struct TaskManager* taskManager, struct Queue* queue);
+    __code incrementTaskCount(Impl* taskManagerImpl, __code next(...));
+    __code decrementTaskCount(Impl* taskManagerImpl, __code next(...));
     __code next(...);
     __code next1(...);
     int worker;
--- a/src/parallel_execution/TaskManagerImpl.cbc	Sun Sep 03 00:21:16 2017 +0900
+++ b/src/parallel_execution/TaskManagerImpl.cbc	Tue Sep 05 16:46:31 2017 +0900
@@ -1,6 +1,7 @@
 #include "../context.h"
 
 #include <stdio.h>
+#include <unistd.h>
 
 void createWorkers(struct Context* context, TaskManager* taskManeger, TaskManagerImpl* taskManagerImpl);
 
@@ -16,11 +17,15 @@
     taskManager->spawnTasks = C_spawnTasksTaskManagerImpl;
     taskManager->spawn = C_spawnTaskManagerImpl;
     taskManager->shutdown  = C_shutdownTaskManagerImpl;
+    taskManager->incrementTaskCount = C_incrementTaskCountTaskManagerImpl;
+    taskManager->decrementTaskCount = C_decrementTaskCountTaskManagerImpl;
     struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl();
-    taskManagerImpl -> taskQueue = createSingleLinkedQueue(context);
-    taskManagerImpl -> numWorker = taskManager->maxCPU;
-    taskManagerImpl -> loopCounter = new LoopCounter();
-    taskManagerImpl -> loopCounter -> i = 0;
+    taskManagerImpl->taskQueue = createSingleLinkedQueue(context);
+    taskManagerImpl->numWorker = taskManager->maxCPU;
+    taskManagerImpl->sendWorkerIndex = 0;
+    taskManagerImpl->taskCount = 0;
+    taskManagerImpl->loopCounter = new LoopCounter();
+    taskManagerImpl->loopCounter -> i = 0;
     createWorkers(context, taskManager, taskManagerImpl);
     taskManager->taskManager = (union Data*)taskManagerImpl;
     return taskManager;
@@ -54,7 +59,7 @@
 
 __code spawnTasksTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Queue* tasks, __code next1(...), struct Queue* queue) {
     queue->queue = (union Data*)tasks;
-    queue->next      = C_spawnTasksTaskManagerImpl1;
+    queue->next = C_spawnTasksTaskManagerImpl1;
     queue->whenEmpty = C_spawnTasksTaskManagerImpl3;
     goto meta(context, tasks->isEmpty);
 }
@@ -69,7 +74,7 @@
 
 __code spawnTasksTaskManagerImpl1(struct TaskManagerImpl* taskManager, struct Queue* tasks, __code next1(...), struct Queue* queue) {
     queue->queue = (union Data*)tasks;
-    queue->next      = C_spawnTasksTaskManagerImpl2;
+    queue->next = C_spawnTasksTaskManagerImpl2;
     goto meta(context, tasks->take);
 }
 
@@ -85,7 +90,7 @@
     task->taskManager = &taskManager->taskManager->TaskManager;
     taskManager->context = task;
     taskManager->next = C_spawnTasksTaskManagerImpl;
-    goto meta(context, C_setWaitTask);
+    goto meta(context, C_setWaitTaskTaskManagerImpl);
 }
 
 __code spawnTasksTaskManagerImpl2_stub(struct Context* context) {
@@ -121,30 +126,40 @@
 	goto spawnTasksTaskManagerImpl5(context, taskManagerImpl, task, taskManager);
 } 
 
-__code setWaitTask(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...), struct Queue* queue) {
+__code setWaitTaskTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...), struct Queue* queue) {
     int i = taskManager->loopCounter->i;
     if(task->idg+i < task->maxIdg) {
         queue->queue = (Data *)GET_WAIT_LIST(task->data[task->idg + i]);
         queue->data  = (Data *)task;
-        queue->next  = C_setWaitTask;
+        queue->next  = C_setWaitTaskTaskManagerImpl;
         taskManager->loopCounter->i++;
         goto meta(context, queue->queue->Queue.put);
     }
     taskManager->loopCounter->i = 0;
     queue->queue = (Data *)taskManager->taskQueue;
     queue->data  = (Data *)task;
-    queue->next  = next;
+    queue->next  = C_incrementTaskCountTaskManagerImpl;
     goto meta(context, taskManager->taskQueue->put);
 }
 
-__code setWaitTask_stub(struct Context* context) {
+__code setWaitTaskTaskManagerImpl_stub(struct Context* context) {
     TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
     struct Context* task = Gearef(context, TaskManager)->context;
-    goto setWaitTask(context,
-                     taskManager,
-                     task,
-                     Gearef(context, TaskManager)->next,
-                     Gearef(context, Queue));
+    goto setWaitTaskTaskManagerImpl(context,
+                                    taskManager,
+                                    task,
+                                    Gearef(context, TaskManager)->next,
+                                    Gearef(context, Queue));
+}
+
+__code incrementTaskCountTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) {
+    __sync_fetch_and_add(&taskManager->taskCount, 1);
+    goto next(...);
+}
+
+__code decrementTaskCountTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) {
+    __sync_fetch_and_sub(&taskManager->taskCount, 1);
+    goto next(...);
 }
 
 __code spawnTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Iterator* iterator, struct Context* task, __code next(...)) {
@@ -195,6 +210,10 @@
 }
 
 __code shutdownTaskManagerImpl(struct TaskManagerImpl* taskManagerImpl, __code next(...), struct TaskManager* taskManager, struct Queue* queue) {
+    if (taskManagerImpl->taskCount != 0) {
+        sleep(1);
+        goto meta(context, taskManager->shutdown);
+    }
     int i = taskManagerImpl->loopCounter->i;
     if (taskManager->cpu <= i && i < taskManager->maxCPU) {
         struct Queue* tasks = taskManagerImpl->workers[i]->tasks;
--- a/src/parallel_execution/context.h	Sun Sep 03 00:21:16 2017 +0900
+++ b/src/parallel_execution/context.h	Tue Sep 05 16:46:31 2017 +0900
@@ -137,6 +137,8 @@
         enum Code spawn;      // start NEW context on the worker
         enum Code spawnTasks; // start NEW tasks on the worker
         enum Code shutdown;
+        enum Code incrementTaskCount;
+        enum Code decrementTaskCount;
         enum Code next;
         enum Code next1;
         enum Code task;
@@ -153,6 +155,7 @@
         enum Code next;
         int numWorker;
         int sendWorkerIndex;
+        int taskCount;
         pthread_mutex_t mutex;
         struct Queue* activeQueue;
         struct Queue* taskQueue;
@@ -246,7 +249,6 @@
     } Element;
     struct Array {
         int size; 
-        int index; 
         int prefix; 
         int* array;
     } Array;
--- a/src/parallel_execution/examples/bitonicSort/bitonicSort.cbc	Sun Sep 03 00:21:16 2017 +0900
+++ b/src/parallel_execution/examples/bitonicSort/bitonicSort.cbc	Tue Sep 05 16:46:31 2017 +0900
@@ -96,7 +96,6 @@
 }
 
 __code code2(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct Time* time) {
-    sleep(40);
     taskManager->next = C_exit_code;
     goto meta(context, taskManager->taskManager->TaskManager.shutdown);
 }
--- a/src/parallel_execution/examples/twice/twice.cbc	Sun Sep 03 00:21:16 2017 +0900
+++ b/src/parallel_execution/examples/twice/twice.cbc	Tue Sep 05 16:46:31 2017 +0900
@@ -1,189 +1,26 @@
 #include <stdio.h>
-#include <string.h>
-#include <stdlib.h>
-#include <unistd.h>
-
-#include "../../../context.h"
-
-int cpu_num = 1;
-int length = 102400;
-int split = 8;
-int* array_ptr;
-int gpu_num = 0;
-int CPU_ANY = -1;
-int CPU_CUDA = -1;
-
-void print_queue(struct Element* element) {
-    while (element) {
-        printf("%p\n", ((struct Task *)(element->data)));
-        element = element->next;
-    }
-}
 
-void print_tree(struct Node* node) {
-    if (node != 0) {
-        printf("%d\n", node->value->Array.index);
-        print_tree(node->left);
-        print_tree(node->right);
-    }
-}
-
-void *start_taskManager(struct Context *context) {
-    goto initDataGears(context, Gearef(context, LoopCounter), Gearef(context, TaskManager));
-    return 0;
-}
-
-#ifdef USE_CUDAWorker
-#ifdef USE_CUDA_MAIN_THREAD
-extern volatile int cuda_initialized;
-#endif
-#endif
-
-__code initDataGears(struct LoopCounter* loopCounter, struct TaskManager* taskManager) {
-    // loopCounter->tree = createRedBlackTree(context);
-    loopCounter->i = 0;
-    taskManager->taskManager = (union Data*)createTaskManagerImpl(context, cpu_num, gpu_num, 0);
-#ifdef USE_CUDAWorker
-#ifdef USE_CUDA_MAIN_THREAD
-    while(! cuda_initialized) {};
-#endif
-#endif
-    goto meta(context, C_code1);
-}
+#include "../context.h"
 
-__code initDataGears_stub(struct Context* context) {
-    struct TaskManager* taskManager =  Gearef(context, TaskManager);
-    taskManager->taskManager = 0;
-#if (! defined(USE_CUDAWorker) || ! defined(USE_CUDA_MAIN_THREAD))
-    struct LoopCounter* loopCounter = Gearef(context, LoopCounter);
-    goto initDataGears(context, loopCounter, taskManager);
-#else
-    cuda_initialized = 0;
-    pthread_t thread;
-    pthread_create(&thread, NULL, (void*)&start_taskManager, context);
-    while (taskManager->taskManager == 0);
-    TaskManager *t = (TaskManager*)taskManager->taskManager;
-    TaskManagerImpl *im = (TaskManagerImpl*)t->taskManager;
-    struct Queue *q = (Queue *)im->workers[0];
-    createCUDAWorker(context,0,q, im);
-    pthread_join(thread,0);
-    exit(0);
-#endif
-}
+__code twice(struct Array* array, __code next(...), struct LoopCounter loopCounter) {
+    int i = loopCounter->i;
+    if (i < prefix) {
+        array[i+index*prefix] = array[i+index*prefix]*2;
+        loopCounter->i++;
 
-__code code1(struct Time* time) {
-    printf("cpus:\t\t%d\n", cpu_num);
-    printf("gpus:\t\t%d\n", gpu_num);
-    printf("length:\t\t%d\n", length);
-    printf("length/task:\t%d\n", length/split);
-    /* puts("queue"); */
-    /* print_queue(context->data[ActiveQueue]->queue.first); */
-    /* puts("tree"); */
-    /* print_tree(context->data[Tree]->tree.root); */
-    /* puts("result"); */
-    time->time = (union Data*)createTimeImpl(context);
-    time->next = C_createTask1;
-    goto meta(context, time->time->Time.start);
-}
-
-__code code2(struct LoopCounter* loopCounter) {
-    int i = loopCounter->i;
-
-    if (i < length) {
-        //printf("%d\n", array_ptr[i]);
-        if (array_ptr[i] == (i*2)) {
-            loopCounter->i++;
-            goto meta(context, C_code2);
-        } else
-            puts("wrong result");
-
-    }
-
-    goto meta(context, C_exit_code);
-}
-
-__code createTask1(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct Time* time) {
-    int i = loopCounter->i;
-
-    if ((length/split*i) < length) {
-        goto meta(context, C_createTask2);
+        goto meta(context, C_twice);
     }
 
     loopCounter->i = 0;
-    taskManager->next = time->time->Time.end;
-    time->next = C_code2;
-#if ( defined(USE_CUDAWorker) && defined(USE_CUDA_MAIN_THREAD))
-sleep(5);
-#endif
-    goto meta(context, taskManager->taskManager->TaskManager.shutdown);
+    goto meta(context, context->next);
 }
 
-__code createTask2(struct LoopCounter* loopCounter, struct TaskManager* taskManager) {
-    struct Context** tasks = (struct Context**)ALLOC_ARRAY(context, Context, 1);
-
-    int i = loopCounter->i;
-    LoopCounter* loopCounter2 = &ALLOCATE_DATA_GEAR(context, LoopCounter)->LoopCounter;
-    Array* array = &ALLOCATE_DATA_GEAR(context, Array)->Array;
-    array->index = i;
-    array->prefix = length/split;
-    array->array = array_ptr;
-    array->size = length;
-    loopCounter2->i = 0;
-    loopCounter->i++;
-
-    // par goto twice(loopCounter2, array, __exit);
-    struct Context* task = NEW(struct Context);
-    initContext(task);
-    task->idgCount = 0;
-    if (gpu_num) {
-#ifdef USE_CUDAWorker
-        task->next = C_CUDAtwice;
-        task->workerId = CPU_CUDA;
-#else
-        task->next = C_twice;
-#endif
-    } else {
-        task->next = C_twice;
-    }
-    task->idg = task->dataNum;
-    task->data[task->idg] = (union Data*)loopCounter2;
-    task->data[task->idg+1] = (union Data*)array;
-    task->maxIdg = task->idg + 2;
-    task->odg = task->maxIdg;
-    task->maxOdg = task->odg;
-    tasks[0] = task;
-
-    // goto crateTask1();
-    goto taskManager->spawnTasks(tasks, createTask1);
+__code twice_stub(struct Context* context) {
+    struct LoopCounter* loopCounter = &context->data[context->dataNum]->LoopCounter;
+    struct Array* array = &context->data[context->dataNum+1]->Array;
+    goto twice(context,
+               loopCounter,
+               array->index,
+               array->prefix,
+               array->array);
 }
-
-void init(int argc, char** argv) {
-    for (int i = 1; argv[i]; ++i) {
-        if (strcmp(argv[i], "-cpu") == 0)
-            cpu_num = (int)atoi(argv[i+1]);
-        else if (strcmp(argv[i], "-l") == 0)
-            length = (int)atoi(argv[i+1]);
-        else if (strcmp(argv[i], "-s") == 0)
-            split = (int)atoi(argv[i+1]);
-        else if (strcmp(argv[i], "-cuda") == 0) {
-            gpu_num = 1;
-            CPU_CUDA = 0;
-        }
-    }
-}
-
-
-int main(int argc, char** argv) {
-    init(argc, argv);
-
-    array_ptr = NEWN(length, int);
-
-    for(int i=0; i<length; i++)
-        array_ptr[i]=i;
-
-    struct Context* main_context = NEW(struct Context);
-    initContext(main_context);
-    main_context->next = C_initDataGears;
-
-    goto start_code(main_context);
-}
--- a/src/parallel_execution/twice.cbc	Sun Sep 03 00:21:16 2017 +0900
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,26 +0,0 @@
-#include <stdio.h>
-
-#include "../context.h"
-
-__code twice(struct LoopCounter* loopCounter, int index, int prefix, int* array) {
-    int i = loopCounter->i;
-    if (i < prefix) {
-        array[i+index*prefix] = array[i+index*prefix]*2;
-        loopCounter->i++;
-
-        goto meta(context, C_twice);
-    }
-
-    loopCounter->i = 0;
-    goto meta(context, context->next);
-}
-
-__code twice_stub(struct Context* context) {
-    struct LoopCounter* loopCounter = &context->data[context->dataNum]->LoopCounter;
-    struct Array* array = &context->data[context->dataNum+1]->Array;
-    goto twice(context,
-               loopCounter,
-               array->index,
-               array->prefix,
-               array->array);
-}