view TaskManager/Cell/CellTaskManagerImpl.cc @ 956:15026ebf7a17

unified queue worked on Mac OS X
author Shinji KONO <kono@ie.u-ryukyu.ac.jp>
date Wed, 04 Aug 2010 17:02:26 +0900
parents 0d404f6c36a8
children 559d041313dc
line wrap: on
line source

#define DEBUG
#include "error.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "CellTaskManagerImpl.h"
#include "HTask.h"
#include "QueueInfo.h"
#include "SchedTask.h"
#include "MainScheduler.h"
#include "types.h"
#include "SysFunc.h"

static void send_alloc_reply(CellTaskManagerImpl *tm, int id,  SpeThreads *speThreads);

CellTaskManagerImpl::~CellTaskManagerImpl()
{
    delete speThreads;
    delete [] speTaskList;

    delete ppeManager;
}

void
CellTaskManagerImpl::init()
{
    spe_running = 0;

    // 実行可能な HTask のリスト。 FifoTaskManager と共有される
    activeTaskQueue = new QueueInfo<HTask>(htaskPool);
    // HTask の factory。 HTaskInfo ならなんでもいい。
    htaskImpl = activeTaskQueue ; // any HTaskInfo
    
    speThreads = new SpeThreads(machineNum);
    speThreads->init();

    // 実行される Task 用の パイプライン用のダブルバッファ
    speTaskList   = new *QueueInfo<TaskList>[machineNum]; // spe上の走っている Task の配列
    taskListInfo  = new *QueueInfo<TaskList>[machineNum]; // 次に走る Task の配列

    for (int i = 0; i < machineNum; i++) {
	taskListInfo[i] = new QueueInfo<TaskList>(taskListPool);
	speTaskList[i] = new QueueInfo<TaskList>(taskListPool);
    }

    // PPE 側の管理をする Manager
    ppeManager = new FifoTaskManagerImpl(machineNum);
    // 大半のTaskQueueInfoは、共有される
    MainScheduler *mscheduler = new MainScheduler;
    set_scheduler(mscheduler);
    ppeManager->init(mscheduler, this); // ここで HTaskInfo が共有される。
    
    ppeManager->get_scheduler()->set_manager(this);

    // Task 内からManager->task_create() とかするときに必要なTaskManager。
    // 現状では ppe 側からしか動かない
    // spe 側から Task create できない
    schedTaskManager = new SchedTask();
    schedTaskManager->init(0,0,0,ppeManager->get_scheduler());
    ppeManager->schedTaskManager = schedTaskManager;
}

void
CellTaskManagerImpl::append_activeTask(HTaskPtr task)
{
    if (task->cpu_type == CPU_PPE) {
        ppeManager->append_activeTask(task);
    } else {
        activeTaskQueue->addLast(task);
    }
}

// SPE_ANY が指定されていた時に
// これをインクリメントしつつ呼ぶことにする。
unsigned int cur_anySpeid = 0;

/**
 * ActiveTaskQueue から Task を
 * 各 SPE に渡す (backgound) TaskList に入れる
 *
 * ここの activeTaskQueue は FifoTaskManagerImpl のと意味が違い、
 * spe に渡される Task だけ入っている
 */
void
CellTaskManagerImpl::set_runTaskList()
{
    int speid;

    while (HTaskPtr htask = activeTaskQueue->poll()) {

	if (htask->cpu_type == SPE_ANY) {
	    speid = cur_anySpeid++;
	} else {
	    // -1 してるのは
	    // htask->cpu_type - CPU_SPE で
	    // SPE0 = 1, SPE1 = 2, ... SPE5 = 6 ってなってるので
	    // 配列的 (SPE0 = arr[0], SPE1 = arr[1]) にするため
	    speid = htask->cpu_type - CPU_SPE - 1;
	}
	speid %= machineNum;
	set_taskList(htask, taskListInfo[speid]);
    }
}

void
CellTaskManagerImpl::sendTaskList()
{
    for (int id = 0; id < machineNum; id++)  {
	mail_check(id);
	if (!speTaskList[id]->empty()) 
	    continue; // まだ、走ってる
	if (! taskListInfo[id]->empty() ) {
	    // SPE に送る TaskList の準備
	    send_taskList(id);
	    spe_running++;
	}
    }
}

void
CellTaskManagerImpl::poll()
{
    set_runTaskList();
    // TaskList 待ちの SPE に TaskList を送る
    sendTaskList();
}

void
CellTaskManagerImpl::run()
{
    do {
        // PPE side
	ppeManager->poll();
        // SPE side
	do {
	    poll();
	} while (ppeManager->activeTaskQueue->empty() && spe_running >0 );
    } while (!ppeManager->activeTaskQueue->empty() || 
	!activeTaskQueue->empty() ||
	spe_running >0); 
    if (!waitTaskQueue->empty()) {
	show_dead_lock_info();
    }
}

static void
loop_check(HTask *p,HTask *me, int depth)
{
    if (p==me) printf("*%lx ",(long)p); // loop
    if (depth==0) return;
    TaskQueueInfo *w = p->wait_i;
    if (w) {
	for( TaskQueue *q = w->getFirst(); q; q = w->getNext(q)) {
	    loop_check(q->task,me, depth-1);
	}
    }
}

