view src/parallel_execution/CPUWorker.cbc @ 473:71b634a5ed65

Merge
author Tatsuki IHA <innparusu@cr.ie.u-ryukyu.ac.jp>
date Thu, 28 Dec 2017 11:55:59 +0900
parents ac244346c85d
children b92898d3a630
line wrap: on
line source

#include "../context.h"
#interface "TaskManager.h"
#interface "Worker.h"
#interface "Iterator.h"
#interface "Queue.h"

static void startWorker(Worker* worker);

Worker* createCPUWorker(struct Context* context, int id, Queue* queue) {
    struct Worker* worker = new Worker();
    struct CPUWorker* cpuWorker = new CPUWorker();
    worker->worker = (union Data*)cpuWorker;
    worker->tasks = queue;
    cpuWorker->id = id;
    cpuWorker->loopCounter = 0;
    worker->taskReceive = C_taskReceiveCPUWorker;
    worker->shutdown = C_shutdownCPUWorker;
    pthread_create(&worker->thread, NULL, (void*)&startWorker, worker);
    return worker;
}

static void startWorker(struct Worker* worker) {
    CPUWorker* cpuWorker = (CPUWorker*)worker->worker;
    cpuWorker->context = NEW(struct Context);
    initContext(cpuWorker->context);
    Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker;
    Gearef(cpuWorker->context, Worker)->tasks = worker->tasks;
    goto meta(cpuWorker->context, worker->taskReceive);
}

__code taskReceiveCPUWorker(struct CPUWorker* worker, struct Queue* tasks) {
    goto tasks->take(getTaskCPUWorker);
}

__code getTaskCPUWorker(struct CPUWorker* cpuWorker, struct Context* task, struct Worker* worker) {
    if (!task) {
        goto worker->shutdown(); // end thread
    }
    task->worker = worker;
    enum Code taskCg = task->next;
    task->next = C_odgCommitCPUWorker; // set CG after task exec
    goto meta(task, taskCg); // switch task context
}

__code getTaskCPUWorker_stub(struct Context* context) {
    CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker);
    Worker* worker = &Gearef(context,Worker)->worker->Worker;
    struct Context* task = &Gearef(context, Queue)->data->Context;
    goto getTaskCPUWorker(context, cpuWorker, task, worker);
}

__code iterateCommitCPUWorker(struct CPUWorker* worker) {
    struct Iterator* iterator = context->iterator;
    goto iterator->barrier(context, odgCommitCPUWorker, iterateCommitCPUWorker1);
}

__code iterateCommitCPUWorker1(struct CPUWorker* worker, struct Context* task) {
    struct Worker* taskWorker = task->worker;
    goto taskWorker->taskReceive(taskWorker->tasks);
}

__code iterateCommitCPUWorker1_stub(struct Context* context) {
    // switch worker context
    struct Context* workerContext = context->worker->worker->CPUWorker.context;
    CPUWorker* cpuWorker = &context->worker->worker->CPUWorker;
    goto iterateCommitCPUWorker1(workerContext,
            cpuWorker,
            context);
}

__code odgCommitCPUWorker(struct CPUWorker* worker, struct Context* task) {
    int i = worker->loopCounter;
    if (task->odg+i < task->maxOdg) {
        goto odgCommitCPUWorker1();
    }
    worker->loopCounter = 0;
    struct TaskManager* taskManager = task->taskManager;
    goto taskManager->decrementTaskCount(taskReceiveCPUWorker);
}

__code odgCommitCPUWorker_stub(struct Context* context) {
    // switch worker context
    struct Context* workerContext = context->worker->worker->CPUWorker.context;
    CPUWorker* cpuWorker = &context->worker->worker->CPUWorker;
    goto odgCommitCPUWorker(workerContext,
            cpuWorker,
            context);
}

__code odgCommitCPUWorker1(struct CPUWorker* worker) {
    int i = worker->loopCounter;
    struct Queue* queue = GET_WAIT_LIST(context->data[context->odg+i]);
    goto queue->isEmpty(odgCommitCPUWorker2, odgCommitCPUWorker4);
}

__code odgCommitCPUWorker2(struct CPUWorker* worker) {
    int i = worker->loopCounter;
    struct Queue* queue = GET_WAIT_LIST(context->data[context->odg+i]);
    goto queue->take(odgCommitCPUWorker3);
}

__code odgCommitCPUWorker3(struct CPUWorker* worker, struct Context* task) {
    if (__sync_fetch_and_sub(&task->idgCount, 1) == 1) { // atomic decrement idg counter(__sync_fetch_and_sub function return initial value of task->idgCount point)
        struct TaskManager* taskManager = task->taskManager;
        taskManager->task = task;
        goto taskManager->spawn(task, odgCommitCPUWorker1);
    }
    goto odgCommitCPUWorker1();
}

__code odgCommitCPUWorker3_stub(struct Context* context) {
    CPUWorker* cpuWorker = (CPUWorker*)GearImpl(context, Worker, worker);
    struct Context* task = &Gearef(context, Queue)->data->Context;
    goto odgCommitCPUWorker3(context,
            cpuWorker,
            task);
}

__code odgCommitCPUWorker4(struct CPUWorker* worker) {
    worker->loopCounter++;
    goto odgCommitCPUWorker();
}

__code shutdownCPUWorker(struct CPUWorker* worker) {
    goto exit_code();
}