comparison src/parallel_execution/TaskManagerImpl.cbc @ 461:6b71cf5b1c22

Change Interface files from cbc to header
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Wed, 20 Dec 2017 17:54:15 +0900
parents 57c715bd6283
children 8d7e5d48cad3
comparison
equal deleted inserted replaced
459:57c715bd6283 461:6b71cf5b1c22
1 #include "../context.h" 1 #include "../context.h"
2 #include "./TaskManager.h"
3 #include "./Iterator.h"
4 #include "./Queue.h"
5 #include "./Worker.h"
2 6
3 #include <stdio.h> 7 #include <stdio.h>
4 #include <unistd.h> 8 #include <unistd.h>
5 9
6 void createWorkers(struct Context* context, TaskManager* taskManeger, TaskManagerImpl* taskManagerImpl); 10 void createWorkers(struct Context* context, TaskManagerImpl* taskManager);
7 11
8 TaskManager* createTaskManagerImpl(struct Context* context, int numCPU, int numGPU, int numIO) { 12 TaskManager* createTaskManagerImpl(struct Context* context, int numCPU, int numGPU, int numIO) {
9 struct TaskManager* taskManager = new TaskManager(); 13 struct TaskManager* taskManager = new TaskManager();
10 // 0...numIO-1 IOProcessor
11 // numIO...numIO+numGPU-1 GPUProcessor
12 // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor
13 taskManager->io = 0;
14 taskManager->gpu = numIO;
15 taskManager->cpu = numIO+numGPU;
16 taskManager->maxCPU = numIO+numGPU+numCPU;
17 taskManager->spawnTasks = C_spawnTasksTaskManagerImpl; 14 taskManager->spawnTasks = C_spawnTasksTaskManagerImpl;
18 taskManager->spawn = C_spawnTaskManagerImpl; 15 taskManager->spawn = C_spawnTaskManagerImpl;
19 taskManager->shutdown = C_shutdownTaskManagerImpl; 16 taskManager->shutdown = C_shutdownTaskManagerImpl;
20 taskManager->incrementTaskCount = C_incrementTaskCountTaskManagerImpl; 17 taskManager->incrementTaskCount = C_incrementTaskCountTaskManagerImpl;
21 taskManager->decrementTaskCount = C_decrementTaskCountTaskManagerImpl; 18 taskManager->decrementTaskCount = C_decrementTaskCountTaskManagerImpl;
22 taskManager->setWaitTask = C_setWaitTaskTaskManagerImpl; 19 taskManager->setWaitTask = C_setWaitTaskTaskManagerImpl;
23 struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl(); 20 struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl();
21 // 0...numIO-1 IOProcessor
22 // numIO...numIO+numGPU-1 GPUProcessor
23 // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor
24 taskManagerImpl->io = 0;
25 taskManagerImpl->gpu = numIO;
26 taskManagerImpl->cpu = numIO+numGPU;
27 taskManagerImpl->maxCPU = numIO+numGPU+numCPU;
24 taskManagerImpl->taskQueue = createSingleLinkedQueue(context); 28 taskManagerImpl->taskQueue = createSingleLinkedQueue(context);
25 taskManagerImpl->numWorker = taskManager->maxCPU; 29 taskManagerImpl->numWorker = taskManagerImpl->maxCPU;
26 taskManagerImpl->sendGPUWorkerIndex = taskManager->gpu; 30 taskManagerImpl->sendGPUWorkerIndex = taskManagerImpl->gpu;
27 taskManagerImpl->sendCPUWorkerIndex = taskManager->cpu; 31 taskManagerImpl->sendCPUWorkerIndex = taskManagerImpl->cpu;
28 taskManagerImpl->taskCount = 0; 32 taskManagerImpl->taskCount = 0;
29 taskManagerImpl->loopCounter = new LoopCounter(); 33 taskManagerImpl->loopCounter = new LoopCounter();
30 taskManagerImpl->loopCounter -> i = 0; 34 taskManagerImpl->loopCounter -> i = 0;
31 createWorkers(context, taskManager, taskManagerImpl); 35 createWorkers(context, taskManagerImpl);
32 taskManager->taskManager = (union Data*)taskManagerImpl; 36 taskManager->taskManager = (union Data*)taskManagerImpl;
33 return taskManager; 37 return taskManager;
34 } 38 }
35 39
36 void createWorkers(struct Context* context, TaskManager* taskManager, TaskManagerImpl* taskManagerImpl) { 40 void createWorkers(struct Context* context, TaskManagerImpl* taskManager) {
37 int i = 0; 41 int i = 0;
38 taskManagerImpl->workers = (Worker**)ALLOCATE_PTR_ARRAY(context, Worker, taskManager->maxCPU); 42 taskManager->workers = (Worker**)ALLOCATE_PTR_ARRAY(context, Worker, taskManager->maxCPU);
39 for (;i<taskManager->gpu;i++) { 43 for (;i<taskManager->gpu;i++) {
40 Queue* queue = createSynchronizedQueue(context); 44 Queue* queue = createSynchronizedQueue(context);
41 taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); 45 taskManager->workers[i] = (Worker*)createCPUWorker(context, i, queue);
42 } 46 }
43 for (;i<taskManager->cpu;i++) { 47 for (;i<taskManager->cpu;i++) {
44 Queue* queue = createSynchronizedQueue(context); 48 Queue* queue = createSynchronizedQueue(context);
45 #ifdef USE_CUDAWorker 49 #ifdef USE_CUDAWorker
46 taskManagerImpl->workers[i] = (Worker*)createCUDAWorker(context, i, queue,0); 50 taskManager->workers[i] = (Worker*)createCUDAWorker(context, i, queue,0);
47 #else 51 #else
48 taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); 52 taskManager->workers[i] = (Worker*)createCPUWorker(context, i, queue);
49 #endif 53 #endif
50 } 54 }
51 for (;i<taskManager->maxCPU;i++) { 55 for (;i<taskManager->maxCPU;i++) {
52 Queue* queue = createSynchronizedQueue(context); 56 Queue* queue = createSynchronizedQueue(context);
53 taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); 57 taskManager->workers[i] = (Worker*)createCPUWorker(context, i, queue);
54 } 58 }
55 } 59 }
56 60
57 __code spawnTasksTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Queue* tasks, __code next1(...)) { 61 __code spawnTasksTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Queue* tasks, __code next1(...)) {
58 goto tasks->isEmpty(spawnTasksTaskManagerImpl1, spawnTasksTaskManagerImpl3); 62 goto tasks->isEmpty(spawnTasksTaskManagerImpl1, spawnTasksTaskManagerImpl3);
72 __code spawnTasksTaskManagerImpl1_stub(struct Context* context) { 76 __code spawnTasksTaskManagerImpl1_stub(struct Context* context) {
73 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); 77 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
74 Queue* tasks = Gearef(context, TaskManager)->tasks; 78 Queue* tasks = Gearef(context, TaskManager)->tasks;
75 enum Code next1 = Gearef(context, TaskManager)->next1; 79 enum Code next1 = Gearef(context, TaskManager)->next1;
76 goto spawnTasksTaskManagerImpl1(context, taskManager, tasks, next1); 80 goto spawnTasksTaskManagerImpl1(context, taskManager, tasks, next1);
77 } 81 }
78 82
79 __code spawnTasksTaskManagerImpl2(struct TaskManagerImpl* taskManagerImpl, struct Context* task, struct TaskManager* taskManager) { 83 __code spawnTasksTaskManagerImpl2(struct TaskManagerImpl* taskManagerImpl, struct Context* task, struct TaskManager* taskManager) {
80 task->taskManager = &taskManager->taskManager->TaskManager; 84 task->taskManager = taskManager;
81 taskManager->task = task; 85 goto taskManager->setWaitTask(task, spawnTasksTaskManagerImpl);
82 taskManager->next = C_spawnTasksTaskManagerImpl;
83 goto meta(context, C_setWaitTaskTaskManagerImpl);
84 } 86 }
85 87
86 __code spawnTasksTaskManagerImpl2_stub(struct Context* context) { 88 __code spawnTasksTaskManagerImpl2_stub(struct Context* context) {
87 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); 89 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
88 Context* task = (struct Context*)Gearef(context, Queue)->data; 90 Context* task = (struct Context*)Gearef(context, Queue)->data;
89 TaskManager* taskManager = Gearef(context, TaskManager); 91 TaskManager* taskManager = &Gearef(context, TaskManager)->taskManager->TaskManager;
90 goto spawnTasksTaskManagerImpl2(context, taskManagerImpl, task, taskManager); 92 goto spawnTasksTaskManagerImpl2(context, taskManagerImpl, task, taskManager);
91 } 93 }
92 94
93 __code spawnTasksTaskManagerImpl3(struct TaskManagerImpl* taskManager, __code next1(...)) { 95 __code spawnTasksTaskManagerImpl3(struct TaskManagerImpl* taskManager, __code next1(...)) {
94 struct Queue* queue = taskManager->taskQueue; 96 struct Queue* queue = taskManager->taskQueue;
109 __code spawnTasksTaskManagerImpl5_stub(struct Context* context) { 111 __code spawnTasksTaskManagerImpl5_stub(struct Context* context) {
110 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); 112 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
111 Context* task = (struct Context*)Gearef(context, Queue)->data; 113 Context* task = (struct Context*)Gearef(context, Queue)->data;
112 TaskManager* taskManager = Gearef(context, TaskManager); 114 TaskManager* taskManager = Gearef(context, TaskManager);
113 goto spawnTasksTaskManagerImpl5(context, taskManagerImpl, task, taskManager); 115 goto spawnTasksTaskManagerImpl5(context, taskManagerImpl, task, taskManager);
114 } 116 }
115 117
116 __code setWaitTaskTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { 118 __code setWaitTaskTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) {
117 int i = taskManager->loopCounter->i; 119 int i = taskManager->loopCounter->i;
118 if(task->idg+i < task->maxIdg) { 120 if(task->idg+i < task->maxIdg) {
119 struct Queue* queue = GET_WAIT_LIST(task->data[task->idg + i]); 121 struct Queue* queue = GET_WAIT_LIST(task->data[task->idg + i]);
127 129
128 __code setWaitTaskTaskManagerImpl_stub(struct Context* context) { 130 __code setWaitTaskTaskManagerImpl_stub(struct Context* context) {
129 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); 131 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
130 struct Context* task = Gearef(context, TaskManager)->task; 132 struct Context* task = Gearef(context, TaskManager)->task;
131 goto setWaitTaskTaskManagerImpl(context, 133 goto setWaitTaskTaskManagerImpl(context,
132 taskManager, 134 taskManager,
133 task, 135 task,
134 Gearef(context, TaskManager)->next); 136 Gearef(context, TaskManager)->next);
135 } 137 }
136 138
137 __code incrementTaskCountTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) { 139 __code incrementTaskCountTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) {
138 __sync_fetch_and_add(&taskManager->taskCount, 1); 140 __sync_fetch_and_add(&taskManager->taskCount, 1);
139 goto next(...); 141 goto next(...);
142 __code decrementTaskCountTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) { 144 __code decrementTaskCountTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) {
143 __sync_fetch_and_sub(&taskManager->taskCount, 1); 145 __sync_fetch_and_sub(&taskManager->taskCount, 1);
144 goto next(...); 146 goto next(...);
145 } 147 }
146 148
147 __code spawnTaskManagerImpl(struct TaskManagerImpl* taskManagerImpl, struct Iterator* iterator, struct TaskManager* taskManager, struct Context* task, __code next(...)) { 149 __code spawnTaskManagerImpl(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) {
148 if (task->idgCount == 0) { 150 if (task->idgCount == 0) {
151 // iterator task is normal task until spawned
149 if(task->iterator != NULL && task->iterate == 0) { 152 if(task->iterator != NULL && task->iterate == 0) {
150 iterator->iterator = (union Data*)task->iterator; 153 pthread_mutex_unlock(&taskManager->mutex);
151 iterator->task = task; 154 struct Iterator* iterator = task->iterator;
152 iterator->next = next; 155 goto iterator->exec(task, taskManager->cpu - taskManager->gpu, next(...));
153 iterator->numGPU = taskManager->cpu - taskManager->gpu;
154 pthread_mutex_unlock(&taskManagerImpl->mutex);
155 goto meta(context, task->iterator->exec);
156 } 156 }
157 goto meta(context, C_taskSend); 157 goto meta(context, C_taskSend);
158 } 158 }
159 pthread_mutex_unlock(&taskManagerImpl->mutex); 159 pthread_mutex_unlock(&taskManager->mutex);
160 goto next(...); 160 goto next(...);
161 } 161 }
162 162
163 __code spawnTaskManagerImpl_stub(struct Context* context) { 163 __code taskSend(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) {
164 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
165 pthread_mutex_lock(&taskManager->mutex);
166 goto spawnTaskManagerImpl(context,
167 taskManager,
168 Gearef(context, Iterator),
169 &Gearef(context, TaskManager)->taskManager->TaskManager,
170 Gearef(context, TaskManager)->task,
171 Gearef(context, TaskManager)->next);
172 }
173
174 __code taskSend(struct TaskManagerImpl* taskManagerImpl, struct Queue* queue, struct TaskManager* taskManager, struct Context* task, __code next(...)) {
175 // set workerId 164 // set workerId
176 if (task->gpu) { 165 if (task->gpu) {
177 task->workerId = taskManagerImpl->sendGPUWorkerIndex; 166 task->workerId = taskManager->sendGPUWorkerIndex;
178 if(++taskManagerImpl->sendGPUWorkerIndex >= taskManager->cpu) { 167 if(++taskManager->sendGPUWorkerIndex >= taskManager->cpu) {
179 taskManagerImpl->sendGPUWorkerIndex = taskManager->gpu; 168 taskManager->sendGPUWorkerIndex = taskManager->gpu;
180 } 169 }
181 } else { 170 } else {
182 task->workerId = taskManagerImpl->sendCPUWorkerIndex; 171 task->workerId = taskManager->sendCPUWorkerIndex;
183 if(++taskManagerImpl->sendCPUWorkerIndex >= taskManager->maxCPU) { 172 if(++taskManager->sendCPUWorkerIndex >= taskManager->maxCPU) {
184 taskManagerImpl->sendCPUWorkerIndex = taskManager->cpu; 173 taskManager->sendCPUWorkerIndex = taskManager->cpu;
185 } 174 }
186 } 175 }
187 struct Queue* tasks = taskManagerImpl->workers[task->workerId]->tasks; 176 pthread_mutex_unlock(&taskManager->mutex);
188 queue->queue = (union Data*)tasks; 177 struct Queue* queue = taskManager->workers[task->workerId]->tasks;
189 queue->data = (union Data*)task; 178 goto queue->put(task, next(...));
190 queue->next = next;
191 pthread_mutex_unlock(&taskManagerImpl->mutex);
192 goto meta(context, tasks->put);
193 } 179 }
194 180
195 __code taskSend_stub(struct Context* context) { 181 __code taskSend_stub(struct Context* context) {
196 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); 182 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
197 goto taskSend(context, 183 goto taskSend(context,
198 taskManager, 184 taskManager,
199 Gearef(context, Queue), 185 Gearef(context, TaskManager)->task,
200 &Gearef(context, TaskManager)->taskManager->TaskManager, 186 Gearef(context, TaskManager)->next);
201 Gearef(context, TaskManager)->task, 187 }
202 Gearef(context, TaskManager)->next); 188
203 } 189 __code shutdownTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...)) {
204
205 __code shutdownTaskManagerImpl(struct TaskManagerImpl* taskManager, __code next(...), struct Queue* queue) {
206 if (taskManager->taskCount != 0) { 190 if (taskManager->taskCount != 0) {
207 usleep(1000); 191 usleep(1000);
208 goto meta(context, C_shutdownTaskManagerImpl); 192 goto meta(context, C_shutdownTaskManagerImpl);
209 } 193 }
210 int i = taskManager->loopCounter->i; 194 int i = taskManager->loopCounter->i;
211 if (i < taskManager->numWorker) { 195 if (i < taskManager->numWorker) {
212 struct Queue* tasks = taskManager->workers[i]->tasks; 196 struct Queue* tasks = taskManager->workers[i]->tasks;
213 queue->queue = (union Data*)tasks; 197 goto tasks->put(NULL, shutdownTaskManagerImpl1);
214 queue->data = NULL;
215 queue->next = C_shutdownTaskManagerImpl1;
216 goto meta(context, tasks->put);
217 } 198 }
218 199
219 taskManager->loopCounter->i = 0; 200 taskManager->loopCounter->i = 0;
220 goto meta(context, next); 201 goto meta(context, next);
221 } 202 }
222 203
223 __code shutdownTaskManagerImpl_stub(struct Context* context) { 204 __code shutdownTaskManagerImpl_stub(struct Context* context) {
224 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); 205 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
225 goto shutdownTaskManagerImpl(context, 206 goto shutdownTaskManagerImpl(context,
226 taskManagerImpl, 207 taskManagerImpl,
227 Gearef(context, TaskManager)->next, 208 Gearef(context, TaskManager)->next);
228 Gearef(context, Queue));
229 } 209 }
230 210
231 __code shutdownTaskManagerImpl1(TaskManagerImpl* taskManager) { 211 __code shutdownTaskManagerImpl1(TaskManagerImpl* taskManager) {
232 int i = taskManager->loopCounter->i; 212 int i = taskManager->loopCounter->i;
233 pthread_join(taskManager->workers[i]->thread, NULL); 213 pthread_join(taskManager->workers[i]->thread, NULL);