view TaskManager/Cell/CellTaskManagerImpl.cc @ 1872:a1bfda09128a draft

add IO threads in CpuThreads
author masa
date Fri, 27 Dec 2013 20:58:30 +0900
parents ce1a5624395e
children 039e6d5cf5b7
line wrap: on
line source

#define DEBUG
#include "error.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "CellTaskManagerImpl.h"
#include "HTask.h"
#include "QueueInfo.h"
#include "ExportTaskLog.h"
#include "SchedTask.h"
#include "MainScheduler.h"
#include "types.h"
#include "SysFunc.h"
#ifdef __CERIUM_GPU__
#include "GpuThreads.h"
#endif
static void send_alloc_reply(CellTaskManagerImpl *tm, int id,
                             Threads *speThreads);

CellTaskManagerImpl::~CellTaskManagerImpl() {
    delete speThreads;
    delete[] speTaskList;
    delete ppeManager;
}

void CellTaskManagerImpl::init(int spuIdle_,int useRefDma,int export_task_log) {
    spe_running = 0;
    spuIdle = spuIdle_;
    int m = machineNum == 0?1:machineNum; // at least 1 tasklistinfo in -cpu 0

    // 実行される Task 用の パイプライン用のダブルバッファ
    speTaskList = new QueueInfo<TaskList>*[m]; // spe上の走っている Task の配列
    taskListInfo = new QueueInfo<TaskList>*[m]; // 次に走る Task の配列


    for (int i = 0; i < m; i++) {
        taskListInfo[i] = new QueueInfo<TaskList> (taskListPool);
        speTaskList[i] = new QueueInfo<TaskList> (taskListPool);
    }

    // PPE 側の管理をする Manager
    ppeManager = new FifoTaskManagerImpl(machineNum);
    // 大半のTaskQueueInfoは、共有される
    MainScheduler *mscheduler = new MainScheduler;
    set_scheduler(mscheduler);
    ppeManager->init(mscheduler, this, useRefDma); // ここで HTaskInfo が共有される。

    speThreads->init();

    // 実行可能な HTask のリスト。 FifoTaskManager と共有される
    activeTaskQueue = ppeManager->activeTaskQueue;
    // HTask の factory。 HTaskInfo ならなんでもいい。
    htaskImpl = activeTaskQueue; // any HTaskInfo
    
    
    ppeManager->get_scheduler()->set_manager(this);
    
    // Task 内からManager->task_create() とかするときに必要なTaskManager。
    // 現状では ppe 側からしか動かない
    // spe 側から Task create できない
    schedTaskManager = new SchedTask();
    schedTaskManager->init(0, 0, ppeManager->get_scheduler(), 0);
    ppeManager->schedTaskManager = schedTaskManager;

    _export_task_log = export_task_log;
}

void CellTaskManagerImpl::append_activeTask(HTaskPtr task) {
    if (task->cpu_type == CPU_PPE) {
        ppeManager->append_activeTask(task);
    } else {
        activeTaskQueue->addLast(task);
    }
}

// SPE_ANY が指定されていた時に
// これをインクリメントしつつ呼ぶことにする。
unsigned int cur_anySpeid = 0;
unsigned int cur_anyGPUid = 0;
/**
 * ActiveTaskQueue から Task を
 * 各 SPE に渡す (backgound) TaskList に入れる
 *
 * ここの activeTaskQueue は FifoTaskManagerImpl のと意味が違い、
 * spe に渡される Task だけ入っている
 *
 * machineNum = 0(cpu = 0,gpu = 0) のときはこのルーチンには来ない
 */
