changeset 2053:030b8efcf357 draft

remove wordCount dependency in FileMapReduce
author masa
date Thu, 28 Jan 2016 18:52:38 +0900
parents cc1ea3933551
children 2e7a6f40672f
files TaskManager/ManyCore/FileMapReduce.cc TaskManager/ManyCore/FileMapReduce.h example/word_count/main.cc example/word_count/ppe/Print.cc
diffstat 4 files changed, 37 insertions(+), 45 deletions(-) [+]
line wrap: on
line diff
--- a/TaskManager/ManyCore/FileMapReduce.cc	Thu Jan 28 17:55:47 2016 +0900
+++ b/TaskManager/ManyCore/FileMapReduce.cc	Thu Jan 28 18:52:38 2016 +0900
@@ -197,6 +197,7 @@
 
         Task *t_exec = 0;
         HTask *h_exec = 0;
+        int out_size = w->division_out_size / sizeof(*w->o_data);
         for (int j = 0; j < array_task_num; j++) {
             int i = w->task_spawned++;
             if (w->size < size) size = w->size;
@@ -208,7 +209,7 @@
                 t_exec->set_param(2,(long)size);
                 t_exec->set_param(3,(long)w->division_out_size);
                 t_exec->set_inData(0,w->file_mmap + i*w->division_size, size);
-                t_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size);
+                t_exec->set_outData(0,w->o_data + i*out_size, w->division_out_size);
             } else if (use_compat) {
                 h_exec = manager->create_task(TASK_EXEC);
                 t_exec->set_param(0,(long)0);
@@ -216,12 +217,12 @@
                 t_exec->set_param(2,(long)size);
                 t_exec->set_param(3,(long)w->division_out_size);
                 h_exec->set_inData(0,w->file_mmap + i*w->division_size, size);
-                h_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size);
+                h_exec->set_outData(0,w->o_data + i*out_size, w->division_out_size);
                 t_next->wait_for(h_exec);
                 h_exec->set_cpu(spe_cpu);
                 h_exec->spawn();
             } else if (use_iterate) {
-                array_task_num = w->task_num;
+                array_task_num = w->remain_task;
                 use_iterate = 0;
                 use_compat = 1;
                 w->size -= size*array_task_num;
@@ -232,23 +233,23 @@
                 h_exec = manager->create_task(TASK_EXEC_DATA_PARALLEL);
                 h_exec->flip();
                 h_exec->set_inData(0,w->file_mmap,w->file_size);
-                h_exec->set_inData(1,w->o_data,w->out_size_);
+                h_exec->set_inData(1,w->o_data,w->division_out_size*w->remain_task);
                 h_exec->set_outData(0,w->file_mmap,w->file_size);
-                h_exec->set_outData(1,w->o_data,w->out_size_);
+                h_exec->set_outData(1,w->o_data,w->division_out_size*w->remain_task);
                 h_exec->set_param(0,(long)i);
                 h_exec->set_param(1,(long)w->division_size);
                 h_exec->set_param(2,(long)size);
-                h_exec->set_param(3,(long)w->out_size);
+                h_exec->set_param(3,(long)out_size);
                 t_next->wait_for(h_exec);
                 h_exec->set_cpu(spe_cpu);
                 h_exec->iterate(array_task_num);
-                w->task_num -= array_task_num;
+                w->remain_task -= array_task_num;
                 w->task_spawned += array_task_num-1;
                 break;
             } else {
                 h_exec = manager->create_task(TASK_EXEC,
                                               (memaddr)(w->file_mmap + i*w->division_size), size,
-                                              (memaddr)(w->o_data + i*w->out_size), w->division_out_size);
+                                              (memaddr)(w->o_data + i*out_size), w->division_out_size);
                 t_exec->set_param(0,(long)0);
                 t_exec->set_param(1,(long)0);
                 t_exec->set_param(2,(long)size);
@@ -258,7 +259,7 @@
                 h_exec->spawn();
             }
             w->size -= size;
