view example/word_count_test/main.cc @ 674:bde5f13adf10

fix many task example (sort). Dummy task is now system supported.
author Shinji KONO <kono@ie.u-ryukyu.ac.jp>
date Sun, 06 Dec 2009 00:54:10 +0900
parents d9111086b2c4
children f725c6455d19
line wrap: on
line source

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "TaskManager.h"
#include "SchedTask.h"
#include "Func.h"
#include "WordCount.h"

extern void task_init();

const char *usr_help_str = "Usage: ./word_count [-cpu spe_num] [-file filename]\n";

typedef struct {
    caddr_t file_mmap;
    off_t size;
} st_mmap_t;



/*与えられたsizeをfix_byte_sizeの倍数にする(丸め込むっていうのかな?)*/
static int
fix_byte(int size,int fix_byte_size)
{
    size = (size/fix_byte_size)*fix_byte_size  + ((size%fix_byte_size)!= 0)*fix_byte_size;
    
    return size;
}


static st_mmap_t
my_mmap(char *filename)
{

    /*マッピングだよ!*/
    int fd = -1;
    int map = MAP_PRIVATE;
    st_mmap_t st_mmap;
    struct stat sb;
    
    if ((fd=open(filename,O_RDONLY,0666))==0) {
	fprintf(stderr,"can't open %s\n",filename);
    }
    
    if (fstat(fd,&sb)) {
	fprintf(stderr,"can't fstat %s\n",filename);
    }

    printf("file size %d\n",(int)sb.st_size);
   
    /*sizeをページングサイズの倍数にあわせる*/
    st_mmap.size = fix_byte(sb.st_size,4096);

    printf("fix 4096byte file size %d\n",(int)st_mmap.size);

    st_mmap.file_mmap = (char*)mmap(NULL,st_mmap.size,PROT_READ,map,fd,(off_t)0);
    if (st_mmap.file_mmap == (caddr_t)-1) {
	fprintf(stderr,"Can't mmap file\n");
	perror(NULL);
	exit(0);
    }

    return st_mmap;

}

static void
run_tasks(SchedTask *manager, WordCount *w, int task_count, HTaskPtr t_next, int size) 
{
    for (int j = 0; j < task_count && w->size>0; j++) {
	int i = w->task_spwaned++;
#ifdef SIMPLE_TASK
	//    printf("div %0x\n", (w->file_mmap + i*w->division_size));
	HTaskPtr t_exec = manager->create_task(TASK_EXEC,
	    (memaddr)(w->file_mmap + i*w->division_size), size,
	    (memaddr)(w->o_data + i*w->out_size), w->division_out_size);
#else
	HTaskPtr t_exec = manager->create_task(TASK_EXEC);
	if (size>w->size) size = w->size;
	t_exec->add_inData(w->file_mmap + i*w->division_size, size);
	t_exec->add_outData(w->o_data + i*w->status_num, w->division_out_size);
	t_exec->add_outData(w->head_tail_flag + i*w->pad, w->division_out_size);
	t_exec->add_param(size);
#endif
	t_exec->set_cpu(SPE_ANY);
	t_next->wait_for(t_exec);
	t_exec->spawn();
	w->size -= size;
	w->task_num--;
    }
}

SchedDefineTask1(RUN_TASK_BLOCKS,run16);

static int
run16(SchedTask *manager, void *in, void *out)
{
#ifdef SIMPLE_TASK
    WordCount *w = *(WordCount **)in;
#else
    WordCount *w = (WordCount *)manager->get_param(0);
#endif
   
    if (w->task_num < w->task_blocks) {
	if (w->size >= w->division_size) 
	    run_tasks(manager,w,w->task_blocks, w->t_print, w->division_size);
	while (w->size>0) 
	    run_tasks(manager,w,1, w->t_print, w->size);
	// printf("run16 last %d\n",w->task_num);
    } else {
#ifdef SIMPLE_TASK
	HTaskPtr t_next = manager->create_task(RUN_TASK_BLOCKS,
	    (memaddr)&w->self,sizeof(memaddr),0,0);
#else
	HTaskPtr t_next = manager->create_task(RUN_TASK_BLOCKS);
	t_next->set_param(0,(void*)w);
#endif
	w->t_print->wait_for(t_next);

	run_tasks(manager,w, w->task_blocks, t_next, w->division_size);

	t_next->spawn();
	// printf("run16 next %d\n",w->task_num);
    }
    return 0;
}


