Mercurial > hg > Game > Cerium
changeset 2047:de89da997e07 draft
add FileMapReduce
author | Nozomi |
---|---|
date | Wed, 27 Jan 2016 19:09:33 +0900 |
parents | 476fc75a5e17 |
children | 6796d85f3d6b |
files | TaskManager/ManyCore/FileMapReduce.cc TaskManager/ManyCore/FileMapReduce.h example/word_count/main.cc |
diffstat | 3 files changed, 418 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/TaskManager/ManyCore/FileMapReduce.cc Wed Jan 27 19:09:33 2016 +0900 @@ -0,0 +1,412 @@ +#include "FileMapReduce.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/mman.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <unistd.h> +#include <sys/time.h> +#include "TaskManager.h" +#include "SchedTask.h" +#include "Func.h" +#include "WordCount.h" + +/* ;TODO + * PS3でCPU数が2以上の時に、あまりが計算されてない + */ + +extern void task_init(); +void TMend(TaskManager *); +static double st_time; +static double ed_time; +int all = 0; +int use_task_array = 1; +int use_task_creater = 0; +int use_compat = 0; +int use_iterate = 0; +int array_task_num = 11; +int spe_num = 1; +int read_type = MY_MMAP; +int t_exec_num = 4; +CPU_TYPE spe_cpu = SPE_ANY; +CPU_TYPE read_spe_cpu = IO_0; + +const char *usr_help_str = "Usage: ./word_count [-a -c -s] [-cpu spe_num] [-g] [-file filename] [-br]\n"; + +static double +getTime() { + struct timeval tv; + gettimeofday(&tv, NULL); + return tv.tv_sec + (double)tv.tv_usec*1e-6; +} + +typedef struct { + caddr_t file_mmap; + off_t size; +} st_mmap_t; + +/*与えられたsizeをfix_byte_sizeの倍数にする(丸め込むっていうのかな?)*/ +static int +fix_byte(int size,int fix_byte_size) +{ + size = (size/fix_byte_size)*fix_byte_size + ((size%fix_byte_size)!= 0)*fix_byte_size; + + return size; +} + +static void +my_read(char *filename, WordCount *w, TaskManager *manager) +{ + long fd = w->fd; + long r_filesize = w->read_filesize; + + if ((fd=open(filename,O_RDONLY,0666))==0) { + fprintf(stderr,"can't open %s\n",filename); + } + + w->file_mmap = (char*)manager->allocate(w->read_filesize); + + long one_read_size = 1024 * 1024 * 1024; // 1GB + + for (int i = 0; 0 < r_filesize; i++) { + if (r_filesize > one_read_size) { + pread(fd, w->file_mmap + i*one_read_size, one_read_size,i*one_read_size); + }else if ((r_filesize < one_read_size) && (r_filesize != 0)) { + pread(fd, w->file_mmap + i*one_read_size, r_filesize,i*one_read_size); + } + r_filesize -= one_read_size; + } + + if (w->file_mmap == (caddr_t)-1) { + fprintf(stderr,"Can't mmap file\n"); + perror(NULL); + exit(0); + } + + return ; +} + +static void +my_mmap(char *filename, WordCount *w) +{ + /*マッピングだよ!*/ + int map = MAP_PRIVATE; + st_mmap_t st_mmap; + struct stat sb; + long fd = w->fd; + + if ((fd=open(filename,O_RDONLY,0666))==0) { + fprintf(stderr,"can't open %s\n",filename); + } + + if (fstat(fd,&sb)) { + fprintf(stderr,"can't fstat %s\n",filename); + } + + st_mmap.size = fix_byte(sb.st_size,4096); + + //madvise(w->file_mmap, w->read_filesize, POSIX_MADV_NORMAL); + w->file_mmap = (char*)mmap(NULL,st_mmap.size,PROT_WRITE|PROT_READ,map,fd,(off_t)0); + + if (st_mmap.file_mmap == (caddr_t)-1) { + fprintf(stderr,"Can't mmap file\n"); + perror(NULL); + exit(0); + } + + return ; +} + +static void +run_tasks(SchedTask *manager, WordCount *w, int task_count, HTaskPtr t_read, HTaskPtr t_next, int size) +{ + + if (task_count < array_task_num) { + array_task_num = task_count; + if (task_count<=0) return; + } + for (int i = 0; i < task_count; i += array_task_num) { + HTask *task_array; + if (use_task_array) { + int task_num = (w->size+size-1)/size; + if (task_num>array_task_num) task_num = array_task_num; + task_array = manager->create_task_array(TASK_EXEC,task_num,1,1,1); + if (t_read != 0) task_array->wait_for(t_read); + if (!all) { + t_next->wait_for(task_array); + } else { + w->t_print->wait_for(task_array); + } + } + + Task *t_exec = 0; + HTask *h_exec = 0; + for (int j = 0; j < array_task_num; j++) { + int i = w->task_spawned++; + if (w->size < size) size = w->size; + if (size==0) break; + if (use_task_array) { + t_exec = task_array->next_task_array(TASK_EXEC,t_exec); + t_exec->set_param(0,(long)0); + t_exec->set_param(1,(long)0); + t_exec->set_param(2,(long)size); + t_exec->set_param(3,(long)w->division_out_size); + t_exec->set_inData(0,w->file_mmap + i*w->division_size, size); + t_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size); + } else if (use_compat) { + h_exec = manager->create_task(TASK_EXEC); + t_exec->set_param(0,(long)0); + t_exec->set_param(1,(long)0); + t_exec->set_param(2,(long)size); + t_exec->set_param(3,(long)w->division_out_size); + h_exec->set_inData(0,w->file_mmap + i*w->division_size, size); + h_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size); + + t_next->wait_for(h_exec); + + h_exec->set_cpu(spe_cpu); + h_exec->spawn(); + } else if (use_iterate) { + array_task_num = w->task_num; + use_iterate = 0; + use_compat = 1; + + w->size -= size*array_task_num; + if(w->size < 0) { + array_task_num -= 1; + w->size += size; + } + h_exec = manager->create_task(TASK_EXEC); + h_exec->flip(); + h_exec->set_inData(0,w->file_mmap,w->file_size); + h_exec->set_inData(1,w->o_data,w->out_size_); + h_exec->set_outData(0,w->file_mmap,w->file_size); + h_exec->set_outData(1,w->o_data,w->out_size_); + h_exec->set_param(0,(long)i); + h_exec->set_param(1,(long)w->division_size); + h_exec->set_param(2,(long)size); + h_exec->set_param(3,(long)w->out_size); + + t_next->wait_for(h_exec); + h_exec->set_cpu(spe_cpu); + h_exec->iterate(array_task_num); + + w->task_num -= array_task_num; + w->task_spawned += array_task_num-1; + + break; + } else { + h_exec = manager->create_task(TASK_EXEC, + (memaddr)(w->file_mmap + i*w->division_size), size, + (memaddr)(w->o_data + i*w->out_size), w->division_out_size); + t_exec->set_param(0,(long)0); + t_exec->set_param(1,(long)0); + t_exec->set_param(2,(long)size); + t_exec->set_param(3,(long)w->division_out_size); + t_next->wait_for(h_exec); + h_exec->set_cpu(spe_cpu); + h_exec->spawn(); + } + w->size -= size; + w->task_num--; + } + if (use_task_array) { + task_array->spawn_task_array(t_exec->next()); + task_array->set_cpu(spe_cpu); + task_array->spawn(); + } else { + //if (!all) t_next->wait_for(h_exec); + } + } +} + +/** + * このTaskは、PPE上で実行されるので、並列に実行されることはない + * 二つ実行されていて、Task が足りなくなることがないようにしている。 + */ + +SchedDefineTask1(BREAD_RUN_TASK_BLOCKS,bread_run16); + +static int +bread_run16(SchedTask *manager, void *in, void *out) +{ + WordCount *w = *(WordCount **)in; + + HTaskPtr t_read = manager->create_task(READ_TASK); + w->t_print->wait_for(t_read); + t_read->set_cpu(read_spe_cpu); + t_read->set_param(0,w->fd); + + if (w->task_num < w->task_blocks) { + t_read->set_param(1,w->task_spawned*w->division_size); + t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size); + + // last case + while (w->size >= w->division_size) + run_tasks(manager,w,w->task_num,t_read,w->t_print, w->division_size); + // remaining data + while (w->size>0) + run_tasks(manager,w,1,t_read,w->t_print, w->division_size); + + t_read->set_param(2,w->task_spawned*w->division_size); + t_read->spawn(); + } else { + HTaskPtr t_next = manager->create_task(BREAD_RUN_TASK_BLOCKS, + (memaddr)&w->self,sizeof(memaddr),0,0); + w->t_print->wait_for(t_next); + + t_read->set_param(1,w->task_spawned*w->division_size); + t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size); + + run_tasks(manager,w, w->task_blocks, t_read, t_next, w->division_size); + + t_read->set_param(2,w->task_spawned*w->division_size); + + t_read->spawn(); + t_next->spawn(); + } + return 0; +} + +SchedDefineTask1(RUN_TASK_BLOCKS,run16); + +static int +run16(SchedTask *manager, void *in, void *out) +{ + WordCount *w = *(WordCount **)in; + + if(use_iterate) { + run_tasks(manager, w, w->task_num, 0, w->t_print, w->division_size); + } else if (w->task_num < w->task_blocks) { + // last case + while (w->size >= w->division_size) + run_tasks(manager,w,w->task_num,0,w->t_print, w->division_size); + // remaining data + while (w->size>0) + run_tasks(manager,w,1,0, w->t_print, w->size); + // printf("run16 last %d\n",w->task_num); + } else { + HTaskPtr t_next = manager->create_task(RUN_TASK_BLOCKS, + (memaddr)&w->self,sizeof(memaddr),0,0); + w->t_print->wait_for(t_next); + + run_tasks(manager,w, w->task_blocks,0, t_next, w->division_size); + + t_next->spawn(); + // printf("run16 next %d\n",w->task_num); + } + return 0; +} + +static int blocks = 48; +//static int blocks = 31 * 6 * 24; +static int division = 16; // in Kbyte + +static void +run_start(TaskManager *manager, char *filename) +{ + long fd = (long)manager->allocate(sizeof(long)); + struct stat *sb = (struct stat*)manager->allocate(sizeof(struct stat)); + HTaskPtr t_exec; + + if ((fd=open(filename,O_RDONLY,0666))==0) { + fprintf(stderr,"can't open %s\n",filename); + return ; + } + + if (fstat(fd,sb)) { + fprintf(stderr,"can't fstat %s\n",filename); + return ; + } + + WordCountPtr w = (WordCountPtr)manager->allocate(sizeof(WordCount)); + + w->self = w; + w->fd = fd; + w->read_filesize = sb->st_size; + + + if (read_type == BLOCKED_READ) { + printf("[blocked read mode]\n"); + w->file_mmap = (char*)manager->allocate(w->read_filesize); + }else if (read_type == MY_READ) { + printf("[single read mode]\n"); + my_read(filename, w, manager); + }else if(read_type == MY_MMAP) { + printf("[mmap mode]\n"); + my_mmap(filename, w); + }else if(read_type == BLOCKED_MMAP) { + printf("[blocked mmap mode]\n"); + my_mmap(filename, w); + } + + HTaskPtr t_print; + + //w->task_blocks = blocks; + w->self = w; + w->task_spawned = 0; + + w->size = w->file_size = w->read_filesize; + printf("w %lx\n",(long)w); + + /* 1task分のデータサイズ(byte) */ + if (w->size >= 1024*division) { + w->division_size = 1024 * division;/*16kbyte*/ + } else { + w->division_size = w->size; + } + + printf("division_size %ld\n",w->division_size); + + /* "word num" and "line num" */ + w->status_num = 2; + /* taskの数 */ + w->task_num = w->size / w->division_size; + w->task_num = w->task_num + (w->division_size*w->task_num < w->size); + int out_task_num = w->task_num; + + if(!all) { + w->task_blocks = blocks; + } else { + w->task_blocks = w->task_num; + } + + w->out_task_num = out_task_num; + printf("task_num %ld\n",w->task_num); + printf("out_task_num %ld\n",w->out_task_num); + + /* out用のdivision_size. statusが2つなので、あわせて16byteになるように、long long(4byte)を使用 */ + + w->division_out_size = sizeof(unsigned long long)*4; + int out_size = w->division_out_size*out_task_num; + w->o_data = (unsigned long long *)manager->allocate(out_size); + w->out_size_ = out_size; + w->out_size = 4; + printf("out size %d\n",out_size); + + /*各SPEの結果を合計して出力するタスク*/ + + t_print = manager->create_task(TASK_PRINT, + (memaddr)&w->self,sizeof(memaddr),0,0); + w->t_print = t_print; + for(int i=0;i<t_exec_num;i++) { + /* Task を task_blocks ずつ起動する Task */ + /* serialize されていると仮定する... */ + if (read_type == BLOCKED_READ) { + t_exec = manager->create_task(BREAD_RUN_TASK_BLOCKS, + (memaddr)&w->self,sizeof(memaddr),0,0); + }else { + t_exec = manager->create_task(RUN_TASK_BLOCKS, + (memaddr)&w->self,sizeof(memaddr),0,0); + } + + t_print->wait_for(t_exec); + // t_exec->iterate(4); + t_exec->spawn(); + } + t_print->spawn(); +} +/* end */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/TaskManager/ManyCore/FileMapReduce.h Wed Jan 27 19:09:33 2016 +0900 @@ -0,0 +1,4 @@ +class FileMapReduce { + FileMapReduce(TaskManager *manager,const char* filename,int TASK_EXEC,int TASK_PRINT); + void start(); +}
--- a/example/word_count/main.cc Wed Jan 27 18:55:25 2016 +0900 +++ b/example/word_count/main.cc Wed Jan 27 19:09:33 2016 +0900 @@ -484,6 +484,8 @@ task_init(); st_time = getTime(); run_start(manager, filename); + // FileMapReduce fmp = new FileMapReduce(manager, filename, TASK_EXEC, TASK_PRINT); + // fmp->start(); manager->set_TMend(TMend); return 0; }