void CellTaskManagerImpl::set_runTaskList(QueueInfo<HTask> *activeTaskQueue) {
    int speid;
    HTaskPtr htask = activeTaskQueue->getFirst();
    while (htask != NULL) {
        if (htask->cpu_type == CPU_PPE) {
            htask = activeTaskQueue->getNext(htask);
        }  else {
            if (htask->cpu_type == SPE_ANY) {
                if (cpu_num!=0) 
                    speid = (cur_anySpeid++ % cpu_num) + id_offset;
                else
                    speid = (cur_anySpeid++ % gpuNum) ; // gpu があれば gpu に割り振る
#ifdef __CERIUM_GPU__
            } else if (gpuNum == 0 && htask->cpu_type < (int)SPE_0) {
                // gpu = 0 で gpu を指定されたときには cpu で実行する
                speid = cur_anySpeid++ % machineNum + id_offset ;
            } else if (htask->cpu_type == GPU_ANY) {
                if(gpuNum == 0) speid = cur_anySpeid++ % machineNum;
                else speid = cur_anyGPUid++ % gpuNum;
            } else if (htask->cpu_type < GPU_0+gpuNum) {
                speid = htask->cpu_type - (int)(GPU_0);
#endif
            } else if (htask->cpu_type == ANY_ANY) { 
                speid = cur_anySpeid++ % machineNum;
            } else if (htask->cpu_type == IO_0) { 
                speid = cpu_num;
            } else if (htask->cpu_type == IO_1) { 
                speid = cpu_num+1;
            } else {
                // -1 してるのは
                // htask->cpu_type - CPU_SPE で
                // SPE0 = 1, SPE1 = 2, ... SPE5 = 6 ってなってるので
                // 配列的 (SPE0 = arr[0], SPE1 = arr[1]) にするため
                speid = htask->cpu_type - CPU_SPE - 1 + gpuNum;
                if (speid >= gpuNum && machineNum == gpuNum) {
                    // SPE specified but no CPU
                    speid = (cur_anySpeid++ % gpuNum) ; 
                } else if (speid < gpuNum && gpuNum == 0) {
                    // GPU specified but no GPU
                    speid = cur_anySpeid++ % machineNum + id_offset ;
                }
            }
            int dim ; 
            if ( (dim = ((TaskList*)(htask->rbuf))->ismultidim(cpu_num) ) && speid >= id_offset )  {
                // multi dimenstion task on CPU will be copied to all CPU
                for(int i=1; i < dim && i < cpu_num ; i++ ) {
                    TaskList *tl =  (TaskList*)(htask->rbuf);
                    while(tl) {
                        TaskListPtr dm = createTaskList();
                        memcpy(dm,tl, ((memaddr)tl->last()) - (memaddr(tl)));
                        taskListInfo[i+id_offset]->addLast(dm);
                        tl = tl->next;
                    }
                }
                set_taskList(htask, taskListInfo[0+id_offset]);
            } else
                set_taskList(htask, taskListInfo[speid]);
            
            HTaskPtr next = activeTaskQueue->getNext(htask);
            activeTaskQueue->remove(htask);
            htask = next;
        }
    }
}

void CellTaskManagerImpl::sendTaskList() {
    for (int id = 0; id < machineNum; id++) {
        mail_check(id);
        if (!speTaskList[id]->empty()) {
            continue; // まだ、走ってる
        }
        if (!taskListInfo[id]->empty()) {
            // SPE に送る TaskList の準備
            send_taskList(id);
        }
    }
}

void CellTaskManagerImpl::poll() {
    set_runTaskList(activeTaskQueue);
    // TaskList 待ちの SPE に TaskList を送る
    sendTaskList();
}

void CellTaskManagerImpl::debug_check_spe_idle(
                                               QueueInfo<HTask> * activeTaskQueue, int spe_running_) {
    printf("spu_idle! spe_running = %d : activeTaskQueue->length = %d \n",
           spe_running_, activeTaskQueue->length());
    HTaskPtr task = activeTaskQueue->getFirst();
    int tmp_i = 0;
    do {
        printf("task_name = %s ,", ppeManager->get_task_name(task));
        printf("cpu = [%d], count = %d", task->cpu_type, tmp_i);
        tmp_i++;
    } while ((task = activeTaskQueue->getNext(task)) != 0);
    printf("\n");
}

void CellTaskManagerImpl::run() {
    int spu_limit = spuIdle;
    if (machineNum == 0) {
        ppeManager->run();
        return;
    }

    do {
        // PPE side
        ppeManager->poll();
        // SPE side
        do {
            poll();
        } while (ppeManager->activeTaskQueue->empty() && spe_running > 0);

        if (spe_running < spu_limit) {
            debug_check_spe_idle(ppeManager->activeTaskQueue, spe_running);
        }

    } while (!ppeManager->activeTaskQueue->empty() || !activeTaskQueue->empty() || spe_running > 0);
    if (!waitTaskQueue->empty()) {
        show_dead_lock_info();
    }

}

