Mercurial > hg > Game > Cerium
changeset 2051:b79a250b4f99 draft
FileMapReduce (no compile error)
author | masa |
---|---|
date | Thu, 28 Jan 2016 17:23:45 +0900 |
parents | 26dd777ba95d |
children | cc1ea3933551 |
files | TaskManager/ManyCore/FileMapReduce.cc TaskManager/ManyCore/FileMapReduce.h TaskManager/kernel/sys_task/Read.cc TaskManager/kernel/sys_task/Read.h TaskManager/kernel/sys_task/SysTasks.h TaskManager/kernel/sys_task/systask_register.cc example/word_count/Func.h example/word_count/main.cc example/word_count/ppe/Read.cc example/word_count/ppe/Read.h example/word_count/task_init.cc |
diffstat | 11 files changed, 182 insertions(+), 608 deletions(-) [+] |
line wrap: on
line diff
--- a/TaskManager/ManyCore/FileMapReduce.cc Thu Jan 28 15:43:36 2016 +0900 +++ b/TaskManager/ManyCore/FileMapReduce.cc Thu Jan 28 17:23:45 2016 +0900 @@ -2,6 +2,7 @@ #include <stdio.h> #include <stdlib.h> +#include <string.h> #include <sys/stat.h> #include <fcntl.h> #include <unistd.h> @@ -12,62 +13,92 @@ * PS3でCPU数が2以上の時に、あまりが計算されてない */ -extern void task_init(); -void TMend(TaskManager *); -int all = 0; -int use_task_array = 1; -int use_task_creater = 0; -int use_compat = 0; -int use_iterate = 0; -int array_task_num = 11; -int spe_num = 1; -int read_type = MY_MMAP; -int t_exec_num = 4; -CPU_TYPE spe_cpu = SPE_ANY; -CPU_TYPE read_spe_cpu = IO_0; +enum { +#include "SysTasks.h" +}; -const char *usr_help_str = "Usage: ./word_count [-a -c -s] [-cpu spe_num] [-g] [-file filename] [-br]\n"; - -FileMapReduce::FileMapReduce(TaskManager *manager,const char* filename,int TASK_EXEC,int TASK_PRINT) { - task_init(); - run_start(manager,filename); +FileMapReduce::FileMapReduce(TaskManager *manager,int TASK_EXEC,int TASK_EXEC_DATA_PARALLEL,int TASK_PRINT) { + all = 0; + use_task_array = 1; + use_task_creater = 0; + use_compat = 0; + use_iterate = 0; + array_task_num = 11; + spe_num = 1; + read_type = MY_MMAP; + t_exec_num = 4; + spe_cpu = SPE_ANY; + read_spe_cpu = IO_0; + blocks = 48; + division = 16; // in KByte + this->TASK_EXEC = TASK_EXEC; + this->TASK_EXEC_DATA_PARALLEL = TASK_EXEC_DATA_PARALLEL; + this->TASK_PRINT = TASK_PRINT; + fmp_help_str = "[-a -c -s] [-cpu spe_num] [-g] [-file filename] [-br]\n"; } FileMapReduce::~FileMapReduce() { } -#ifdef __CERIUM_GPU__ -#include "GpuScheduler.h" -#endif -#ifdef __CERIUM_CUDA__ -#include "CudaScheduler.h" -#endif - -/* 必ずこの位置に書いて */ -SchedExternTask(READ_TASK); -SchedExternTask(BREAD_RUN_TASK_BLOCKS); -SchedExternTask(Exec); -SchedExternTask(Print); -SchedExternTask(RUN_TASK_BLOCKS); +char* +FileMapReduce::init(int argc, char **argv) +{ + char *filename = 0; + for (int i = 1; argv[i]; ++i) { + if (strcmp(argv[i], "-file") == 0) { + filename = argv[i+1]; i++; + } else if (strcmp(argv[i], "-division") == 0) { + division = atoi(argv[i+1]); + i++; + } else if (strcmp(argv[i], "-block") == 0) { + blocks = atoi(argv[i+1]); + i++; + } else if (strcmp(argv[i], "-a") == 0) { + // create task all at once + all = 1; + } else if (strcmp(argv[i], "-c") == 0) { + use_task_array = 0; + use_compat = 1; + } else if (strcmp(argv[i], "-s") == 0) { + use_task_array = 0; + use_compat = 0; + } else if (strcmp(argv[i], "-t") == 0) { + use_task_creater = 1; + use_task_array = 0; + use_compat = 0; + } else if (strcmp(argv[i], "-anum") == 0) { + array_task_num = atoi(argv[i+1]); + i++; + } else if (strcmp(argv[i], "-g") == 0) { + spe_cpu = GPU_0; + } else if (strcmp(argv[i], "-any") == 0) { + spe_cpu = ANY_ANY; + } else if (strcmp(argv[i], "-i") == 0) { + use_iterate = 1; + use_task_array = 0; + t_exec_num = 1; + } else if (strcmp(argv[i], "-br") == 0) { + read_type = BLOCKED_READ; + } else if (strcmp(argv[i], "-r") == 0) { + read_type = MY_READ; + } + /* else if (strcmp(argv[i], "-cpu") == 0) { + spe_num = atoi(argv[i+1]); + i++; + if (spe_num==0) spe_num = 1; + } else { + fprintf(stderr,"%s\n",fmp_help_str); + exit (0); + }*/ + } + if (filename==0) { + printf("Usage: %s ",argv[0]); + puts(fmp_help_str); + exit(1); + } -void -task_init(void) -{ -#ifdef __CERIUM_GPU__ - GpuSchedRegister(TASK_EXEC, "gpu/Exec.cl", "wordcount"); -#endif -#ifdef __CERIUM_CUDA__ - CudaSchedRegister(TASK_EXEC, "cuda/Exec.ptx", "wordcount"); -#endif - - SchedRegisterTask(TASK_EXEC, Exec); - - SchedRegister(READ_TASK); - SchedRegister(BREAD_RUN_TASK_BLOCKS); - - SchedRegisterTask(TASK_PRINT, Print); - SchedRegister(RUN_TASK_BLOCKS); + return filename; } /*与えられたsizeをfix_byte_sizeの倍数にする(丸め込むっていうのかな?)*/ @@ -79,8 +110,8 @@ return size; } -static void -my_read(const char *filename, WordCount *w, TaskManager *manager) +void +FileMapReduce::my_read(const char *filename, MapReduce *w, TaskManager *manager) { long fd = w->fd; long r_filesize = w->read_filesize; @@ -109,8 +140,8 @@ } } -static void -my_mmap(const char *filename, WordCount *w) +void +FileMapReduce::my_mmap(const char *filename, MapReduce *w) { /*マッピングだよ!*/ int map = MAP_PRIVATE; @@ -137,8 +168,14 @@ } } -static void -run_tasks(SchedTask *manager, WordCount *w, int task_count, HTaskPtr t_read, HTaskPtr t_next, int size) +void +run_tasks(SchedTask *manager, MapReduce *w, int task_count, HTaskPtr t_read, HTaskPtr t_next, int size) +{ + w->fmp->run_tasks(manager,w,task_count,t_read,t_next,size); +} + +void +FileMapReduce::run_tasks(SchedTask *manager, MapReduce *w, int task_count, HTaskPtr t_read, HTaskPtr t_next, int size) { if (task_count < array_task_num) { array_task_num = task_count; @@ -192,7 +229,7 @@ array_task_num -= 1; w->size += size; } - h_exec = manager->create_task(TASK_EXEC); + h_exec = manager->create_task(TASK_EXEC_DATA_PARALLEL); h_exec->flip(); h_exec->set_inData(0,w->file_mmap,w->file_size); h_exec->set_inData(1,w->o_data,w->out_size_); @@ -243,11 +280,11 @@ static int bread_run16(SchedTask *manager, void *in, void *out) { - WordCount *w = *(WordCount **)in; + MapReduce *w = *(MapReduce **)in; HTaskPtr t_read = manager->create_task(READ_TASK); w->t_print->wait_for(t_read); - t_read->set_cpu(read_spe_cpu); + t_read->set_cpu(w->fmp->read_spe_cpu); t_read->set_param(0,w->fd); if (w->task_num < w->task_blocks) { @@ -286,9 +323,9 @@ static int run16(SchedTask *manager, void *in, void *out) { - WordCount *w = *(WordCount **)in; + MapReduce *w = *(MapReduce **)in; - if(use_iterate) { + if(w->fmp->use_iterate) { run_tasks(manager, w, w->task_num, 0, w->t_print, w->division_size); } else if (w->task_num < w->task_blocks) { // last case @@ -311,11 +348,8 @@ return 0; } -static int blocks = 48; -static int division = 16; // in Kbyte - -static void -run_start(TaskManager *manager, const char *filename) +void +FileMapReduce::run_start(TaskManager *manager, const char *filename) { long fd = (long)manager->allocate(sizeof(long)); struct stat *sb = (struct stat*)manager->allocate(sizeof(struct stat)); @@ -331,13 +365,13 @@ return ; } - WordCountPtr w = (WordCountPtr)manager->allocate(sizeof(WordCount)); + MapReducePtr w = (MapReducePtr)manager->allocate(sizeof(MapReduce)); w->self = w; + w->fmp = this; w->fd = fd; w->read_filesize = sb->st_size; - if (read_type == BLOCKED_READ) { printf("[blocked read mode]\n"); w->file_mmap = (char*)manager->allocate(w->read_filesize); @@ -417,10 +451,3 @@ } t_print->spawn(); } - -void -start() -{ - t_exec->spawn(); - t_print->spawn(); -}
--- a/TaskManager/ManyCore/FileMapReduce.h Thu Jan 28 15:43:36 2016 +0900 +++ b/TaskManager/ManyCore/FileMapReduce.h Thu Jan 28 17:23:45 2016 +0900 @@ -4,8 +4,11 @@ #include "SchedTask.h" #include "HTask.h" -typedef struct wordCount { - struct wordCount *self; +class FileMapReduce; + +typedef struct mapReduce { + struct mapReduce *self; + FileMapReduce *fmp; long fd; long read_filesize; CPU_TYPE read_cpu; @@ -27,16 +30,7 @@ long file_size; HTaskPtr t_print; HTaskPtr t_exec; -} WordCount, *WordCountPtr; - -enum { -#include "SysTasks.h" - BREAD_RUN_TASK_BLOCKS, - READ_TASK, - TASK_EXEC, - RUN_TASK_BLOCKS, - TASK_PRINT, -}; +} MapReduce, *MapReducePtr; // Read Type enum { @@ -59,18 +53,38 @@ class FileMapReduce { public: /* constructor */ - FileMapReduce(TaskManager *manager,const char* filename,int TASK_EXEC,int TASK_PRINT); + FileMapReduce(TaskManager *manager,int TASK_EXEC,int TASK_EXEC_DATA_PARALLEL,int TASK_PRINT); ~FileMapReduce(); /* User API */ - static void run_start(TaskManager *manager, const char *filename); - void start(); + void run_start(TaskManager *manager, const char *filename); + void run_tasks(SchedTask *manager, MapReduce *w, int task_count, HTaskPtr t_read, HTaskPtr t_next, int size); + char* init(int argc, char **argv); private: - static void my_read(const char *filename, WordCount *w, TaskManager *manager); - static void my_mmap(const char *filename, WordCount *w); + void my_read(const char *filename, MapReduce *w, TaskManager *manager); + void my_mmap(const char *filename, MapReduce *w); + void task_init(void); /* variable */ - HTaskPtr t_exec; - HTaskPtr t_print; + int all; + int use_task_array; + int use_task_creater ; + int use_compat ; + int array_task_num ; + int spe_num ; + int read_type ; + int t_exec_num ; + CPU_TYPE spe_cpu ; + int TASK_EXEC; + int TASK_EXEC_DATA_PARALLEL; + int TASK_PRINT; + int blocks; + int division; // in Kbyte +public: + CPU_TYPE read_spe_cpu; + int use_iterate; + int task_exec_id; + int task_print_id; + const char *fmp_help_str; };
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/TaskManager/kernel/sys_task/Read.cc Thu Jan 28 17:23:45 2016 +0900 @@ -0,0 +1,28 @@ +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <sys/stat.h> +#include <iostream> +#include <vector> +#include <cstdlib> + +#include "SchedTask.h" +#include "SysFunc.h" + +/* これは必須 */ +SchedDefineTask1(READ_TASK,read_task); + +static int +read_task(SchedTask *s, void *rbuf, void *wbuf) +{ + long fd = (long)s->get_param(0); + long start_read_position = (long)s->get_param(1); + long end_read_position = (long)s->get_param(2); + char *read_text = (char*)s->get_output(wbuf,0); + + long read_size = end_read_position - start_read_position; + + pread(fd, read_text, read_size , start_read_position); + + return 0; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/TaskManager/kernel/sys_task/Read.h Thu Jan 28 17:23:45 2016 +0900 @@ -0,0 +1,7 @@ +#ifndef INCLUDED_TASK_HELLO +#define INCLUDED_TASK_HELLO + +#include "SchedTask.h" + + +#endif
--- a/TaskManager/kernel/sys_task/SysTasks.h Thu Jan 28 15:43:36 2016 +0900 +++ b/TaskManager/kernel/sys_task/SysTasks.h Thu Jan 28 17:23:45 2016 +0900 @@ -2,4 +2,7 @@ FinishTask, ShowTime, StartProfile, +READ_TASK, +BREAD_RUN_TASK_BLOCKS, +RUN_TASK_BLOCKS, #define Dummy StartTask
--- a/TaskManager/kernel/sys_task/systask_register.cc Thu Jan 28 15:43:36 2016 +0900 +++ b/TaskManager/kernel/sys_task/systask_register.cc Thu Jan 28 17:23:45 2016 +0900 @@ -3,12 +3,16 @@ SchedExternTask(StartTask); SchedExternTask(FinishTask); -// SchedExternTask(TaskArray); +SchedExternTask(READ_TASK); +SchedExternTask(BREAD_RUN_TASK_BLOCKS); +SchedExternTask(RUN_TASK_BLOCKS); void systask_register() { SchedRegister(StartTask); SchedRegister(FinishTask); -// SchedRegister(TaskArray); + SchedRegister(READ_TASK); + SchedRegister(BREAD_RUN_TASK_BLOCKS); + SchedRegister(RUN_TASK_BLOCKS); }
--- a/example/word_count/Func.h Thu Jan 28 15:43:36 2016 +0900 +++ b/example/word_count/Func.h Thu Jan 28 17:23:45 2016 +0900 @@ -1,22 +1,6 @@ enum { #include "SysTasks.h" - BREAD_RUN_TASK_BLOCKS, - READ_TASK, TASK_EXEC, TASK_EXEC_DATA_PARALLEL, - RUN_TASK_BLOCKS, TASK_PRINT, }; - -// Read Type -enum { - MY_MMAP, - MY_READ, - BLOCKED_READ, - BLOCKED_MMAP, -}; - -#define DATA_NUM 16 -#define ADD_NUM 26 - -#define DATA_ID 0
--- a/example/word_count/main.cc Thu Jan 28 15:43:36 2016 +0900 +++ b/example/word_count/main.cc Thu Jan 28 17:23:45 2016 +0900 @@ -1,16 +1,9 @@ #include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <sys/mman.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <fcntl.h> -#include <unistd.h> #include <sys/time.h> #include "TaskManager.h" #include "SchedTask.h" #include "Func.h" -#include "WordCount.h" +#include "FileMapReduce.h" /* ;TODO * PS3でCPU数が2以上の時に、あまりが計算されてない @@ -20,19 +13,7 @@ void TMend(TaskManager *); static double st_time; static double ed_time; -int all = 0; -int use_task_array = 1; -int use_task_creater = 0; -int use_compat = 0; -int use_iterate = 0; -int array_task_num = 11; -int spe_num = 1; -int read_type = MY_MMAP; -int t_exec_num = 4; -CPU_TYPE spe_cpu = SPE_ANY; -CPU_TYPE read_spe_cpu = IO_0; - -const char *usr_help_str = "Usage: ./word_count [-a -c -s] [-cpu spe_num] [-g] [-file filename] [-br]\n"; +const char* usr_help_str = ""; static double getTime() { @@ -41,441 +22,13 @@ return tv.tv_sec + (double)tv.tv_usec*1e-6; } -typedef struct { - caddr_t file_mmap; - off_t size; -} st_mmap_t; - -/*与えられたsizeをfix_byte_sizeの倍数にする(丸め込むっていうのかな?)*/ -static int -fix_byte(int size,int fix_byte_size) -{ - size = (size/fix_byte_size)*fix_byte_size + ((size%fix_byte_size)!= 0)*fix_byte_size; - - return size; -} - -static void -my_read(char *filename, WordCount *w, TaskManager *manager) -{ - long fd = w->fd; - long r_filesize = w->read_filesize; - - if ((fd=open(filename,O_RDONLY,0666))==0) { - fprintf(stderr,"can't open %s\n",filename); - } - - w->file_mmap = (char*)manager->allocate(w->read_filesize); - - long one_read_size = 1024 * 1024 * 1024; // 1GB - - for (int i = 0; 0 < r_filesize; i++) { - if (r_filesize > one_read_size) { - pread(fd, w->file_mmap + i*one_read_size, one_read_size,i*one_read_size); - }else if ((r_filesize < one_read_size) && (r_filesize != 0)) { - pread(fd, w->file_mmap + i*one_read_size, r_filesize,i*one_read_size); - } - r_filesize -= one_read_size; - } - - if (w->file_mmap == (caddr_t)-1) { - fprintf(stderr,"Can't mmap file\n"); - perror(NULL); - exit(0); - } - - return ; -} - -static void -my_mmap(char *filename, WordCount *w) -{ - /*マッピングだよ!*/ - int map = MAP_PRIVATE; - st_mmap_t st_mmap; - struct stat sb; - long fd = w->fd; - - if ((fd=open(filename,O_RDONLY,0666))==0) { - fprintf(stderr,"can't open %s\n",filename); - } - - if (fstat(fd,&sb)) { - fprintf(stderr,"can't fstat %s\n",filename); - } - - st_mmap.size = fix_byte(sb.st_size,4096); - - //madvise(w->file_mmap, w->read_filesize, POSIX_MADV_NORMAL); - w->file_mmap = (char*)mmap(NULL,st_mmap.size,PROT_WRITE|PROT_READ,map,fd,(off_t)0); - - if (st_mmap.file_mmap == (caddr_t)-1) { - fprintf(stderr,"Can't mmap file\n"); - perror(NULL); - exit(0); - } - - return ; -} - -static void -run_tasks(SchedTask *manager, WordCount *w, int task_count, HTaskPtr t_read, HTaskPtr t_next, int size) -{ - - if (task_count < array_task_num) { - array_task_num = task_count; - if (task_count<=0) return; - } - for (int i = 0; i < task_count; i += array_task_num) { - HTask *task_array; - if (use_task_array) { - int task_num = (w->size+size-1)/size; - if (task_num>array_task_num) task_num = array_task_num; - task_array = manager->create_task_array(TASK_EXEC,task_num,1,1,1); - if (t_read != 0) task_array->wait_for(t_read); - if (!all) { - t_next->wait_for(task_array); - } else { - w->t_print->wait_for(task_array); - } - } - - Task *t_exec = 0; - HTask *h_exec = 0; - for (int j = 0; j < array_task_num; j++) { - int i = w->task_spawned++; - if (w->size < size) size = w->size; - if (size==0) break; - if (use_task_array) { - t_exec = task_array->next_task_array(TASK_EXEC,t_exec); - t_exec->set_param(0,(long)0); - t_exec->set_param(1,(long)0); - t_exec->set_param(2,(long)size); - t_exec->set_param(3,(long)w->division_out_size); - t_exec->set_inData(0,w->file_mmap + i*w->division_size, size); - t_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size); - } else if (use_compat) { - h_exec = manager->create_task(TASK_EXEC); - t_exec->set_param(0,(long)0); - t_exec->set_param(1,(long)0); - t_exec->set_param(2,(long)size); - t_exec->set_param(3,(long)w->division_out_size); - h_exec->set_inData(0,w->file_mmap + i*w->division_size, size); - h_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size); - - t_next->wait_for(h_exec); - - h_exec->set_cpu(spe_cpu); - h_exec->spawn(); - } else if (use_iterate) { - array_task_num = w->task_num; - use_iterate = 0; - use_compat = 1; - - w->size -= size*array_task_num; - if(w->size < 0) { - array_task_num -= 1; - w->size += size; - } - h_exec = manager->create_task(TASK_EXEC_DATA_PARALLEL); - h_exec->flip(); - h_exec->set_inData(0,w->file_mmap,w->file_size); - h_exec->set_inData(1,w->o_data,w->out_size_); - h_exec->set_outData(0,w->file_mmap,w->file_size); - h_exec->set_outData(1,w->o_data,w->out_size_); - h_exec->set_param(0,(long)i); - h_exec->set_param(1,(long)w->division_size); - h_exec->set_param(2,(long)size); - h_exec->set_param(3,(long)w->out_size); - - t_next->wait_for(h_exec); - h_exec->set_cpu(spe_cpu); - h_exec->iterate(array_task_num); - - w->task_num -= array_task_num; - w->task_spawned += array_task_num-1; - - break; - } else { - h_exec = manager->create_task(TASK_EXEC, - (memaddr)(w->file_mmap + i*w->division_size), size, - (memaddr)(w->o_data + i*w->out_size), w->division_out_size); - t_exec->set_param(0,(long)0); - t_exec->set_param(1,(long)0); - t_exec->set_param(2,(long)size); - t_exec->set_param(3,(long)w->division_out_size); - t_next->wait_for(h_exec); - h_exec->set_cpu(spe_cpu); - h_exec->spawn(); - } - w->size -= size; - w->task_num--; - } - if (use_task_array) { - task_array->spawn_task_array(t_exec->next()); - task_array->set_cpu(spe_cpu); - task_array->spawn(); - } else { - //if (!all) t_next->wait_for(h_exec); - } - } -} - -/** - * このTaskは、PPE上で実行されるので、並列に実行されることはない - * 二つ実行されていて、Task が足りなくなることがないようにしている。 - */ - -SchedDefineTask1(BREAD_RUN_TASK_BLOCKS,bread_run16); - -static int -bread_run16(SchedTask *manager, void *in, void *out) -{ - WordCount *w = *(WordCount **)in; - - HTaskPtr t_read = manager->create_task(READ_TASK); - w->t_print->wait_for(t_read); - t_read->set_cpu(read_spe_cpu); - t_read->set_param(0,w->fd); - - if (w->task_num < w->task_blocks) { - t_read->set_param(1,w->task_spawned*w->division_size); - t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size); - - // last case - while (w->size >= w->division_size) - run_tasks(manager,w,w->task_num,t_read,w->t_print, w->division_size); - // remaining data - while (w->size>0) - run_tasks(manager,w,1,t_read,w->t_print, w->division_size); - - t_read->set_param(2,w->task_spawned*w->division_size); - t_read->spawn(); - } else { - HTaskPtr t_next = manager->create_task(BREAD_RUN_TASK_BLOCKS, - (memaddr)&w->self,sizeof(memaddr),0,0); - w->t_print->wait_for(t_next); - - t_read->set_param(1,w->task_spawned*w->division_size); - t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size); - - run_tasks(manager,w, w->task_blocks, t_read, t_next, w->division_size); - - t_read->set_param(2,w->task_spawned*w->division_size); - - t_read->spawn(); - t_next->spawn(); - } - return 0; -} - -SchedDefineTask1(RUN_TASK_BLOCKS,run16); - -static int -run16(SchedTask *manager, void *in, void *out) -{ - WordCount *w = *(WordCount **)in; - - if(use_iterate) { - run_tasks(manager, w, w->task_num, 0, w->t_print, w->division_size); - } else if (w->task_num < w->task_blocks) { - // last case - while (w->size >= w->division_size) - run_tasks(manager,w,w->task_num,0,w->t_print, w->division_size); - // remaining data - while (w->size>0) - run_tasks(manager,w,1,0, w->t_print, w->size); - // printf("run16 last %d\n",w->task_num); - } else { - HTaskPtr t_next = manager->create_task(RUN_TASK_BLOCKS, - (memaddr)&w->self,sizeof(memaddr),0,0); - w->t_print->wait_for(t_next); - - run_tasks(manager,w, w->task_blocks,0, t_next, w->division_size); - - t_next->spawn(); - // printf("run16 next %d\n",w->task_num); - } - return 0; -} - -static int blocks = 48; -//static int blocks = 31 * 6 * 24; -static int division = 16; // in Kbyte - -static void -run_start(TaskManager *manager, char *filename) -{ - long fd = (long)manager->allocate(sizeof(long)); - struct stat *sb = (struct stat*)manager->allocate(sizeof(struct stat)); - HTaskPtr t_exec; - - if ((fd=open(filename,O_RDONLY,0666))==0) { - fprintf(stderr,"can't open %s\n",filename); - return ; - } - - if (fstat(fd,sb)) { - fprintf(stderr,"can't fstat %s\n",filename); - return ; - } - - WordCountPtr w = (WordCountPtr)manager->allocate(sizeof(WordCount)); - - w->self = w; - w->fd = fd; - w->read_filesize = sb->st_size; - - - if (read_type == BLOCKED_READ) { - printf("[blocked read mode]\n"); - w->file_mmap = (char*)manager->allocate(w->read_filesize); - }else if (read_type == MY_READ) { - printf("[single read mode]\n"); - my_read(filename, w, manager); - }else if(read_type == MY_MMAP) { - printf("[mmap mode]\n"); - my_mmap(filename, w); - }else if(read_type == BLOCKED_MMAP) { - printf("[blocked mmap mode]\n"); - my_mmap(filename, w); - } - - HTaskPtr t_print; - - //w->task_blocks = blocks; - w->self = w; - w->task_spawned = 0; - - w->size = w->file_size = w->read_filesize; - printf("w %lx\n",(long)w); - - /* 1task分のデータサイズ(byte) */ - if (w->size >= 1024*division) { - w->division_size = 1024 * division;/*16kbyte*/ - } else { - w->division_size = w->size; - } - - printf("division_size %ld\n",w->division_size); - - /* "word num" and "line num" */ - w->status_num = 2; - /* taskの数 */ - w->task_num = w->size / w->division_size; - w->task_num = w->task_num + (w->division_size*w->task_num < w->size); - int out_task_num = w->task_num; - - if(!all) { - w->task_blocks = blocks; - } else { - w->task_blocks = w->task_num; - } - - w->out_task_num = out_task_num; - printf("task_num %ld\n",w->task_num); - printf("out_task_num %ld\n",w->out_task_num); - - /* out用のdivision_size. statusが2つなので、あわせて16byteになるように、long long(4byte)を使用 */ - - w->division_out_size = sizeof(unsigned long long)*4; - int out_size = w->division_out_size*out_task_num; - w->o_data = (unsigned long long *)manager->allocate(out_size); - w->out_size_ = out_size; - w->out_size = 4; - printf("out size %d\n",out_size); - - /*各SPEの結果を合計して出力するタスク*/ - - t_print = manager->create_task(TASK_PRINT, - (memaddr)&w->self,sizeof(memaddr),0,0); - w->t_print = t_print; - for(int i=0;i<t_exec_num;i++) { - /* Task を task_blocks ずつ起動する Task */ - /* serialize されていると仮定する... */ - if (read_type == BLOCKED_READ) { - t_exec = manager->create_task(BREAD_RUN_TASK_BLOCKS, - (memaddr)&w->self,sizeof(memaddr),0,0); - }else { - t_exec = manager->create_task(RUN_TASK_BLOCKS, - (memaddr)&w->self,sizeof(memaddr),0,0); - } - - t_print->wait_for(t_exec); - // t_exec->iterate(4); - t_exec->spawn(); - } - t_print->spawn(); -} - -static char* -init(int argc, char **argv) -{ - - char *filename = 0; - - for (int i = 1; argv[i]; ++i) { - if (strcmp(argv[i], "-file") == 0) { - filename = argv[i+1]; i++; - } else if (strcmp(argv[i], "-division") == 0) { - division = atoi(argv[i+1]); - i++; - } else if (strcmp(argv[i], "-block") == 0) { - blocks = atoi(argv[i+1]); - i++; - } else if (strcmp(argv[i], "-a") == 0) { - // create task all at once - all = 1; - } else if (strcmp(argv[i], "-c") == 0) { - use_task_array = 0; - use_compat = 1; - } else if (strcmp(argv[i], "-s") == 0) { - use_task_array = 0; - use_compat = 0; - } else if (strcmp(argv[i], "-t") == 0) { - use_task_creater = 1; - use_task_array = 0; - use_compat = 0; - } else if (strcmp(argv[i], "-anum") == 0) { - array_task_num = atoi(argv[i+1]); - i++; - } else if (strcmp(argv[i], "-g") == 0) { - spe_cpu = GPU_0; - } else if (strcmp(argv[i], "-any") == 0) { - spe_cpu = ANY_ANY; - } else if (strcmp(argv[i], "-i") == 0) { - use_iterate = 1; - use_task_array = 0; - t_exec_num = 1; - } else if (strcmp(argv[i], "-br") == 0) { - read_type = BLOCKED_READ; - } else if (strcmp(argv[i], "-r") == 0) { - read_type = MY_READ; - } - /* else if (strcmp(argv[i], "-cpu") == 0) { - spe_num = atoi(argv[i+1]); - i++; - if (spe_num==0) spe_num = 1; - } else { - fprintf(stderr,"%s\n",usr_help_str); - exit (0); - }*/ - } - if (filename==0) { - puts(usr_help_str); - exit(1); - } - - return filename; -} - - int TMmain(TaskManager *manager, int argc, char *argv[]) { char *filename = 0; - filename = init(argc, argv); + FileMapReduce *fmp = new FileMapReduce(manager,TASK_EXEC,TASK_EXEC_DATA_PARALLEL,TASK_PRINT); + filename = fmp->init(argc, argv); if (filename < 0) { return -1; @@ -483,8 +36,7 @@ task_init(); st_time = getTime(); - run_start(manager, filename); - // FileMapReduce fmp = new FileMapReduce(manager, filename, TASK_EXEC, TASK_PRINT); + fmp->run_start(manager, filename); // fmp->start(); manager->set_TMend(TMend); return 0;
--- a/example/word_count/ppe/Read.cc Thu Jan 28 15:43:36 2016 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,30 +0,0 @@ -#include <stdio.h> -#include <string.h> -#include <unistd.h> -#include <sys/stat.h> -#include <iostream> -#include <vector> -#include <cstdlib> - -#include "SchedTask.h" -#include "Print.h" -#include "Func.h" -#include "WordCount.h" - -/* これは必須 */ -SchedDefineTask1(READ_TASK,read_task); - -static int -read_task(SchedTask *s, void *rbuf, void *wbuf) -{ - long fd = (long)s->get_param(0); - long start_read_position = (long)s->get_param(1); - long end_read_position = (long)s->get_param(2); - char *read_text = (char*)s->get_output(wbuf,0); - - long read_size = end_read_position - start_read_position; - - pread(fd, read_text, read_size , start_read_position); - - return 0; -}
--- a/example/word_count/ppe/Read.h Thu Jan 28 15:43:36 2016 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,7 +0,0 @@ -#ifndef INCLUDED_TASK_HELLO -#define INCLUDED_TASK_HELLO - -#include "SchedTask.h" - - -#endif
--- a/example/word_count/task_init.cc Thu Jan 28 15:43:36 2016 +0900 +++ b/example/word_count/task_init.cc Thu Jan 28 17:23:45 2016 +0900 @@ -8,12 +8,9 @@ #endif /* 必ずこの位置に書いて */ -SchedExternTask(READ_TASK); -SchedExternTask(BREAD_RUN_TASK_BLOCKS); SchedExternTask(Exec); SchedExternTask(Exec_Data_Parallel); SchedExternTask(Print); -SchedExternTask(RUN_TASK_BLOCKS); /** * この関数は ../spe/spe-main と違って @@ -34,10 +31,5 @@ SchedRegisterTask(TASK_EXEC, Exec); SchedRegisterTask(TASK_EXEC_DATA_PARALLEL, Exec); - - SchedRegister(READ_TASK); - SchedRegister(BREAD_RUN_TASK_BLOCKS); - SchedRegisterTask(TASK_PRINT, Print); - SchedRegister(RUN_TASK_BLOCKS); }