-            w->task_num--;
+            w->remain_task--;
         }
         if (use_task_array) {
             task_array->spawn_task_array(t_exec->next());
@@ -287,13 +288,13 @@
     t_read->set_cpu(w->fmp->read_spe_cpu);
     t_read->set_param(0,w->fd);
 
-    if (w->task_num < w->task_blocks) {
+    if (w->remain_task < w->task_blocks) {
         t_read->set_param(1,w->task_spawned*w->division_size);
         t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size);
 
         // last case
         while (w->size >= w->division_size)
-            run_tasks(manager,w,w->task_num,t_read,w->t_print, w->division_size);
+            run_tasks(manager,w,w->remain_task,t_read,w->t_print, w->division_size);
         // remaining data
         while (w->size>0)
             run_tasks(manager,w,1,t_read,w->t_print, w->division_size);
@@ -326,15 +327,15 @@
     MapReduce *w = *(MapReduce **)in;
 
     if(w->fmp->use_iterate) {
-        run_tasks(manager, w, w->task_num, 0, w->t_print, w->division_size);
-    } else if (w->task_num < w->task_blocks) {
+        run_tasks(manager, w, w->remain_task, 0, w->t_print, w->division_size);
+    } else if (w->remain_task < w->task_blocks) {
         // last case
         while (w->size >= w->division_size)
-            run_tasks(manager,w,w->task_num,0,w->t_print, w->division_size);
+            run_tasks(manager,w,w->remain_task,0,w->t_print, w->division_size);
         // remaining data
         while (w->size>0)
             run_tasks(manager,w,1,0, w->t_print, w->size);
-        // printf("run16 last %d\n",w->task_num);
+        // printf("run16 last %d\n",w->remain_task);
     } else {
         HTaskPtr t_next = manager->create_task(RUN_TASK_BLOCKS,
                                                (memaddr)&w->self,sizeof(memaddr),0,0);
@@ -343,7 +344,7 @@
         run_tasks(manager,w, w->task_blocks,0, t_next, w->division_size);
 
         t_next->spawn();
-        // printf("run16 next %d\n",w->task_num);
+        // printf("run16 next %d\n",w->remain_task);
     }
     return 0;
 }
@@ -404,30 +405,24 @@
 
     printf("division_size %ld\n",w->division_size);
 
-    /* "word num" and "line num" */
-    w->status_num = 2;
     /* taskの数 */
     w->task_num = w->size / w->division_size;
     w->task_num = w->task_num + (w->division_size*w->task_num < w->size);
-    int out_task_num = w->task_num;
+    w->remain_task = w->task_num;
 
     if(!all) {
         w->task_blocks = blocks;
     } else {
-        w->task_blocks = w->task_num;
+        w->task_blocks = w->remain_task;
     }
 
-    w->out_task_num = out_task_num;
-    printf("task_num %ld\n",w->task_num);
-    printf("out_task_num %ld\n",w->out_task_num);
+    printf("task_num %ld\n",w->remain_task);
 
     /* out用のdivision_size. statusが2つなので、あわせて16byteになるように、long long(4byte)を使用 */
 
-    w->division_out_size = sizeof(unsigned long long)*4;
-    int out_size = w->division_out_size*out_task_num;
+    w->division_out_size = division_out_size;
+    int out_size = w->division_out_size*w->remain_task;
     w->o_data = (unsigned long long *)manager->allocate(out_size);
-    w->out_size_ = out_size;
-    w->out_size = 4;
     printf("out size %d\n",out_size);
 
     /*各SPEの結果を合計して出力するタスク*/
--- a/TaskManager/ManyCore/FileMapReduce.h	Thu Jan 28 17:55:47 2016 +0900
+++ b/TaskManager/ManyCore/FileMapReduce.h	Thu Jan 28 18:52:38 2016 +0900
@@ -16,14 +16,11 @@
     long size;             // remaining file size
     long division_size;    // for each word count task
     long division_out_size;
-    long out_size;
-    long out_size_;
-    long task_num;         // remaining task count
+    long task_num;         // task count
+    long remain_task;      // remaining task count
     long task_blocks;      // spawn task one at a time
-    long status_num;
     long task_spawned;
     unsigned long long *o_data;
-    long out_task_num;
     long pad;
     char *file_mmap;
     long file_size;
@@ -77,10 +74,11 @@
     int TASK_EXEC;
     int TASK_EXEC_DATA_PARALLEL;
     int TASK_PRINT;
+public:
+    CPU_TYPE read_spe_cpu;
     int blocks;
     int division; // in Kbyte
-public:
-    CPU_TYPE read_spe_cpu;
+    int division_out_size;
     int use_iterate;
     int task_exec_id;
     int task_print_id;
--- a/example/word_count/main.cc	Thu Jan 28 17:55:47 2016 +0900
+++ b/example/word_count/main.cc	Thu Jan 28 18:52:38 2016 +0900
@@ -29,15 +29,13 @@
     char *filename = 0;
     FileMapReduce *fmp = new FileMapReduce(manager,TASK_EXEC,TASK_EXEC_DATA_PARALLEL,TASK_PRINT);
     filename = fmp->init(argc, argv);
-
     if (filename < 0) {
         return -1;
     }
-
+    fmp->division_out_size = sizeof(unsigned long long)*4;
     task_init();
     st_time = getTime();
     fmp->run_start(manager, filename);
-    // fmp->start();
     manager->set_TMend(TMend);
     return 0;
 }
--- a/example/word_count/ppe/Print.cc	Thu Jan 28 17:55:47 2016 +0900
+++ b/example/word_count/ppe/Print.cc	Thu Jan 28 18:52:38 2016 +0900
@@ -4,6 +4,7 @@
 #include "Func.h"
 #include "FileMapReduce.h"
 
+#define STATUS_NUM 2
 /* これは必須 */
 SchedDefineTask1(Print,run_print);
 
@@ -12,9 +13,8 @@
 {
     MapReduce *w = (MapReduce*)s->get_input(0);
     unsigned long long *idata = w->o_data;
-    // long task_num = w->task_num;
-    long status_num = w->status_num;
-    int out_task_num = w->out_task_num;
+    long status_num = STATUS_NUM;
+    int out_task_num = w->task_num;
 
     /*
      *  head_flag
@@ -22,7 +22,7 @@
      *  o_data[1]
      *
      */
-    unsigned long long word_data[2];
+    unsigned long long word_data[STATUS_NUM];
 
     int flag_cal_sum = 0;
     //printf("pad %d\n",pad);
@@ -62,16 +62,17 @@
 
     s->printf("start sum\n");
 
-    for (int i = 0; i < status_num; i++) {
+    for (int i = 0; i < STATUS_NUM; i++) {
         word_data[i] = 0;
     }
 
+    int out_size = w->division_out_size / sizeof(unsigned long long);
     for (int i = 0; i < out_task_num ; i++) {
-        word_data[0] += idata[i*w->out_size+0];
+        word_data[0] += idata[i*out_size+0];
         //        printf("idata[%d]=%lld\n",i*w->out_size+0,idata[i*w->out_size+0]);
-        word_data[1] += idata[i*w->out_size+1];
+        word_data[1] += idata[i*out_size+1];
         unsigned long long *head_tail_flag = 
-            &idata[i*w->out_size+2];
+            &idata[i*out_size+2];
         if((i!=out_task_num-1)&&
            (head_tail_flag[1] == 1) && (head_tail_flag[4] == 0)) {
             flag_cal_sum++;