Mercurial > hg > Members > Moririn
annotate src/parallel_execution/TaskManagerImpl.cbc @ 283:2b41bd298fe8
add openCL test files
author | mir3636 |
---|---|
date | Sun, 05 Feb 2017 18:30:30 +0900 |
parents | a3448b0f0a56 |
children | f1b0cc555b6e |
rev | line source |
---|---|
269 | 1 #include "../context.h" |
278 | 2 |
269 | 3 #include <stdio.h> |
4 | |
5 void createWorkers(struct Context* context, TaskManager* taskManeger, TaskManagerImpl* taskManagerImpl); | |
6 | |
280 | 7 TaskManager* createTaskManagerImpl(struct Context* context, int numCPU, int numGPU, int numIO) { |
269 | 8 struct TaskManager* taskManager = new TaskManager(); |
9 // 0...numIO-1 IOProcessor | |
10 // numIO...numIO+numGPU-1 GPUProcessor | |
11 // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor | |
12 taskManager->io = 0; | |
13 taskManager->gpu = numIO; | |
14 taskManager->cpu = numIO+numGPU; | |
15 taskManager->maxCPU = numIO+numGPU+numCPU; | |
16 taskManager->createTask = C_createTask; | |
17 taskManager->spawn = C_spawnTaskManager; | |
18 taskManager->shutdown = C_shutdownTaskManager; | |
19 struct TaskManagerImpl* taskManagerImpl = new TaskManagerImpl(); | |
20 taskManager->taskManager = (union Data*)taskManagerImpl; | |
280 | 21 taskManagerImpl -> activeQueue = createSingleLinkedQueue(context); |
22 taskManagerImpl -> taskQueue = createSingleLinkedQueue(context); | |
269 | 23 taskManagerImpl -> numWorker = taskManager->maxCPU; |
24 createWorkers(context, taskManager, taskManagerImpl); | |
25 return taskManager; | |
26 } | |
27 | |
28 void createWorkers(struct Context* context, TaskManager* taskManager, TaskManagerImpl* taskManagerImpl) { | |
29 int i = 0; | |
30 taskManagerImpl->workers = (Worker**)ALLOC_ARRAY(context, Worker, taskManager->maxCPU); | |
31 for (;i<taskManager->gpu;i++) { | |
280 | 32 Queue* queue = createSynchronizedQueue(context); |
269 | 33 taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); |
34 } | |
35 for (;i<taskManager->cpu;i++) { | |
36 #ifdef USE_CUDA | |
37 #else | |
280 | 38 Queue* queue = createSynchronizedQueue(context); |
269 | 39 taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); |
40 #endif | |
41 } | |
42 for (;i<taskManager->maxCPU;i++) { | |
280 | 43 Queue* queue = createSynchronizedQueue(context); |
269 | 44 taskManagerImpl->workers[i] = (Worker*)createCPUWorker(context, i, queue); |
45 } | |
46 } | |
47 | |
48 __code createTask(struct TaskManager* taskManager) { | |
49 taskManager->context = NEW(struct Context); | |
50 initContext(taskManager->context); | |
280 | 51 goto meta(context, C_setWorker); |
269 | 52 } |
53 | |
54 __code setWorker(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { | |
55 task->workerId = taskManager->sendWorkerIndex; | |
56 if(++taskManager->sendWorkerIndex >= taskManager->numWorker) { | |
57 taskManager->sendWorkerIndex = 0; | |
58 } | |
59 goto next(...); | |
60 } | |
61 | |
280 | 62 __code setWorker_stub(struct Context* context) { |
63 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | |
64 goto setWorker(context, taskManager, Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); | |
65 } | |
66 | |
269 | 67 __code spawnTaskManager(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) { |
68 if (task->idgCount == 0) { | |
69 // enqueue activeQueue | |
70 queue->queue = (union Data*)taskManager->activeQueue; | |
71 } else { | |
72 // enqueue waitQueue | |
73 queue->queue = (union Data*)taskManager->taskQueue; | |
74 } | |
75 queue->data = (union Data*)task; | |
76 queue->next = C_spawnTaskManager1; | |
77 goto meta(context, queue->queue->Queue.put); | |
78 } | |
79 | |
80 __code spawnTaskManager_stub(struct Context* context) { | |
81 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | |
82 pthread_mutex_lock(&taskManager->mutex); | |
83 goto spawnTaskManager(context, | |
84 taskManager, | |
85 Gearef(context, Queue), | |
86 Gearef(context, TaskManager)->context, | |
87 Gearef(context, TaskManager)->next); | |
88 } | |
89 | |
90 | |
91 __code spawnTaskManager1(struct TaskManagerImpl* taskManager) { | |
92 pthread_mutex_unlock(&taskManager->mutex); | |
280 | 93 goto meta(context, C_taskSend); |
269 | 94 } |
95 | |
96 __code spawnTaskManager1_stub(struct Context* context) { | |
97 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | |
98 goto spawnTaskManager1(context, | |
99 taskManager); | |
100 } | |
101 | |
102 __code taskSend(struct TaskManagerImpl* taskManager, struct Queue* queue) { | |
103 queue->queue = (union Data*)taskManager->activeQueue; | |
104 queue->next = C_taskSend1; | |
105 goto meta(context, taskManager->activeQueue->take); | |
106 } | |
107 | |
108 __code taskSend1(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) { | |
109 struct Queue* tasks = taskManager->workers[task->workerId]->tasks; | |
110 queue->queue = (union Data*)tasks; | |
111 queue->data = (union Data*)task; | |
112 queue->next = next; | |
113 goto meta(context, tasks->put); | |
114 } | |
115 | |
116 __code taskSend1_stub(struct Context* context) { | |
117 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | |
118 goto taskSend1(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); | |
119 } | |
120 | |
121 __code shutdownTaskManager(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct TaskManagerImpl* taskManagerImpl, struct Queue* queue, __code next(...)) { | |
122 int i = loopCounter->i; | |
123 if (taskManager->cpu <= i && i < taskManager->maxCPU) { | |
124 struct Queue* tasks = taskManagerImpl->workers[i]->tasks; | |
125 queue->queue = (union Data*)tasks; | |
126 queue->data = NULL; | |
282
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
127 queue->next = C_shutdownTaskManager1; |
269 | 128 goto meta(context, tasks->put); |
129 } | |
130 | |
131 loopCounter->i = 0; | |
282
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
132 goto meta(context, next); |
269 | 133 } |
134 | |
135 __code shutdownTaskManager_stub(struct Context* context) { | |
136 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); | |
282
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
137 goto shutdownTaskManager(context, Gearef(context, LoopCounter), &Gearef(context, TaskManager)->taskManager->TaskManager, taskManagerImpl, Gearef(context, Queue), Gearef(context, TaskManager)->next); |
269 | 138 } |
282
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
139 |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
140 __code shutdownTaskManager1(struct LoopCounter* loopCounter, TaskManagerImpl* taskManagerImpl) { |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
141 int i = loopCounter->i; |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
142 pthread_join(taskManagerImpl->workers[i]->worker->CPUWorker.thread, NULL); |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
143 loopCounter->i++; |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
144 goto meta(context, C_shutdownTaskManager); |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
145 } |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
146 |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
147 __code shutdownTaskManager1_stub(struct Context* context) { |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
148 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
149 goto shutdownTaskManager1(context, Gearef(context, LoopCounter), taskManagerImpl); |
a3448b0f0a56
Add input data gear
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents:
280
diff
changeset
|
150 } |