changeset 282:a3448b0f0a56

Add input data gear
author Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
date Sun, 05 Feb 2017 04:08:30 +0900
parents ceb8735aefb0
children 2b41bd298fe8
files src/parallel_execution/CPUWorker.cbc src/parallel_execution/SynchronizedQueue.cbc src/parallel_execution/TaskManagerImpl.cbc src/parallel_execution/context.h src/parallel_execution/main.cbc src/parallel_execution/twice.cbc
diffstat 6 files changed, 64 insertions(+), 49 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/CPUWorker.cbc	Fri Feb 03 18:19:24 2017 +0900
+++ b/src/parallel_execution/CPUWorker.cbc	Sun Feb 05 04:08:30 2017 +0900
@@ -21,7 +21,7 @@
     cpuWorker->context = NEW(struct Context);
     initContext(cpuWorker->context);
     Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker;
-    goto meta(cpuWorker->context, C_taskReceiveWorker);
+    goto meta(cpuWorker->context, worker->taskReceive);
 }
 
 __code taskReceiveWorker(struct Worker* worker,struct Queue* queue) {
@@ -31,8 +31,6 @@
 }
 
 __code taskReceiveWorker_stub(struct Context* context) {
-    CPUWorker* cpuWorker = (CPUWorker *)GearImpl(context, Worker, worker);
-    pthread_cond_wait(&cpuWorker->cond, &cpuWorker->mutex);
     goto taskReceiveWorker(context, &Gearef(context, Worker)->worker->Worker, Gearef(context, Queue));
 }
 
--- a/src/parallel_execution/SynchronizedQueue.cbc	Fri Feb 03 18:19:24 2017 +0900
+++ b/src/parallel_execution/SynchronizedQueue.cbc	Sun Feb 05 04:08:30 2017 +0900
@@ -4,10 +4,10 @@
 
 Queue* createSynchronizedQueue(struct Context* context) {
     struct Queue* queue = new Queue();
-    struct SingleLinkedQueue* singleLinkedQueue = new SingleLinkedQueue();
-    queue->queue = (union Data*)singleLinkedQueue;
-    singleLinkedQueue->top  = NULL;
-    singleLinkedQueue->last = NULL;
+    struct SynchronizedQueue* synchronizedQueue = new SynchronizedQueue();
+    synchronizedQueue->top  = NULL;
+    synchronizedQueue->last = NULL;
+    queue->queue = (union Data*)synchronizedQueue;
     queue->take  = C_takeSynchronizedQueue;
     queue->put  = C_putSynchronizedQueue;
     queue->isEmpty = C_isEmptySynchronizedQueue;
@@ -15,7 +15,7 @@
     return queue;
 }
 
