changeset 230:a1fb3f2d1a36

fix worker
author ikkun
date Sat, 21 Jan 2017 20:21:00 +0900
parents a10ea0cfc929
children 24da4f217447
files src/parallel_execution/context.h src/parallel_execution/taskManager.c src/parallel_execution/worker.c
diffstat 3 files changed, 46 insertions(+), 42 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/context.h	Sat Jan 21 20:04:32 2017 +0900
+++ b/src/parallel_execution/context.h	Sat Jan 21 20:21:00 2017 +0900
@@ -49,6 +49,8 @@
 
 struct Context {
     enum Code next;
+    struct Worker* worker;
+    struct TaskManager* taskManager;
     int codeNum;
     __code (**code) (struct Context*);
     void* heapStart;
@@ -65,6 +67,7 @@
         enum DataType type;
         struct Queue* wait; // tasks waiting this dataGear
     } meta;
+    struct Context context;
     struct Time {
         enum Code next;
         double time;
@@ -89,12 +92,12 @@
     } TaskManagerImpl;
     struct Worker {
         int id;
-        struct Context* contexts;
-        enum Code execute;
-        enum Code taskSend;
+        struct Queue* tasks;
+        int runFlag;
+        struct Context* context;
         enum Code taskReceive;
         enum Code shutdown;
-        struct Queue* tasks;
+        enum Code next;
     } Worker;
 #ifdef USE_CUDA
     struct CudaTask {
--- a/src/parallel_execution/taskManager.c	Sat Jan 21 20:04:32 2017 +0900
+++ b/src/parallel_execution/taskManager.c	Sat Jan 21 20:21:00 2017 +0900
@@ -46,6 +46,24 @@
     }
     goto meta(context, TaskManager->next);
 }
+__code createWorker1(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker) {
+    int i = loopCounter->i;
+
+    if (i < worker->id) {
+        struct Context* worker_context = &worker->contexts[i];
+        worker_context->next = C_getTask1;
+        worker_context->data[D_Tree] = context->data[D_Tree];
+        // worker_context->data[D_ActiveQueue] = context->data[D_ActiveQueue];                                                            
+        pthread_create(&worker_context->thread, NULL, (void*)&start_code, worker_context);
+        worker_context->thread_num = i;
+        loopCounter->i++;
+
+        goto meta(context, C_createWorker1);
+    }
+
+    loopCounter->i = 0;
+    goto meta(context, C_taskManager);
+}
 
 __code shutdownTaskManager(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker, struct TaskManager* taskManager) {
     int i = loopCounter->i;
--- a/src/parallel_execution/worker.c	Sat Jan 21 20:04:32 2017 +0900
+++ b/src/parallel_execution/worker.c	Sat Jan 21 20:21:00 2017 +0900
@@ -3,60 +3,38 @@
 #include "context.h"
 #include "origin_cs.h"
 
-union Data* createWorker(struct Context* context) {
+union Data* createWorker(struct Context* context, int id, Queue* queue) {
     struct Worker* worker = &ALLOCATE(context, Worker)->Worker;
-    worker->execute = C_executeWorker;
-    worker->taskSend = C_taskSendWorker;
+    worker->tasks = queue;
+    worker->id = id;
+    worker->runFlag = 1;
     worker->taskReceive = C_taskReceiveWorker;
     worker->shutdown = C_shutdownWorker;
     return (union Data*)(worker);
 }
 
-__code taskSendWorker(struct Context* context) {
-}
-
-__code taskSendWorker_stub(struct Context* context) {
-    goto taskSendWorker(context);
-}
-
-__code executeWorker(struct Context* context, Worker* worker) {
-    worker->next = worker->taskReceive;
-    goto meta(context, task->code);
-}
-
-__code executeWorker_stub(struct Context* context) {
-    Worker* worker = &Gearef(context,Worker);
-    goto extcuteWorker(context,worker);
-}
-
 __code taskReceiveWorker(struct Context* context, Worker* worker) {
-    Queue* queue = &Gearef(context,queue);
-    queue->queue = worker->tasks;
+    if (! worker->runFlag)
+        goto meta(context, worker->next);
+    Queue* queue = worker->tasks;
     queue->next = C_getTask1;
     goto meta(context, queue->take);
 }
 
 __code taskReceiveWorker_stub(struct Context* context) {
-    Worker* worker = Gearef(context,Worker).worker;
-    goto taskReceiveWorker(context,queue);
+    Worker* worker = Gearef(context,Worker);
+    goto taskReceiveWorker(context,worker);
 }
 
-__code getTask1(struct Context* context, struct Queue* queue) {
-    queue->next = C_getTask2;
-    goto meta(context, queue->take);
+__code getTask1(struct Context* context, Worker* worker, struct Context* task) {
+    task->worker = worker;
+    goto meta(task, task->next);
 }
 
 __code getTask1_stub(struct Context* context) {
-    goto getTask1(context,/* &context->data[D_ActiveQueue]->Queue*/ NULL);
-}
-
-__code getTask2(struct Context* context, struct Task* task, struct Node* node) {
-    context->next = C_getTask1;
-    goto meta(context, C_getTask2);
-}
-
-__code getTask2_stub(struct Context* context) {
-    goto getTask2(context, /*&(context->data[D_ActiveQueue]->Queue.data->Task)*/ NULL, &context->data[D_Node]->Node);
+    Worker* worker = Gearef(context,Worker);
+    Context* task = &worker->tasks->data->context; 
+    goto getTask1(context,worker,task);
 }
 
 #ifdef USE_CUDA
@@ -67,6 +45,11 @@
 }
 #endif
 
-__code shutdownWorker(struct Context* context) {
+__code shutdownWorker(struct Context* context, Worker* worker) {
+    worker->runFlag = 0;
+}
 
+__code shutdownWorker_stub(struct Context* context) {
+    Worker* worker = Gearef(context,Worker);
+    goto shutdownWorker(context,worker);
 }