comparison src/CUDAWorker.cbc @ 590:9146d6017f18 default tip

hg mv parallel_execution/* ..
author anatofuz <anatofuz@cr.ie.u-ryukyu.ac.jp>
date Thu, 16 Jan 2020 15:12:06 +0900
parents src/parallel_execution/CUDAWorker.cbc@d6983ce1015d
children
comparison
equal deleted inserted replaced
589:a4cab67624f7 590:9146d6017f18
1 #include "../context.h"
2 #interface "TaskManager.h"
3 #interface "Worker.h"
4 #interface "Iterator.h"
5 #interface "Queue.h"
6
7 extern void cudaInit(struct CUDAWorker *cudaWorker,int phase, int deviceNum);
8 extern void cudaShutdown(CUDAWorker *cudaWorker);
9
10 static void startCUDAWorker(Worker* worker);
11
12 Worker* createCUDAWorker(struct Context* context, int id, Queue* queue, int deviceNum) {
13 struct Worker* worker = new Worker();
14 struct CUDAWorker* cudaWorker = new CUDAWorker();
15 worker->worker = (union Data*)cudaWorker;
16 worker->tasks = queue;
17 cudaWorker->id = id;
18 cudaWorker->loopCounter = 0;
19 cudaWorker->deviceNum = deviceNum;
20 worker->taskReceive = C_taskReceiveCUDAWorker;
21 worker->shutdown = C_shutdownCUDAWorker;
22 pthread_create(&worker->thread, NULL, (void*)&startCUDAWorker, worker);
23 return worker;
24 }
25
26 static void startCUDAWorker(Worker* worker) {
27 struct CUDAWorker* cudaWorker = &worker->worker->CUDAWorker;
28 cudaInit(cudaWorker, 0, cudaWorker->deviceNum);
29 cudaWorker->context = NEW(struct Context);
30 initContext(cudaWorker->context);
31 cudaWorker->executor = createCUDAExecutor(cudaWorker->context, cudaWorker->device);
32 Gearef(cudaWorker->context, Worker)->worker = (union Data*)worker;
33 Gearef(cudaWorker->context, Worker)->tasks = worker->tasks;
34 goto meta(cudaWorker->context, worker->taskReceive);
35 }
36
37 __code taskReceiveCUDAWorker(struct Worker* worker, struct Queue* tasks) {
38 goto tasks->take(getTaskCUDAWorker);
39 }
40
41 __code getTaskCUDAWorker(struct CUDAWorker* cudaWorker, struct Context* task, struct Worker* worker) {
42 if (!task) {
43 goto worker->shutdown(); // end thread
44 }
45 task->worker = worker;
46 enum Code taskCg = task->next;
47 task->next = C_odgCommitCUDAWorker; // commit outputDG after task exec
48 goto meta(task, taskCg); // switch task context
49 }
50
51 __code getTaskCUDAWorker_stub(struct Context* context) {
52 CUDAWorker* cudaWorker = (CUDAWorker*)GearImpl(context, Worker, worker);
53 Worker* worker = &Gearef(context,Worker)->worker->Worker;
54 struct Context* task = &Gearef(context, Queue)->data->Context;
55 goto getTaskCUDAWorker(context, cudaWorker, task, worker);
56 }
57
58 __code odgCommitCUDAWorker(struct CUDAWorker* worker, struct Context* task) {
59 if (task->iterate) {
60 struct Iterator* iterator = task->iterator;
61 goto iterator->barrier(task, odgCommitCUDAWorker1, odgCommitCUDAWorker6);
62 } else {
63 goto odgCommitCUDAWorker1();
64 }
65 }
66
67 __code odgCommitCUDAWorker_stub(struct Context* context) {
68 // switch worker context
69 struct Context* workerContext = context->worker->worker->CUDAWorker.context;
70 Gearef(workerContext, Worker)->worker = (union Data*)context->worker;
71 Gearef(workerContext, Worker)->task = context;
72 CUDAWorker* cudaWorker = (CUDAWorker*)GearImpl(workerContext, Worker, worker);
73 goto odgCommitCUDAWorker(workerContext,
74 cudaWorker,
75 context);
76 }
77
78 __code odgCommitCUDAWorker1(struct CUDAWorker* worker, struct Context* task) {
79 int i = worker->loopCounter;
80 if (task->odg+i < task->maxOdg) {
81 goto odgCommitCUDAWorker2();
82 }
83 worker->loopCounter = 0;
84 struct TaskManager* taskManager = task->taskManager;
85 goto taskManager->decrementTaskCount(odgCommitCUDAWorker6);
86 }
87
88 __code odgCommitCUDAWorker2(struct CUDAWorker* worker, struct Context* task) {
89 int i = worker->loopCounter;
90 struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
91 goto queue->isEmpty(odgCommitCUDAWorker3, odgCommitCUDAWorker5);
92 }
93
94 __code odgCommitCUDAWorker3(struct CUDAWorker* worker, struct Context* task) {
95 int i = worker->loopCounter;
96 struct Queue* queue = GET_WAIT_LIST(task->data[task->odg+i]);
97 goto queue->take(odgCommitCUDAWorker4);
98 }
99
100 __code odgCommitCUDAWorker4(struct CUDAWorker* worker, struct Context* task, struct Context* waitTask) {
101 if (__sync_fetch_and_sub(&waitTask->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of waitTask->idgCount point)
102 struct TaskManager* taskManager = waitTask->taskManager;
103 goto taskManager->spawn(waitTask, odgCommitCUDAWorker2);
104 }
105 goto odgCommitCUDAWorker2();
106 }
107
108 __code odgCommitCUDAWorker4_stub(struct Context* context) {
109 CUDAWorker* cudaWorker = (CUDAWorker*)GearImpl(context, Worker, worker);
110 struct Context* task = Gearef(context, Worker)->task;
111 struct Context* waitTask = &Gearef(context, Queue)->data->Context;
112 goto odgCommitCUDAWorker4(context,
113 cudaWorker,
114 task,
115 waitTask);
116 }
117
118 __code odgCommitCUDAWorker5(struct CUDAWorker* worker, struct Context* task) {
119 worker->loopCounter++;
120 goto odgCommitCUDAWorker1();
121 }
122
123 __code odgCommitCUDAWorker6(struct CUDAWorker* worker, struct Context* task) {
124 struct Worker* taskWorker = task->worker;
125 goto taskWorker->taskReceive(taskWorker->tasks);
126 }
127
128 __code shutdownCUDAWorker(struct CUDAWorker* worker) {
129 cudaShutdown(worker);
130 goto meta(context, C_exit_code);
131 }