Mercurial > hg > GearsTemplate
comparison src/parallel_execution/TaskManagerImpl.cbc @ 461:6b71cf5b1c22
Change Interface files from cbc to header
author | Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Wed, 20 Dec 2017 17:54:15 +0900 |
parents | 57c715bd6283 |
children | 8d7e5d48cad3 |
comparison
equal
deleted
inserted
replaced
459:57c715bd6283 | 461:6b71cf5b1c22 |
---|---|
1 #include "../context.h" | 1 #include "../context.h" |
2 #include "./TaskManager.h" | |
3 #include "./Iterator.h" | |
4 #include "./Queue.h" | |
5 #include "./Worker.h" | |
2 | 6 |
3 #include <stdio.h> | 7 #include <stdio.h> |
4 #include <unistd.h> | 8 #include <unistd.h> |
5 | 9 |
6 void createWorkers(struct Context* context, TaskManager* taskManeger, TaskManagerImpl* taskManagerImpl); | 10 void createWorkers(struct Context* context, TaskManagerImpl* taskManager); |
7 | 11 |
8 TaskManager* createTaskManagerImpl(struct Context* context, int numCPU, int numGPU, int numIO) { | 12 TaskManager* createTaskManagerImpl(struct Context* context, int numCPU, int numGPU, int numIO) { |
9 struct TaskManager* taskManager = new TaskManager(); | 13 struct TaskManager* taskManager = new TaskManager(); |
10 // 0...numIO-1 IOProcessor | |
11 // numIO...numIO+numGPU-1 GPUProcessor | |
12 // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor | |
13 taskManager->io = 0; | |
14 taskManager->gpu = numIO; | |
15 taskManager->cpu = numIO+numGPU; | |
16 taskManager->maxCPU = numIO+numGPU+numCPU; | |
17 taskManager->spawnTasks = C_spawnTasksTaskManagerImpl; | 14 taskManager->spawnTasks = C_spawnTasksTaskManagerImpl; |
18 taskManager->spawn = C_spawnTaskManagerImpl; | 15 taskManager->spawn = C_spawnTaskManagerImpl; |
19 taskManager->shutdown = C_shutdownTaskManagerImpl; | 16 taskManager->shutdown = C_shutdownTaskManagerImpl; |
20 taskManager->incrementTaskCount = C_incrementTaskCountTaskManagerImpl; | 17 taskManager->incrementTaskCount = C_incrementTaskCountTaskManagerImpl; |
21 taskManager->decrementTaskCount = C_decrementTaskCountTaskManagerImpl; | 18 taskManager->decrementTaskCount = C_decrementTaskCountTaskManagerImpl; |
22 taskManager->setWaitTask = C_setWaitTaskTaskManagerImpl; | 19 taskManager->setWaitTask = C_setWaitTaskTaskManagerImpl; |
23 struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl(); | 20 struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl(); |
21 // 0...numIO-1 IOProcessor | |
22 // numIO...numIO+numGPU-1 GPUProcessor | |
23 // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor | |
24 taskManagerImpl->io = 0; | |
25 taskManagerImpl->gpu = numIO; | |
26 taskManagerImpl->cpu = numIO+numGPU; | |
27 taskManagerImpl->maxCPU = numIO+numGPU+numCPU; | |
24 taskManagerImpl->taskQueue = createSingleLinkedQueue(context); | 28 taskManagerImpl->taskQueue = createSingleLinkedQueue(context); |
25 taskManagerImpl->numWorker = taskManager->maxCPU; | 29 taskManagerImpl->numWorker = taskManagerImpl->maxCPU; |
26 taskManagerImpl->sendGPUWorkerIndex = taskManager->gpu; | 30 taskManagerImpl->sendGPUWorkerIndex = taskManagerImpl->gpu; |
27 taskManagerImpl->sendCPUWorkerIndex = taskManager->cpu; | 31 taskManagerImpl->sendCPUWorkerIndex = taskManagerImpl->cpu; |
28 taskManagerImpl->taskCount = 0; | 32 taskManagerImpl->taskCount = 0; |
29 taskManagerImpl->loopCounter = new LoopCounter(); | 33 taskManagerImpl->loopCounter = new LoopCounter(); |
30 taskManagerImpl->loopCounter -> i = 0; | 34 taskManagerImpl->loopCounter -> i = 0; |
31 createWorkers(context, taskManager, taskManagerImpl); | 35 createWorkers(context, taskManagerImpl); |
32 taskManager->taskManager = (union Data*)taskManagerImpl; | 36 taskManager->taskManager = (union Data*)taskManagerImpl; |
33 return taskManager; | 37 return taskManager; |
34 } | 38 } |
35 | 39 |
36 void createWorkers(struct Context* context, TaskManager* taskManager, TaskManagerImpl* taskManagerImpl) { | 40 void createWorkers(struct Context* context, TaskManagerImpl* taskManager) { |
37 int i = 0; | 41 int i = 0; |
38 taskManagerImpl->workers = (Worker**)ALLOCATE_PTR_ARRAY(context, Worker, taskManager->maxCPU); | 42 taskManager->workers = (Worker**)ALLOCATE_PTR_ARRAY(context, Worker, taskManager->maxCPU); |
39 for (;i<taskManager->gpu;i++) { | 43 for (;i<taskManager->gpu;i++) { |
40 Queue* queue = createSynchronizedQueue(context); | 44 Queue* queue = createSynchronizedQueue(context); |
41 taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); | 45 taskManager->workers[i] = (Worker*)createCPUWorker(context, i, queue); |
42 } | 46 } |
43 for (;i<taskManager->cpu;i++) { | 47 for (;i<taskManager->cpu;i++) { |
44 Queue* queue = createSynchronizedQueue(context); | 48 Queue* queue = createSynchronizedQueue(context); |
45 #ifdef USE_CUDAWorker | 49 #ifdef USE_CUDAWorker |
46 taskManagerImpl->workers[i] = (Worker*)createCUDAWorker(context, i, queue,0); | 50 taskManager->workers[i] = (Worker*)createCUDAWorker(context, i, queue,0); |
47 #else | 51 #else |
48 taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); | 52 taskManager->workers[i] = (Worker*)createCPUWorker(context, i, queue); |
49 #endif | 53 #endif |
50 } | 54 } |
51 for (;i<taskManager->maxCPU;i++) { | 55 for (;i<taskManager->maxCPU;i++) { |
52 Queue* queue = createSynchronizedQueue(context); | 56 Queue* queue = createSynchronizedQueue(context); |
53 taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); | 57 taskManager->workers[i] = (Worker*)createCPUWorker(context, i, queue); |
54 } | 58 } |
55 } | 59 } |
56 | 60 |
57 __code spawnTasksTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Queue* tasks, __code next1(...)) { | 61 __code spawnTasksTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Queue* tasks, __code next1(...)) { |
58 goto tasks->isEmpty(spawnTasksTaskManagerImpl1, spawnTasksTaskManagerImpl3); | 62 goto tasks->isEmpty(spawnTasksTaskManagerImpl1, spawnTasksTaskManagerImpl3); |
72 __code spawnTasksTaskManagerImpl1_stub(struct Context* context) { | 76 __code spawnTasksTaskManagerImpl1_stub(struct Context* context) { |
73 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | 77 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); |
74 Queue* tasks = Gearef(context, TaskManager)->tasks; | 78 Queue* tasks = Gearef(context, TaskManager)->tasks; |
75 enum Code next1 = Gearef(context, TaskManager)->next1; | 79 enum Code next1 = Gearef(context, TaskManager)->next1; |
76 goto spawnTasksTaskManagerImpl1(context, taskManager, tasks, next1); | 80 goto spawnTasksTaskManagerImpl1(context, taskManager, tasks, next1); |
77 } | 81 } |
78 | 82 |
79 __code spawnTasksTaskManagerImpl2(struct TaskManagerImpl* taskManagerImpl, struct Context* task, struct TaskManager* taskManager) { | 83 __code spawnTasksTaskManagerImpl2(struct TaskManagerImpl* taskManagerImpl, struct Context* task, struct TaskManager* taskManager) { |
80 task->taskManager = &taskManager->taskManager->TaskManager; | 84 task->taskManager = taskManager; |
81 taskManager->task = task; | 85 goto taskManager->setWaitTask(task, spawnTasksTaskManagerImpl); |
82 taskManager->next = C_spawnTasksTaskManagerImpl; | |
83 goto meta(context, C_setWaitTaskTaskManagerImpl); | |
84 } | 86 } |
85 | 87 |
86 __code spawnTasksTaskManagerImpl2_stub(struct Context* context) { | 88 __code spawnTasksTaskManagerImpl2_stub(struct Context* context) { |
87 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | 89 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); |
88 Context* task = (struct Context*)Gearef(context, Queue)->data; | 90 Context* task = (struct Context*)Gearef(context, Queue)->data; |
89 TaskManager* taskManager = Gearef(context, TaskManager); | 91 TaskManager* taskManager = &Gearef(context, TaskManager)->taskManager->TaskManager; |
90 goto spawnTasksTaskManagerImpl2(context, taskManagerImpl, task, taskManager); | 92 goto spawnTasksTaskManagerImpl2(context, taskManagerImpl, task, taskManager); |
91 } | 93 } |
92 | 94 |
93 __code spawnTasksTaskManagerImpl3(struct TaskManagerImpl* taskManager, __code next1(...)) { | 95 __code spawnTasksTaskManagerImpl3(struct TaskManagerImpl* taskManager, __code next1(...)) { |
94 struct Queue* queue = taskManager->taskQueue; | 96 struct Queue* queue = taskManager->taskQueue; |
109 __code spawnTasksTaskManagerImpl5_stub(struct Context* context) { | 111 __code spawnTasksTaskManagerImpl5_stub(struct Context* context) { |
110 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | 112 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); |
111 Context* task = (struct Context*)Gearef(context, Queue)->data; | 113 Context* task = (struct Context*)Gearef(context, Queue)->data; |
112 TaskManager* taskManager = Gearef(context, TaskManager); | 114 TaskManager* taskManager = Gearef(context, TaskManager); |
113 goto spawnTasksTaskManagerImpl5(context, taskManagerImpl, task, taskManager); | 115 goto spawnTasksTaskManagerImpl5(context, taskManagerImpl, task, taskManager); |
114 } | 116 } |
115 | 117 |
116 __code setWaitTaskTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { | 118 __code setWaitTaskTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { |
117 int i = taskManager->loopCounter->i; | 119 int i = taskManager->loopCounter->i; |
118 if(task->idg+i < task->maxIdg) { | 120 if(task->idg+i < task->maxIdg) { |
119 struct Queue* queue = GET_WAIT_LIST(task->data[task->idg + i]); | 121 struct Queue* queue = GET_WAIT_LIST(task->data[task->idg + i]); |
127 | 129 |
128 __code setWaitTaskTaskManagerImpl_stub(struct Context* context) { | 130 __code setWaitTaskTaskManagerImpl_stub(struct Context* context) { |
129 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | 131 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); |
130 struct Context* task = Gearef(context, TaskManager)->task; | 132 struct Context* task = Gearef(context, TaskManager)->task; |
131 goto setWaitTaskTaskManagerImpl(context, | 133 goto setWaitTaskTaskManagerImpl(context, |
132 taskManager, | 134 taskManager, |
133 task, | 135 task, |
134 Gearef(context, TaskManager)->next); | 136 Gearef(context, TaskManager)->next); |
135 } | 137 } |
136 | 138 |
137 __code incrementTaskCountTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) { | 139 __code incrementTaskCountTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) { |
138 __sync_fetch_and_add(&taskManager->taskCount, 1); | 140 __sync_fetch_and_add(&taskManager->taskCount, 1); |
139 goto next(...); | 141 goto next(...); |
142 __code decrementTaskCountTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) { | 144 __code decrementTaskCountTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) { |
143 __sync_fetch_and_sub(&taskManager->taskCount, 1); | 145 __sync_fetch_and_sub(&taskManager->taskCount, 1); |
144 goto next(...); | 146 goto next(...); |
145 } | 147 } |
146 | 148 |
147 __code spawnTaskManagerImpl(struct TaskManagerImpl* taskManagerImpl, struct Iterator* iterator, struct TaskManager* taskManager, struct Context* task, __code next(...)) { | 149 __code spawnTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { |
148 if (task->idgCount == 0) { | 150 if (task->idgCount == 0) { |
151 // iterator task is normal task until spawned | |
149 if(task->iterator != NULL && task->iterate == 0) { | 152 if(task->iterator != NULL && task->iterate == 0) { |
150 iterator->iterator = (union Data*)task->iterator; | 153 pthread_mutex_unlock(&taskManager->mutex); |
151 iterator->task = task; | 154 struct Iterator* iterator = task->iterator; |
152 iterator->next = next; | 155 goto iterator->exec(task, taskManager->cpu - taskManager->gpu, next(...)); |
153 iterator->numGPU = taskManager->cpu - taskManager->gpu; | |
154 pthread_mutex_unlock(&taskManagerImpl->mutex); | |
155 goto meta(context, task->iterator->exec); | |
156 } | 156 } |
157 goto meta(context, C_taskSend); | 157 goto meta(context, C_taskSend); |
158 } | 158 } |
159 pthread_mutex_unlock(&taskManagerImpl->mutex); | 159 pthread_mutex_unlock(&taskManager->mutex); |
160 goto next(...); | 160 goto next(...); |
161 } | 161 } |
162 | 162 |
163 __code spawnTaskManagerImpl_stub(struct Context* context) { | 163 __code taskSend(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { |
164 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | |
165 pthread_mutex_lock(&taskManager->mutex); | |
166 goto spawnTaskManagerImpl(context, | |
167 taskManager, | |
168 Gearef(context, Iterator), | |
169 &Gearef(context, TaskManager)->taskManager->TaskManager, | |
170 Gearef(context, TaskManager)->task, | |
171 Gearef(context, TaskManager)->next); | |
172 } | |
173 | |
174 __code taskSend(struct TaskManagerImpl* taskManagerImpl, struct Queue* queue, struct TaskManager* taskManager, struct Context* task, __code next(...)) { | |
175 // set workerId | 164 // set workerId |
176 if (task->gpu) { | 165 if (task->gpu) { |
177 task->workerId = taskManagerImpl->sendGPUWorkerIndex; | 166 task->workerId = taskManager->sendGPUWorkerIndex; |
178 if(++taskManagerImpl->sendGPUWorkerIndex >= taskManager->cpu) { | 167 if(++taskManager->sendGPUWorkerIndex >= taskManager->cpu) { |
179 taskManagerImpl->sendGPUWorkerIndex = taskManager->gpu; | 168 taskManager->sendGPUWorkerIndex = taskManager->gpu; |
180 } | 169 } |
181 } else { | 170 } else { |
182 task->workerId = taskManagerImpl->sendCPUWorkerIndex; | 171 task->workerId = taskManager->sendCPUWorkerIndex; |
183 if(++taskManagerImpl->sendCPUWorkerIndex >= taskManager->maxCPU) { | 172 if(++taskManager->sendCPUWorkerIndex >= taskManager->maxCPU) { |
184 taskManagerImpl->sendCPUWorkerIndex = taskManager->cpu; | 173 taskManager->sendCPUWorkerIndex = taskManager->cpu; |
185 } | 174 } |
186 } | 175 } |
187 struct Queue* tasks = taskManagerImpl->workers[task->workerId]->tasks; | 176 pthread_mutex_unlock(&taskManager->mutex); |
188 queue->queue = (union Data*)tasks; | 177 struct Queue* queue = taskManager->workers[task->workerId]->tasks; |
189 queue->data = (union Data*)task; | 178 goto queue->put(task, next(...)); |
190 queue->next = next; | |
191 pthread_mutex_unlock(&taskManagerImpl->mutex); | |
192 goto meta(context, tasks->put); | |
193 } | 179 } |
194 | 180 |
195 __code taskSend_stub(struct Context* context) { | 181 __code taskSend_stub(struct Context* context) { |
196 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | 182 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); |
197 goto taskSend(context, | 183 goto taskSend(context, |
198 taskManager, | 184 taskManager, |
199 Gearef(context, Queue), | 185 Gearef(context, TaskManager)->task, |
200 &Gearef(context, TaskManager)->taskManager->TaskManager, | 186 Gearef(context, TaskManager)->next); |
201 Gearef(context, TaskManager)->task, | 187 } |
202 Gearef(context, TaskManager)->next); | 188 |
203 } | 189 __code shutdownTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) { |
204 | |
205 __code shutdownTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...), struct Queue* queue) { | |
206 if (taskManager->taskCount != 0) { | 190 if (taskManager->taskCount != 0) { |
207 usleep(1000); | 191 usleep(1000); |
208 goto meta(context, C_shutdownTaskManagerImpl); | 192 goto meta(context, C_shutdownTaskManagerImpl); |
209 } | 193 } |
210 int i = taskManager->loopCounter->i; | 194 int i = taskManager->loopCounter->i; |
211 if (i < taskManager->numWorker) { | 195 if (i < taskManager->numWorker) { |
212 struct Queue* tasks = taskManager->workers[i]->tasks; | 196 struct Queue* tasks = taskManager->workers[i]->tasks; |
213 queue->queue = (union Data*)tasks; | 197 goto tasks->put(NULL, shutdownTaskManagerImpl1); |
214 queue->data = NULL; | |
215 queue->next = C_shutdownTaskManagerImpl1; | |
216 goto meta(context, tasks->put); | |
217 } | 198 } |
218 | 199 |
219 taskManager->loopCounter->i = 0; | 200 taskManager->loopCounter->i = 0; |
220 goto meta(context, next); | 201 goto meta(context, next); |
221 } | 202 } |
222 | 203 |
223 __code shutdownTaskManagerImpl_stub(struct Context* context) { | 204 __code shutdownTaskManagerImpl_stub(struct Context* context) { |
224 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | 205 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); |
225 goto shutdownTaskManagerImpl(context, | 206 goto shutdownTaskManagerImpl(context, |
226 taskManagerImpl, | 207 taskManagerImpl, |
227 Gearef(context, TaskManager)->next, | 208 Gearef(context, TaskManager)->next); |
228 Gearef(context, Queue)); | |
229 } | 209 } |
230 | 210 |
231 __code shutdownTaskManagerImpl1(TaskManagerImpl* taskManager) { | 211 __code shutdownTaskManagerImpl1(TaskManagerImpl* taskManager) { |
232 int i = taskManager->loopCounter->i; | 212 int i = taskManager->loopCounter->i; |
233 pthread_join(taskManager->workers[i]->thread, NULL); | 213 pthread_join(taskManager->workers[i]->thread, NULL); |