static int blocks = 48;
static int division = 16; // in Kbyte
static int profile = 0;

static void
run_start(TaskManager *manager, char *filename)
{
    HTaskPtr t_print;

    st_mmap_t st_mmap;
    st_mmap = my_mmap(filename);
    WordCount *w = (WordCount*)manager->allocate(sizeof(WordCount));
    // bzero(w,sizeof(WordCount));

    w->self = w;
    w->task_blocks = blocks;
    w->task_spwaned = 0;

    /*sizeはdivision_sizeの倍数にしている。*/
    w->size = st_mmap.size;
    w->file_mmap = st_mmap.file_mmap;

    /* 1task分のデータサイズ(byte) */
    if (w->size >= 1024*division) {
	w->division_size = 1024 * division;/*16kbyte*/
    } else {
	w->division_size = w->size;
    }

    printf("dvision_size %d\n",w->division_size);

    /* "word num" and "line num" */
    w-> status_num = 2;
    /* taskの数 */
    w-> task_num = w->size / w->division_size;
    int out_task_num = w->task_num + (w->division_size*w->task_num < w->size);

    w->out_task_num = out_task_num;
    printf("task_num %d\n",w->task_num);

    /* out用のdivision_size. statusが2つなので、あわせて16byteになるように、long long(8byte)を使用 */

#ifdef SIMPLE_TASK
    w-> division_out_size = sizeof(unsigned long long)*4;
    int out_size = w->division_out_size*out_task_num;
    w->o_data = (unsigned long long *)manager->allocate(out_size);
    w-> out_size = 4;
#else
    w-> division_out_size = 16;
    int out_size = w->division_out_size*out_task_num;
    /* out用のデータのサイズ。*/
    caddr_t p = (caddr_t) manager->allocate(out_size*2);
    w->o_data = (unsigned long long*)p;
    //bzero(w->o_data,out_size);

    w-> pad = 2;
    w->head_tail_flag = (unsigned long long*)(p+out_size);
    // bzero(w->head_tail_flag,out_size);
#endif
    printf("out size %d\n",out_size);

    /*各SPEの結果を合計して出力するタスク*/

#ifdef SIMPLE_TASK
    t_print = manager->create_task(TASK_PRINT,
	(memaddr)&w->self,sizeof(memaddr),0,0);
#else
    t_print = manager->create_task(TASK_PRINT);
    t_print->add_inData(w->o_data, out_size);
    t_print->add_inData(w->head_tail_flag, out_size);
    t_print->add_param(out_task_num);
    t_print->add_param(w->status_num);
    t_print->add_param(out_task_num);
    t_print->add_param(w->pad);
#endif

    w->t_print = t_print;

    for(int i = 0;i<1;i++) {
	/* Task を task_blocks ずつ起動する Task */
        /* serialize されていると仮定する... */
    #ifdef SIMPLE_TASK
	HTaskPtr t_exec = manager->create_task(RUN_TASK_BLOCKS,
	    (memaddr)&w->self,sizeof(memaddr),0,0);
    #else
	HTaskPtr t_exec = manager->create_task(RUN_TASK_BLOCKS);
	t_exec->set_param(0,(void*)w);
    #endif
	t_exec->spawn();
	t_print->wait_for(t_exec);
    }

    t_print->spawn();
}

static char*
init(int argc, char **argv)
{
    
    char *filename = 0;
    
    for (int i = 1; argv[i]; ++i) {	
	if (strcmp(argv[i], "-file") == 0) {
	    filename = argv[i+1];
	} else if (strcmp(argv[i], "-division") == 0) {
	    division = atoi(argv[i+1]);
	} else if (strcmp(argv[i], "-block") == 0) {
	    blocks = atoi(argv[i+1]);
	} else if (strcmp(argv[i], "-p") == 0) {
	    profile = 1;
	}
    }
    if (filename==0) {
        printf("usage: %s [-block 10] -file filename\n",argv[0]);
	exit(1);
    }
    
    return filename;
}


int
TMmain(TaskManager *manager, int argc, char *argv[])
{

    char *filename = 0;
    filename = init(argc, argv);
    
    if (filename < 0) {
	return -1;
    }

    task_init();
    run_start(manager, filename);

    return 0;
}

/* end */