changeset 293:198affea1be1

merge
author Shinji KONO <kono@ie.u-ryukyu.ac.jp>
date Fri, 10 Feb 2017 08:48:51 +0900
parents 3d70e21a3902 (diff) 2bc63a22dd21 (current diff)
children f6770c0a24c2
files
diffstat 11 files changed, 130 insertions(+), 72 deletions(-) [+]
line wrap: on
line diff
Binary file doc/dependency.graffle has changed
--- a/src/parallel_execution/CMakeLists.txt	Thu Feb 09 19:51:32 2017 +0900
+++ b/src/parallel_execution/CMakeLists.txt	Fri Feb 10 08:48:51 2017 +0900
@@ -40,7 +40,7 @@
   TARGET
       twice
   SOURCES 
-    main.cbc RedBlackTree.cbc compare.c SingleLinkedStack.cbc CPUWorker.cbc time.cbc twice.cbc TaskManagerImpl.cbc SingleLinkedQueue.cbc SynchronizedQueue.cbc
+      main.cbc RedBlackTree.cbc compare.c SingleLinkedStack.cbc CPUWorker.cbc time.cbc twice.cbc TaskManagerImpl.cbc SingleLinkedQueue.cbc SynchronizedQueue.cbc SemaphoreImpl.cbc
 )
 
 GearsCommand(
--- a/src/parallel_execution/CPUWorker.cbc	Thu Feb 09 19:51:32 2017 +0900
+++ b/src/parallel_execution/CPUWorker.cbc	Fri Feb 10 08:48:51 2017 +0900
@@ -38,8 +38,9 @@
     if (!task)
         return; // end thread
     task->worker = worker;
-    context->next = C_taskReceiveWorker; // set CG after task exec
-    goto meta(task, task->next);
+    enum Code taskCg = task->next;
+    task->next = C_odgCommit; // set CG after task exec
+    goto meta(task, taskCg);
 }
 
 __code getTask_stub(struct Context* context) {
@@ -48,6 +49,46 @@
     goto getTask(context, worker, task);
 }
 
+__code odgCommit(struct LoopCounter* loopCounter, struct Queue* queue, struct Context* task) {
+    int i = loopCounter->i ;
+    if(task->odg + i < task->maxOdg) {
+        queue->queue = (union Data*)GET_WAIT_LIST(task->data[task->odg+i]);
+        queue->next = C_odgCommit1;
+        goto meta(context, queue->queue->Queue.take);
+    }
+    loopCounter->i = 0;
+    goto meta(context, C_taskReceiveWorker);
+}
+
+__code odgCommit_stub(struct Context* context) {
+    struct Context* workerContext = context->worker->worker->CPUWorker.context;
+    goto odgCommit(workerContext,
+                   Gearef(workerContext, LoopCounter),
+                   Gearef(workerContext, Queue),
+                   context);
+}
+
+__code odgCommit1(struct TaskManager* taskManager, struct Context* task) {
+    if(__sync_fetch_and_sub(&task->idgCount, 1)) {
+        if(task->idgCount == 0) {
+            taskManager->taskManager = (union Data*)task->taskManager;
+            taskManager->context = task;
+            taskManager->next = C_odgCommit;
+            goto meta(context, task->taskManager->spawn);
+        }
+    } else {
+        goto meta(context, C_odgCommit1);
+    }
+}
+
+__code odgCommit1_stub(struct Context* context) {
+    struct Context* task = &Gearef(context, Queue)->data->Context;
+    goto odgCommit1(context,
+                    Gearef(context, TaskManager),
+                    task);
+                 
+}
+
 #ifdef USE_CUDA
 __code twiceGpu() {
     cuMemcpyHtoDAsync(context,context,context,context->stream);
@@ -62,4 +103,3 @@
 
 __code shutdownWorker(struct CPUWorker* worker) {
 }
-
--- a/src/parallel_execution/Queue.cbc	Thu Feb 09 19:51:32 2017 +0900
+++ b/src/parallel_execution/Queue.cbc	Fri Feb 10 08:48:51 2017 +0900
@@ -7,5 +7,5 @@
         __code take(Impl* queue, __code next(union Data*, ...));
         __code isEmpty(Impl* queue, __code next(...), __code whenEmpty(...));
         __code next(...);
-} Stack;
+} Queue;
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/Semaphore.cbc	Fri Feb 10 08:48:51 2017 +0900
@@ -0,0 +1,7 @@
+typedef struct Semaphore<Impl>{
+        union Data* semaphore;
+        __code p(Impl* semaphore, __code next(...)); 
+        __code v(Impl* semaphore, __code next(...)); 
+        __code next(...);
+} Semaphore;
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/parallel_execution/SemaphoreImpl.cbc	Fri Feb 10 08:48:51 2017 +0900
@@ -0,0 +1,37 @@
+#include "../context.h"
+
+Semaphore* createSemaphoreImpl(struct Context* context, int n) {
+    struct Semaphore* semaphore = new Semaphore();
+    struct SemaphoreImpl* semaphoreImpl = new SemaphoreImpl();
+    semaphore->semaphore = (union Data*)semaphoreImpl;
+    semaphoreImpl->value =  n;
+    pthread_mutex_init(&semaphoreImpl->mutex, NULL);
+    pthread_cond_init(&semaphoreImpl->cond, NULL);
+    semaphore->p = C_pOperationSemaphoreImpl;
+    semaphore->v = C_vOperationSemaphoreImpl;
+    return semaphore;
+}
+
+__code pOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) {
+    pthread_mutex_lock(&semaphore->mutex);
+    goto meta(context, C_pOperationSemaphoreImpl1);
+}
+
+__code pOperationSemaphoreImpl1(struct SemaphoreImpl* semaphore, __code next(...)) {
+    if(semaphore->value == 0) {
+        pthread_cond_wait(&semaphore->cond, &semaphore->mutex);
+        goto meta(context, C_pOperationSemaphoreImpl1);
+    }
+    semaphore->value--;
+    pthread_mutex_unlock(&semaphore->mutex);
+    goto next(...);
+}
+
+__code vOperationSemaphoreImpl(struct SemaphoreImpl* semaphore, __code next(...)) {
+    pthread_mutex_lock(&semaphore->mutex);
+    semaphore->value++;
+    pthread_cond_signal(&semaphore->cond);
+    pthread_mutex_unlock(&semaphore->mutex);
+    goto next(...);
+}
+
--- a/src/parallel_execution/SynchronizedQueue.cbc	Thu Feb 09 19:51:32 2017 +0900
+++ b/src/parallel_execution/SynchronizedQueue.cbc	Fri Feb 10 08:48:51 2017 +0900
@@ -7,6 +7,7 @@
     struct SynchronizedQueue* synchronizedQueue = new SynchronizedQueue();
     synchronizedQueue->top  = NULL;
     synchronizedQueue->last = NULL;