-__code clearSynchronizedQueue(struct SingleLinkedQueue* queue, __code next(...)) {
+__code clearSynchronizedQueue(struct SynchronizedQueue* queue, __code next(...)) {
     struct Element* top = queue->top;
     if (__sync_bool_compare_and_swap(&queue->top, top, NULL)) {
         goto next(...);
@@ -24,10 +24,11 @@
     }
 }
 
-__code putSynchronizedQueue(struct SingleLinkedQueue* queue, struct Element* element, union Data* data, __code next(...)) {
+__code putSynchronizedQueue(struct SynchronizedQueue* queue, union Data* data, __code next(...)) {
+    Element* element = new Element();
     element->next = NULL;
     element->data = data;
-    if (queue->last) {
+    if (queue->top) {
         Element* last = queue->last;
         if (__sync_bool_compare_and_swap(&queue->last, last, element)) {
             last->next = element;
@@ -44,7 +45,7 @@
     goto next(...);
 }
 
-__code takeSynchronizedQueue(struct SingleLinkedQueue* queue, __code next(union Data* data, ...)) {
+__code takeSynchronizedQueue(struct SynchronizedQueue* queue, __code next(union Data* data, ...)) {
     if (queue->top) {
         struct Element* top = queue->top;
         if (__sync_bool_compare_and_swap(&queue->top, top, top->next)) {
@@ -53,12 +54,12 @@
             goto meta(context, C_takeSynchronizedQueue);
         }
     } else {
-        data = NULL;
+        goto meta(context, C_takeSynchronizedQueue);
     }
     goto next(data, ...);
 }
 
-__code isEmptySynchronizedQueue(struct SingleLinkedQueue* queue, __code next(...), __code whenEmpty(...)) {
+__code isEmptySynchronizedQueue(struct SynchronizedQueue* queue, __code next(...), __code whenEmpty(...)) {
     if (queue->top)
         goto next(...);
     else
--- a/src/parallel_execution/TaskManagerImpl.cbc	Fri Feb 03 18:19:24 2017 +0900
+++ b/src/parallel_execution/TaskManagerImpl.cbc	Sun Feb 05 04:08:30 2017 +0900
@@ -110,7 +110,6 @@
     queue->queue = (union Data*)tasks;
     queue->data = (union Data*)task;
     queue->next = next;
-    pthread_cond_signal(&taskManager->workers[task->workerId]->worker->CPUWorker.cond);
     goto meta(context, tasks->put);
 }
 
@@ -125,18 +124,27 @@
         struct Queue* tasks = taskManagerImpl->workers[i]->tasks;
         queue->queue = (union Data*)tasks;
         queue->data = NULL;
-        queue->next = next;
+        queue->next = C_shutdownTaskManager1;
         goto meta(context, tasks->put);
-        pthread_join(taskManagerImpl->workers[i]->worker->CPUWorker.thread, NULL);
-        loopCounter->i++;
-        goto meta(context, C_shutdownTaskManager);
     }
 
     loopCounter->i = 0;
-    goto meta(context, taskManager->next);
+    goto meta(context, next);
 }
 
 __code shutdownTaskManager_stub(struct Context* context) {
     TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
-    goto shutdownTaskManager(context, Gearef(context, LoopCounter), Gearef(context, TaskManager), taskManagerImpl, Gearef(context, Queue), Gearef(context, TaskManager)->next);
+    goto shutdownTaskManager(context, Gearef(context, LoopCounter), &Gearef(context, TaskManager)->taskManager->TaskManager, taskManagerImpl, Gearef(context, Queue), Gearef(context, TaskManager)->next);
 }
+
+__code shutdownTaskManager1(struct LoopCounter* loopCounter, TaskManagerImpl* taskManagerImpl) {
+    int i = loopCounter->i;
+    pthread_join(taskManagerImpl->workers[i]->worker->CPUWorker.thread, NULL);
+    loopCounter->i++;
+    goto meta(context, C_shutdownTaskManager);
+}
+
+__code shutdownTaskManager1_stub(struct Context* context) {
+    TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+    goto shutdownTaskManager1(context, Gearef(context, LoopCounter), taskManagerImpl);
+}
--- a/src/parallel_execution/context.h	Fri Feb 03 18:19:24 2017 +0900
+++ b/src/parallel_execution/context.h	Sun Feb 05 04:08:30 2017 +0900
@@ -76,6 +76,7 @@
     long heapLimit;
     int dataNum;
     int idgCount; //number of waiting dataGear
+    int odg;
     int workerId;
     union Data **data;
 };
@@ -175,6 +176,10 @@
         struct Element* top;
         struct Element* last;
     } SingleLinkedQueue;
+    struct SynchronizedQueue {
+        struct Element* top;
+        struct Element* last;
+    } SynchronizedQueue;
     // Stack Interface
     struct Stack {
         union Data* stack;
--- a/src/parallel_execution/main.cbc	Fri Feb 03 18:19:24 2017 +0900
+++ b/src/parallel_execution/main.cbc	Sun Feb 05 04:08:30 2017 +0900
@@ -46,7 +46,8 @@
     /* puts("result"); */
 
     time->next = C_code2;
-    goto meta(context, C_exit_code);
+    goto meta(context, C_code2);
+    //goto meta(context, C_exit_code);
     //goto meta(context, C_start_time);
 }
 
@@ -54,12 +55,12 @@
     goto code1(context, Gearef(context, Time));
 }
 
-__code code2(struct Array* array, struct LoopCounter* loopCounter) {
+__code code2(struct LoopCounter* loopCounter) {
     int i = loopCounter->i;
 
     if (i < length) {
-           //printf("%d\n", array->array[i]);
-        if (array->array[i] == (i*2)) {
+        printf("%d\n", array_ptr[i]);
+        if (array_ptr[i] == (i*2)) {
             loopCounter->i++;
             goto meta(context, C_code2);
         } else
@@ -67,11 +68,7 @@
 
     }
 
-    goto meta(context, C_code2);
-}
-
-__code code2_stub(struct Context* context) {
-    goto code2(context, &context->data[D_Node]->Node.value->Array, &context->data[D_LoopCounter]->LoopCounter);
+    goto meta(context, C_exit_code);
 }
 
 __code createData1(struct Allocate* allocate, struct LoopCounter* loopCounter) {
@@ -98,7 +95,7 @@
 
     node->key = i;
     node->value = (union Data*)array;
-    
+
     tree->tree = (union Data*)loopCounter->tree;
 
     tree->next = C_createTask1;
@@ -125,7 +122,8 @@
     }
 
     loopCounter->i = 0;
-    goto meta(context, C_code1);
+    taskManager->next = C_code1;
+    goto meta(context, taskManager->taskManager->TaskManager.shutdown);
 }
 
 __code createTask1_stub(struct Context* context) {
@@ -134,32 +132,30 @@
             Gearef(context, TaskManager));
 }
 
-__code createTask2(LoopCounter* loopCounter, TaskManager* taskManager,struct Context* task, Array* array) {
+__code createTask2(LoopCounter* loopCounter, TaskManager* taskManager,struct Context* task, LoopCounter* loopCounter2, Array* array) {
     int i = loopCounter->i;
-
-    if ((length/split*i) < length) {
-        array->index = i;
-        array->prefix = length/split;
-        array->array = array_ptr;
-        task->idgCount = 0;
-        task->next = C_twice;
-        // task->data[task->idg] = (union Data*)array;
-        taskManager->next = C_createTask1;
-        loopCounter->i++;
-
-        goto meta(context, taskManager->taskManager->TaskManager.spawn);
-    }
-    loopCounter->i = 0;
-    taskManager->next = C_code1;
-    goto meta(context, taskManager->taskManager->TaskManager.shutdown);
+    array->index = i;
+    array->prefix = length/split;
+    array->array = array_ptr;
+    loopCounter2->i = 0;
+    task->idgCount = 0;
+    task->next = C_twice;
+    task->data[task->dataNum] = (union Data*)loopCounter2;
+    task->data[task->dataNum+1] = (union Data*)array;
+    task->odg = task->dataNum + 2;
+    taskManager->next = C_createTask1;
+    loopCounter->i++;
+    goto meta(context, taskManager->taskManager->TaskManager.spawn);
 }
 
 __code createTask2_stub(struct Context* context) {
+    LoopCounter* loopCounter = &ALLOCATE(context, LoopCounter)->LoopCounter;
     Array* array = &ALLOCATE(context, Array)->Array;
     goto createTask2(context,
             Gearef(context, LoopCounter),
             Gearef(context, TaskManager),
             Gearef(context, TaskManager)->context,
+            loopCounter,
             array);
 }
 
--- a/src/parallel_execution/twice.cbc	Fri Feb 03 18:19:24 2017 +0900
+++ b/src/parallel_execution/twice.cbc	Sun Feb 05 04:08:30 2017 +0900
@@ -17,5 +17,12 @@
 
 __code twice_stub(struct Context* context) {
     struct Context* workerContext = context->worker->worker->CPUWorker.context;
-    goto twice(context, Gearef(context, LoopCounter), 0, 0, NULL, workerContext);
+    struct LoopCounter* loopCounter = &context->data[context->dataNum]->LoopCounter;
+    struct Array* array = &context->data[context->dataNum+1]->Array;
+    goto twice(context,
+               loopCounter,
+               array->index,
+               array->prefix,
+               array->array,
+               workerContext);
 }