static void loop_check(HTask *p, HTask *me, int depth) {
    if (p == me)
        printf("*%lx ", (long) p); // loop
    if (depth == 0)
        return;
    QueueInfo<TaskQueue> *w = p->wait_i;
    if (w) {
        for (TaskQueue *q = w->getFirst(); q; q = w->getNext(q)) {
            loop_check(q->task, me, depth - 1);
        }
    }
}

void CellTaskManagerImpl::show_dead_lock_info() {
    get_scheduler()-> printf("Dead lock detected\n   ppe queue %d\n",
                             ppeManager->activeTaskQueue->length());
    // 確か waitQueue は共通...
    // get_scheduler()-> printf("   wait queue %d\n",ppeManager->waitTaskQueue->length());
    get_scheduler()-> printf("   wait queue %d\n", waitTaskQueue->length());
    for (HTask *p = waitTaskQueue->getFirst(); p; p = waitTaskQueue->getNext(p)) {
        printf("  Waiting task%d %lx", p->command, (long) p);
        QueueInfo<TaskQueue> *w = p->wait_i;
        if (w) {
            for (TaskQueue *q = w->getFirst(); q; q = w->getNext(q)) {
                printf("    waiting task%d %lx", q->task->command,
                       (long) q->task);
                if (!waitTaskQueue->find(q->task)) {
                    printf("!"); // stray task
                }
                loop_check(q->task, p, 10);
            }
        }
        printf("\n");
    }
    get_scheduler()-> printf("   spe queue %d\n", activeTaskQueue->length());
    for (int i = 0; i < machineNum; i++) {
        get_scheduler()-> printf("   spe %d send %d wait %d\n", i,
                                 speTaskList[i]->length(), taskListInfo[i]->length());
    }
}

/**
 * SPE からのメールをチェックする
 */

void CellTaskManagerImpl::mail_check(int id) {
    memaddr data;

    // SPE Scheduler からの mail check
     while (speThreads->has_mail(id,1,&data)) {
        if (data == (memaddr) MY_SPE_STATUS_READY) {
            //  MY_SPE_STATUS_READY: SPE が持ってた Task 全て終了
            // freeAll する前に循環リストに戻す
            spe_running--;
            if (!speTaskList[id]->empty()) {
                speTaskList[id]->getLast()->next = speTaskList[id];
                speTaskList[id]->freeAll();
            }
            // printf("SPE %d status ready, %d running\n",id, spe_running);
        } else if (data == (memaddr) MY_SPE_COMMAND_MALLOC) {
            // MY_SPE_COMMAND_MALLOC   SPE からのmain memory request
            send_alloc_reply(this, id, speThreads);
        } else if (data == (memaddr) MY_SPE_NOP) {
            continue;
        } else {
#ifdef TASK_LIST_MAIL
            // multi dimensionだったらcount downする 
            TaskListPtr list = (TaskListPtr)data;
            if (--list->self->flag.dim_count == 0)
                check_task_list_finish(schedTaskManager, list, waitTaskQueue);
#else
            // 終了したタスク(PPEにあるのでアドレス)
            HTaskPtr task = (HTaskPtr) data;
#if 0
            if (task->cpu_type != CPU_SPE) {
                const char *name = get_task_name(task);
                if (name != NULL) {
                    printf("[SPE] ");
                    printf("Task id : %d, ", task->command);
                    printf("Task name : %s\n", name);
                }
            }
#endif
#ifndef NOT_CHECK

            if (task != NULL) {
                //SPE で処理された Task が返ってくるはず。それがもし、type PPE なら・・・
                if (task->cpu_type == CPU_PPE) {
                    printf("attention : PPE task run on SPE\n");
                    printf("Task id : %d\n", task->command);
                    const char *name = get_task_name(task);
                    if (name != NULL) {
                        printf("Task name : %s\n", name);
                    }
                }
            }

#endif

            task->post_func(schedTaskManager, task->post_arg1, task->post_arg2);
            check_task_finish(task, waitTaskQueue);
#endif
        }
        // MY_SPE_NOP: 特に意味のないコマンド
    }
}

void CellTaskManagerImpl::polling() {
    // may  call recursively check_task_list_finish()
    // we need fifo here
    for (int i = 0; i < machineNum; i++) {
        mail_check(i);
    }
}

