view TaskManager/Cell/CellTaskManagerImpl.cc @ 1477:5ca4e9469c65 draft

remove GpuTaskManagerImpl
author Yuhi TOMARI <yuhi@cr.ie.u-ryukyu.ac.jp>
date Thu, 19 Jul 2012 14:03:49 +0900
parents 840dee241530
children 163220e54cc0
line wrap: on
line source

#define DEBUG
#include "error.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "CellTaskManagerImpl.h"
#include "HTask.h"
#include "QueueInfo.h"
#include "ExportTaskLog.h"
#include "SchedTask.h"
#include "MainScheduler.h"
#include "types.h"
#include "SysFunc.h"

static void send_alloc_reply(CellTaskManagerImpl *tm, int id,
		Threads *speThreads);

CellTaskManagerImpl::~CellTaskManagerImpl() {

	delete speThreads;
	delete[] speTaskList;

	delete ppeManager;
}

void CellTaskManagerImpl::init(int spuIdle_,int useRefDma) {
	spe_running = 0;
	spuIdle = spuIdle_;

	// 実行される Task 用の パイプライン用のダブルバッファ
	speTaskList = new QueueInfo<TaskList>*[machineNum]; // spe上の走っている Task の配列
	taskListInfo = new QueueInfo<TaskList>*[machineNum]; // 次に走る Task の配列


	for (int i = 0; i < machineNum; i++) {
		taskListInfo[i] = new QueueInfo<TaskList> (taskListPool);
		speTaskList[i] = new QueueInfo<TaskList> (taskListPool);
	}

	// PPE 側の管理をする Manager
	ppeManager = new FifoTaskManagerImpl(machineNum);
	// 大半のTaskQueueInfoは、共有される
	MainScheduler *mscheduler = new MainScheduler;
	set_scheduler(mscheduler);
	ppeManager->init(mscheduler, this, useRefDma); // ここで HTaskInfo が共有される。

	speThreads->init();

	// 実行可能な HTask のリスト。 FifoTaskManager と共有される
	activeTaskQueue = ppeManager->activeTaskQueue;
	// HTask の factory。 HTaskInfo ならなんでもいい。
	htaskImpl = activeTaskQueue; // any HTaskInfo


	ppeManager->get_scheduler()->set_manager(this);

	// Task 内からManager->task_create() とかするときに必要なTaskManager。
	// 現状では ppe 側からしか動かない
	// spe 側から Task create できない
	schedTaskManager = new SchedTask();
	schedTaskManager->init(0, 0, ppeManager->get_scheduler(), 0);
	ppeManager->schedTaskManager = schedTaskManager;
}

void CellTaskManagerImpl::append_activeTask(HTaskPtr task) {
	if (task->cpu_type == CPU_PPE) {
		ppeManager->append_activeTask(task);
	} else {
		activeTaskQueue->addLast(task);
	}
}

// SPE_ANY が指定されていた時に
// これをインクリメントしつつ呼ぶことにする。
unsigned int cur_anySpeid = 0;

/**
 * ActiveTaskQueue から Task を
 * 各 SPE に渡す (backgound) TaskList に入れる
 *
 * ここの activeTaskQueue は FifoTaskManagerImpl のと意味が違い、
 * spe に渡される Task だけ入っている
 */
void CellTaskManagerImpl::set_runTaskList(QueueInfo<HTask> *activeTaskQueue) {
	int speid;
	HTaskPtr htask = activeTaskQueue->getFirst();
	while (htask != NULL) {

		if (htask->cpu_type == CPU_PPE) {

			htask = activeTaskQueue->getNext(htask);

		} else {
			if (htask->cpu_type == SPE_ANY) {
				speid = cur_anySpeid++;
			} else {
				// -1 してるのは
				// htask->cpu_type - CPU_SPE で
				// SPE0 = 1, SPE1 = 2, ... SPE5 = 6 ってなってるので
				// 配列的 (SPE0 = arr[0], SPE1 = arr[1]) にするため
				speid = htask->cpu_type - CPU_SPE - 1;
			}

			speid %= machineNum;
			set_taskList(htask, taskListInfo[speid]);

			HTaskPtr next = activeTaskQueue->getNext(htask);
			activeTaskQueue->remove(htask);
			htask = next;

		}
	}
}

void CellTaskManagerImpl::sendTaskList() {
	for (int id = 0; id < machineNum; id++) {
		mail_check(id);
		if (!speTaskList[id]->empty()) {
			continue; // まだ、走ってる
		}
		if (!taskListInfo[id]->empty()) {
			// SPE に送る TaskList の準備
			send_taskList(id);
			spe_running++;
		}
	}
}

