Mercurial > hg > Game > Cerium
changeset 2053:030b8efcf357 draft
remove wordCount dependency in FileMapReduce
author | masa |
---|---|
date | Thu, 28 Jan 2016 18:52:38 +0900 |
parents | cc1ea3933551 |
children | 2e7a6f40672f |
files | TaskManager/ManyCore/FileMapReduce.cc TaskManager/ManyCore/FileMapReduce.h example/word_count/main.cc example/word_count/ppe/Print.cc |
diffstat | 4 files changed, 37 insertions(+), 45 deletions(-) [+] |
line wrap: on
line diff
--- a/TaskManager/ManyCore/FileMapReduce.cc Thu Jan 28 17:55:47 2016 +0900 +++ b/TaskManager/ManyCore/FileMapReduce.cc Thu Jan 28 18:52:38 2016 +0900 @@ -197,6 +197,7 @@ Task *t_exec = 0; HTask *h_exec = 0; + int out_size = w->division_out_size / sizeof(*w->o_data); for (int j = 0; j < array_task_num; j++) { int i = w->task_spawned++; if (w->size < size) size = w->size; @@ -208,7 +209,7 @@ t_exec->set_param(2,(long)size); t_exec->set_param(3,(long)w->division_out_size); t_exec->set_inData(0,w->file_mmap + i*w->division_size, size); - t_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size); + t_exec->set_outData(0,w->o_data + i*out_size, w->division_out_size); } else if (use_compat) { h_exec = manager->create_task(TASK_EXEC); t_exec->set_param(0,(long)0); @@ -216,12 +217,12 @@ t_exec->set_param(2,(long)size); t_exec->set_param(3,(long)w->division_out_size); h_exec->set_inData(0,w->file_mmap + i*w->division_size, size); - h_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size); + h_exec->set_outData(0,w->o_data + i*out_size, w->division_out_size); t_next->wait_for(h_exec); h_exec->set_cpu(spe_cpu); h_exec->spawn(); } else if (use_iterate) { - array_task_num = w->task_num; + array_task_num = w->remain_task; use_iterate = 0; use_compat = 1; w->size -= size*array_task_num; @@ -232,23 +233,23 @@ h_exec = manager->create_task(TASK_EXEC_DATA_PARALLEL); h_exec->flip(); h_exec->set_inData(0,w->file_mmap,w->file_size); - h_exec->set_inData(1,w->o_data,w->out_size_); + h_exec->set_inData(1,w->o_data,w->division_out_size*w->remain_task); h_exec->set_outData(0,w->file_mmap,w->file_size); - h_exec->set_outData(1,w->o_data,w->out_size_); + h_exec->set_outData(1,w->o_data,w->division_out_size*w->remain_task); h_exec->set_param(0,(long)i); h_exec->set_param(1,(long)w->division_size); h_exec->set_param(2,(long)size); - h_exec->set_param(3,(long)w->out_size); + h_exec->set_param(3,(long)out_size); t_next->wait_for(h_exec); h_exec->set_cpu(spe_cpu); h_exec->iterate(array_task_num); - w->task_num -= array_task_num; + w->remain_task -= array_task_num; w->task_spawned += array_task_num-1; break; } else { h_exec = manager->create_task(TASK_EXEC, (memaddr)(w->file_mmap + i*w->division_size), size, - (memaddr)(w->o_data + i*w->out_size), w->division_out_size); + (memaddr)(w->o_data + i*out_size), w->division_out_size); t_exec->set_param(0,(long)0); t_exec->set_param(1,(long)0); t_exec->set_param(2,(long)size); @@ -258,7 +259,7 @@ h_exec->spawn(); } w->size -= size; - w->task_num--; + w->remain_task--; } if (use_task_array) { task_array->spawn_task_array(t_exec->next()); @@ -287,13 +288,13 @@ t_read->set_cpu(w->fmp->read_spe_cpu); t_read->set_param(0,w->fd); - if (w->task_num < w->task_blocks) { + if (w->remain_task < w->task_blocks) { t_read->set_param(1,w->task_spawned*w->division_size); t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size); // last case while (w->size >= w->division_size) - run_tasks(manager,w,w->task_num,t_read,w->t_print, w->division_size); + run_tasks(manager,w,w->remain_task,t_read,w->t_print, w->division_size); // remaining data while (w->size>0) run_tasks(manager,w,1,t_read,w->t_print, w->division_size); @@ -326,15 +327,15 @@ MapReduce *w = *(MapReduce **)in; if(w->fmp->use_iterate) { - run_tasks(manager, w, w->task_num, 0, w->t_print, w->division_size); - } else if (w->task_num < w->task_blocks) { + run_tasks(manager, w, w->remain_task, 0, w->t_print, w->division_size); + } else if (w->remain_task < w->task_blocks) { // last case while (w->size >= w->division_size) - run_tasks(manager,w,w->task_num,0,w->t_print, w->division_size); + run_tasks(manager,w,w->remain_task,0,w->t_print, w->division_size); // remaining data while (w->size>0) run_tasks(manager,w,1,0, w->t_print, w->size); - // printf("run16 last %d\n",w->task_num); + // printf("run16 last %d\n",w->remain_task); } else { HTaskPtr t_next = manager->create_task(RUN_TASK_BLOCKS, (memaddr)&w->self,sizeof(memaddr),0,0); @@ -343,7 +344,7 @@ run_tasks(manager,w, w->task_blocks,0, t_next, w->division_size); t_next->spawn(); - // printf("run16 next %d\n",w->task_num); + // printf("run16 next %d\n",w->remain_task); } return 0; } @@ -404,30 +405,24 @@ printf("division_size %ld\n",w->division_size); - /* "word num" and "line num" */ - w->status_num = 2; /* taskの数 */ w->task_num = w->size / w->division_size; w->task_num = w->task_num + (w->division_size*w->task_num < w->size); - int out_task_num = w->task_num; + w->remain_task = w->task_num; if(!all) { w->task_blocks = blocks; } else { - w->task_blocks = w->task_num; + w->task_blocks = w->remain_task; } - w->out_task_num = out_task_num; - printf("task_num %ld\n",w->task_num); - printf("out_task_num %ld\n",w->out_task_num); + printf("task_num %ld\n",w->remain_task); /* out用のdivision_size. statusが2つなので、あわせて16byteになるように、long long(4byte)を使用 */ - w->division_out_size = sizeof(unsigned long long)*4; - int out_size = w->division_out_size*out_task_num; + w->division_out_size = division_out_size; + int out_size = w->division_out_size*w->remain_task; w->o_data = (unsigned long long *)manager->allocate(out_size); - w->out_size_ = out_size; - w->out_size = 4; printf("out size %d\n",out_size); /*各SPEの結果を合計して出力するタスク*/
--- a/TaskManager/ManyCore/FileMapReduce.h Thu Jan 28 17:55:47 2016 +0900 +++ b/TaskManager/ManyCore/FileMapReduce.h Thu Jan 28 18:52:38 2016 +0900 @@ -16,14 +16,11 @@ long size; // remaining file size long division_size; // for each word count task long division_out_size; - long out_size; - long out_size_; - long task_num; // remaining task count + long task_num; // task count + long remain_task; // remaining task count long task_blocks; // spawn task one at a time - long status_num; long task_spawned; unsigned long long *o_data; - long out_task_num; long pad; char *file_mmap; long file_size; @@ -77,10 +74,11 @@ int TASK_EXEC; int TASK_EXEC_DATA_PARALLEL; int TASK_PRINT; +public: + CPU_TYPE read_spe_cpu; int blocks; int division; // in Kbyte -public: - CPU_TYPE read_spe_cpu; + int division_out_size; int use_iterate; int task_exec_id; int task_print_id;
--- a/example/word_count/main.cc Thu Jan 28 17:55:47 2016 +0900 +++ b/example/word_count/main.cc Thu Jan 28 18:52:38 2016 +0900 @@ -29,15 +29,13 @@ char *filename = 0; FileMapReduce *fmp = new FileMapReduce(manager,TASK_EXEC,TASK_EXEC_DATA_PARALLEL,TASK_PRINT); filename = fmp->init(argc, argv); - if (filename < 0) { return -1; } - + fmp->division_out_size = sizeof(unsigned long long)*4; task_init(); st_time = getTime(); fmp->run_start(manager, filename); - // fmp->start(); manager->set_TMend(TMend); return 0; }
--- a/example/word_count/ppe/Print.cc Thu Jan 28 17:55:47 2016 +0900 +++ b/example/word_count/ppe/Print.cc Thu Jan 28 18:52:38 2016 +0900 @@ -4,6 +4,7 @@ #include "Func.h" #include "FileMapReduce.h" +#define STATUS_NUM 2 /* これは必須 */ SchedDefineTask1(Print,run_print); @@ -12,9 +13,8 @@ { MapReduce *w = (MapReduce*)s->get_input(0); unsigned long long *idata = w->o_data; - // long task_num = w->task_num; - long status_num = w->status_num; - int out_task_num = w->out_task_num; + long status_num = STATUS_NUM; + int out_task_num = w->task_num; /* * head_flag @@ -22,7 +22,7 @@ * o_data[1] * */ - unsigned long long word_data[2]; + unsigned long long word_data[STATUS_NUM]; int flag_cal_sum = 0; //printf("pad %d\n",pad); @@ -62,16 +62,17 @@ s->printf("start sum\n"); - for (int i = 0; i < status_num; i++) { + for (int i = 0; i < STATUS_NUM; i++) { word_data[i] = 0; } + int out_size = w->division_out_size / sizeof(unsigned long long); for (int i = 0; i < out_task_num ; i++) { - word_data[0] += idata[i*w->out_size+0]; + word_data[0] += idata[i*out_size+0]; // printf("idata[%d]=%lld\n",i*w->out_size+0,idata[i*w->out_size+0]); - word_data[1] += idata[i*w->out_size+1]; + word_data[1] += idata[i*out_size+1]; unsigned long long *head_tail_flag = - &idata[i*w->out_size+2]; + &idata[i*out_size+2]; if((i!=out_task_num-1)&& (head_tail_flag[1] == 1) && (head_tail_flag[4] == 0)) { flag_cal_sum++;