static void send_alloc_reply(CellTaskManagerImpl *tm, int id,
                             Threads *speThreads) {

    /**
     * info[0] = alloc_id; (CellScheduler::mainMem_alloc 参照)
     * info[1] = alloc_addr;
     */
    memaddr alloc_info[2];
    long alloc_size;
    long command;

    speThreads->get_mail(id, 2, alloc_info);
    command = (long) alloc_info[0];
    alloc_size = (long) alloc_info[1];

    alloc_info[1] = (memaddr) tm->allocate(alloc_size);
    //__debug_ppe("[PPE] MALLOCED 0x%lx from [SPE %d]\n", alloc_info[1],id);
    // 今のところ何もしてない。どうも、この allocate を free
    // するのは、SPE task が返した値を見て行うらしい。それは、
    // 忘れやすいのではないか?
    speThreads->add_output_tasklist(command, alloc_info[1], alloc_size);

    speThreads->send_mail(id, 2, alloc_info);
}

/**
 * 条件を満たしたら SPE に TaskList を送信する
 * 条件1. SPE が持ってた TaskList を終了して、次の TaskList を待ってる
 * 条件2. SPE に送る TaskList に Task がある
 *
 * SPE で実行終了した speTaskList  と
 * これから実行する taskListInfo  のバッファを入れ替える
 */
void CellTaskManagerImpl::send_taskList(int id) {
    // speTaskList は走り終わった ppe の Task の List.
    // taskListInfo はこれから走る Task の List.
    // 交換して実行する
    QueueInfo<TaskList> *tmp = taskListInfo[id];
    taskListInfo[id] = speTaskList[id];
    speTaskList[id] = tmp;

    // speTaskList は本来は循環リストなのだけど、実行中は線形リストである。
    // spe の Task が終了した時点でなおす。
    tmp->getLast()->next = 0;
    TaskListPtr p = tmp->getFirst();
    // printf("SPE %d task list sending\n",id);

    // speThreads->send_mail(id, 1, p);
    int run = speThreads->spawn_task(id, p);
    spe_running += run;
    // printf("SPE %d task list sent\n",id);
}

void CellTaskManagerImpl::show_profile() {
#ifdef __CERIUM_GPU__
    for (int id = 0; id < gpuNum; id++) {
        HTaskPtr t = schedTaskManager->create_task(ShowTime, 0, 0, 0, 0);
        t->set_cpu((CPU_TYPE) (id + GPU_0));
        t->spawn();
    }
#endif
    HTaskPtr t = schedTaskManager->create_task(ShowTime, 0, 0, 0, 0);
    t->set_cpu(SPE_ANY);
    t->iterate(machineNum);
}

void CellTaskManagerImpl::start_profile() {
#ifdef __CERIUM_GPU__
    for (int id = 0; id < gpuNum; id++) {
        HTaskPtr t = schedTaskManager->create_task(StartProfile, 0, 0, 0, 0);
        t->set_cpu((CPU_TYPE) (id + GPU_0));
        t->spawn();
    }
#endif
    HTaskPtr t = schedTaskManager->create_task(StartProfile, 0, 0, 0, 0);
    t->set_cpu(SPE_ANY);
    t->iterate(machineNum);
}

void CellTaskManagerImpl::export_task_log() {
    ExportTaskLog _export(taskLogQueue);
    _export.printOut();
}

void CellTaskManagerImpl::print_arch() {
    printf("CellTaskManager\n");
}

TaskListPtr CellTaskManagerImpl::createTaskList()
{
    TaskListPtr tl = taskListInfo[0]->create();
    bzero(tl->tasks,sizeof(Task)*TASK_MAX_SIZE);
    return tl;
}

#if defined (__CERIUM_CELL__)||defined (__CERIUM_GPU__)
TaskManagerImpl *create_impl(int num, int num_gpu, int useRefDma)
{
#ifdef __CERIUM_CELL__
    Threads *cpus = new SpeThreads(num);

#elif __CERIUM_GPU__    
    init_task_list(gpu_task_list);
    Threads *cpus = new CpuThreads(num, useRefDma,num_gpu);
    num += num_gpu; // for GPU
#else    
    Threads *cpus = new CpuThreads(num, useRefDma);
#endif
    return new CellTaskManagerImpl(num, num_gpu, cpus);
}
#endif // __CERIUM_CELL