void CellTaskManagerImpl::poll() {
	set_runTaskList(activeTaskQueue);
	// TaskList 待ちの SPE に TaskList を送る
	sendTaskList();
}

void CellTaskManagerImpl::debug_check_spe_idle(
		QueueInfo<HTask> * activeTaskQueue, int spe_running_) {
	printf("spu_idle! spe_running = %d : activeTaskQueue->length = %d \n",
			spe_running_, activeTaskQueue->length());
	HTaskPtr task = activeTaskQueue->getFirst();
	int tmp_i = 0;
	do {
		printf("task_name = %s ,", ppeManager->get_task_name(task));
		printf("cpu = [%d], count = %d", task->cpu_type, tmp_i);
		tmp_i++;
	} while ((task = activeTaskQueue->getNext(task)) != 0);
	printf("\n");
}

void CellTaskManagerImpl::run() {
    int spu_limit = spuIdle;
    if (machineNum == 0) {
        ppeManager->run();
        return;
    }

    do {
        // PPE side
        ppeManager->poll();
        // SPE side
        do {
            poll();
        } while (ppeManager->activeTaskQueue->empty() && spe_running > 0);

        if (spe_running < spu_limit) {
            debug_check_spe_idle(ppeManager->activeTaskQueue, spe_running);
        }

    } while (!ppeManager->activeTaskQueue->empty() || !activeTaskQueue->empty() || spe_running > 0);
    if (!waitTaskQueue->empty()) {
        show_dead_lock_info();
    }

}

static void loop_check(HTask *p, HTask *me, int depth) {
	if (p == me)
		printf("*%lx ", (long) p); // loop
	if (depth == 0)
		return;
	QueueInfo<TaskQueue> *w = p->wait_i;
	if (w) {
		for (TaskQueue *q = w->getFirst(); q; q = w->getNext(q)) {
			loop_check(q->task, me, depth - 1);
		}
	}
}

void CellTaskManagerImpl::show_dead_lock_info() {
	get_scheduler()-> printf("Dead lock detected\n   ppe queue %d\n",
			ppeManager->activeTaskQueue->length());
	// 確か waitQueue は共通...
	// get_scheduler()-> printf("   wait queue %d\n",ppeManager->waitTaskQueue->length());
	get_scheduler()-> printf("   wait queue %d\n", waitTaskQueue->length());
	for (HTask *p = waitTaskQueue->getFirst(); p; p = waitTaskQueue->getNext(p)) {
		printf("  Waiting task%d %lx", p->command, (long) p);
		QueueInfo<TaskQueue> *w = p->wait_i;
		if (w) {
			for (TaskQueue *q = w->getFirst(); q; q = w->getNext(q)) {
				printf("    waiting task%d %lx", q->task->command,
						(long) q->task);
				if (!waitTaskQueue->find(q->task)) {
					printf("!"); // stray task
				}
				loop_check(q->task, p, 10);
			}
		}
		printf("\n");
	}
	get_scheduler()-> printf("   spe queue %d\n", activeTaskQueue->length());
	for (int i = 0; i < machineNum; i++) {
		get_scheduler()-> printf("   spe %d send %d wait %d\n", i,
				speTaskList[i]->length(), taskListInfo[i]->length());
	}
}

/**
 * SPE からのメールをチェックする
 */

void CellTaskManagerImpl::mail_check(int id) {
	memaddr data;

	// SPE Scheduler からの mail check
	while (speThreads->has_mail(id, 1, &data)) {
		if (data == (memaddr) MY_SPE_STATUS_READY) {
			//  MY_SPE_STATUS_READY: SPE が持ってた Task 全て終了
			// freeAll する前に循環リストに戻す
			speTaskList[id]->getLast()->next = speTaskList[id];
			speTaskList[id]->freeAll();
			spe_running--;
			// printf("SPE %d status ready, %d running\n",id, spe_running);
		} else if (data == (memaddr) MY_SPE_COMMAND_MALLOC) {
			// MY_SPE_COMMAND_MALLOC   SPE からのmain memory request
			send_alloc_reply(this, id, speThreads);
		} else if (data > (memaddr) MY_SPE_NOP) {
#ifdef TASK_LIST_MAIL
			TaskListPtr list = (TaskListPtr)data;
			check_task_list_finish(schedTaskManager, list, waitTaskQueue);
#else
			// 終了したタスク(PPEにあるのでアドレス)
			HTaskPtr task = (HTaskPtr) data;
#if 0
			if (task->cpu_type != CPU_SPE) {
				const char *name = get_task_name(task);
				if (name != NULL) {
					printf("[SPE] ");
					printf("Task id : %d, ", task->command);
					printf("Task name : %s\n", name);
				}
			}
#endif
#ifndef NOT_CHECK

			if (task != NULL) {
				//SPE で処理された Task が返ってくるはず。それがもし、type PPE なら・・・
				if (task->cpu_type == CPU_PPE) {
					printf("attention : PPE task run on SPE\n");
					printf("Task id : %d\n", task->command);
					const char *name = get_task_name(task);
					if (name != NULL) {
						printf("Task name : %s\n", name);
					}
				}
			}

#endif

			task->post_func(schedTaskManager, task->post_arg1, task->post_arg2);
			check_task_finish(task, waitTaskQueue);
#endif
		}
		// MY_SPE_NOP: 特に意味のないコマンド
	}
}

