# HG changeset patch # User Tatsuki IHA # Date 1486235310 -32400 # Node ID a3448b0f0a56f471d1676777e66ceb5742007589 # Parent ceb8735aefb0bd41f7ef1de48a1bbdc6374385f5 Add input data gear diff -r ceb8735aefb0 -r a3448b0f0a56 src/parallel_execution/CPUWorker.cbc --- a/src/parallel_execution/CPUWorker.cbc Fri Feb 03 18:19:24 2017 +0900 +++ b/src/parallel_execution/CPUWorker.cbc Sun Feb 05 04:08:30 2017 +0900 @@ -21,7 +21,7 @@ cpuWorker->context = NEW(struct Context); initContext(cpuWorker->context); Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker; - goto meta(cpuWorker->context, C_taskReceiveWorker); + goto meta(cpuWorker->context, worker->taskReceive); } __code taskReceiveWorker(struct Worker* worker,struct Queue* queue) { @@ -31,8 +31,6 @@ } __code taskReceiveWorker_stub(struct Context* context) { - CPUWorker* cpuWorker = (CPUWorker *)GearImpl(context, Worker, worker); - pthread_cond_wait(&cpuWorker->cond, &cpuWorker->mutex); goto taskReceiveWorker(context, &Gearef(context, Worker)->worker->Worker, Gearef(context, Queue)); } diff -r ceb8735aefb0 -r a3448b0f0a56 src/parallel_execution/SynchronizedQueue.cbc --- a/src/parallel_execution/SynchronizedQueue.cbc Fri Feb 03 18:19:24 2017 +0900 +++ b/src/parallel_execution/SynchronizedQueue.cbc Sun Feb 05 04:08:30 2017 +0900 @@ -4,10 +4,10 @@ Queue* createSynchronizedQueue(struct Context* context) { struct Queue* queue = new Queue(); - struct SingleLinkedQueue* singleLinkedQueue = new SingleLinkedQueue(); - queue->queue = (union Data*)singleLinkedQueue; - singleLinkedQueue->top = NULL; - singleLinkedQueue->last = NULL; + struct SynchronizedQueue* synchronizedQueue = new SynchronizedQueue(); + synchronizedQueue->top = NULL; + synchronizedQueue->last = NULL; + queue->queue = (union Data*)synchronizedQueue; queue->take = C_takeSynchronizedQueue; queue->put = C_putSynchronizedQueue; queue->isEmpty = C_isEmptySynchronizedQueue; @@ -15,7 +15,7 @@ return queue; } -__code clearSynchronizedQueue(struct SingleLinkedQueue* queue, __code next(...)) { +__code clearSynchronizedQueue(struct SynchronizedQueue* queue, __code next(...)) { struct Element* top = queue->top; if (__sync_bool_compare_and_swap(&queue->top, top, NULL)) { goto next(...); @@ -24,10 +24,11 @@ } } -__code putSynchronizedQueue(struct SingleLinkedQueue* queue, struct Element* element, union Data* data, __code next(...)) { +__code putSynchronizedQueue(struct SynchronizedQueue* queue, union Data* data, __code next(...)) { + Element* element = new Element(); element->next = NULL; element->data = data; - if (queue->last) { + if (queue->top) { Element* last = queue->last; if (__sync_bool_compare_and_swap(&queue->last, last, element)) { last->next = element; @@ -44,7 +45,7 @@ goto next(...); } -__code takeSynchronizedQueue(struct SingleLinkedQueue* queue, __code next(union Data* data, ...)) { +__code takeSynchronizedQueue(struct SynchronizedQueue* queue, __code next(union Data* data, ...)) { if (queue->top) { struct Element* top = queue->top; if (__sync_bool_compare_and_swap(&queue->top, top, top->next)) { @@ -53,12 +54,12 @@ goto meta(context, C_takeSynchronizedQueue); } } else { - data = NULL; + goto meta(context, C_takeSynchronizedQueue); } goto next(data, ...); } -__code isEmptySynchronizedQueue(struct SingleLinkedQueue* queue, __code next(...), __code whenEmpty(...)) { +__code isEmptySynchronizedQueue(struct SynchronizedQueue* queue, __code next(...), __code whenEmpty(...)) { if (queue->top) goto next(...); else diff -r ceb8735aefb0 -r a3448b0f0a56 src/parallel_execution/TaskManagerImpl.cbc --- a/src/parallel_execution/TaskManagerImpl.cbc Fri Feb 03 18:19:24 2017 +0900 +++ b/src/parallel_execution/TaskManagerImpl.cbc Sun Feb 05 04:08:30 2017 +0900 @@ -110,7 +110,6 @@ queue->queue = (union Data*)tasks; queue->data = (union Data*)task; queue->next = next; - pthread_cond_signal(&taskManager->workers[task->workerId]->worker->CPUWorker.cond); goto meta(context, tasks->put); } @@ -125,18 +124,27 @@ struct Queue* tasks = taskManagerImpl->workers[i]->tasks; queue->queue = (union Data*)tasks; queue->data = NULL; - queue->next = next; + queue->next = C_shutdownTaskManager1; goto meta(context, tasks->put); - pthread_join(taskManagerImpl->workers[i]->worker->CPUWorker.thread, NULL); - loopCounter->i++; - goto meta(context, C_shutdownTaskManager); } loopCounter->i = 0; - goto meta(context, taskManager->next); + goto meta(context, next); } __code shutdownTaskManager_stub(struct Context* context) { TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); - goto shutdownTaskManager(context, Gearef(context, LoopCounter), Gearef(context, TaskManager), taskManagerImpl, Gearef(context, Queue), Gearef(context, TaskManager)->next); + goto shutdownTaskManager(context, Gearef(context, LoopCounter), &Gearef(context, TaskManager)->taskManager->TaskManager, taskManagerImpl, Gearef(context, Queue), Gearef(context, TaskManager)->next); } + +__code shutdownTaskManager1(struct LoopCounter* loopCounter, TaskManagerImpl* taskManagerImpl) { + int i = loopCounter->i; + pthread_join(taskManagerImpl->workers[i]->worker->CPUWorker.thread, NULL); + loopCounter->i++; + goto meta(context, C_shutdownTaskManager); +} + +__code shutdownTaskManager1_stub(struct Context* context) { + TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); + goto shutdownTaskManager1(context, Gearef(context, LoopCounter), taskManagerImpl); +} diff -r ceb8735aefb0 -r a3448b0f0a56 src/parallel_execution/context.h --- a/src/parallel_execution/context.h Fri Feb 03 18:19:24 2017 +0900 +++ b/src/parallel_execution/context.h Sun Feb 05 04:08:30 2017 +0900 @@ -76,6 +76,7 @@ long heapLimit; int dataNum; int idgCount; //number of waiting dataGear + int odg; int workerId; union Data **data; }; @@ -175,6 +176,10 @@ struct Element* top; struct Element* last; } SingleLinkedQueue; + struct SynchronizedQueue { + struct Element* top; + struct Element* last; + } SynchronizedQueue; // Stack Interface struct Stack { union Data* stack; diff -r ceb8735aefb0 -r a3448b0f0a56 src/parallel_execution/main.cbc --- a/src/parallel_execution/main.cbc Fri Feb 03 18:19:24 2017 +0900 +++ b/src/parallel_execution/main.cbc Sun Feb 05 04:08:30 2017 +0900 @@ -46,7 +46,8 @@ /* puts("result"); */ time->next = C_code2; - goto meta(context, C_exit_code); + goto meta(context, C_code2); + //goto meta(context, C_exit_code); //goto meta(context, C_start_time); } @@ -54,12 +55,12 @@ goto code1(context, Gearef(context, Time)); } -__code code2(struct Array* array, struct LoopCounter* loopCounter) { +__code code2(struct LoopCounter* loopCounter) { int i = loopCounter->i; if (i < length) { - //printf("%d\n", array->array[i]); - if (array->array[i] == (i*2)) { + printf("%d\n", array_ptr[i]); + if (array_ptr[i] == (i*2)) { loopCounter->i++; goto meta(context, C_code2); } else @@ -67,11 +68,7 @@ } - goto meta(context, C_code2); -} - -__code code2_stub(struct Context* context) { - goto code2(context, &context->data[D_Node]->Node.value->Array, &context->data[D_LoopCounter]->LoopCounter); + goto meta(context, C_exit_code); } __code createData1(struct Allocate* allocate, struct LoopCounter* loopCounter) { @@ -98,7 +95,7 @@ node->key = i; node->value = (union Data*)array; - + tree->tree = (union Data*)loopCounter->tree; tree->next = C_createTask1; @@ -125,7 +122,8 @@ } loopCounter->i = 0; - goto meta(context, C_code1); + taskManager->next = C_code1; + goto meta(context, taskManager->taskManager->TaskManager.shutdown); } __code createTask1_stub(struct Context* context) { @@ -134,32 +132,30 @@ Gearef(context, TaskManager)); } -__code createTask2(LoopCounter* loopCounter, TaskManager* taskManager,struct Context* task, Array* array) { +__code createTask2(LoopCounter* loopCounter, TaskManager* taskManager,struct Context* task, LoopCounter* loopCounter2, Array* array) { int i = loopCounter->i; - - if ((length/split*i) < length) { - array->index = i; - array->prefix = length/split; - array->array = array_ptr; - task->idgCount = 0; - task->next = C_twice; - // task->data[task->idg] = (union Data*)array; - taskManager->next = C_createTask1; - loopCounter->i++; - - goto meta(context, taskManager->taskManager->TaskManager.spawn); - } - loopCounter->i = 0; - taskManager->next = C_code1; - goto meta(context, taskManager->taskManager->TaskManager.shutdown); + array->index = i; + array->prefix = length/split; + array->array = array_ptr; + loopCounter2->i = 0; + task->idgCount = 0; + task->next = C_twice; + task->data[task->dataNum] = (union Data*)loopCounter2; + task->data[task->dataNum+1] = (union Data*)array; + task->odg = task->dataNum + 2; + taskManager->next = C_createTask1; + loopCounter->i++; + goto meta(context, taskManager->taskManager->TaskManager.spawn); } __code createTask2_stub(struct Context* context) { + LoopCounter* loopCounter = &ALLOCATE(context, LoopCounter)->LoopCounter; Array* array = &ALLOCATE(context, Array)->Array; goto createTask2(context, Gearef(context, LoopCounter), Gearef(context, TaskManager), Gearef(context, TaskManager)->context, + loopCounter, array); } diff -r ceb8735aefb0 -r a3448b0f0a56 src/parallel_execution/twice.cbc --- a/src/parallel_execution/twice.cbc Fri Feb 03 18:19:24 2017 +0900 +++ b/src/parallel_execution/twice.cbc Sun Feb 05 04:08:30 2017 +0900 @@ -17,5 +17,12 @@ __code twice_stub(struct Context* context) { struct Context* workerContext = context->worker->worker->CPUWorker.context; - goto twice(context, Gearef(context, LoopCounter), 0, 0, NULL, workerContext); + struct LoopCounter* loopCounter = &context->data[context->dataNum]->LoopCounter; + struct Array* array = &context->data[context->dataNum+1]->Array; + goto twice(context, + loopCounter, + array->index, + array->prefix, + array->array, + workerContext); }