comparison src/parallel_execution/TaskManagerImpl.cbc @ 327:534601ed8c50 examples_directory

Running dependency example for single thread and single task
author Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
date Tue, 18 Apr 2017 05:53:37 +0900
parents f23f6d0aa4e9
children c03159481cb6
comparison
equal deleted inserted replaced
326:f23f6d0aa4e9 327:534601ed8c50
53 } 53 }
54 54
55 __code createTask(struct TaskManager* taskManager) { 55 __code createTask(struct TaskManager* taskManager) {
56 taskManager->context = NEW(struct Context); 56 taskManager->context = NEW(struct Context);
57 initContext(taskManager->context); 57 initContext(taskManager->context);
58 taskManager->context->taskManager = taskManager; 58 taskManager->context->taskManager = (struct TaskManager*)taskManager->taskManager;
59 taskManager->context->idg = taskManager->context->dataNum; 59 taskManager->context->idg = taskManager->context->dataNum;
60 goto meta(context, C_setWorker); 60 goto meta(context, C_setWorker);
61 }
62
63 __code setWaitTask(struct Queue* queue, struct Context* task, Data* data, __code next(...)) {
64 struct Meta *metaData = GET_META(data);
65 if (!metaData->wait) {
66 metaData->wait = createSynchronizedQueue(context);
67 }
68 queue->queue = (union Data*)metaData->wait;
69 queue->next = next;
70 queue->data = (Data *)task;
71 goto meta(context, queue->queue->Queue.put);
72 }
73
74 __code setWaitTask_stub(struct Context* context) {
75 goto setWaitTask(context, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->data, Gearef(context, TaskManager)->next);
76 } 61 }
77 62
78 __code setWorker(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) { 63 __code setWorker(struct TaskManagerImpl* taskManager, struct Context* task, __code next(...)) {
79 task->workerId = taskManager->sendWorkerIndex; 64 task->workerId = taskManager->sendWorkerIndex;
80 if(++taskManager->sendWorkerIndex >= taskManager->numWorker) { 65 if(++taskManager->sendWorkerIndex >= taskManager->numWorker) {
86 __code setWorker_stub(struct Context* context) { 71 __code setWorker_stub(struct Context* context) {
87 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); 72 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
88 goto setWorker(context, taskManager, Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); 73 goto setWorker(context, taskManager, Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next);
89 } 74 }
90 75
76 __code setWaitTask(struct Queue* queue, struct Context* task, Data* data, __code next(...)) {
77 queue->queue = (Data *)GET_WAIT_LIST(data);
78 queue->next = next;
79 queue->data = (Data *)task;
80 goto meta(context, queue->queue->Queue.put);
81 }
82
83 __code setWaitTask_stub(struct Context* context) {
84 goto setWaitTask(context, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->data, Gearef(context, TaskManager)->next);
85 }
86
91 __code spawnTaskManager(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) { 87 __code spawnTaskManager(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
92 if (task->idgCount == 0) { 88 if (task->idgCount == 0) {
93 // enqueue activeQueue 89 goto meta(context, C_taskSend);
94 queue->queue = (union Data*)taskManager->activeQueue;
95 } else { 90 } else {
96 // enqueue waitQueue 91 pthread_mutex_unlock(&taskManager->mutex);
97 queue->queue = (union Data*)taskManager->taskQueue; 92 goto next(...);
98 } 93 }
99 queue->data = (union Data*)task;
100 queue->next = C_spawnTaskManager1;
101 goto meta(context, queue->queue->Queue.put);
102 } 94 }
103 95
104 __code spawnTaskManager_stub(struct Context* context) { 96 __code spawnTaskManager_stub(struct Context* context) {
105 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); 97 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
106 pthread_mutex_lock(&taskManager->mutex); 98 pthread_mutex_lock(&taskManager->mutex);
109 Gearef(context, Queue), 101 Gearef(context, Queue),
110 Gearef(context, TaskManager)->context, 102 Gearef(context, TaskManager)->context,
111 Gearef(context, TaskManager)->next); 103 Gearef(context, TaskManager)->next);
112 } 104 }
113 105
114 106 __code taskSend(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
115 __code spawnTaskManager1(struct TaskManagerImpl* taskManager) {
116 pthread_mutex_unlock(&taskManager->mutex);
117 goto meta(context, C_taskSend);
118 }
119
120 __code spawnTaskManager1_stub(struct Context* context) {
121 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
122 goto spawnTaskManager1(context,
123 taskManager);
124 }
125
126 __code taskSend(struct TaskManagerImpl* taskManager, struct Queue* queue) {
127 queue->queue = (union Data*)taskManager->activeQueue;
128 queue->next = C_taskSend1;
129 goto meta(context, taskManager->activeQueue->take);
130 }
131
132 __code taskSend1(struct TaskManagerImpl* taskManager, struct Queue* queue, struct Context* task, __code next(...)) {
133 struct Queue* tasks = taskManager->workers[task->workerId]->tasks; 107 struct Queue* tasks = taskManager->workers[task->workerId]->tasks;
134 queue->queue = (union Data*)tasks; 108 queue->queue = (union Data*)tasks;
135 queue->data = (union Data*)task; 109 queue->data = (union Data*)task;
136 queue->next = next; 110 queue->next = next;
111 pthread_mutex_unlock(&taskManager->mutex);
137 goto meta(context, tasks->put); 112 goto meta(context, tasks->put);
138 } 113 }
139 114
140 __code taskSend1_stub(struct Context* context) { 115 __code taskSend_stub(struct Context* context) {
141 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager); 116 TaskManagerImpl* taskManager = (TaskManagerImpl*)GearImpl(context, TaskManager, taskManager);
142 goto taskSend1(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next); 117 goto taskSend(context, taskManager, Gearef(context, Queue), Gearef(context, TaskManager)->context, Gearef(context, TaskManager)->next);
143 } 118 }
144 119
145 __code shutdownTaskManager(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct TaskManagerImpl* taskManagerImpl, struct Queue* queue, __code next(...)) { 120 __code shutdownTaskManager(struct LoopCounter* loopCounter, struct TaskManager* taskManager, struct TaskManagerImpl* taskManagerImpl, struct Queue* queue, __code next(...)) {
146 int i = loopCounter->i; 121 int i = loopCounter->i;
147 if (taskManager->cpu <= i && i < taskManager->maxCPU) { 122 if (taskManager->cpu <= i && i < taskManager->maxCPU) {