void CellTaskManagerImpl::polling() {
	// may  call recursively check_task_list_finish()
	// we need fifo here
	for (int i = 0; i < machineNum; i++) {
		mail_check(i);
	}
}

static void send_alloc_reply(CellTaskManagerImpl *tm, int id,
		Threads *speThreads) {

	/**
	 * info[0] = alloc_id; (CellScheduler::mainMem_alloc 参照)
	 * info[1] = alloc_addr;
	 */
	memaddr alloc_info[2];
	long alloc_size;
	long command;

	speThreads->get_mail(id, 2, alloc_info);
	command = (long) alloc_info[0];
	alloc_size = (long) alloc_info[1];

	alloc_info[1] = (memaddr) tm->allocate(alloc_size);
	//__debug_ppe("[PPE] MALLOCED 0x%lx from [SPE %d]\n", alloc_info[1],id);
	// 今のところ何もしてない。どうも、この allocate を free
	// するのは、SPE task が返した値を見て行うらしい。それは、
	// 忘れやすいのではないか?
	speThreads->add_output_tasklist(command, alloc_info[1], alloc_size);

	speThreads->send_mail(id, 2, alloc_info);
}

/**
 * 条件を満たしたら SPE に TaskList を送信する
 * 条件1. SPE が持ってた TaskList を終了して、次の TaskList を待ってる
 * 条件2. SPE に送る TaskList に Task がある
 *
 * SPE で実行終了した speTaskList  と
 * これから実行する taskListInfo  のバッファを入れ替える
 */
void CellTaskManagerImpl::send_taskList(int id) {
	// speTaskList は走り終わった ppe の Task の List.
	// taskListInfo はこれから走る Task の List.
	// 交換して実行する
	QueueInfo<TaskList> *tmp = taskListInfo[id];
	taskListInfo[id] = speTaskList[id];
	speTaskList[id] = tmp;

	// speTaskList は本来は循環リストなのだけど、実行中は線形リストである。
	// spe の Task が終了した時点でなおす。
	tmp->getLast()->next = 0;
	TaskListPtr p = tmp->getFirst();
	// printf("SPE %d task list sending\n",id);
	speThreads->send_mail(id, 1, (memaddr *) &p);
	// printf("SPE %d task list sent\n",id);
}

void CellTaskManagerImpl::show_profile() {
	for (int id = 0; id < machineNum; id++) {
		HTaskPtr t = schedTaskManager->create_task(ShowTime, 0, 0, 0, 0);
		t->set_cpu((CPU_TYPE) (id + SPE_0));
		t->spawn();
	}
}

void CellTaskManagerImpl::start_profile() {
	for (int id = 0; id < machineNum; id++) {
		HTaskPtr t = schedTaskManager->create_task(StartProfile, 0, 0, 0, 0);
		t->set_cpu((CPU_TYPE) (id + SPE_0));
		t->spawn();
	}
}

void CellTaskManagerImpl::export_task_log() {
    ExportTaskLog _export(taskLogQueue);
    _export.printOut();
}

void CellTaskManagerImpl::print_arch() {
	printf("CellTaskManager\n");
}

TaskListPtr CellTaskManagerImpl::createTaskList()
{
        TaskListPtr tl = taskListInfo[0]->create();
        bzero(tl->tasks,sizeof(Task)*TASK_MAX_SIZE);
	return tl;
}


#ifdef __CERIUM_CELL__
TaskManagerImpl*
create_impl(int num, int useRefDma)
{
	Threads *cpus = new SpeThreads(num);
	return new CellTaskManagerImpl(num,cpus);
}
#endif // __CERIUM_CELL