view example/regex_mas/main.cc @ 1600:e63ce6aee28e draft

eraser word_count's tarces in regex_mas
author Masa <e085726@ie.u-ryukyu.ac.jp>
date Thu, 04 Apr 2013 15:48:33 +0900
parents 0f94d9d580f9
children da6835e6d306
line wrap: on
line source

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/time.h>
#include "TaskManager.h"
#include "SchedTask.h"
#include "Func.h"
#include "WordCount.h"

/* ;TODO
 * PS3でCPU数が2以上の時に、あまりが計算されてない
 */

extern void task_init();
void TMend(TaskManager *);
//static double st_time;
//static double ed_time;
int all = 0;
int use_task_array = 1;
int use_task_creater = 0;
int use_compat = 0;
int array_task_num = 8;
int spe_num = 1;
CPU_TYPE spe_cpu = SPE_ANY;
const char *usr_help_str = "Usage: ./word_count [-a -c -s] [-cpu spe_num] [-file filename]\n";

//static double
//getTime() {
//    struct timeval tv;
//    gettimeofday(&tv, NULL);
//    return tv.tv_sec + (double)tv.tv_usec*1e-6;
//}

typedef struct {
    caddr_t file_mmap;
    off_t size;
} st_mmap_t;

static void simple_task_creater(int in_total_size, int out_total_size,
                                int command, int in_data_size, int out_data_size,
                                void *in_data, void *out_data, SchedTask *manager,
                                HTask *wait_i, HTask *wait_me) {
    int in_task_size = 0;
    int out_task_size = 0;

    if (in_total_size != 0) {
        in_task_size = in_total_size / in_data_size;
        if (in_total_size != in_task_size * in_data_size) {
            printf("mismatch of in_total_size and in_data_size\n");
        }
    }

    if (out_total_size != 0) {
        out_task_size = out_total_size / out_data_size;
        if (out_total_size != out_task_size * out_data_size) {
            printf("mismatch of out_total_size and out_data_size\n");
        }
    }

    /*in, out の大きい方に合わせるのがいいかな? Taskの数は1Task分に使うデータの大きいほうを取るような仕様がいいかな*/
    int task_num = (in_task_size > out_task_size) ? in_task_size : out_task_size;

    if (task_num == 0) task_num = 1;

    /*spe分あればいいのかな?*/

    int array_num = spe_num;
    if (task_num < array_num) {
        array_num = task_num;
    }


    HTaskPtr *task_array = (HTask**)manager->allocate(sizeof(HTask*)*array_num);
    TaskPtr *t_exec = (Task**)manager->allocate(sizeof(Task*)*array_num);

    int array_length = task_num / array_num;
    int rest = task_num % array_num;

    int index = 0;

    for (int k = 0; k < array_num; k++) {

        task_array[k] = manager->create_task_array(command,array_length,0,1,1);
        t_exec[k] = 0;

        if (wait_me != 0) {
            wait_me->wait_for(task_array[k]);
        }
        if (wait_i != 0) {
            task_array[k]->wait_for(wait_i);
        }

    }

    int length = in_data_size/sizeof(char);
    for (int j = 0; j < array_length; j++) {
        for (int k = 0; k < array_num; k++) {

            t_exec[k] = task_array[k]->next_task_array(command,t_exec[k]);
            t_exec[k]->set_param(0,(memaddr)length);
            t_exec[k]->set_inData(0,(char*)in_data + index*in_data_size, in_data_size);
            t_exec[k]->set_outData(0,(char*)out_data + index*out_data_size, out_data_size);
            index++;

        }
    }

    for (int k = 0; k < array_num; k++) {
        task_array[k]->spawn_task_array(t_exec[k]->next());
        task_array[k]->set_cpu(spe_cpu);
        task_array[k]->spawn();
    }

    for (int k = 0; k < rest; k++) {
        HTaskPtr t_exec = manager->create_task(command);
        t_exec->set_param(0,(memaddr)length);
        t_exec->set_inData(0,(char*)in_data + index*in_data_size, in_data_size);
        t_exec->set_outData(0,(char*)out_data + index*out_data_size, out_data_size);

        index++;

        if (wait_me != 0) {
            wait_me->wait_for(t_exec);
        }
        if (wait_i != 0) {
            t_exec->wait_for(wait_i);
        }

        t_exec->set_cpu(spe_cpu);
        t_exec->spawn();

    }


}


/*与えられたsizeをfix_byte_sizeの倍数にする(丸め込むっていうのかな?)*/
static int
fix_byte(int size,int fix_byte_size)
{
    size = (size/fix_byte_size)*fix_byte_size  + ((size%fix_byte_size)!= 0)*fix_byte_size;

    return size;
}


