Mercurial > hg > GearsTemplate
annotate src/parallel_execution/taskManager.c @ 269:5170539348ec
rename TaskManagerImpl.cbc
author | mir3636 |
---|---|
date | Sun, 29 Jan 2017 22:15:32 +0900 |
parents | cd3486e4ba70 |
children |
rev | line source |
---|---|
178 | 1 #include "context.h" |
2 #include "stack.h" | |
182 | 3 #include "queue.h" |
237 | 4 #include "worker.h" |
178 | 5 #include "origin_cs.h" |
6 #include <stdio.h> | |
7 | |
257 | 8 void createWorkers(struct Context* context, TaskManager* taskManeger, TaskManagerImpl* taskManagerImpl); |
237 | 9 |
233 | 10 union Data* createTaskManager(struct Context* context, int numCPU, int numGPU, int numIO) { |
209 | 11 struct TaskManager* taskManager = &ALLOCATE(context, TaskManager)->TaskManager; |
233 | 12 // 0...numIO-1 IOProcessor |
13 // numIO...numIO+numGPU-1 GPUProcessor | |
14 // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor | |
15 taskManager->io = 0; | |
16 taskManager->gpu = numIO; | |
235 | 17 taskManager->cpu = numIO+numGPU; |
236 | 18 taskManager->maxCPU = numIO+numGPU+numCPU; |
234 | 19 taskManager->createTask = C_createTask; |
178 | 20 taskManager->spawn = C_spawnTaskManager; |
21 taskManager->shutdown = C_shutdownTaskManager; | |
242 | 22 struct TaskManagerImpl* taskManagerImpl = &ALLOCATE(context, TaskManagerImpl)->TaskManagerImpl; |
23 taskManager->taskManager = (union Data*)taskManagerImpl; | |
24 taskManagerImpl -> activeQueue = &createSingleLinkedQueue(context)->Queue; | |
25 taskManagerImpl -> taskQueue = &createSingleLinkedQueue(context)->Queue; | |
26 taskManagerImpl -> numWorker = taskManager->maxCPU; | |
247
ce262b2c1daf
Fix createTask for main
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
244
diff
changeset
|
27 createWorkers(context, taskManager, taskManagerImpl); |
178 | 28 return (union Data*)(taskManager); |
29 } | |
30 | |
257 | 31 void createWorkers(struct Context* context, TaskManager* taskManager, TaskManagerImpl* taskManagerImpl) { |
236 | 32 int i = 0; |
244 | 33 taskManagerImpl->workers = (Worker**)ALLOC_ARRAY(context, Worker, taskManager->maxCPU); |
240 | 34 for (;i<taskManager->gpu;i++) { |
236 | 35 Queue* queue = &createSynchronizedQueue(context)->Queue; |
242 | 36 taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); |
236 | 37 } |
240 | 38 for (;i<taskManager->cpu;i++) { |
236 | 39 #ifdef USE_CUDA |
40 #else | |
41 Queue* queue = &createSynchronizedQueue(context)->Queue; | |
242 | 42 taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); |
236 | 43 #endif |
44 } | |
240 | 45 for (;i<taskManager->maxCPU;i++) { |
236 | 46 Queue* queue = &createSynchronizedQueue(context)->Queue; |
242 | 47 taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); |
236 | 48 } |
49 } | |
50 | |
242 | 51 __code createTask(struct Context* context, TaskManager* taskManager) { |
236 | 52 taskManager->context = NEW(struct Context); |
53 initContext(taskManager->context); | |
247
ce262b2c1daf
Fix createTask for main
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
244
diff
changeset
|
54 goto meta(context, C_setWorker); |
242 | 55 } |
56 | |
57 __code createTask_stub(struct Context* context) { | |
247
ce262b2c1daf
Fix createTask for main
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
244
diff
changeset
|
58 goto createTask(context, Gearef(context,TaskManager)); |
242 | 59 } |
60 | |
257 | 61 __code setWorker(struct Context* context, TaskManagerImpl* taskManager, struct Context* task, enum Code next) { |
244 | 62 task->workerId = taskManager->sendWorkerIndex; |
63 if(++taskManager->sendWorkerIndex >= taskManager->numWorker) { | |
64 taskManager->sendWorkerIndex = 0; | |
242 | 65 } |
236 | 66 goto meta(context, next); |
67 } | |
68 | |
247
ce262b2c1daf
Fix createTask for main
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
244
diff
changeset
|
69 __code setWorker_stub(struct Context* context) { |
242 | 70 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); |
247
ce262b2c1daf
Fix createTask for main
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
244
diff
changeset
|
71 goto setWorker(context, taskManager, Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); |
236 | 72 } |
73 | |
74 __code spawnTaskManager(struct Context* context, struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, enum Code next) { | |
75 if (task->idgCount == 0) { | |
178 | 76 // enqueue activeQueue |
218
d8a59b727f65
Fix spawnTaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
209
diff
changeset
|
77 queue->queue = (union Data*)taskManager->activeQueue; |
178 | 78 } else { |
79 // enqueue waitQueue | |
218
d8a59b727f65
Fix spawnTaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
209
diff
changeset
|
80 queue->queue = (union Data*)taskManager->taskQueue; |
178 | 81 } |
218
d8a59b727f65
Fix spawnTaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
209
diff
changeset
|
82 queue->data = (union Data*)task; |
240 | 83 queue->next = C_spawnTaskManager1; |
209 | 84 goto meta(context, queue->queue->Queue.put); |
178 | 85 } |
86 | |
87 __code spawnTaskManager_stub(struct Context* context) { | |
239 | 88 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); |
89 pthread_mutex_lock(&taskManager->mutex); | |
209 | 90 goto spawnTaskManager(context, |
239 | 91 taskManager, |
236 | 92 Gearef(context, Queue), |
93 Gearef(context, TaskManager)->context, | |
240 | 94 Gearef(context, TaskManager)->next); |
95 } | |
96 | |
97 | |
247
ce262b2c1daf
Fix createTask for main
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
244
diff
changeset
|
98 __code spawnTaskManager1(struct Context* context, struct TaskManagerImpl* taskManager) { |
242 | 99 pthread_mutex_unlock(&taskManager->mutex); |
247
ce262b2c1daf
Fix createTask for main
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
244
diff
changeset
|
100 goto meta(context, C_taskSend); |
178 | 101 } |
102 | |
240 | 103 __code spawnTaskManager1_stub(struct Context* context) { |
242 | 104 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); |
244 | 105 goto spawnTaskManager1(context, |
247
ce262b2c1daf
Fix createTask for main
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
244
diff
changeset
|
106 taskManager); |
178 | 107 } |
108 | |
244 | 109 __code taskSend(struct Context* context, TaskManagerImpl* taskManager, Queue* queue) { |
242 | 110 queue->queue = (union Data*)taskManager->activeQueue; |
111 queue->next = C_taskSend1; | |
248
1ede5390cda2
Fix segmentation fault but not multi thread running
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
247
diff
changeset
|
112 goto meta(context, taskManager->activeQueue->take); |
239 | 113 } |
114 | |
115 __code taskSend_stub(struct Context* context) { | |
242 | 116 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); |
244 | 117 goto taskSend(context, taskManager, Gearef(context, Queue)); |
242 | 118 } |
119 | |
244 | 120 __code taskSend1(struct Context* context, TaskManagerImpl* taskManager, Queue* queue, struct Context* task, enum Code next) { |
242 | 121 struct Queue* tasks = taskManager->workers[task->workerId]->tasks; |
244 | 122 queue->queue = (union Data*)tasks; |
123 queue->data = (union Data*)task; | |
242 | 124 queue->next = next; |
244 | 125 pthread_cond_signal(&taskManager->workers[task->workerId]->worker->CPUWorker.cond); |
242 | 126 goto meta(context, tasks->put); |
127 } | |
128 | |
129 __code taskSend1_stub(struct Context* context) { | |
130 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | |
244 | 131 goto taskSend1(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); |
222
77faa28128b4
Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
221
diff
changeset
|
132 } |
231 | 133 |
248
1ede5390cda2
Fix segmentation fault but not multi thread running
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
247
diff
changeset
|
134 __code shutdownTaskManager(struct Context* context, struct LoopCounter* loopCounter, TaskManager* taskManager, TaskManagerImpl* taskManagerImpl, Queue* queue, enum Code next) { |
230 | 135 int i = loopCounter->i; |
244 | 136 if (taskManager->cpu <= i && i < taskManager->maxCPU) { |
248
1ede5390cda2
Fix segmentation fault but not multi thread running
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
247
diff
changeset
|
137 struct Queue* tasks = taskManagerImpl->workers[i]->tasks; |
1ede5390cda2
Fix segmentation fault but not multi thread running
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
247
diff
changeset
|
138 queue->queue = (union Data*)tasks; |
1ede5390cda2
Fix segmentation fault but not multi thread running
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
247
diff
changeset
|
139 queue->data = NULL; |
1ede5390cda2
Fix segmentation fault but not multi thread running
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
247
diff
changeset
|
140 queue->next = next; |
1ede5390cda2
Fix segmentation fault but not multi thread running
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
247
diff
changeset
|
141 goto meta(context, tasks->put); |
244 | 142 pthread_join(taskManagerImpl->workers[i]->worker->CPUWorker.thread, NULL); |
178 | 143 loopCounter->i++; |
231 | 144 goto meta(context, C_shutdownTaskManager); |
178 | 145 } |
146 | |
147 loopCounter->i = 0; | |
223 | 148 goto meta(context, taskManager->next); |
178 | 149 } |
150 | |
182 | 151 __code shutdownTaskManager_stub(struct Context* context) { |
244 | 152 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); |
248
1ede5390cda2
Fix segmentation fault but not multi thread running
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
247
diff
changeset
|
153 goto shutdownTaskManager(context, Gearef(context, LoopCounter), Gearef(context, TaskManager), taskManagerImpl, Gearef(context, Queue), Gearef(context, TaskManager)->next); |
178 | 154 } |