annotate src/parallel_execution/taskManager.c @ 236:865179a0a56d

fix taskManager
author ikkun
date Mon, 23 Jan 2017 20:00:46 +0900
parents 05e61405cc88
children 6f6cc49213c5
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
1 #include "context.h"
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
2 #include "stack.h"
182
57a11c15ff4c Add queue_test
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 178
diff changeset
3 #include "queue.h"
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
4 #include "origin_cs.h"
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
5 #include <stdio.h>
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
6
233
06133afb3b5b create worker start_code
mir3636
parents: 231
diff changeset
7 union Data* createTaskManager(struct Context* context, int numCPU, int numGPU, int numIO) {
209
5708390a9d88 fix compile errors
mir3636
parents: 184
diff changeset
8 struct TaskManager* taskManager = &ALLOCATE(context, TaskManager)->TaskManager;
5708390a9d88 fix compile errors
mir3636
parents: 184
diff changeset
9 struct TaskManagerImpl* taskManagerImpl = &ALLOCATE(context, TaskManagerImpl)->TaskManagerImpl;
221
2454f4392316 Success create Task and inqueue Task
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 220
diff changeset
10 taskManager->taskManager = (union Data*)taskManagerImpl;
233
06133afb3b5b create worker start_code
mir3636
parents: 231
diff changeset
11 taskManagerImpl -> activeQueue = &createSingleLinkedQueue(context)->Queue;
06133afb3b5b create worker start_code
mir3636
parents: 231
diff changeset
12 taskManagerImpl -> taskQueue = &createSingleLinkedQueue(context)->Queue;
06133afb3b5b create worker start_code
mir3636
parents: 231
diff changeset
13 // 0...numIO-1 IOProcessor
06133afb3b5b create worker start_code
mir3636
parents: 231
diff changeset
14 // numIO...numIO+numGPU-1 GPUProcessor
06133afb3b5b create worker start_code
mir3636
parents: 231
diff changeset
15 // numIO+numGPU...numIO+numGPU+numCPU-1 CPUProcessor
06133afb3b5b create worker start_code
mir3636
parents: 231
diff changeset
16 taskManager->io = 0;
06133afb3b5b create worker start_code
mir3636
parents: 231
diff changeset
17 taskManager->gpu = numIO;
235
05e61405cc88 fix worker compile error
mir3636
parents: 234
diff changeset
18 taskManager->cpu = numIO+numGPU;
236
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
19 taskManager->maxCPU = numIO+numGPU+numCPU;
234
47588c28f189 TaskManager
mir3636
parents: 233
diff changeset
20 taskManager->createTask = C_createTask;
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
21 taskManager->spawn = C_spawnTaskManager;
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
22 taskManager->shutdown = C_shutdownTaskManager;
236
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
23 createWorkers(taskManager);
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
24 return (union Data*)(taskManager);
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
25 }
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
26
236
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
27 void createWorker(Context* context, TaskManeger * taskManeger) {
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
28 int i = 0;
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
29 TaskManagerImpl *taskManagerImpl = GearImpl(context,TaskManagerImpl,taskManager);
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
30 taskManagerImpl->workers = ALLOC_ARRAY(context,Worker,taskManager->maxCPU);
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
31 for (;i>taskManeger->gpu;i++) {
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
32 Queue* queue = &createSynchronizedQueue(context)->Queue;
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
33 taskManagerImpl->workers[i] = (Worker*)createCPUWorker (context,i,queue);
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
34 }
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
35 for (;i>taskManeger->cpu;i++) {
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
36 #ifdef USE_CUDA
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
37 #else
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
38 Queue* queue = &createSynchronizedQueue(context)->Queue;
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
39 taskManagerImpl->workers[i] = (Worker*)createCPUWorker (context,i,queue);
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
40 #endif
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
41 }
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
42 for (;i>taskManeger->maxCPU;i++) {
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
43 Queue* queue = &createSynchronizedQueue(context)->Queue;
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
44 taskManagerImpl->workers[i] = (Worker*)createCPUWorker (context,i,queue);
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
45 }
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
46 }
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
47
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
48 __code createTask(struct Context* context, TaskManeger* taskManager, enum Code next) {
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
49 taskManager->context = NEW(struct Context);
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
50 initContext(taskManager->context);
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
51 goto meta(context, next);
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
52 }
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
53
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
54 __code createTask_stub(struct Context* context) {
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
55 goto createTask(context,Gearef(context,TaskManager),Gearef(context,TaskManager)->next);
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
56 }
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
57
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
58 __code spawnTaskManager(struct Context* context, struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, enum Code next) {
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
59 if (task->idgCount == 0) {
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
60 // enqueue activeQueue
218
d8a59b727f65 Fix spawnTaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 209
diff changeset
61 queue->queue = (union Data*)taskManager->activeQueue;
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
62 } else {
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
63 // enqueue waitQueue
218
d8a59b727f65 Fix spawnTaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 209
diff changeset
64 queue->queue = (union Data*)taskManager->taskQueue;
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
65 }
218
d8a59b727f65 Fix spawnTaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 209
diff changeset
66 queue->data = (union Data*)task;
d8a59b727f65 Fix spawnTaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 209
diff changeset
67 queue->next = next;
236
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
68 pthread_mutex_unlock(taskManagerImpl->mutex);
209
5708390a9d88 fix compile errors
mir3636
parents: 184
diff changeset
69 goto meta(context, queue->queue->Queue.put);
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
70 }
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
71
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
72 __code spawnTaskManager_stub(struct Context* context) {
236
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
73 pthread_mutex_lock(taskManager->mutex);
209
5708390a9d88 fix compile errors
mir3636
parents: 184
diff changeset
74 goto spawnTaskManager(context,
236
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
75 GearImpl(context, TaskManagerImpl, taskManager),
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
76 Gearef(context, Queue),
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
77 Gearef(context, TaskManager)->context,
865179a0a56d fix taskManager
ikkun
parents: 235
diff changeset
78 Gearef(context, TaskManager)->next
218
d8a59b727f65 Fix spawnTaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 209
diff changeset
79 );
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
80 }
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
81
222
77faa28128b4 Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 221
diff changeset
82 __code taskSend(struct Context* context) {
77faa28128b4 Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 221
diff changeset
83 if(loopCounter->i < taskManager->numWorker) {
77faa28128b4 Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 221
diff changeset
84 taskManager->workers[i]->taskSend;
77faa28128b4 Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 221
diff changeset
85 loopCounter->i++;
77faa28128b4 Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 221
diff changeset
86 goto meta(context, C_taskSend);
77faa28128b4 Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 221
diff changeset
87 }
77faa28128b4 Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 221
diff changeset
88 goto meta(context, TaskManager->next);
77faa28128b4 Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 221
diff changeset
89 }
231
mir3636
parents: 230
diff changeset
90
230
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
91 __code createWorker1(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker) {
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
92 int i = loopCounter->i;
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
93
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
94 if (i < worker->id) {
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
95 struct Context* worker_context = &worker->contexts[i];
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
96 worker_context->next = C_getTask1;
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
97 worker_context->data[D_Tree] = context->data[D_Tree];
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
98 // worker_context->data[D_ActiveQueue] = context->data[D_ActiveQueue];
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
99 pthread_create(&worker_context->thread, NULL, (void*)&start_code, worker_context);
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
100 worker_context->thread_num = i;
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
101 loopCounter->i++;
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
102
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
103 goto meta(context, C_createWorker1);
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
104 }
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
105
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
106 loopCounter->i = 0;
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
107 goto meta(context, C_taskManager);
a1fb3f2d1a36 fix worker
ikkun
parents: 223
diff changeset
108 }
222
77faa28128b4 Add taskSend for TaskManager
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 221
diff changeset
109
231
mir3636
parents: 230
diff changeset
110 __code createWorker1_stub(struct Context* context) {
mir3636
parents: 230
diff changeset
111 goto createWorker1(context, &context->data[D_LoopCounter]->LoopCounter, &context->data[D_Worker]->Worker);
mir3636
parents: 230
diff changeset
112 }
mir3636
parents: 230
diff changeset
113
223
8d2519c3a36e Add input data gear example
one
parents: 222
diff changeset
114 __code shutdownTaskManager(struct Context* context, struct LoopCounter* loopCounter, struct Worker* worker, struct TaskManager* taskManager) {
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
115 int i = loopCounter->i;
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
116
209
5708390a9d88 fix compile errors
mir3636
parents: 184
diff changeset
117 if (i < worker->id) {
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
118 pthread_join(worker->contexts[i].thread, NULL);
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
119 loopCounter->i++;
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
120
231
mir3636
parents: 230
diff changeset
121 goto meta(context, C_shutdownTaskManager);
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
122 }
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
123
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
124 loopCounter->i = 0;
223
8d2519c3a36e Add input data gear example
one
parents: 222
diff changeset
125 goto meta(context, taskManager->next);
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
126 }
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
127
182
57a11c15ff4c Add queue_test
Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
parents: 178
diff changeset
128 __code shutdownTaskManager_stub(struct Context* context) {
223
8d2519c3a36e Add input data gear example
one
parents: 222
diff changeset
129 goto shutdownTaskManager(context, &context->data[D_LoopCounter]->LoopCounter, &context->data[D_Worker]->Worker, Gearef(context, TaskManager));
178
5077cf9bf54e add TaskManager.c
mir3636
parents:
diff changeset
130 }