+    synchronizedQueue ->queueCount = createSemaphoreImpl(context, 0);
     queue->queue = (union Data*)synchronizedQueue;
     queue->take  = C_takeSynchronizedQueue;
     queue->put  = C_putSynchronizedQueue;
@@ -42,17 +43,25 @@
             goto meta(context, C_putSynchronizedQueue);
         }
     }
-    goto next(...);
+    goto meta(context, C_putSynchronizedQueue1);
+}
+
+__code putSynchronizedQueue1(struct SynchronizedQueue* queue, struct Semaphore* semaphore, __code next(...)) {
+    semaphore->semaphore = (union Data*)queue->queueCount;
+    semaphore->next = next;
+    goto meta(context, queue->queueCount->v);
 }
 
-__code takeSynchronizedQueue(struct SynchronizedQueue* queue, __code next(union Data* data, ...)) {
-    if (queue->top) {
-        struct Element* top = queue->top;
-        if (__sync_bool_compare_and_swap(&queue->top, top, top->next)) {
-            data = top->data;
-        } else {
-            goto meta(context, C_takeSynchronizedQueue);
-        }
+__code takeSynchronizedQueue(struct SynchronizedQueue* queue, struct Semaphore* semaphore) {
+    semaphore->semaphore = (union Data*)queue->queueCount;
+    semaphore->next = C_takeSynchronizedQueue1;
+    goto meta(context, queue->queueCount->p);
+}
+
+__code takeSynchronizedQueue1(struct SynchronizedQueue* queue, __code next(union Data* data, ...)) {
+    struct Element* top = queue->top;
+    if (__sync_bool_compare_and_swap(&queue->top, top, top->next)) {
+        data = top->data;
     } else {
         goto meta(context, C_takeSynchronizedQueue);
     }
--- a/src/parallel_execution/TaskManagerImpl.cbc	Thu Feb 09 19:51:32 2017 +0900
+++ b/src/parallel_execution/TaskManagerImpl.cbc	Fri Feb 10 08:48:51 2017 +0900
@@ -48,6 +48,7 @@
 __code createTask(struct TaskManager* taskManager) {
     taskManager->context = NEW(struct Context);
     initContext(taskManager->context);
+    taskManager->context->taskManager = taskManager;
     goto meta(context, C_setWorker);
 }
 
--- a/src/parallel_execution/context.h	Thu Feb 09 19:51:32 2017 +0900
+++ b/src/parallel_execution/context.h	Fri Feb 10 08:48:51 2017 +0900
@@ -45,9 +45,9 @@
     meta->size = len; \
     data; })
 
-#define GET_TYPE(dseg) ({ \
-    struct Meta* meta = (struct Meta*)(((void*)dseg) - sizeof(struct Meta));\
-    meta->type; })
+#define GET_META(dseg) ((struct Meta*)(((void*)dseg) - sizeof(struct Meta)))
+#define GET_TYPE(dseg) (GET_META(dseg)->type)
+#define GET_WAIT_LIST(dseg) (GET_META(dseg)->wait)
 
 #define Gearef(context, t) (&(context)->data[D_##t]->t)
 
@@ -77,6 +77,7 @@
     int dataNum;
     int idgCount; //number of waiting dataGear
     int odg;
+    int maxOdg;
     int workerId;
     union Data **data;
 };
