view src/parallel_execution/CPUWorker.cbc @ 327:534601ed8c50 examples_directory

Running dependency example for single thread and single task
author Tatsuki IHA <e125716@ie.u-ryukyu.ac.jp>
date Tue, 18 Apr 2017 05:53:37 +0900
parents 7d664be4efa5
children d6ce4273e7d1
line wrap: on
line source

#include "../context.h"

static void start_worker(Worker* worker);

Worker* createCPUWorker(struct Context* context, int id, Queue* queue) {
    struct Worker* worker = new Worker();
    struct CPUWorker* cpuWorker = new CPUWorker();
    worker->worker = (union Data*)cpuWorker;
    worker->tasks = queue;
    cpuWorker->id = id;
    worker->taskReceive = C_taskReceiveWorker;
    worker->shutdown = C_shutdownWorker;
    pthread_create(&worker->worker->CPUWorker.thread, NULL, (void*)&start_worker, worker);
    return worker;
}

static void start_worker(Worker* worker) {
    CPUWorker* cpuWorker = (CPUWorker*)worker->worker;
    cpuWorker->context = NEW(struct Context);
    initContext(cpuWorker->context);
    Gearef(cpuWorker->context, Worker)->worker = (union Data*)worker;
    goto meta(cpuWorker->context, worker->taskReceive);
}

__code taskReceiveWorker(struct Worker* worker,struct Queue* queue) {
    queue->queue = (union Data*)worker->tasks;
    queue->next = C_getTask;
    goto meta(context, worker->tasks->take);
}

__code taskReceiveWorker_stub(struct Context* context) {
    goto taskReceiveWorker(context, &Gearef(context, Worker)->worker->Worker, Gearef(context, Queue));
}

__code getTask(struct Worker* worker, struct Context* task) {
    if (!task)
        return; // end thread
    task->worker = worker;
    enum Code taskCg = task->next;
    task->next = C_odgCommit; // set CG after task exec
    goto meta(task, taskCg);
}

__code getTask_stub(struct Context* context) {
    Worker* worker = &Gearef(context,Worker)->worker->Worker;
    struct Context* task = &Gearef(context, Queue)->data->Context;
    goto getTask(context, worker, task);
}

__code odgCommit(struct LoopCounter* loopCounter, struct Context* task) {
    int i = loopCounter->i ;
    if(task->odg + i < task->maxOdg) {
        goto meta(task, C_odgCommit1);
    }
    loopCounter->i = 0;
    goto meta(context, C_taskReceiveWorker);
}

__code odgCommit_stub(struct Context* context) {
    struct Context* workerContext = context->worker->worker->CPUWorker.context;
    goto odgCommit(workerContext,
                   Gearef(context, LoopCounter),
                   context);
}

__code odgCommit1(struct LoopCounter* loopCounter, struct Queue* queue) {
    int i = loopCounter->i ;
    queue->queue = (union Data*)GET_WAIT_LIST(context->data[context->odg+i]);
    queue->whenEmpty = C_odgCommit4;
    queue->next = C_odgCommit2;
    goto meta(context, queue->queue->Queue.isEmpty);
}

__code odgCommit1_stub(struct Context* context) {
    goto odgCommit1(context,
                   Gearef(context, LoopCounter),
                   Gearef(context, Queue));
}

__code odgCommit2(struct Queue* queue) {
    queue->next = C_odgCommit3;
    goto meta(context, queue->queue->Queue.take);
}

__code odgCommit2_stub(struct Context* context) {
    goto odgCommit2(context,
                   Gearef(context, Queue));
}

__code odgCommit3(struct TaskManager* taskManager, struct Context* task) {
    if(__sync_fetch_and_sub(&task->idgCount, 1)) {
        if(task->idgCount == 0) {
            taskManager->taskManager = (union Data*)task->taskManager;
            taskManager->context = task;
            taskManager->next = C_odgCommit1;
            goto meta(context, task->taskManager->spawn);
        } else {
            goto meta(context, C_odgCommit1);
        }
    } else {
        goto meta(context, C_odgCommit3);
    }
}

__code odgCommit3_stub(struct Context* context) {
    struct Context* task = &Gearef(context, Queue)->data->Context;
    goto odgCommit3(context,
                    Gearef(context, TaskManager),
                    task);
}

__code odgCommit4(struct LoopCounter* loopCounter) {
    loopCounter->i++;
    goto meta(context, C_odgCommit);
}

__code odgCommit4_stub(struct Context* context) {
    goto odgCommit4(context,
                    Gearef(context, LoopCounter));
}

__code shutdownWorker(struct CPUWorker* worker) {
}