Mercurial > hg > Gears > GearsAgda
comparison src/parallel_execution/CPUWorker.cbc @ 473:71b634a5ed65
Merge
author | Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 28 Dec 2017 11:55:59 +0900 |
parents | ac244346c85d |
children | b92898d3a630 |
comparison
equal
deleted
inserted
replaced
472:a4d94c591246 | 473:71b634a5ed65 |
---|---|
1 #include "../context.h" | 1 #include "../context.h" |
2 #interface "TaskManager.h" | 2 #interface "TaskManager.h" |
3 #interface "Worker.h" | |
4 #interface "Iterator.h" | |
5 #interface "Queue.h" | |
6 | |
3 static void startWorker(Worker* worker); | 7 static void startWorker(Worker* worker); |
4 | 8 |
5 Worker* createCPUWorker(struct Context* context, int id, Queue* queue) { | 9 Worker* createCPUWorker(struct Context* context, int id, Queue* queue) { |
6 struct Worker* worker = new Worker(); | 10 struct Worker* worker = new Worker(); |
7 struct CPUWorker* cpuWorker = new CPUWorker(); | 11 struct CPUWorker* cpuWorker = new CPUWorker(); |
8 worker->worker = (union Data*)cpuWorker; | 12 worker->worker = (union Data*)cpuWorker; |
9 worker->tasks = queue; | 13 worker->tasks = queue; |
10 cpuWorker->id = id; | 14 cpuWorker->id = id; |
11 worker->taskReceive = C_taskReceiveWorker; | 15 cpuWorker->loopCounter = 0; |
12 worker->shutdown = C_shutdownWorker; | 16 worker->taskReceive = C_taskReceiveCPUWorker; |
17 worker->shutdown = C_shutdownCPUWorker; | |
13 pthread_create(&worker->thread, NULL, (void*)&startWorker, worker); | 18 pthread_create(&worker->thread, NULL, (void*)&startWorker, worker); |
14 return worker; | 19 return worker; |
15 } | 20 } |
16 | 21 |
17 static void startWorker(Worker* worker) { | 22 static void startWorker(struct Worker* worker) { |
18 CPUWorker* cpuWorker = (CPUWorker*)worker->worker; | 23 CPUWorker* cpuWorker = (CPUWorker*)worker->worker; |
19 cpuWorker->context = NEW(struct Context); | 24 cpuWorker->context = NEW(struct Context); |
20 initContext(cpuWorker->context); | 25 initContext(cpuWorker->context); |
21 Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker; | 26 Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker; |
27 Gearef(cpuWorker->context, Worker)->tasks = worker->tasks; | |
22 goto meta(cpuWorker->context, worker->taskReceive); | 28 goto meta(cpuWorker->context, worker->taskReceive); |
23 } | 29 } |
24 | 30 |
25 __code taskReceiveWorker(struct Worker* worker,struct Queue* queue) { | 31 __code taskReceiveCPUWorker(struct CPUWorker* worker, struct Queue* tasks) { |
26 queue->queue = (union Data*)worker->tasks; | 32 goto tasks->take(getTaskCPUWorker); |
27 queue->next = C_getTask; | |
28 goto meta(context, worker->tasks->take); | |
29 } | 33 } |
30 | 34 |
31 __code taskReceiveWorker_stub(struct Context* context) { | 35 __code getTaskCPUWorker(struct CPUWorker* cpuWorker, struct Context* task, struct Worker* worker) { |
32 goto taskReceiveWorker(context, &Gearef(context, Worker)->worker->Worker, Gearef(context, Queue)); | 36 if (!task) { |
37 goto worker->shutdown(); // end thread | |
38 } | |
39 task->worker = worker; | |
40 enum Code taskCg = task->next; | |
41 task->next = C_odgCommitCPUWorker; // set CG after task exec | |
42 goto meta(task, taskCg); // switch task context | |
33 } | 43 } |
34 | 44 |
35 __code getTask(struct Worker* worker, struct Context* task) { | 45 __code getTaskCPUWorker_stub(struct Context* context) { |
36 if (!task) | 46 CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker); |
37 goto meta(context, worker->shutdown); // end thread | 47 Worker* worker = &Gearef(context,Worker)->worker->Worker; |
38 task->worker = worker; | 48 struct Context* task = &Gearef(context, Queue)->data->Context; |
39 enum Code taskCg = task->next; | 49 goto getTaskCPUWorker(context, cpuWorker, task, worker); |
40 if (task->iterate) { | |
41 task->next = C_iterateCommit; | |
42 } else { | |
43 task->next = C_odgCommit; // set CG after task exec | |
44 } | |
45 goto meta(task, taskCg); | |
46 } | 50 } |
47 | 51 |
48 __code getTask_stub(struct Context* context) { | 52 __code iterateCommitCPUWorker(struct CPUWorker* worker) { |
49 Worker* worker = &Gearef(context,Worker)->worker->Worker; | 53 struct Iterator* iterator = context->iterator; |
50 struct Context* task = &Gearef(context, Queue)->data->Context; | 54 goto iterator->barrier(context, odgCommitCPUWorker, iterateCommitCPUWorker1); |
51 goto getTask(context, worker, task); | |
52 } | 55 } |
53 | 56 |
54 __code iterateCommit(struct Iterator* iterator) { | 57 __code iterateCommitCPUWorker1(struct CPUWorker* worker, struct Context* task) { |
55 iterator->iterator = (union Data*)context->iterator; | 58 struct Worker* taskWorker = task->worker; |
56 iterator->task = context; | 59 goto taskWorker->taskReceive(taskWorker->tasks); |
57 iterator->next = C_odgCommit; | |
58 iterator->whenWait = C_iterateCommit1; | |
59 goto meta(context, context->iterator->barrier); | |
60 } | 60 } |
61 | 61 |
62 __code iterateCommit1(struct Context* task) { | 62 __code iterateCommitCPUWorker1_stub(struct Context* context) { |
63 goto meta(context, C_taskReceiveWorker); | 63 // switch worker context |
64 struct Context* workerContext = context->worker->worker->CPUWorker.context; | |
65 CPUWorker* cpuWorker = &context->worker->worker->CPUWorker; | |
66 goto iterateCommitCPUWorker1(workerContext, | |
67 cpuWorker, | |
68 context); | |
64 } | 69 } |
65 | 70 |
66 __code iterateCommit1_stub(struct Context* context) { | 71 __code odgCommitCPUWorker(struct CPUWorker* worker, struct Context* task) { |
72 int i = worker->loopCounter; | |
73 if (task->odg+i < task->maxOdg) { | |
74 goto odgCommitCPUWorker1(); | |
75 } | |
76 worker->loopCounter = 0; | |
77 struct TaskManager* taskManager = task->taskManager; | |
78 goto taskManager->decrementTaskCount(taskReceiveCPUWorker); | |
79 } | |
80 | |
81 __code odgCommitCPUWorker_stub(struct Context* context) { | |
67 // switch worker context | 82 // switch worker context |
68 struct Context* workerContext = context->worker->worker->CPUWorker.context; | 83 struct Context* workerContext = context->worker->worker->CPUWorker.context; |
69 goto iterateCommit1(workerContext, context); | 84 CPUWorker* cpuWorker = &context->worker->worker->CPUWorker; |
85 goto odgCommitCPUWorker(workerContext, | |
86 cpuWorker, | |
87 context); | |
70 } | 88 } |
71 | 89 |
72 __code odgCommit(struct LoopCounter* loopCounter, struct Context* task, struct TaskManager* taskManager) { | 90 __code odgCommitCPUWorker1(struct CPUWorker* worker) { |
73 int i = loopCounter->i ; | 91 int i = worker->loopCounter; |
74 if (task->odg+i < task->maxOdg) { | 92 struct Queue* queue = GET_WAIT_LIST(context->data[context->odg+i]); |
75 goto meta(task, C_odgCommit1); | 93 goto queue->isEmpty(odgCommitCPUWorker2, odgCommitCPUWorker4); |
76 } | |
77 loopCounter->i = 0; | |
78 taskManager->taskManager = (union Data*)task->taskManager; | |
79 taskManager->next = C_taskReceiveWorker; | |
80 goto meta(context, task->taskManager->decrementTaskCount); | |
81 } | 94 } |
82 | 95 |
83 __code odgCommit_stub(struct Context* context) { | 96 __code odgCommitCPUWorker2(struct CPUWorker* worker) { |
84 // switch worker context | 97 int i = worker->loopCounter; |
85 struct Context* workerContext = context->worker->worker->CPUWorker.context; | 98 struct Queue* queue = GET_WAIT_LIST(context->data[context->odg+i]); |
86 goto odgCommit(workerContext, | 99 goto queue->take(odgCommitCPUWorker3); |
87 Gearef(context, LoopCounter), | |
88 context, | |
89 Gearef(workerContext, TaskManager)); | |
90 } | 100 } |
91 | 101 |
92 __code odgCommit1(struct LoopCounter* loopCounter, struct Queue* queue) { | 102 __code odgCommitCPUWorker3(struct CPUWorker* worker, struct Context* task) { |
93 int i = loopCounter->i ; | 103 if (__sync_fetch_and_sub(&task->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of task->idgCount point) |
94 queue->queue = (union Data*)GET_WAIT_LIST(context->data[context->odg+i]); | 104 struct TaskManager* taskManager = task->taskManager; |
95 queue->whenEmpty = C_odgCommit4; | 105 taskManager->task = task; |
96 queue->next = C_odgCommit2; | 106 goto taskManager->spawn(task, odgCommitCPUWorker1); |
97 goto meta(context, queue->queue->Queue.isEmpty); | 107 } |
108 goto odgCommitCPUWorker1(); | |
98 } | 109 } |
99 | 110 |
100 __code odgCommit1_stub(struct Context* context) { | 111 __code odgCommitCPUWorker3_stub(struct Context* context) { |
101 goto odgCommit1(context, | 112 CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker); |
102 Gearef(context, LoopCounter), | |
103 Gearef(context, Queue)); | |
104 } | |
105 | |
106 __code odgCommit2(struct Queue* queue) { | |
107 queue->next = C_odgCommit3; | |
108 goto meta(context, queue->queue->Queue.take); | |
109 } | |
110 | |
111 __code odgCommit2_stub(struct Context* context) { | |
112 goto odgCommit2(context, | |
113 Gearef(context, Queue)); | |
114 } | |
115 | |
116 __code odgCommit3(struct TaskManager* taskManager, struct Context* task) { | |
117 if (__sync_fetch_and_sub(&task->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of task->idgCount point) | |
118 taskManager->taskManager = (union Data*)task->taskManager; | |
119 taskManager->task = task; | |
120 taskManager->next = C_odgCommit1; | |
121 goto meta(context, task->taskManager->spawn); | |
122 } | |
123 goto meta(context, C_odgCommit1); | |
124 } | |
125 | |
126 __code odgCommit3_stub(struct Context* context) { | |
127 struct Context* task = &Gearef(context, Queue)->data->Context; | 113 struct Context* task = &Gearef(context, Queue)->data->Context; |
128 goto odgCommit3(context, | 114 goto odgCommitCPUWorker3(context, |
129 Gearef(context, TaskManager), | 115 cpuWorker, |
130 task); | 116 task); |
131 } | 117 } |
132 | 118 |
133 __code odgCommit4(struct LoopCounter* loopCounter) { | 119 __code odgCommitCPUWorker4(struct CPUWorker* worker) { |
134 loopCounter->i++; | 120 worker->loopCounter++; |
135 goto meta(context, C_odgCommit); | 121 goto odgCommitCPUWorker(); |
136 } | 122 } |
137 | 123 |
138 __code odgCommit4_stub(struct Context* context) { | 124 __code shutdownCPUWorker(struct CPUWorker* worker) { |
139 goto odgCommit4(context, | 125 goto exit_code(); |
140 Gearef(context, LoopCounter)); | |
141 } | 126 } |
142 | |
143 __code shutdownWorker(struct CPUWorker* worker) { | |
144 goto meta(context, C_exit_code); | |
145 } |