comparison src/parallel_execution/CPUWorker.cbc @ 473:71b634a5ed65

Merge
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Thu, 28 Dec 2017 11:55:59 +0900
parents ac244346c85d
children b92898d3a630
comparison
equal deleted inserted replaced
472:a4d94c591246 473:71b634a5ed65
1 #include "../context.h" 1 #include "../context.h"
2 #interface "TaskManager.h" 2 #interface "TaskManager.h"
3 #interface "Worker.h"
4 #interface "Iterator.h"
5 #interface "Queue.h"
6
3 static void startWorker(Worker* worker); 7 static void startWorker(Worker* worker);
4 8
5 Worker* createCPUWorker(struct Context* context, int id, Queue* queue) { 9 Worker* createCPUWorker(struct Context* context, int id, Queue* queue) {
6 struct Worker* worker = new Worker(); 10 struct Worker* worker = new Worker();
7 struct CPUWorker* cpuWorker = new CPUWorker(); 11 struct CPUWorker* cpuWorker = new CPUWorker();
8 worker->worker = (union Data*)cpuWorker; 12 worker->worker = (union Data*)cpuWorker;
9 worker->tasks = queue; 13 worker->tasks = queue;
10 cpuWorker->id = id; 14 cpuWorker->id = id;
11 worker->taskReceive = C_taskReceiveWorker; 15 cpuWorker->loopCounter = 0;
12 worker->shutdown = C_shutdownWorker; 16 worker->taskReceive = C_taskReceiveCPUWorker;
17 worker->shutdown = C_shutdownCPUWorker;
13 pthread_create(&worker->thread, NULL, (void*)&startWorker, worker); 18 pthread_create(&worker->thread, NULL, (void*)&startWorker, worker);
14 return worker; 19 return worker;
15 } 20 }
16 21
17 static void startWorker(Worker* worker) { 22 static void startWorker(struct Worker* worker) {
18 CPUWorker* cpuWorker = (CPUWorker*)worker->worker; 23 CPUWorker* cpuWorker = (CPUWorker*)worker->worker;
19 cpuWorker->context = NEW(struct Context); 24 cpuWorker->context = NEW(struct Context);
20 initContext(cpuWorker->context); 25 initContext(cpuWorker->context);
21 Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker; 26 Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker;
27 Gearef(cpuWorker->context, Worker)->tasks = worker->tasks;
22 goto meta(cpuWorker->context, worker->taskReceive); 28 goto meta(cpuWorker->context, worker->taskReceive);
23 } 29 }
24 30
25 __code taskReceiveWorker(struct Worker* worker,struct Queue* queue) { 31 __code taskReceiveCPUWorker(struct CPUWorker* worker, struct Queue* tasks) {
26 queue->queue = (union Data*)worker->tasks; 32 goto tasks->take(getTaskCPUWorker);
27 queue->next = C_getTask;
28 goto meta(context, worker->tasks->take);
29 } 33 }
30 34
31 __code taskReceiveWorker_stub(struct Context* context) { 35 __code getTaskCPUWorker(struct CPUWorker* cpuWorker, struct Context* task, struct Worker* worker) {
32 goto taskReceiveWorker(context, &Gearef(context, Worker)->worker->Worker, Gearef(context, Queue)); 36 if (!task) {
37 goto worker->shutdown(); // end thread
38 }
39 task->worker = worker;
40 enum Code taskCg = task->next;
41 task->next = C_odgCommitCPUWorker; // set CG after task exec
42 goto meta(task, taskCg); // switch task context
33 } 43 }
34 44
35 __code getTask(struct Worker* worker, struct Context* task) { 45 __code getTaskCPUWorker_stub(struct Context* context) {
36 if (!task) 46 CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker);
37 goto meta(context, worker->shutdown); // end thread 47 Worker* worker = &Gearef(context,Worker)->worker->Worker;
38 task->worker = worker; 48 struct Context* task = &Gearef(context, Queue)->data->Context;
39 enum Code taskCg = task->next; 49 goto getTaskCPUWorker(context, cpuWorker, task, worker);
40 if (task->iterate) {
41 task->next = C_iterateCommit;
42 } else {
43 task->next = C_odgCommit; // set CG after task exec
44 }
45 goto meta(task, taskCg);
46 } 50 }
47 51
48 __code getTask_stub(struct Context* context) { 52 __code iterateCommitCPUWorker(struct CPUWorker* worker) {
49 Worker* worker = &Gearef(context,Worker)->worker->Worker; 53 struct Iterator* iterator = context->iterator;
50 struct Context* task = &Gearef(context, Queue)->data->Context; 54 goto iterator->barrier(context, odgCommitCPUWorker, iterateCommitCPUWorker1);
51 goto getTask(context, worker, task);
52 } 55 }
53 56
54 __code iterateCommit(struct Iterator* iterator) { 57 __code iterateCommitCPUWorker1(struct CPUWorker* worker, struct Context* task) {
55 iterator->iterator = (union Data*)context->iterator; 58 struct Worker* taskWorker = task->worker;
56 iterator->task = context; 59 goto taskWorker->taskReceive(taskWorker->tasks);
57 iterator->next = C_odgCommit;
58 iterator->whenWait = C_iterateCommit1;
59 goto meta(context, context->iterator->barrier);
60 } 60 }
61 61
62 __code iterateCommit1(struct Context* task) { 62 __code iterateCommitCPUWorker1_stub(struct Context* context) {
63 goto meta(context, C_taskReceiveWorker); 63 // switch worker context
64 struct Context* workerContext = context->worker->worker->CPUWorker.context;
65 CPUWorker* cpuWorker = &context->worker->worker->CPUWorker;
66 goto iterateCommitCPUWorker1(workerContext,
67 cpuWorker,
68 context);
64 } 69 }
65 70
66 __code iterateCommit1_stub(struct Context* context) { 71 __code odgCommitCPUWorker(struct CPUWorker* worker, struct Context* task) {
72 int i = worker->loopCounter;
73 if (task->odg+i < task->maxOdg) {
74 goto odgCommitCPUWorker1();
75 }
76 worker->loopCounter = 0;
77 struct TaskManager* taskManager = task->taskManager;
78 goto taskManager->decrementTaskCount(taskReceiveCPUWorker);
79 }
80
81 __code odgCommitCPUWorker_stub(struct Context* context) {
67 // switch worker context 82 // switch worker context
68 struct Context* workerContext = context->worker->worker->CPUWorker.context; 83 struct Context* workerContext = context->worker->worker->CPUWorker.context;
69 goto iterateCommit1(workerContext, context); 84 CPUWorker* cpuWorker = &context->worker->worker->CPUWorker;
85 goto odgCommitCPUWorker(workerContext,
86 cpuWorker,
87 context);
70 } 88 }
71 89
72 __code odgCommit(struct LoopCounter* loopCounter, struct Context* task, struct TaskManager* taskManager) { 90 __code odgCommitCPUWorker1(struct CPUWorker* worker) {
73 int i = loopCounter->i ; 91 int i = worker->loopCounter;
74 if (task->odg+i < task->maxOdg) { 92 struct Queue* queue = GET_WAIT_LIST(context->data[context->odg+i]);
75 goto meta(task, C_odgCommit1); 93 goto queue->isEmpty(odgCommitCPUWorker2, odgCommitCPUWorker4);
76 }
77 loopCounter->i = 0;
78 taskManager->taskManager = (union Data*)task->taskManager;
79 taskManager->next = C_taskReceiveWorker;
80 goto meta(context, task->taskManager->decrementTaskCount);
81 } 94 }
82 95
83 __code odgCommit_stub(struct Context* context) { 96 __code odgCommitCPUWorker2(struct CPUWorker* worker) {
84 // switch worker context 97 int i = worker->loopCounter;
85 struct Context* workerContext = context->worker->worker->CPUWorker.context; 98 struct Queue* queue = GET_WAIT_LIST(context->data[context->odg+i]);
86 goto odgCommit(workerContext, 99 goto queue->take(odgCommitCPUWorker3);
87 Gearef(context, LoopCounter),
88 context,
89 Gearef(workerContext, TaskManager));
90 } 100 }
91 101
92 __code odgCommit1(struct LoopCounter* loopCounter, struct Queue* queue) { 102 __code odgCommitCPUWorker3(struct CPUWorker* worker, struct Context* task) {
93 int i = loopCounter->i ; 103 if (__sync_fetch_and_sub(&task->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of task->idgCount point)
94 queue->queue = (union Data*)GET_WAIT_LIST(context->data[context->odg+i]); 104 struct TaskManager* taskManager = task->taskManager;
95 queue->whenEmpty = C_odgCommit4; 105 taskManager->task = task;
96 queue->next = C_odgCommit2; 106 goto taskManager->spawn(task, odgCommitCPUWorker1);
97 goto meta(context, queue->queue->Queue.isEmpty); 107 }
108 goto odgCommitCPUWorker1();
98 } 109 }
99 110
100 __code odgCommit1_stub(struct Context* context) { 111 __code odgCommitCPUWorker3_stub(struct Context* context) {
101 goto odgCommit1(context, 112 CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker);
102 Gearef(context, LoopCounter),
103 Gearef(context, Queue));
104 }
105
106 __code odgCommit2(struct Queue* queue) {
107 queue->next = C_odgCommit3;
108 goto meta(context, queue->queue->Queue.take);
109 }
110
111 __code odgCommit2_stub(struct Context* context) {
112 goto odgCommit2(context,
113 Gearef(context, Queue));
114 }
115
116 __code odgCommit3(struct TaskManager* taskManager, struct Context* task) {
117 if (__sync_fetch_and_sub(&task->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of task->idgCount point)
118 taskManager->taskManager = (union Data*)task->taskManager;
119 taskManager->task = task;
120 taskManager->next = C_odgCommit1;
121 goto meta(context, task->taskManager->spawn);
122 }
123 goto meta(context, C_odgCommit1);
124 }
125
126 __code odgCommit3_stub(struct Context* context) {
127 struct Context* task = &Gearef(context, Queue)->data->Context; 113 struct Context* task = &Gearef(context, Queue)->data->Context;
128 goto odgCommit3(context, 114 goto odgCommitCPUWorker3(context,
129 Gearef(context, TaskManager), 115 cpuWorker,
130 task); 116 task);
131 } 117 }
132 118
133 __code odgCommit4(struct LoopCounter* loopCounter) { 119 __code odgCommitCPUWorker4(struct CPUWorker* worker) {
134 loopCounter->i++; 120 worker->loopCounter++;
135 goto meta(context, C_odgCommit); 121 goto odgCommitCPUWorker();
136 } 122 }
137 123
138 __code odgCommit4_stub(struct Context* context) { 124 __code shutdownCPUWorker(struct CPUWorker* worker) {
139 goto odgCommit4(context, 125 goto exit_code();
140 Gearef(context, LoopCounter));
141 } 126 }
142
143 __code shutdownWorker(struct CPUWorker* worker) {
144 goto meta(context, C_exit_code);
145 }