changeset 316:54d203daf06b

CUDAtwice.cbc is called.
author Shinji KONO <kono@ie.u-ryukyu.ac.jp>
date Wed, 15 Feb 2017 16:25:23 +0900
parents 1839586f5b41
children 51aa65676e37
files src/parallel_execution/CUDAWorker.cbc src/parallel_execution/CUDAtwice.cbc src/parallel_execution/TaskManagerImpl.cbc src/parallel_execution/context.h src/parallel_execution/main.cbc
diffstat 5 files changed, 89 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/src/parallel_execution/CUDAWorker.cbc	Wed Feb 15 12:34:19 2017 +0900
+++ b/src/parallel_execution/CUDAWorker.cbc	Wed Feb 15 16:25:23 2017 +0900
@@ -13,36 +13,51 @@
 #include "../context.h"
 
 static void start_CUDAworker(Worker* worker);
-static void cudaInit(struct CUDAWorker *cudaWorker) ;
+static void cudaInit(struct CUDAWorker *cudaWorker,int phase) ;
 
-static int cuda_initialized = 0;
+volatile int cuda_initialized = 0;
 
-Worker* createCUDAWorker(struct Context* context, int id, Queue* queue) {
+Worker* createCUDAWorker(struct Context* context, int id, Queue* queue, TaskManagerImpl *im) {
     struct Worker* worker = ALLOC(context, Worker);
     struct CUDAWorker* cudaWorker = new CUDAWorker();
+
+    cudaInit(cudaWorker,0);
+
     worker->worker = (union Data*)cudaWorker;
     worker->tasks = queue;
+printf("createCUDAWorker %p\n",queue);
     cudaWorker->id = id;
     worker->shutdown = C_shutdownCUDAWorker;
-    pthread_create(&worker->worker->CUDAWorker.thread, NULL, (void*)&start_CUDAworker, worker);
+    // pthread_create(&worker->worker->CUDAWorker.thread, NULL, (void*)&start_CUDAworker, worker);
+    if (im) {
+printf("im->worker %p\n",im->workers);
+        im->workers[0] = worker;
+    }
+    cuda_initialized = 1;
+    start_CUDAworker(worker);
     return worker;
 }
 
-static void cudaInit(struct CUDAWorker *cudaWorker) {
+static void cudaInit(struct CUDAWorker *cudaWorker,int phase) {
     // initialize and load kernel
     cudaWorker->num_stream = 1; // number of stream
 //    cudaWorker->stream = NEWN(cudaWorker->num_stream, CUstream );
 printf("cudaInit 1\n");
+   if (phase==0)
     checkCudaErrors(cuInit(0));
+   if (phase==0)
     checkCudaErrors(cuDeviceGet(&cudaWorker->device, 0));
 printf("cudaInit 2\n");
+   if (phase==0)
     checkCudaErrors(cuCtxCreate(&cudaWorker->cuCtx, CU_CTX_SCHED_SPIN, cudaWorker->device));
 printf("cudaInit 3\n");
 //    if (cudaWorker->num_stream) {
 //        for (int i=0;i<cudaWorker->num_stream;i++)
 //            checkCudaErrors(cuStreamCreate(&cudaWorker->stream[i],0));
 //    }
-    cuda_initialized = 1;
+    CUdeviceptr devA;
+    checkCudaErrors(cuMemAlloc(&devA, 16));
+
 printf("cudaInit done\n");
 }
 
@@ -68,10 +83,10 @@
 __code getTaskCUDA(struct Worker* worker, struct Context* task) {
     if (!task)
         return; // end thread
-    if (cuda_initialized==0) {
-        CUDAWorker* cudaWorker = (CUDAWorker*)worker->worker;
-        cudaInit(cudaWorker);
-    }
+//    if (cuda_initialized==0 || 1) {
+//        CUDAWorker* cudaWorker = (CUDAWorker*)worker->worker;
+//        cudaInit(cudaWorker,1);
+//    }
     worker->taskReceive = C_taskReceiveCUDAWorker;
     task->worker = worker;
     enum Code taskCg = task->next;
--- a/src/parallel_execution/CUDAtwice.cbc	Wed Feb 15 12:34:19 2017 +0900
+++ b/src/parallel_execution/CUDAtwice.cbc	Wed Feb 15 16:25:23 2017 +0900
@@ -18,7 +18,7 @@
     checkCudaErrors(cuMemAlloc(&devLoopCounter, sizeof(LoopCounter)));
 
     //twiceカーネルが定義されてなければそれをロードする
-    checkCudaErrors(cuModuleLoad(&context->module, "CUDAtwice.ptx"));
+    checkCudaErrors(cuModuleLoad(&context->module, "c/CUDAtwice.ptx"));
     checkCudaErrors(cuModuleGetFunction(&context->function, context->module, "twice"));
 printf("CUdA Exe 2\n");
 
@@ -38,7 +38,9 @@
     //結果を取ってくるコマンドを入力する
     //コマンドの終了待ちを行う   
     checkCudaErrors(cuMemcpyDtoH(array->array, devA, array->size));
+
     // wait for stream
+    checkCudaErrors(cuCtxSynchronize());
 }
 
 __code CUDAtwice(struct Context* context, struct LoopCounter* loopCounter, int index, int prefix, int* array, struct Context* workerContext) {
--- a/src/parallel_execution/TaskManagerImpl.cbc	Wed Feb 15 12:34:19 2017 +0900
+++ b/src/parallel_execution/TaskManagerImpl.cbc	Wed Feb 15 16:25:23 2017 +0900
@@ -17,11 +17,13 @@
     taskManager->spawn = C_spawnTaskManager;
     taskManager->shutdown  = C_shutdownTaskManager;
     struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl();
-    taskManager->taskManager = (union Data*)taskManagerImpl;
     taskManagerImpl -> activeQueue = createSingleLinkedQueue(context);
     taskManagerImpl -> taskQueue = createSingleLinkedQueue(context);
     taskManagerImpl -> numWorker = taskManager->maxCPU;
     createWorkers(context, taskManager, taskManagerImpl);
+    taskManager->taskManager = (union Data*)taskManagerImpl;
+printf ("create taskManagerImpl %p workers %p\n",taskManagerImpl, taskManagerImpl->workers);
+printf ("create taskManager %p\n",taskManager);
     return taskManager;
 }
 
@@ -35,7 +37,11 @@
     for (;i<taskManager->cpu;i++) {
 #ifdef USE_CUDAWorker
         Queue* queue = createSynchronizedQueue(context);
-        taskManagerImpl->workers[i] = (Worker*)createCUDAWorker(context, i, queue);
+        // taskManagerImpl->workers[i] = (Worker*)createCUDAWorker(context, i, queue,0);
+printf("createWorkers q %p\n", queue );
+        taskManagerImpl->workers[i] = (Worker*)queue;
+printf("createWorkers im %p\n", taskManagerImpl );
+printf ("createWorkers workers %p\n",taskManagerImpl->workers);
 #else
         Queue* queue = createSynchronizedQueue(context);
         taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue);
@@ -48,9 +54,16 @@
 }
 
 __code createTask(struct TaskManager* taskManager) {
+    TaskManager *t = (TaskManager *)taskManager->taskManager;
+    TaskManagerImpl *im = (TaskManagerImpl *)t->taskManager;
+
+printf ("createTask im %p 1 worker %p q %p \n",im, im->workers, im->workers[0]->tasks);
+
     taskManager->context = NEW(struct Context);
     initContext(taskManager->context);
     taskManager->context->taskManager = taskManager;
+    struct Queue* tasks = im->workers[0]->tasks;
+printf ("createTask  q %p\n",tasks);
     goto meta(context, C_setWorker);
 }
 
@@ -68,6 +81,7 @@
 }
 
 __code spawnTaskManager(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
+printf ("2 %p\n",taskManager->workers[0]->tasks);
     if (task->idgCount == 0) {
         // enqueue activeQueue
         queue->queue = (union Data*)taskManager->activeQueue;
@@ -82,6 +96,7 @@
 
 __code spawnTaskManager_stub(struct Context* context) {
     TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
+printf ("3 %p\n",taskManager->workers[0]->tasks);
     pthread_mutex_lock(&taskManager->mutex);
     goto spawnTaskManager(context,
                           taskManager,
@@ -111,6 +126,7 @@
 __code taskSend1(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
     struct Queue* tasks = taskManager->workers[task->workerId]->tasks;
     queue->queue = (union Data*)tasks;
+printf("taskSend1 workerid %d %p workers %p q %p \n" , task->workerId, taskManager,  taskManager->workers, queue->queue);
     queue->data = (union Data*)task;
     queue->next = next;
     goto meta(context, tasks->put);
--- a/src/parallel_execution/context.h	Wed Feb 15 12:34:19 2017 +0900
+++ b/src/parallel_execution/context.h	Wed Feb 15 16:25:23 2017 +0900
@@ -102,7 +102,7 @@
         int i;
     } LoopCounter;
     struct TaskManager {
-        union Data* taskManager;
+        volatile union Data* taskManager;
         enum Code createTask; // create NEW  contexts for execution & argument
         enum Code spawn;      // start NEW context on the worker
         enum Code shutdown;
@@ -141,14 +141,14 @@
     } CPUWorker;
 #ifdef USE_CUDAWorker
     struct CUDAWorker {
+        CUdevice device;
+        CUcontext cuCtx;
         pthread_t thread;
         struct Context* context;
         int id;
         struct Queue* tasks;
         int runFlag;
         enum Code next;
-        CUdevice device;
-        CUcontext cuCtx;
         int num_stream;
         CUstream *stream;
     } CUDAWorker;
--- a/src/parallel_execution/main.cbc	Wed Feb 15 12:34:19 2017 +0900
+++ b/src/parallel_execution/main.cbc	Wed Feb 15 16:25:23 2017 +0900
@@ -27,15 +27,47 @@
     }
 }
 
+void *start_taskManager(struct Context *context) {
+    goto initDataGears(context, Gearef(context, LoopCounter), Gearef(context, TaskManager));
+    return 0;
+}
+
+#ifdef USE_CUDAWorker
+extern volatile int cuda_initialized;
+#endif
+
 __code initDataGears(struct LoopCounter* loopCounter, struct TaskManager* taskManager) {
     // loopCounter->tree = createRedBlackTree(context);
     loopCounter->i = 0;
     taskManager->taskManager = (union Data*)createTaskManagerImpl(context, cpu_num, gpu_num, 0);
+#ifdef USE_CUDAWorker
+    while(! cuda_initialized) {};
+#endif
+TaskManager *t = taskManager->taskManager;
+TaskManagerImpl *im = (TaskManagerImpl*)t->taskManager;
+printf("cuda initialized. TaskMangerImpl %p workers %p q %p\n", im, im->workers, im->workers[0]->tasks);
     goto meta(context, C_createTask1);
 }
 
 __code initDataGears_stub(struct Context* context) {
-    goto initDataGears(context, Gearef(context, LoopCounter), Gearef(context, TaskManager));
+    struct TaskManager* taskManager =  Gearef(context, TaskManager);
+    taskManager->taskManager = 0;
+#ifndef USE_CUDAWorker
+    struct LoopCounter* loopCounter = Gearef(context, LoopCounter);
+    goto initDataGears(context, loopCounter, taskManager);
+#else
+    cuda_initialized = 0;
+    pthread_t thread;
+    pthread_create(&thread, NULL, (void*)&start_taskManager, context);
+    while (taskManager->taskManager == 0);
+    TaskManager *t = (TaskManager*)taskManager->taskManager;
+    TaskManagerImpl *im = (TaskManagerImpl*)t->taskManager;
+    struct Queue *q = (Queue *)im->workers[0];
+printf("init TaskManagerImpl %p workers %p q %p \n",im, im->workers, q);
+    createCUDAWorker(context,0,q, im);
+    pthread_join(thread,0);
+    exit(0);
+#endif
 }
 
 __code code1(struct Time* time) {
@@ -78,6 +110,10 @@
 __code createTask1(struct LoopCounter* loopCounter, struct TaskManager* taskManager) {
     int i = loopCounter->i;
 
+TaskManager* t = (TaskManager*)taskManager->taskManager;
+TaskManagerImpl* im = (TaskManagerImpl*)t->taskManager;
+printf("createTask1 %p->workers %p\n", im, im->workers);
+
     if ((length/split*i) < length) {
         taskManager->next = C_createTask2;
         goto meta(context, taskManager->taskManager->TaskManager.createTask);
@@ -112,6 +148,9 @@
     task->maxOdg = task->odg;
     taskManager->next = C_createTask1;
     loopCounter->i++;
+TaskManager* t = (TaskManager*)taskManager->taskManager;
+TaskManagerImpl* im = (TaskManagerImpl*)t->taskManager;
+printf("im %p->workers %p\n", im, im->workers);
     goto meta(context, taskManager->taskManager->TaskManager.spawn);
 }