@@ -125,6 +126,7 @@
         enum Code shutdown;
         enum Code next;
         struct Queue* tasks;
+        struct TaskManager* taskManager;
     } Worker;
     struct CPUWorker {
         pthread_t thread;
@@ -179,6 +181,7 @@
     struct SynchronizedQueue {
         struct Element* top;
         struct Element* last;
+        struct Semaphore* queueCount;
     } SynchronizedQueue;
     // Stack Interface
     struct Stack {
@@ -249,6 +252,17 @@
             Black,
         } color;
     } Node;
+    struct Semaphore {
+        union Data* semaphore;
+        enum Code p;
+        enum Code v;
+        enum Code next;
+    } Semaphore;
+    struct SemaphoreImpl {
+        int value;
+        pthread_mutex_t mutex;
+        pthread_cond_t cond;
+    } SemaphoreImpl;
     struct Allocate {
         enum Code next;
         long size;
--- a/src/parallel_execution/main.cbc	Thu Feb 09 19:51:32 2017 +0900
+++ b/src/parallel_execution/main.cbc	Fri Feb 10 08:48:51 2017 +0900
@@ -59,7 +59,7 @@
     int i = loopCounter->i;
 
     if (i < length) {
-        printf("%d\n", array_ptr[i]);
+        //printf("%d\n", array_ptr[i]);
         if (array_ptr[i] == (i*2)) {
             loopCounter->i++;
             goto meta(context, C_code2);
@@ -71,48 +71,6 @@
     goto meta(context, C_exit_code);
 }
 
-__code createData1(struct Allocate* allocate, struct LoopCounter* loopCounter) {
-    int i = loopCounter->i;
-
-    if ((length/split*i) < length) {
-        goto meta(context, C_createData2);
-    }
-
-    loopCounter->i = 0;
-    goto meta(context, C_code1);
-}
-
-__code createData1_stub(struct Context* context) {
-    goto createData1(context, Gearef(context, Allocate), Gearef(context, LoopCounter));
-}
-
-__code createData2(struct LoopCounter* loopCounter, struct Array* array, struct Node* node, Tree* tree) {
-    int i = loopCounter->i;
-
-    array->index = i;
-    array->prefix = length/split;
-    array->array = array_ptr;
-
-    node->key = i;
-    node->value = (union Data*)array;
-
-    tree->tree = (union Data*)loopCounter->tree;
-
-    tree->next = C_createTask1;
-    tree->node = node;
-
-    goto meta(context, loopCounter->tree->put);
-}
-
-__code createData2_stub(struct Context* context) {
-    Array* array = &ALLOCATE(context, Array)->Array;
-    goto createData2(context,
-            Gearef(context, LoopCounter),
-            array,
-            Gearef(context, Node),
-            Gearef(context, Tree));
-}
-
 __code createTask1(struct LoopCounter* loopCounter, struct TaskManager* taskManager) {
     int i = loopCounter->i;
 
@@ -126,12 +84,6 @@
     goto meta(context, taskManager->taskManager->TaskManager.shutdown);
 }
 
-__code createTask1_stub(struct Context* context) {
-    goto createTask1(context,
-            Gearef(context, LoopCounter),
-            Gearef(context, TaskManager));
-}
-
 __code createTask2(LoopCounter* loopCounter, TaskManager* taskManager,struct Context* task, LoopCounter* loopCounter2, Array* array) {
     int i = loopCounter->i;
     array->index = i;
@@ -143,6 +95,7 @@
     task->data[task->dataNum] = (union Data*)loopCounter2;
     task->data[task->dataNum+1] = (union Data*)array;
     task->odg = task->dataNum + 2;
+    task->maxOdg = task->odg;
     taskManager->next = C_createTask1;
     loopCounter->i++;
     goto meta(context, taskManager->taskManager->TaskManager.spawn);
@@ -185,4 +138,3 @@
 
     goto start_code(main_context);
 }
-
--- a/src/parallel_execution/twice.cbc	Thu Feb 09 19:51:32 2017 +0900
+++ b/src/parallel_execution/twice.cbc	Fri Feb 10 08:48:51 2017 +0900
@@ -2,7 +2,7 @@
 
 #include "../context.h"
 
-__code twice(struct LoopCounter* loopCounter, int index, int prefix, int* array, struct Context* workerContext) {
+__code twice(struct LoopCounter* loopCounter, int index, int prefix, int* array) {
     int i = loopCounter->i;
     if (i < prefix) {
         array[i+index*prefix] = array[i+index*prefix]*2;
@@ -12,17 +12,15 @@
     }
 
     loopCounter->i = 0;
-    goto meta(workerContext, workerContext->next);
+    goto meta(context, context->next);
 }
 
 __code twice_stub(struct Context* context) {
-    struct Context* workerContext = context->worker->worker->CPUWorker.context;
     struct LoopCounter* loopCounter = &context->data[context->dataNum]->LoopCounter;
     struct Array* array = &context->data[context->dataNum+1]->Array;
     goto twice(context,
                loopCounter,
                array->index,
                array->prefix,
-               array->array,
-               workerContext);
+               array->array);
 }