void
CellTaskManagerImpl::show_dead_lock_info()
{
    get_scheduler()-> printf("Dead lock detected\n   ppe queue %d\n",
	ppeManager->activeTaskQueue->length());
    // 確か waitQueue は共通...
    // get_scheduler()-> printf("   wait queue %d\n",ppeManager->waitTaskQueue->length()); 
    get_scheduler()-> printf("   wait queue %d\n",waitTaskQueue->length()); 
    for( HTask *p = waitTaskQueue->getFirst(); p; p = waitTaskQueue->getNext(p)) {
	printf("  Waiting task%d %lx",p->command, (long)p);
	TaskQueueInfo *w = p->wait_i;
	if (w) {
	    for( TaskQueue *q = w->getFirst(); q; q = w->getNext(q)) {
		printf("    waiting task%d %lx",q->task->command, (long)q->task);
		if (!waitTaskQueue->find(q->task)) {
		    printf("!"); // stray task
		}
		loop_check(q->task,p, 10);
	    }
	}
        printf("\n");
    }
    get_scheduler()-> printf("   spe queue %d\n",activeTaskQueue->length()); 
    for (int i = 0; i < machineNum; i++)  {
	get_scheduler()-> printf("   spe %d send %d wait %d\n",i, 
		  speTaskList[i]->length(), taskListInfo[i]->length());
    }
}

/**
 * SPE からのメールをチェックする
 */

void
CellTaskManagerImpl::mail_check(int id)
{
    memaddr data;

    // SPE Scheduler からの mail check
    while (speThreads->has_mail(id, 1, &data)) {				
	if (data == (memaddr)MY_SPE_STATUS_READY) {
	    //  MY_SPE_STATUS_READY: SPE が持ってた Task 全て終了
	    // freeAll する前に循環リストに戻す
	    speTaskList[id]->getLast()->next = speTaskList[id];
	    speTaskList[id]->freeAll();
	    spe_running--;
// printf("SPE %d status ready, %d running\n",id, spe_running);
	} else if (data == (memaddr)MY_SPE_COMMAND_MALLOC) {
	    // MY_SPE_COMMAND_MALLOC   SPE からのmain memory request
	    send_alloc_reply(this, id, speThreads);
	} else if (data > (memaddr)MY_SPE_NOP) {
#ifdef TASK_LIST_MAIL
	    TaskListPtr list = (TaskListPtr)data;
	    check_task_list_finish(schedTaskManager, list, waitTaskQueue);
#else
	    // 終了したタスク(PPEにあるのでアドレス)
	    HTaskPtr task = (HTaskPtr)data;
	    task->post_func(schedTaskManager, task->post_arg1, task->post_arg2);
	    check_task_finish(task, waitTaskQueue);
#endif
	}
	// MY_SPE_NOP: 特に意味のないコマンド
    }
}

void
CellTaskManagerImpl::polling()
{
    // may  call recursively check_task_list_finish() 
    // we need fifo here
    for (int i = 0; i < machineNum; i++)  {
	mail_check(i);
    }
}

static void
send_alloc_reply(CellTaskManagerImpl *tm, int id, SpeThreads *speThreads)
{

    /**
     * info[0] = alloc_id; (CellScheduler::mainMem_alloc 参照)
     * info[1] = alloc_addr;
     */
    memaddr alloc_info[2];
    long alloc_size;
    long command;
    
    speThreads->get_mail(id, 2, alloc_info);
    command = (long)alloc_info[0];
    alloc_size = (long)alloc_info[1];

    
    alloc_info[1] = (memaddr)tm->allocate(alloc_size);
    //__debug_ppe("[PPE] MALLOCED 0x%lx from [SPE %d]\n", alloc_info[1],id);
    // 今のところ何もしてない。どうも、この allocate を free 
    // するのは、SPE task が返した値を見て行うらしい。それは、
    // 忘れやすいのではないか?
    speThreads->add_output_tasklist(command, alloc_info[1], alloc_size);

    speThreads->send_mail(id, 2, alloc_info);
}

/**
 * 条件を満たしたら SPE に TaskList を送信する
 * 条件1. SPE が持ってた TaskList を終了して、次の TaskList を待ってる
 * 条件2. SPE に送る TaskList に Task がある
 *
 * SPE で実行終了した speTaskList  と
 * これから実行する taskListInfo  のバッファを入れ替える
 */
void
CellTaskManagerImpl::send_taskList(int id)
{
    // speTaskList は走り終わった ppe の Task の List. 
    // taskListInfo はこれから走る Task の List. 
    // 交換して実行する
    TaskListInfoPtr tmp = taskListInfo[id];
    taskListInfo[id] = speTaskList[id];
    speTaskList[id] = tmp;

    // speTaskList は本来は循環リストなのだけど、実行中は線形リストである。
    // spe の Task が終了した時点でなおす。
    tmp->getLast()->next = 0;
    TaskListPtr p = tmp->getFirst();
// printf("SPE %d task list sending\n",id);
    speThreads->send_mail(id, 1, (memaddr *)&p);
// printf("SPE %d task list sent\n",id);
}

void CellTaskManagerImpl::show_profile() {
    for (int id = 0; id < machineNum; id++) {	    
	HTaskPtr t = schedTaskManager->create_task(ShowTime,0,0,0,0);
	t->set_cpu((CPU_TYPE)(id+2));
	t->spawn();
    }
}

void CellTaskManagerImpl::start_profile() {
    for (int id = 0; id < machineNum; id++) {	    
	HTaskPtr t = schedTaskManager->create_task(StartProfile,0,0,0,0);
	t->set_cpu((CPU_TYPE)(id+2));
	t->spawn();
    }
}


#ifdef __CERIUM_CELL__
TaskManagerImpl*
create_impl(int num)
{
    return new CellTaskManagerImpl(num);
}
#endif // __CERIUM_CELL