changeset 233:06133afb3b5b

create worker start_code
author mir3636
date Sun, 22 Jan 2017 20:02:21 +0900
parents 123b0d277b84
children 47588c28f189
files src/parallel_execution/context.h src/parallel_execution/taskManager.c src/parallel_execution/worker.c
diffstat 3 files changed, 35 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/context.h	Sun Jan 22 19:02:12 2017 +0900
+++ b/src/parallel_execution/context.h	Sun Jan 22 20:02:21 2017 +0900
@@ -31,6 +31,8 @@
     meta->type = D_##t; \
     data; })
 
+#define ALLOC(context, t) (&ALLOCATE(context, t)->t)
+
 #define GET_TYPE(dseg) ({ \
     struct Meta* meta = (struct Meta*)(((void*)dseg) - sizeof(struct Meta));\
     meta->type; })
@@ -60,7 +62,6 @@
     void* heapStart;
     void* heap;
     long heapLimit;
-    pthread_t thread;
     int thread_num;
     int dataNum;
     union Data **data;
@@ -82,17 +83,22 @@
     } LoopCounter;
     struct TaskManager {
         union Data* taskManager;
-        int numWorker;
         enum Code spawn;
         enum Code shutdown;
-        enum Code deadLockDetected;
+
         enum Code next;
-        struct Task* task;
-        struct Worker* workers;
+        enum Code task;
+        struct Context* context;
+        int worker;
+        int cpu;
+        int gpu;
+        int io;
     } TaskManager;
     struct TaskManagerImpl {
+        int numWorker;
         struct Queue* activeQueue;
         struct Queue* taskQueue;
+        struct Worker* workers;
     } TaskManagerImpl;
     struct Worker {
         union Data* worker;
@@ -101,6 +107,7 @@
         enum Code next;
     } Worker;
     struct CPUWorker {
+        pthread_t thread;
         struct Context* context;
         int id;
         struct Queue* tasks;
@@ -109,6 +116,7 @@
     } CPUWorker;
 #ifdef USE_CUDA
     struct CudaWorker {
+        pthread_t thread;
         struct Context* context;
         int id;
         struct Queue* tasks;
--- a/src/parallel_execution/taskManager.c	Sun Jan 22 19:02:12 2017 +0900
+++ b/src/parallel_execution/taskManager.c	Sun Jan 22 20:02:21 2017 +0900
@@ -4,15 +4,20 @@
 #include "origin_cs.h"
 #include <stdio.h>
 
-union Data* createTaskManager(struct Context* context) {
+union Data* createTaskManager(struct Context* context, int numCPU, int numGPU, int numIO) {
     struct TaskManager* taskManager = &ALLOCATE(context, TaskManager)->TaskManager;
     struct TaskManagerImpl* taskManagerImpl = &ALLOCATE(context, TaskManagerImpl)->TaskManagerImpl;
     taskManager->taskManager = (union Data*)taskManagerImpl;
-    taskManagerImpl -> activeQueue = &createSynchronizedQueue(context)->Queue;
-    taskManagerImpl -> taskQueue = &createSynchronizedQueue(context)->Queue;
+    taskManagerImpl -> activeQueue = &createSingleLinkedQueue(context)->Queue;
+    taskManagerImpl -> taskQueue = &createSingleLinkedQueue(context)->Queue;
+    // 0...numIO-1 IOProcessor 
+    // numIO...numIO+numGPU-1 GPUProcessor
+    // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor
+    taskManager->io = 0;
+    taskManager->gpu = numIO;
+    taskManeger->cpu = numIO+numGPU;
     taskManager->spawn = C_spawnTaskManager;
     taskManager->shutdown  = C_shutdownTaskManager;
-    // taskManager->deadLockDetected  = C_deadLockDetected;
     return (union Data*)(taskManager);
 }
 
--- a/src/parallel_execution/worker.c	Sun Jan 22 19:02:12 2017 +0900
+++ b/src/parallel_execution/worker.c	Sun Jan 22 20:02:21 2017 +0900
@@ -3,9 +3,11 @@
 #include "context.h"
 #include "origin_cs.h"
 
+static void start_code(Worker* worker);
+
 union Data* createCPUWorker(struct Context* context, int id, Queue* queue, enum Code next) {
-    struct Worker* worker = &ALLOCATE(context, Worker)->Worker;
-    struct CPUWorker* cpuWorker = &ALLOCATE(context, CPUWorker)->CPUWorker;
+    struct Worker* worker = ALLOC(context, Worker);
+    struct CPUWorker* cpuWorker = ALLOC(context, CPUWorker);
     worker->worker = (union Data*)cpuWorker;
     cpuWorker->tasks = queue;
     cpuWorker->id = id;
@@ -13,12 +15,20 @@
     cpuWorker->next = next;
     worker->taskReceive = C_taskReceiveWorker;
     worker->shutdown = C_shutdownWorker;
+    pthread_create(&worker->thread, NULL, (void*)&start_code, worker);
     return (union Data*)(worker);
 }
 
+static void start_code(Worker* worker) {
+    CPUWorker* worker = (CPUWorker*)worker->worker;
+    worker->context = NEW(struct Context);
+    initContext(worker->context);
+    goto meta(worker->context, C_taskReceiveWorker);
+}
+
 __code taskReceiveWorker(struct Context* context, CPUWorker* worker) {
     if (! worker->runFlag)
-        goto meta(context, worker->next);
+        return; // end thread
     Queue* queue = worker->tasks;
     queue->next = C_getTask1;
     goto meta(context, queue->take);