static st_mmap_t
my_mmap(char *filename)
{

    /*マッピングだよ!*/
    int fd = -1;
    int map = MAP_PRIVATE;
    st_mmap_t st_mmap;
    struct stat sb;

    if ((fd=open(filename,O_RDONLY,0666))==0) {
        fprintf(stderr,"can't open %s\n",filename);
    }

    if (fstat(fd,&sb)) {
        fprintf(stderr,"can't fstat %s\n",filename);
    }

    /*sizeをページングサイズの倍数にあわせる*/
    st_mmap.size = fix_byte(sb.st_size,4096);

    st_mmap.file_mmap = (char*)mmap(NULL,st_mmap.size,PROT_READ,map,fd,(off_t)0);
    if (st_mmap.file_mmap == (caddr_t)-1) {
        fprintf(stderr,"Can't mmap file\n");
        perror(NULL);
        exit(0);
    }

    return st_mmap;

}

static void
run_tasks(SchedTask *manager, WordCount *w, int task_count, HTaskPtr t_next, int size)
{

    if (task_count < array_task_num) {
        array_task_num = task_count;
        if (task_count<=0) return;
    }

    if (use_task_creater) {
        simple_task_creater(w->file_size, w->division_out_size * w->task_num, TASK_EXEC, w->division_size, w->division_out_size,
                            w->file_mmap, w->o_data, manager, w->t_print, 0);
    }

    if (use_task_array) {

        int spl = spe_num * array_task_num;
        int loop = (task_count + spl - 1) / spl;

        for (int i = 0; i < loop; i += 1) {

            if (spl > w->task_num) {
                if (w->task_num >= spe_num) {
                    array_task_num = w->task_num / spe_num;
                } else {

                    int task_num = w->task_num;

                    for (int j = 0; j < task_num; j++) {
                        HTask *h_exec = 0;
                        int i = w->task_spwaned++;

                        if (w->size < size) size = w->size;

                        h_exec = manager->create_task(TASK_EXEC,
                                                      (memaddr)(w->file_mmap + i*w->division_size), size,
                                                      (memaddr)(w->o_data + i*w->out_size), w->division_out_size);

                        if (all) {
                            w->t_print->wait_for(h_exec);
                        } else {
                            t_next->wait_for(h_exec);
                        }

                        h_exec->set_cpu(spe_cpu);
                        h_exec->spawn();

                        w->size -= size;
                        if (w->size == 0) break;
                        w->task_num--;

                    }

                    return;
                }
            }

            // ここから
            HTask **task_array = (HTask**)manager->allocate(sizeof(HTask*)*spe_num);
            Task **t_exec = (Task**)manager->allocate(sizeof(Task*)*spe_num);

            for (int k = 0; k < spe_num; k++) {
                task_array[k] = manager->create_task_array(TASK_EXEC,array_task_num,1,1,1);
                t_exec[k] = 0;
                if (all) {
                    w->t_print->wait_for(task_array[k]);
                } else {
                    t_next->wait_for(task_array[k]);
                }
            }

            for (int j = 0; j < array_task_num; j++) {
                for (int k = 0; k < spe_num; k++) {

                    int a = w->task_spwaned++;
                    
                    if (w->size < size) size = w->size;
                    int length = size/sizeof(char);

                    t_exec[k] = task_array[k]->next_task_array(TASK_EXEC,t_exec[k]);
                    t_exec[k]->set_param(0,(memaddr)length);
                    t_exec[k]->set_inData(0,w->file_mmap + a*w->division_size, size);
                    t_exec[k]->set_outData(0,w->o_data + a*w->out_size, w->division_out_size);

                    w->size -= size;
                    w->task_num--;
                }
            }

            for (int k = 0; k < spe_num; k++) {
                task_array[k]->spawn_task_array(t_exec[k]->next());
                task_array[k]->set_cpu(spe_cpu);
                task_array[k]->spawn();
            }

        }

        return;

    }


    for (int i = 0; i < task_count; i += array_task_num) {

        HTask *h_exec = 0;
        for (int j = 0; j < array_task_num; j++) {
            int i = w->task_spwaned++;
            if (w->size < size) size = w->size;
            int length = size/sizeof(char);
            if (size==0) break;

            if (use_compat) {
                h_exec = manager->create_task(TASK_EXEC);
                h_exec->set_param(0,(memaddr)length);
                h_exec->set_inData(0,w->file_mmap + i*w->division_size, size);
                h_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size);


                if (all) {
                    w->t_print->wait_for(h_exec);
                } else {
                    t_next->wait_for(h_exec);
                }

                h_exec->set_cpu(spe_cpu);
                h_exec->spawn();

            } else {
                h_exec = manager->create_task(TASK_EXEC,
                                              (memaddr)(w->file_mmap + i*w->division_size), size,
                                              (memaddr)(w->o_data + i*w->out_size), w->division_out_size);

                if (all) {
                    w->t_print->wait_for(h_exec);
                } else {
                    t_next->wait_for(h_exec);
                }

                h_exec->set_cpu(spe_cpu);
                h_exec->spawn();
            }
            w->size -= size;
            w->task_num--;
        }

    }

}

