Mercurial > hg > Game > Cerium
changeset 1979:889472b0e6d5 draft
implement blocked read (not running)
author | Masataka Kohagura <e085726@ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 13 Mar 2014 02:43:35 +0900 |
parents | 8fbe022126e1 |
children | aa5fabf2d4b2 |
files | example/word_count/WordCount.h example/word_count/main.cc |
diffstat | 2 files changed, 104 insertions(+), 37 deletions(-) [+] |
line wrap: on
line diff
--- a/example/word_count/WordCount.h Thu Mar 13 02:15:06 2014 +0900 +++ b/example/word_count/WordCount.h Thu Mar 13 02:43:35 2014 +0900 @@ -1,32 +1,25 @@ typedef struct wordCount { struct wordCount *self; - int fd; - int read_division_size; - int read_task_number; - int read_task_num; - int read_left_task_num; - int read_filesize; - int read_left_size; - int read_task_blocks; - char *read_text; + long fd; + long read_filesize; CPU_TYPE read_cpu; - int size; // remaining file size - int division_size; // for each word count task - int division_out_size; - int out_size; - int out_size_; - int task_num; // remaining task count - int task_blocks; // spawn task one at a time - int status_num; - int task_spwaned; + long size; // remaining file size + long division_size; // for each word count task + long division_out_size; + long out_size; + long out_size_; + long task_num; // remaining task count + long task_blocks; // spawn task one at a time + long status_num; + long task_spawned; unsigned long long *o_data; unsigned long long *head_tail_flag; - int out_task_num; - int pad; + long out_task_num; + long pad; char * file_mmap; - int file_size; + long file_size; HTaskPtr t_print; HTaskPtr t_exec; } WordCount, *WordCountPtr;
--- a/example/word_count/main.cc Thu Mar 13 02:15:06 2014 +0900 +++ b/example/word_count/main.cc Thu Mar 13 02:43:35 2014 +0900 @@ -29,6 +29,7 @@ int array_task_num = 11; int spe_num = 1; CPU_TYPE spe_cpu = SPE_ANY; +CPU_TYPE read_spe_cpu = SPE_ANY; const char *usr_help_str = "Usage: ./word_count [-a -c -s] [-cpu spe_num] [-g] [-file filename]\n"; static double @@ -114,7 +115,7 @@ Task *t_exec = 0; HTask *h_exec = 0; for (int j = 0; j < array_task_num; j++) { - int i = w->task_spwaned++; + int i = w->task_spawned++; if (w->size < size) size = w->size; if (size==0) break; if (use_task_array) { @@ -152,7 +153,7 @@ h_exec->iterate(array_task_num); w->task_num -= array_task_num; - w->task_spwaned += array_task_num-1; + w->task_spawned += array_task_num-1; if(w->size < 0) { h_exec = manager->create_task(TASK_EXEC_DATA_PARALLEL); @@ -161,7 +162,7 @@ h_exec->set_inData(1,w->o_data,w->out_size_); h_exec->set_outData(0,w->file_mmap,w->file_size); h_exec->set_outData(1,w->o_data,w->out_size_); - h_exec->set_param(0,(long)w->task_spwaned); + h_exec->set_param(0,(long)w->task_spawned); h_exec->set_param(1,(long)w->division_size); h_exec->set_param(2,(long)(size+w->size)); h_exec->set_param(3,(long)w->out_size); @@ -171,7 +172,7 @@ h_exec->iterate(1); w->task_num -= 1; - w->task_spwaned += 1; + w->task_spawned += 1; array_task_num += 1; } break; @@ -201,6 +202,49 @@ * 二つ実行されていて、Task が足りなくなることがないようにしている。 */ +SchedDefineTask1(BREAD_RUN_TASK_BLOCKS,bread_run16); + +static int +bread_run16(SchedTask *manager, void *in, void *out) +{ + WordCount *w = *(WordCount **)in; + + HTaskPtr t_read = manager->create_task(READ_TASK); + w->t_print->wait_for(t_read); + t_read->set_cpu(read_spe_cpu); + t_read->set_param(0,w->fd); + + if (w->task_num < w->task_blocks) { + t_read->set_param(1,w->task_spawned*w->division_size); + t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size); + + // last case + while (w->size >= w->division_size) + run_tasks(manager,w,w->task_num,t_read,w->t_print, w->division_size); + // remaining data + while (w->size>0) + run_tasks(manager,w,1,t_read,w->t_print, w->division_size); + + t_read->set_param(2,w->task_spawned*w->division_size); + t_read->spawn(); + } else { + HTaskPtr t_next = manager->create_task(BREAD_RUN_TASK_BLOCKS, + (memaddr)&w->self,sizeof(memaddr),0,0); + w->t_print->wait_for(t_next); + + t_read->set_param(1,w->task_spawned*w->division_size); + t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size); + + run_tasks(manager,w, w->task_blocks, t_read, t_next, w->division_size + w->extra_len); + + t_read->set_param(2,w->task_spawned*w->division_size + w->extra_len); + + t_read->spawn(); + t_next->spawn(); + } + return 0; +} + SchedDefineTask1(RUN_TASK_BLOCKS,run16); static int @@ -238,22 +282,46 @@ static void run_start(TaskManager *manager, char *filename) { - HTaskPtr t_print; + long fd = (long)manager->allocate(sizeof(long)); + struct stat *sb = (struct stat*)manager->allocate(sizeof(struct stat)); + HTaskPtr t_exec; + + if ((fd=open(filename,O_RDONLY,0666))==0) { + fprintf(stderr,"can't open %s\n",filename); + return ; + } + + if (fstat(fd,sb)) { + fprintf(stderr,"can't fstat %s\n",filename); + return ; + } - st_mmap_t st_mmap; - st_mmap = my_mmap(filename); - WordCount *w = (WordCount*)manager->allocate(sizeof(WordCount)); - // bzero(w,sizeof(WordCount)); + WordCountPtr w = (WordCountPtr)manager->allocate(sizeof(WordCount)); + + w->self = w; + w->fd = fd; + w->read_filesize = sb->st_size; + + + if (block_read_flag == 1) { + printf("[block read mode]\n"); + w->file_mmap = (char*)manager->allocate(w->read_filesize); + }else { + printf("[mmap mode]\n"); + my_mmap(filename, w); + } + + HTaskPtr t_print; //w->task_blocks = blocks; w->self = w; - w->task_spwaned = 0; + w->task_spawned = 0; /*sizeはdivision_sizeの倍数にしている。*/ w->size = w->file_size = st_mmap.size; w->file_mmap = st_mmap.file_mmap; printf("w %lx\n",(long)w); - + /* 1task分のデータサイズ(byte) */ if (w->size >= 1024*division) { w->division_size = 1024 * division;/*16kbyte*/ @@ -261,7 +329,7 @@ w->division_size = w->size; } - printf("dvision_size %d\n",w->division_size); + printf("dvision_size %ld\n",w->division_size); /* "word num" and "line num" */ w->status_num = 2; @@ -277,8 +345,8 @@ } w->out_task_num = out_task_num; - printf("task_num %d\n",w->task_num); - printf("out_task_num %d\n",w->out_task_num); + printf("task_num %ld\n",w->task_num); + printf("out_task_num %ld\n",w->out_task_num); /* out用のdivision_size. statusが2つなので、あわせて16byteになるように、long long(4byte)を使用 */ @@ -297,8 +365,14 @@ for(int i=0;i<4;i++) { /* Task を task_blocks ずつ起動する Task */ /* serialize されていると仮定する... */ - HTaskPtr t_exec = manager->create_task(RUN_TASK_BLOCKS, - (memaddr)&w->self,sizeof(memaddr),0,0); + if (block_read_flag == 1) { + t_exec = manager->create_task(BREAD_RUN_TASK_BLOCKS, + (memaddr)&w->self,sizeof(memaddr),0,0); + }else { + t_exec = manager->create_task(RUN_TASK_BLOCKS, + (memaddr)&w->self,sizeof(memaddr),0,0); + } + t_print->wait_for(t_exec); // t_exec->iterate(4); t_exec->spawn();