annotate src/parallel_execution/taskManager.c @ 239:cc13a1608364

fix
author mir3636
date Tue, 24 Jan 2017 15:10:23 +0900
parents 6f6cc49213c5
children 9135e22799dd
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
1 #include "context.h"
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
2 #include "stack.h"
182
57a11c15ff4c Add queue_test
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 178
diff changeset
3 #include "queue.h"
237
mir3636
parents: 236
diff changeset
4 #include "worker.h"
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
5 #include "origin_cs.h"
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
6 #include <stdio.h>
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
7
237
mir3636
parents: 236
diff changeset
8 void createWorkers(Context* context, TaskManager * taskManeger);
mir3636
parents: 236
diff changeset
9
233
06133afb3b5b create worker start_code
mir3636
parents: 231
diff changeset
10 union Data* createTaskManager(struct Context* context, int numCPU, int numGPU, int numIO) {
209
5708390a9d88 fix compile errors
mir3636
parents: 184
diff changeset
11 struct TaskManager* taskManager = &ALLOCATE(context, TaskManager)->TaskManager;
5708390a9d88 fix compile errors
mir3636
parents: 184
diff changeset
12 struct TaskManagerImpl* taskManagerImpl = &ALLOCATE(context, TaskManagerImpl)->TaskManagerImpl;
221
2454f4392316 Success create Task and inqueue Task
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 220
diff changeset
13 taskManager->taskManager = (union Data*)taskManagerImpl;
233
06133afb3b5b create worker start_code
mir3636
parents: 231
diff changeset
14 taskManagerImpl -> activeQueue = &createSingleLinkedQueue(context)->Queue;
06133afb3b5b create worker start_code
mir3636
parents: 231
diff changeset
15 taskManagerImpl -> taskQueue = &createSingleLinkedQueue(context)->Queue;
06133afb3b5b create worker start_code
mir3636
parents: 231
diff changeset
16 // 0...numIO-1 IOProcessor
06133afb3b5b create worker start_code
mir3636
parents: 231
diff changeset
17 // numIO...numIO+numGPU-1 GPUProcessor
06133afb3b5b create worker start_code
mir3636
parents: 231
diff changeset
18 // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor
06133afb3b5b create worker start_code
mir3636
parents: 231
diff changeset
19 taskManager->io = 0;
06133afb3b5b create worker start_code
mir3636
parents: 231
diff changeset
20 taskManager->gpu = numIO;
235
05e61405cc88 fix worker compile error
mir3636
parents: 234
diff changeset
21 taskManager->cpu = numIO+numGPU;
236
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
22 taskManager->maxCPU = numIO+numGPU+numCPU;
234
47588c28f189 TaskManager
mir3636
parents: 233
diff changeset
23 taskManager->createTask = C_createTask;
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
24 taskManager->spawn = C_spawnTaskManager;
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
25 taskManager->shutdown = C_shutdownTaskManager;
237
mir3636
parents: 236
diff changeset
26 createWorkers(context, taskManager);
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
27 return (union Data*)(taskManager);
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
28 }
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
29
237
mir3636
parents: 236
diff changeset
30 void createWorkers(Context* context, TaskManager* taskManager) {
236
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
31 int i = 0;
237
mir3636
parents: 236
diff changeset
32 TaskManagerImpl* taskManagerImpl = (TaskManagerImpl*)GearImpl(context,TaskManager,taskManager);
mir3636
parents: 236
diff changeset
33 taskManagerImpl->workers = (Worker*)ALLOC_ARRAY(context,Worker,taskManager->maxCPU);
mir3636
parents: 236
diff changeset
34 for (;i>taskManager->gpu;i++) {
236
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
35 Queue* queue = &createSynchronizedQueue(context)->Queue;
239
mir3636
parents: 237
diff changeset
36 taskManagerImpl->workers[i] = *(Worker*)createCPUWorker(context,i,queue);
236
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
37 }
237
mir3636
parents: 236
diff changeset
38 for (;i>taskManager->cpu;i++) {
236
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
39 #ifdef USE_CUDA
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
40 #else
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
41 Queue* queue = &createSynchronizedQueue(context)->Queue;
239
mir3636
parents: 237
diff changeset
42 taskManagerImpl->workers[i] = *(Worker*)createCPUWorker(context,i,queue);
236
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
43 #endif
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
44 }
237
mir3636
parents: 236
diff changeset
45 for (;i>taskManager->maxCPU;i++) {
236
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
46 Queue* queue = &createSynchronizedQueue(context)->Queue;
239
mir3636
parents: 237
diff changeset
47 taskManagerImpl->workers[i] = *(Worker*)createCPUWorker(context,i,queue);
236
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
48 }
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
49 }
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
50
237
mir3636
parents: 236
diff changeset
51 __code createTask(struct Context* context, TaskManager* taskManager, enum Code next) {
236
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
52 taskManager->context = NEW(struct Context);
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
53 initContext(taskManager->context);
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
54 goto meta(context, next);
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
55 }
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
56
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
57 __code createTask_stub(struct Context* context) {
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
58 goto createTask(context,Gearef(context,TaskManager),Gearef(context,TaskManager)->next);
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
59 }
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
60
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
61 __code spawnTaskManager(struct Context* context, struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, enum Code next) {
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
62 if (task->idgCount == 0) {
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
63 // enqueue activeQueue
218
d8a59b727f65 Fix spawnTaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 209
diff changeset
64 queue->queue = (union Data*)taskManager->activeQueue;
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
65 } else {
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
66 // enqueue waitQueue
218
d8a59b727f65 Fix spawnTaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 209
diff changeset
67 queue->queue = (union Data*)taskManager->taskQueue;
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
68 }
218
d8a59b727f65 Fix spawnTaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 209
diff changeset
69 queue->data = (union Data*)task;
d8a59b727f65 Fix spawnTaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 209
diff changeset
70 queue->next = next;
239
mir3636
parents: 237
diff changeset
71 pthread_mutex_unlock(&taskManager->mutex);
209
5708390a9d88 fix compile errors
mir3636
parents: 184
diff changeset
72 goto meta(context, queue->queue->Queue.put);
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
73 }
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
74
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
75 __code spawnTaskManager_stub(struct Context* context) {
239
mir3636
parents: 237
diff changeset
76 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
mir3636
parents: 237
diff changeset
77 pthread_mutex_lock(&taskManager->mutex);
209
5708390a9d88 fix compile errors
mir3636
parents: 184
diff changeset
78 goto spawnTaskManager(context,
239
mir3636
parents: 237
diff changeset
79 taskManager,
236
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
80 Gearef(context, Queue),
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
81 Gearef(context, TaskManager)->context,
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
82 Gearef(context, TaskManager)->next
218
d8a59b727f65 Fix spawnTaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 209
diff changeset
83 );
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
84 }
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
85
239
mir3636
parents: 237
diff changeset
86 __code taskSend(struct Context* context, TaskManagerImpl* taskManager) {
222
77faa28128b4 Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 221
diff changeset
87 if(loopCounter->i < taskManager->numWorker) {
77faa28128b4 Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 221
diff changeset
88 taskManager->workers[i]->taskSend;
77faa28128b4 Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 221
diff changeset
89 loopCounter->i++;
77faa28128b4 Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 221
diff changeset
90 goto meta(context, C_taskSend);
77faa28128b4 Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 221
diff changeset
91 }
239
mir3636
parents: 237
diff changeset
92 goto meta(context, taskManager->next);
mir3636
parents: 237
diff changeset
93 }
mir3636
parents: 237
diff changeset
94
mir3636
parents: 237
diff changeset
95 __code taskSend_stub(struct Context* context) {
mir3636
parents: 237
diff changeset
96 goto taskSend(context, (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager));
222
77faa28128b4 Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 221
diff changeset
97 }
231
mir3636
parents: 230
diff changeset
98
230
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
99 __code createWorker1(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker) {
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
100 int i = loopCounter->i;
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
101
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
102 if (i < worker->id) {
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
103 struct Context* worker_context = &worker->contexts[i];
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
104 worker_context->next = C_getTask1;
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
105 worker_context->data[D_Tree] = context->data[D_Tree];
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
106 // worker_context->data[D_ActiveQueue] = context->data[D_ActiveQueue];
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
107 pthread_create(&worker_context->thread, NULL, (void*)&start_code, worker_context);
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
108 worker_context->thread_num = i;
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
109 loopCounter->i++;
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
110
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
111 goto meta(context, C_createWorker1);
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
112 }
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
113
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
114 loopCounter->i = 0;
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
115 goto meta(context, C_taskManager);
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
116 }
222
77faa28128b4 Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 221
diff changeset
117
231
mir3636
parents: 230
diff changeset
118 __code createWorker1_stub(struct Context* context) {
mir3636
parents: 230
diff changeset
119 goto createWorker1(context, &context->data[D_LoopCounter]->LoopCounter, &context->data[D_Worker]->Worker);
mir3636
parents: 230
diff changeset
120 }
mir3636
parents: 230
diff changeset
121
223
8d2519c3a36e Add input data gear example
one
parents: 222
diff changeset
122 __code shutdownTaskManager(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker, struct TaskManager* taskManager) {
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
123 int i = loopCounter->i;
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
124
209
5708390a9d88 fix compile errors
mir3636
parents: 184
diff changeset
125 if (i < worker->id) {
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
126 pthread_join(worker->contexts[i].thread, NULL);
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
127 loopCounter->i++;
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
128
231
mir3636
parents: 230
diff changeset
129 goto meta(context, C_shutdownTaskManager);
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
130 }
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
131
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
132 loopCounter->i = 0;
223
8d2519c3a36e Add input data gear example
one
parents: 222
diff changeset
133 goto meta(context, taskManager->next);
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
134 }
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
135
182
57a11c15ff4c Add queue_test
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 178
diff changeset
136 __code shutdownTaskManager_stub(struct Context* context) {
223
8d2519c3a36e Add input data gear example
one
parents: 222
diff changeset
137 goto shutdownTaskManager(context, &context->data[D_LoopCounter]->LoopCounter, &context->data[D_Worker]->Worker, Gearef(context, TaskManager));
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
138 }