/**
 *   このTaskは、PPE上で実行されるので、並列に実行されることはない
 *   二つ実行されていて、Task が足りなくなることがないようにしている。
 */

SchedDefineTask1(RUN_TASK_BLOCKS,run16);

static int
run16(SchedTask *manager, void *in, void *out)
{
    WordCount *w = *(WordCount **)in;

    if (w->task_num < w->task_blocks) {
        // last case
        while (w->size >= w->division_size)
            run_tasks(manager,w,w->task_num, w->t_print, w->division_size);
        // remaining data
        while (w->size>0)
            run_tasks(manager,w,1, w->t_print, w->size);
        // printf("run16 last %d\n",w->task_num);
    } else {
        HTaskPtr t_next = manager->create_task(RUN_TASK_BLOCKS,
                                               (memaddr)&w->self,sizeof(memaddr),0,0);
        w->t_print->wait_for(t_next);

        run_tasks(manager,w, w->task_blocks, t_next, w->division_size);

        t_next->spawn();
        // printf("run16 next %d\n",w->task_num);
    }
    return 0;
}


static int blocks = 48;
//static int blocks = 31 * 6 * 24;
static int division = 16; // in Kbyte

static void
run_start(TaskManager *manager, char *filename)
{
    HTaskPtr t_print;

    st_mmap_t st_mmap;
    st_mmap = my_mmap(filename);
    WordCount *w = (WordCount*)manager->allocate(sizeof(WordCount));
    // bzero(w,sizeof(WordCount));

    //w->task_blocks = blocks;
    w->self = w;
    w->task_spwaned = 0;

    /*sizeはdivision_sizeの倍数にしている。*/
    w->size = w->file_size = st_mmap.size;
    w->file_mmap = st_mmap.file_mmap;
    /* 1task分のデータサイズ(byte) */
    if (w->size >= 1024*division) {
        w->division_size = 1024 * division;/*16kbyte*/
    } else {
        w->division_size = w->size;
    }

    /* "word num" and "line num" */
    w->status_num = 2;
    /* taskの数 */
    w->task_num = w->size / w->division_size;
    w->task_num = w->task_num + (w->division_size*w->task_num < w->size);
    int out_task_num = w->task_num;

    if(!all) {
        w->task_blocks = blocks;
    } else {
        w->task_blocks = w->task_num;
    }

    w->out_task_num = out_task_num;

    /* out用のdivision_size. statusが2つなので、あわせて16byteになるように、long long(4byte)を使用 */

    w->division_out_size = sizeof(unsigned long long)*4;
    int out_size = w->division_out_size*out_task_num;
    w->o_data = (unsigned long long *)manager->allocate(out_size);
    w->out_size = 4;

    /*各SPEの結果を合計して出力するタスク*/

    t_print = manager->create_task(TASK_PRINT,
                                   (memaddr)&w->self,sizeof(memaddr),0,0);
    w->t_print = t_print;

    for(int i = 0;i<20;i++) {
        /* Task を task_blocks ずつ起動する Task */
        /* serialize されていると仮定する... */
        HTaskPtr t_exec = manager->create_task(RUN_TASK_BLOCKS,
                                               (memaddr)&w->self,sizeof(memaddr),0,0);
        t_print->wait_for(t_exec);
        t_exec->spawn();
    }

    t_print->spawn();
}

static char*
init(int argc, char **argv)
{

    char *filename = 0;

    for (int i = 1; argv[i]; ++i) {
        if (strcmp(argv[i], "-file") == 0) {
            filename = argv[i+1];
        } else if (strcmp(argv[i], "-division") == 0) {
            division = atoi(argv[i+1]);
        } else if (strcmp(argv[i], "-block") == 0) {
            blocks = atoi(argv[i+1]);
        } else if (strcmp(argv[i], "-a") == 0) {
            // create task all at once
            all = 1;
        } else if (strcmp(argv[i], "-c") == 0) {
            use_task_array = 0;
            use_compat = 1;
        } else if (strcmp(argv[i], "-s") == 0) {
            use_task_array = 0;
            use_compat = 0;
        } else if (strcmp(argv[i], "-t") == 0) {
            use_task_creater = 1;
            use_task_array = 0;
            use_compat = 0;
        } else if (strcmp(argv[i], "-anum") == 0) {
            array_task_num = atoi(argv[i+1]);
        } else if (strcmp(argv[i], "-g") == 0 ) {
            spe_cpu = GPU_0;
        } else if (strcmp(argv[i], "-cpu") == 0) {
            spe_num = atoi(argv[i+1]);
            if (spe_num==0) spe_num = 1;
        }
    }
    if (filename==0) {
        puts(usr_help_str);
        exit(1);
    }

    return filename;
}


int
TMmain(TaskManager *manager, int argc, char *argv[])
{

    char *filename = 0;
    filename = init(argc, argv);

    if (filename < 0) {
        return -1;
    }

    task_init();
    run_start(manager, filename);
    return 0;
}

/* end */