Mercurial > hg > GearsTemplate
annotate src/parallel_execution/TaskManagerImpl.cbc @ 312:7dd5a7d52a67
USE_CUDAWorker flag only for CUDAtwice
author | Shinji KONO <kono@ie.u-ryukyu.ac.jp> |
---|---|
date | Wed, 15 Feb 2017 11:04:30 +0900 |
parents | f1b0cc555b6e |
children | 54d203daf06b |
rev | line source |
---|---|
269 | 1 #include "../context.h" |
278 | 2 |
269 | 3 #include <stdio.h> |
4 | |
5 void createWorkers(struct Context* context, TaskManager* taskManeger, TaskManagerImpl* taskManagerImpl); | |
6 | |
280 | 7 TaskManager* createTaskManagerImpl(struct Context* context, int numCPU, int numGPU, int numIO) { |
269 | 8 struct TaskManager* taskManager = new TaskManager(); |
9 // 0...numIO-1 IOProcessor | |
10 // numIO...numIO+numGPU-1 GPUProcessor | |
11 // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor | |
12 taskManager->io = 0; | |
13 taskManager->gpu = numIO; | |
14 taskManager->cpu = numIO+numGPU; | |
15 taskManager->maxCPU = numIO+numGPU+numCPU; | |
16 taskManager->createTask = C_createTask; | |
17 taskManager->spawn = C_spawnTaskManager; | |
18 taskManager->shutdown = C_shutdownTaskManager; | |
19 struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl(); | |
20 taskManager->taskManager = (union Data*)taskManagerImpl; | |
280 | 21 taskManagerImpl -> activeQueue = createSingleLinkedQueue(context); |
22 taskManagerImpl -> taskQueue = createSingleLinkedQueue(context); | |
269 | 23 taskManagerImpl -> numWorker = taskManager->maxCPU; |
24 createWorkers(context, taskManager, taskManagerImpl); | |
25 return taskManager; | |
26 } | |
27 | |
28 void createWorkers(struct Context* context, TaskManager* taskManager, TaskManagerImpl* taskManagerImpl) { | |
29 int i = 0; | |
30 taskManagerImpl->workers = (Worker**)ALLOC_ARRAY(context, Worker, taskManager->maxCPU); | |
31 for (;i<taskManager->gpu;i++) { | |
280 | 32 Queue* queue = createSynchronizedQueue(context); |
269 | 33 taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); |
34 } | |
35 for (;i<taskManager->cpu;i++) { | |
312
7dd5a7d52a67
USE_CUDAWorker flag only for CUDAtwice
Shinji KONO <kono@ie.u-ryukyu.ac.jp>
parents:
288
diff
changeset
|
36 #ifdef USE_CUDAWorker |
7dd5a7d52a67
USE_CUDAWorker flag only for CUDAtwice
Shinji KONO <kono@ie.u-ryukyu.ac.jp>
parents:
288
diff
changeset
|
37 Queue* queue = createSynchronizedQueue(context); |
7dd5a7d52a67
USE_CUDAWorker flag only for CUDAtwice
Shinji KONO <kono@ie.u-ryukyu.ac.jp>
parents:
288
diff
changeset
|
38 taskManagerImpl->workers[i] = (Worker*)createCUDAWorker(context, i, queue); |
269 | 39 #else |
280 | 40 Queue* queue = createSynchronizedQueue(context); |
269 | 41 taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); |
42 #endif | |
43 } | |
44 for (;i<taskManager->maxCPU;i++) { | |
280 | 45 Queue* queue = createSynchronizedQueue(context); |
269 | 46 taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); |
47 } | |
48 } | |
49 | |
50 __code createTask(struct TaskManager* taskManager) { | |
51 taskManager->context = NEW(struct Context); | |
52 initContext(taskManager->context); | |
288 | 53 taskManager->context->taskManager = taskManager; |
280 | 54 goto meta(context, C_setWorker); |
269 | 55 } |
56 | |
57 __code setWorker(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { | |
58 task->workerId = taskManager->sendWorkerIndex; | |
59 if(++taskManager->sendWorkerIndex >= taskManager->numWorker) { | |
60 taskManager->sendWorkerIndex = 0; | |
61 } | |
62 goto next(...); | |
63 } | |
64 | |
280 | 65 __code setWorker_stub(struct Context* context) { |
66 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | |
67 goto setWorker(context, taskManager, Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); | |
68 } | |
69 | |
269 | 70 __code spawnTaskManager(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) { |
71 if (task->idgCount == 0) { | |
72 // enqueue activeQueue | |
73 queue->queue = (union Data*)taskManager->activeQueue; | |
74 } else { | |
75 // enqueue waitQueue | |
76 queue->queue = (union Data*)taskManager->taskQueue; | |
77 } | |
78 queue->data = (union Data*)task; | |
79 queue->next = C_spawnTaskManager1; | |
80 goto meta(context, queue->queue->Queue.put); | |
81 } | |
82 | |
83 __code spawnTaskManager_stub(struct Context* context) { | |
84 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | |
85 pthread_mutex_lock(&taskManager->mutex); | |
86 goto spawnTaskManager(context, | |
87 taskManager, | |
88 Gearef(context, Queue), | |
89 Gearef(context, TaskManager)->context, | |
90 Gearef(context, TaskManager)->next); | |
91 } | |
92 | |
93 | |
94 __code spawnTaskManager1(struct TaskManagerImpl* taskManager) { | |
95 pthread_mutex_unlock(&taskManager->mutex); | |
280 | 96 goto meta(context, C_taskSend); |
269 | 97 } |
98 | |
99 __code spawnTaskManager1_stub(struct Context* context) { | |
100 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | |
101 goto spawnTaskManager1(context, | |
102 taskManager); | |
103 } | |
104 | |
105 __code taskSend(struct TaskManagerImpl* taskManager, struct Queue* queue) { | |
106 queue->queue = (union Data*)taskManager->activeQueue; | |
107 queue->next = C_taskSend1; | |
108 goto meta(context, taskManager->activeQueue->take); | |
109 } | |
110 | |
111 __code taskSend1(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) { | |
112 struct Queue* tasks = taskManager->workers[task->workerId]->tasks; | |
113 queue->queue = (union Data*)tasks; | |
114 queue->data = (union Data*)task; | |
115 queue->next = next; | |
116 goto meta(context, tasks->put); | |
117 } | |
118 | |
119 __code taskSend1_stub(struct Context* context) { | |
120 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | |
121 goto taskSend1(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); | |
122 } | |
123 | |
124 __code shutdownTaskManager(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct TaskManagerImpl* taskManagerImpl, struct Queue* queue, __code next(...)) { | |
125 int i = loopCounter->i; | |
126 if (taskManager->cpu <= i && i < taskManager->maxCPU) { | |
127 struct Queue* tasks = taskManagerImpl->workers[i]->tasks; | |
128 queue->queue = (union Data*)tasks; | |
129 queue->data = NULL; | |
282
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
130 queue->next = C_shutdownTaskManager1; |
269 | 131 goto meta(context, tasks->put); |
132 } | |
133 | |
134 loopCounter->i = 0; | |
282
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
135 goto meta(context, next); |
269 | 136 } |
137 | |
138 __code shutdownTaskManager_stub(struct Context* context) { | |
139 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | |
282
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
140 goto shutdownTaskManager(context, Gearef(context, LoopCounter), &Gearef(context, TaskManager)->taskManager->TaskManager, taskManagerImpl, Gearef(context, Queue), Gearef(context, TaskManager)->next); |
269 | 141 } |
282
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
142 |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
143 __code shutdownTaskManager1(struct LoopCounter* loopCounter, TaskManagerImpl* taskManagerImpl) { |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
144 int i = loopCounter->i; |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
145 pthread_join(taskManagerImpl->workers[i]->worker->CPUWorker.thread, NULL); |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
146 loopCounter->i++; |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
147 goto meta(context, C_shutdownTaskManager); |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
148 } |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
149 |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
150 __code shutdownTaskManager1_stub(struct Context* context) { |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
151 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
152 goto shutdownTaskManager1(context, Gearef(context, LoopCounter), taskManagerImpl); |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
153 } |