changeset 1945:653c1caf1362 draft

fix read spawn
author masa
date Wed, 05 Feb 2014 20:09:44 +0900
parents dcf42acc5006
children 502521591d7c
files example/regex_mas/main.cc
diffstat 1 files changed, 23 insertions(+), 10 deletions(-) [+]
line wrap: on
line diff
--- a/example/regex_mas/main.cc	Wed Feb 05 19:13:30 2014 +0900
+++ b/example/regex_mas/main.cc	Wed Feb 05 20:09:44 2014 +0900
@@ -138,7 +138,7 @@
 }
 
 static void
-run_tasks(SchedTask *manager, WordCount *w, int task_count, HTaskPtr t_next, int size)
+run_tasks(SchedTask *manager, WordCount *w, int task_count,HTaskPtr t_read, HTaskPtr t_next, int size)
 {
 
     if (task_count < array_task_num) {
@@ -151,6 +151,7 @@
             int task_num = (w->size+size-1)/size;
             if (task_num>array_task_num) task_num = array_task_num;
             task_array = manager->create_task_array(TASK_EXEC,task_num,1,3,1);
+            task_array->wait_for(t_read);
             if (!all) {
                 t_next->wait_for(task_array);
             } else {
@@ -161,20 +162,21 @@
         Task *t_exec = 0;
         HTask *h_exec = 0;
         for (int j = 0; j < array_task_num; j++) {
-            long a = w->task_spwaned++;
+            long i = w->task_spwaned++;
             if (w->size < size) size = w->size;
             if (size==0) break;
             if (use_task_array) {
                 t_exec = task_array->next_task_array(TASK_EXEC,t_exec);
-                t_exec->set_inData(0,w->file_mmap + a*w->division_size, size);
+                t_exec->set_inData(0,w->file_mmap + i*w->division_size, size);
                 t_exec->set_inData(1,w->search_word, w->search_word_len);
                 t_exec->set_inData(2,w->BMskip_table, 256);
 
-                t_exec->set_param(0,(long)a);
+                t_exec->set_param(0,(long)i);
 
                 t_exec->set_outData(0,w->o_data + a*w->out_size, w->division_out_size);
             } else if (use_compat) {
                 h_exec = manager->create_task(TASK_EXEC);
+                h_exec->wait_for(t_read);
                 h_exec->set_inData(0,w->file_mmap + i*w->division_size, size);
                 h_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size);
 
@@ -186,6 +188,7 @@
                 h_exec = manager->create_task(TASK_EXEC,
                                               (memaddr)(w->file_mmap + i*w->division_size), size,
                                               (memaddr)(w->o_data + i*w->out_size), w->division_out_size);
+                h_exec->wait_for(t_read);
                 t_next->wait_for(h_exec);
                 h_exec->set_cpu(spe_cpu);
                 h_exec->spawn();
@@ -227,9 +230,17 @@
         HTaskPtr t_next = manager->create_task(RUN_TASK_BLOCKS,
                                                (memaddr)&w->self,sizeof(memaddr),0,0);
         w->t_print->wait_for(t_next);
+        long task_spawned = w->task_spawned;
+        HTaskPtr t_read = manager->create_task(READ_TASK,
+                                               (memaddr)&w->self,sizeof(memaddr),0,0);
 
-        run_tasks(manager,w, w->task_blocks, t_next, w->division_size + w->extra_len);
+        t_read->set_cpu(read_cpu);
+        t_read->set_param(0,task_spawned*w->division_size);
 
+        run_tasks(manager,w, w->task_blocks,t_read, t_next, w->division_size + w->extra_len);
+        t_read->set_param(1,task_spawned*w->division_size);
+
+        t_read->spawn();
         t_next->spawn();
         // printf("run16 next %d\n",w->task_num);
     }
@@ -264,10 +275,12 @@
 
     if ((fd=open(filename,O_RDONLY,0666))==0) {
         fprintf(stderr,"can't open %s\n",filename);
+        return ;
     }
 
     if (fstat(fd,sb)) {
         fprintf(stderr,"can't fstat %s\n",filename);
+        return ;
     }
 
     WordCountPtr w = (WordCountPtr)manager->allocate(sizeof(WordCount));
@@ -286,18 +299,18 @@
     printf("one_task_size: %ld\n",w->read_division_size);
     printf("task_num     : %d\n",w->read_task_num);
 
-    HTaskPtr r_run = NULL;
+    HTaskPtr r_read = NULL;
 
     if (divide_read_flag != 0) {
         printf("[divide read mode]\n");
         w->file_mmap = (char*)manager->allocate(w->read_filesize);
-        r_run = manager->create_task(RUN_READ_BLOCKS, (memaddr)&w->self, sizeof(memaddr),0,0);
+        r_read = manager->create_task(RUN_READ_BLOCKS, (memaddr)&w->self, sizeof(memaddr),0,0);
     }else {
         printf("[mmap mode]\n");
-        r_run = manager->create_task(MMAP , (memaddr)&w->self, sizeof(memaddr),0,0);
+        r_read = manager->create_task(MMAP , (memaddr)&w->self, sizeof(memaddr),0,0);
     }
 
-    r_run->spawn();
+    r_read->spawn();
 
     /* prepare BMSearch*/
     int *skip = (int*)manager->allocate(256 * sizeof(int));
@@ -364,7 +377,7 @@
         /* serialize されていると仮定する... */
         t_exec = manager->create_task(RUN_TASK_BLOCKS,
                                                (memaddr)&w->self,sizeof(memaddr),0,0);
-        t_exec->wait_for(r_run);
+        t_exec->wait_for(r_read);
         t_print->wait_for(t_exec);
         //    t_exec->iterate(4);
         t_exec->spawn();