Mercurial > hg > Game > Cerium
view example/word_count/main.cc @ 1911:f842ea419307 draft
running mmap mode ( cannot running divide read )
author | Masataka Kohagura <e085726@ie.u-ryukyu.ac.jp> |
---|---|
date | Sun, 19 Jan 2014 23:36:02 +0900 |
parents | b7b528e9ec5e |
children | 4a716f35980a |
line wrap: on
line source
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/mman.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <unistd.h> #include <sys/time.h> #include "TaskManager.h" #include "SchedTask.h" #include "Func.h" #include "WordCount.h" /* ;TODO * PS3でCPU数が2以上の時に、あまりが計算されてない */ extern void task_init(); void TMend(TaskManager *); static double st_time; static double ed_time; int all = 0; int use_task_array = 1; int use_task_creater = 0; int use_compat = 0; int use_iterate = 0; int use_iterate_all = 0; int array_task_num = 8; int spe_num = 1; int divide_read_flag = 0; int READ_DIVISION_SIZE = 16 * 1024; CPU_TYPE spe_cpu = SPE_ANY; CPU_TYPE read_spe_cpu = IO_0; const char *usr_help_str = "Usage: ./word_count [-a -c -s] [-cpu spe_num] [-g] [-file filename]\n"; static double getTime() { struct timeval tv; gettimeofday(&tv, NULL); return tv.tv_sec + (double)tv.tv_usec*1e-6; } typedef struct { caddr_t file_mmap; off_t size; } st_mmap_t; /*与えられたsizeをfix_byte_sizeの倍数にする(丸め込むっていうのかな?)*/ static int fix_byte(int size,int fix_byte_size) { size = (size/fix_byte_size)*fix_byte_size + ((size%fix_byte_size)!= 0)*fix_byte_size; return size; } SchedDefineTask1(RUN_READ_BLOCKS,read_run); static int read_run(SchedTask *manager, void *in, void *out) { WordCountPtr w = (WordCountPtr)in; char* read_text = (char*)out; HTaskPtr wait; for (int i = 0; (w->read_left_size > 0) && (i < w->read_task_blocks); i++) { HTaskPtr read = manager->create_task(Read_task); read->set_cpu(w->read_cpu); if (i == w->read_task_blocks / 2) wait = read; read->set_param(0,(long)w->read_task_number); //生成するTaskが何番目か read->set_param(1,(long)w->read_division_size); //1つのタスクが読み込む量 if(w->read_left_size <= w->read_division_size){ read->set_param(2,(long)w->read_left_size); }else{ read->set_param(2,(long)w->read_division_size); } read->set_param(3,(long)w->fd); //fdの番号の受け渡し read->set_outData(0,read_text + w->read_task_number*w->read_division_size, w->read_division_size); w->t_print->wait_for(read); read->spawn(); w->read_left_size -= w->read_division_size; w->read_task_number++; } if (w->read_left_size > 0) { HTaskPtr next = manager->create_task(RUN_READ_BLOCKS, (memaddr)&w->self, sizeof(memaddr),read_text,w->read_filesize); w->t_print->wait_for(next); next->wait_for(wait); next->spawn(); } return 0; } SchedDefineTask1(MMAP,my_mmap); static int my_mmap(SchedTask *s, void *in, void *out) { WordCountPtr w = (WordCountPtr)in; /*マッピングだよ!*/ int map = MAP_PRIVATE; st_mmap_t st_mmap; int fd = w->fd; st_mmap.size = fix_byte(w->read_filesize,4096); w->file_mmap = (char*)mmap(NULL,st_mmap.size,PROT_READ,map,fd,(off_t)0); if (st_mmap.file_mmap == (caddr_t)-1) { fprintf(stderr,"Can't mmap file\n"); perror(NULL); exit(0); } printf("in mmap mode\n"); return 0; } //static st_mmap_t //my_mmap(char *filename) //{ // // /*マッピングだよ!*/ // int fd = -1; // int map = MAP_PRIVATE; // st_mmap_t st_mmap; // struct stat sb; // // if ((fd=open(filename,O_RDONLY,0666))==0) { // fprintf(stderr,"can't open %s\n",filename); // } // // if (fstat(fd,&sb)) { // fprintf(stderr,"can't fstat %s\n",filename); // } // // printf("file size %d\n",(int)sb.st_size); // // /*sizeをページングサイズの倍数にあわせる*/ // st_mmap.size = fix_byte(sb.st_size,4096); // // printf("fix 4096byte file size %d\n",(int)st_mmap.size); // // st_mmap.file_mmap = (char*)mmap(NULL,st_mmap.size,PROT_READ,map,fd,(off_t)0); // if (st_mmap.file_mmap == (caddr_t)-1) { // fprintf(stderr,"Can't mmap file\n"); // perror(NULL); // exit(0); // } // // return st_mmap; // //} static void run_tasks(SchedTask *manager, WordCount *w, int task_count, HTaskPtr t_next, int size) { if (task_count < array_task_num) { array_task_num = task_count; if (task_count<=0) return; } for (int i = 0; i < task_count; i += array_task_num) { HTask *task_array; if (use_task_array) { int task_num = (w->size+size-1)/size; if (task_num>array_task_num) task_num = array_task_num; task_array = manager->create_task_array(TASK_EXEC,task_num,1,1,1); if (!all) { t_next->wait_for(task_array); } else { w->t_print->wait_for(task_array); } } Task *t_exec = 0; HTask *h_exec = 0; for (int j = 0; j < array_task_num; j++) { int i = w->task_spwaned++; if (w->size < size) size = w->size; if (size==0) break; if (use_task_array) { t_exec = task_array->next_task_array(TASK_EXEC,t_exec); t_exec->set_inData(0,w->file_mmap + i*w->division_size, size); t_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size); t_exec->set_param(0,(long)size); } else if (use_compat) { h_exec = manager->create_task(TASK_EXEC); h_exec->set_inData(0,w->file_mmap + i*w->division_size, size); h_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size); t_next->wait_for(h_exec); h_exec->set_cpu(spe_cpu); h_exec->spawn(); } else if (use_iterate) { if(use_iterate_all) array_task_num = w->task_num; w->size -= size*array_task_num; if(w->size < 0) array_task_num -= 1; h_exec = manager->create_task(TASK_EXEC_DATA_PARALLEL); h_exec->flip(); h_exec->set_inData(0,w->file_mmap,w->file_size); h_exec->set_inData(1,w->o_data,w->out_size_); h_exec->set_outData(0,w->file_mmap,w->file_size); h_exec->set_outData(1,w->o_data,w->out_size_); h_exec->set_param(0,(long)i); h_exec->set_param(1,(long)w->division_size); h_exec->set_param(2,(long)size); h_exec->set_param(3,(long)w->out_size); t_next->wait_for(h_exec); h_exec->set_cpu(spe_cpu); h_exec->iterate(array_task_num); w->task_num -= array_task_num; w->task_spwaned += array_task_num-1; if(w->size < 0) { h_exec = manager->create_task(TASK_EXEC_DATA_PARALLEL); h_exec->flip(); h_exec->set_inData(0,w->file_mmap,w->file_size); h_exec->set_inData(1,w->o_data,w->out_size_); h_exec->set_outData(0,w->file_mmap,w->file_size); h_exec->set_outData(1,w->o_data,w->out_size_); h_exec->set_param(0,(long)w->task_spwaned); h_exec->set_param(1,(long)w->division_size); h_exec->set_param(2,(long)(size+w->size)); h_exec->set_param(3,(long)w->out_size); t_next->wait_for(h_exec); h_exec->set_cpu(spe_cpu); h_exec->iterate(1); w->task_num -= 1; w->task_spwaned += 1; array_task_num += 1; } break; } else { h_exec = manager->create_task(TASK_EXEC, (memaddr)(w->file_mmap + i*w->division_size), size, (memaddr)(w->o_data + i*w->out_size), w->division_out_size); t_next->wait_for(h_exec); h_exec->set_cpu(spe_cpu); h_exec->spawn(); } w->size -= size; w->task_num--; } if (use_task_array) { task_array->spawn_task_array(t_exec->next()); task_array->set_cpu(spe_cpu); task_array->spawn(); } else { //if (!all) t_next->wait_for(h_exec); } } } /** * このTaskは、PPE上で実行されるので、並列に実行されることはない * 二つ実行されていて、Task が足りなくなることがないようにしている。 */ SchedDefineTask1(RUN_TASK_BLOCKS,run16); static int run16(SchedTask *manager, void *in, void *out) { WordCount *w = *(WordCount **)in; if(use_iterate_all) { run_tasks(manager, w, w->task_num, w->t_print, w->division_size); } else if (w->task_num < w->task_blocks) { // last case while (w->size >= w->division_size) run_tasks(manager,w,w->task_num, w->t_print, w->division_size); // remaining data while (w->size>0) run_tasks(manager,w,1, w->t_print, w->size); // printf("run16 last %d\n",w->task_num); } else { HTaskPtr t_next = manager->create_task(RUN_TASK_BLOCKS, (memaddr)&w->self,sizeof(memaddr),0,0); w->t_print->wait_for(t_next); run_tasks(manager,w, w->task_blocks, t_next, w->division_size); t_next->spawn(); // printf("run16 next %d\n",w->task_num); } return 0; } static int blocks = 48; //static int blocks = 31 * 6 * 24; static int division = 16; // in Kbyte static void run_start(TaskManager *manager, char *filename) { long fd = (long)manager->allocate(sizeof(long)); struct stat *sb = (struct stat*)manager->allocate(sizeof(struct stat)); if ((fd=open(filename,O_RDONLY,0666))==0) { fprintf(stderr,"can't open %s\n",filename); } if (fstat(fd,sb)) { fprintf(stderr,"can't fstat %s\n",filename); } //WordCount *w = (WordCount*)manager->allocate(sizeof(WordCount)); WordCountPtr w = (WordCountPtr)manager->allocate(sizeof(WordCount)); w->self = w; w->fd = fd; w->read_cpu = read_spe_cpu; w->read_task_blocks = 16; w->read_filesize = sb->st_size; w->read_left_size = w->read_filesize; w->read_division_size = READ_DIVISION_SIZE; w->read_task_num = w->read_filesize / READ_DIVISION_SIZE; w->read_task_num += ((w->read_filesize % READ_DIVISION_SIZE) != 0); printf("filesize : %d\n",w->read_filesize); printf("one_task_size: %d\n",w->read_division_size); printf("task_num : %d\n",w->read_task_num); HTaskPtr r_run = NULL; if (divide_read_flag != 0) { char *read_text = (char*)manager->allocate(w->read_filesize); r_run = manager->create_task(RUN_READ_BLOCKS, (memaddr)&w->self, sizeof(memaddr),read_text,w->read_filesize); w->read_text = read_text; }else { //my_mmap(filename, fd, fr); r_run = manager->create_task(MMAP , (memaddr)&w->self, sizeof(memaddr),0,0); } /* original */ HTaskPtr t_print; st_mmap_t st_mmap; //st_mmap = my_mmap(filename); //w->task_blocks = blocks; w->self = w; w->task_spwaned = 0; /*sizeはdivision_sizeの倍数にしている。*/ w->size = w->file_size = w->read_filesize; //w->file_mmap = st_mmap.file_mmap; printf("w %lx\n",(long)w); /* 1task分のデータサイズ(byte) */ if (w->size >= 1024*division) { w->division_size = 1024 * division;/*16kbyte*/ } else { w->division_size = w->size; } printf("dvision_size %d\n",w->division_size); /* "word num" and "line num" */ w->status_num = 2; /* taskの数 */ w->task_num = w->size / w->division_size; w->task_num = w->task_num + (w->division_size*w->task_num < w->size); int out_task_num = w->task_num; if(!all) { w->task_blocks = blocks; } else { w->task_blocks = w->task_num; } w->out_task_num = out_task_num; printf("task_num %d\n",w->task_num); printf("out_task_num %d\n",w->out_task_num); /* out用のdivision_size. statusが2つなので、あわせて16byteになるように、long long(4byte)を使用 */ w->division_out_size = sizeof(unsigned long long)*4; int out_size = w->division_out_size*out_task_num; w->o_data = (unsigned long long *)manager->allocate(out_size); w->out_size_ = out_size; w->out_size = 4; printf("out size %d\n",out_size); /*各SPEの結果を合計して出力するタスク*/ t_print = manager->create_task(TASK_PRINT, (memaddr)&w->self,sizeof(memaddr),0,0); w->t_print = t_print; for(int i=0;i<1;i++) { /* Task を task_blocks ずつ起動する Task */ /* serialize されていると仮定する... */ HTaskPtr t_exec = manager->create_task(RUN_TASK_BLOCKS, (memaddr)&w->self,sizeof(memaddr),0,0); t_exec->wait_for(r_run); t_print->wait_for(t_exec); // t_exec->iterate(4); t_exec->spawn(); } t_print->wait_for(r_run); r_run->spawn(); t_print->spawn(); } static char* init(int argc, char **argv) { char *filename = 0; for (int i = 1; argv[i]; ++i) { if (strcmp(argv[i], "-file") == 0) { filename = argv[i+1]; i++; } else if (strcmp(argv[i], "-division") == 0) { division = atoi(argv[i+1]); i++; } else if (strcmp(argv[i], "-block") == 0) { blocks = atoi(argv[i+1]); i++; } else if (strcmp(argv[i], "-a") == 0) { // create task all at once all = 1; } else if (strcmp(argv[i], "-c") == 0) { use_task_array = 0; use_compat = 1; } else if (strcmp(argv[i], "-s") == 0) { use_task_array = 0; use_compat = 0; } else if (strcmp(argv[i], "-t") == 0) { use_task_creater = 1; use_task_array = 0; use_compat = 0; } else if (strcmp(argv[i], "-anum") == 0) { array_task_num = atoi(argv[i+1]); i++; } else if (strcmp(argv[i], "-g") == 0) { spe_cpu = GPU_ANY; } else if (strcmp(argv[i], "-any") == 0) { spe_cpu = ANY_ANY; } else if (strcmp(argv[i], "-i") == 0) { use_iterate = 1; use_task_array = 0; } else if (strcmp(argv[i], "-ia") == 0) { use_iterate_all = 1; use_iterate = 1; use_task_array = 0; } else if (strcmp(argv[i], "-dr") == 0) { divide_read_flag = 1; } /* else if (strcmp(argv[i], "-cpu") == 0) { spe_num = atoi(argv[i+1]); i++; if (spe_num==0) spe_num = 1; } else { fprintf(stderr,"%s\n",usr_help_str); exit (0); }*/ } if (filename==0) { puts(usr_help_str); exit(1); } return filename; } int TMmain(TaskManager *manager, int argc, char *argv[]) { char *filename = 0; filename = init(argc, argv); if (filename < 0) { return -1; } task_init(); run_start(manager, filename); st_time = getTime(); manager->set_TMend(TMend); return 0; } void TMend(TaskManager *manager) { ed_time = getTime(); printf("Time: %0.6f\n",ed_time-st_time); } /* end */