comparison example/word_count/main.cc @ 2051:b79a250b4f99 draft

FileMapReduce (no compile error)
author masa
date Thu, 28 Jan 2016 17:23:45 +0900
parents 674ac7887dae
children 030b8efcf357
comparison
equal deleted inserted replaced
2050:26dd777ba95d 2051:b79a250b4f99
1 #include <stdio.h> 1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <sys/mman.h>
5 #include <sys/types.h>
6 #include <sys/stat.h>
7 #include <fcntl.h>
8 #include <unistd.h>
9 #include <sys/time.h> 2 #include <sys/time.h>
10 #include "TaskManager.h" 3 #include "TaskManager.h"
11 #include "SchedTask.h" 4 #include "SchedTask.h"
12 #include "Func.h" 5 #include "Func.h"
13 #include "WordCount.h" 6 #include "FileMapReduce.h"
14 7
15 /* ;TODO 8 /* ;TODO
16 * PS3でCPU数が2以上の時に、あまりが計算されてない 9 * PS3でCPU数が2以上の時に、あまりが計算されてない
17 */ 10 */
18 11
19 extern void task_init(); 12 extern void task_init();
20 void TMend(TaskManager *); 13 void TMend(TaskManager *);
21 static double st_time; 14 static double st_time;
22 static double ed_time; 15 static double ed_time;
23 int all = 0; 16 const char* usr_help_str = "";
24 int use_task_array = 1;
25 int use_task_creater = 0;
26 int use_compat = 0;
27 int use_iterate = 0;
28 int array_task_num = 11;
29 int spe_num = 1;
30 int read_type = MY_MMAP;
31 int t_exec_num = 4;
32 CPU_TYPE spe_cpu = SPE_ANY;
33 CPU_TYPE read_spe_cpu = IO_0;
34
35 const char *usr_help_str = "Usage: ./word_count [-a -c -s] [-cpu spe_num] [-g] [-file filename] [-br]\n";
36 17
37 static double 18 static double
38 getTime() { 19 getTime() {
39 struct timeval tv; 20 struct timeval tv;
40 gettimeofday(&tv, NULL); 21 gettimeofday(&tv, NULL);
41 return tv.tv_sec + (double)tv.tv_usec*1e-6; 22 return tv.tv_sec + (double)tv.tv_usec*1e-6;
42 } 23 }
43 24
44 typedef struct {
45 caddr_t file_mmap;
46 off_t size;
47 } st_mmap_t;
48
49 /*与えられたsizeをfix_byte_sizeの倍数にする(丸め込むっていうのかな?)*/
50 static int
51 fix_byte(int size,int fix_byte_size)
52 {
53 size = (size/fix_byte_size)*fix_byte_size + ((size%fix_byte_size)!= 0)*fix_byte_size;
54
55 return size;
56 }
57
58 static void
59 my_read(char *filename, WordCount *w, TaskManager *manager)
60 {
61 long fd = w->fd;
62 long r_filesize = w->read_filesize;
63
64 if ((fd=open(filename,O_RDONLY,0666))==0) {
65 fprintf(stderr,"can't open %s\n",filename);
66 }
67
68 w->file_mmap = (char*)manager->allocate(w->read_filesize);
69
70 long one_read_size = 1024 * 1024 * 1024; // 1GB
71
72 for (int i = 0; 0 < r_filesize; i++) {
73 if (r_filesize > one_read_size) {
74 pread(fd, w->file_mmap + i*one_read_size, one_read_size,i*one_read_size);
75 }else if ((r_filesize < one_read_size) && (r_filesize != 0)) {
76 pread(fd, w->file_mmap + i*one_read_size, r_filesize,i*one_read_size);
77 }
78 r_filesize -= one_read_size;
79 }
80
81 if (w->file_mmap == (caddr_t)-1) {
82 fprintf(stderr,"Can't mmap file\n");
83 perror(NULL);
84 exit(0);
85 }
86
87 return ;
88 }
89
90 static void
91 my_mmap(char *filename, WordCount *w)
92 {
93 /*マッピングだよ!*/
94 int map = MAP_PRIVATE;
95 st_mmap_t st_mmap;
96 struct stat sb;
97 long fd = w->fd;
98
99 if ((fd=open(filename,O_RDONLY,0666))==0) {
100 fprintf(stderr,"can't open %s\n",filename);
101 }
102
103 if (fstat(fd,&sb)) {
104 fprintf(stderr,"can't fstat %s\n",filename);
105 }
106
107 st_mmap.size = fix_byte(sb.st_size,4096);
108
109 //madvise(w->file_mmap, w->read_filesize, POSIX_MADV_NORMAL);
110 w->file_mmap = (char*)mmap(NULL,st_mmap.size,PROT_WRITE|PROT_READ,map,fd,(off_t)0);
111
112 if (st_mmap.file_mmap == (caddr_t)-1) {
113 fprintf(stderr,"Can't mmap file\n");
114 perror(NULL);
115 exit(0);
116 }
117
118 return ;
119 }
120
121 static void
122 run_tasks(SchedTask *manager, WordCount *w, int task_count, HTaskPtr t_read, HTaskPtr t_next, int size)
123 {
124
125 if (task_count < array_task_num) {
126 array_task_num = task_count;
127 if (task_count<=0) return;
128 }
129 for (int i = 0; i < task_count; i += array_task_num) {
130 HTask *task_array;
131 if (use_task_array) {
132 int task_num = (w->size+size-1)/size;
133 if (task_num>array_task_num) task_num = array_task_num;
134 task_array = manager->create_task_array(TASK_EXEC,task_num,1,1,1);
135 if (t_read != 0) task_array->wait_for(t_read);
136 if (!all) {
137 t_next->wait_for(task_array);
138 } else {
139 w->t_print->wait_for(task_array);
140 }
141 }
142
143 Task *t_exec = 0;
144 HTask *h_exec = 0;
145 for (int j = 0; j < array_task_num; j++) {
146 int i = w->task_spawned++;
147 if (w->size < size) size = w->size;
148 if (size==0) break;
149 if (use_task_array) {
150 t_exec = task_array->next_task_array(TASK_EXEC,t_exec);
151 t_exec->set_param(0,(long)0);
152 t_exec->set_param(1,(long)0);
153 t_exec->set_param(2,(long)size);
154 t_exec->set_param(3,(long)w->division_out_size);
155 t_exec->set_inData(0,w->file_mmap + i*w->division_size, size);
156 t_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size);
157 } else if (use_compat) {
158 h_exec = manager->create_task(TASK_EXEC);
159 t_exec->set_param(0,(long)0);
160 t_exec->set_param(1,(long)0);
161 t_exec->set_param(2,(long)size);
162 t_exec->set_param(3,(long)w->division_out_size);
163 h_exec->set_inData(0,w->file_mmap + i*w->division_size, size);
164 h_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size);
165
166 t_next->wait_for(h_exec);
167
168 h_exec->set_cpu(spe_cpu);
169 h_exec->spawn();
170 } else if (use_iterate) {
171 array_task_num = w->task_num;
172 use_iterate = 0;
173 use_compat = 1;
174
175 w->size -= size*array_task_num;
176 if(w->size < 0) {
177 array_task_num -= 1;
178 w->size += size;
179 }
180 h_exec = manager->create_task(TASK_EXEC_DATA_PARALLEL);
181 h_exec->flip();
182 h_exec->set_inData(0,w->file_mmap,w->file_size);
183 h_exec->set_inData(1,w->o_data,w->out_size_);
184 h_exec->set_outData(0,w->file_mmap,w->file_size);
185 h_exec->set_outData(1,w->o_data,w->out_size_);
186 h_exec->set_param(0,(long)i);
187 h_exec->set_param(1,(long)w->division_size);
188 h_exec->set_param(2,(long)size);
189 h_exec->set_param(3,(long)w->out_size);
190
191 t_next->wait_for(h_exec);
192 h_exec->set_cpu(spe_cpu);
193 h_exec->iterate(array_task_num);
194
195 w->task_num -= array_task_num;
196 w->task_spawned += array_task_num-1;
197
198 break;
199 } else {
200 h_exec = manager->create_task(TASK_EXEC,
201 (memaddr)(w->file_mmap + i*w->division_size), size,
202 (memaddr)(w->o_data + i*w->out_size), w->division_out_size);
203 t_exec->set_param(0,(long)0);
204 t_exec->set_param(1,(long)0);
205 t_exec->set_param(2,(long)size);
206 t_exec->set_param(3,(long)w->division_out_size);
207 t_next->wait_for(h_exec);
208 h_exec->set_cpu(spe_cpu);
209 h_exec->spawn();
210 }
211 w->size -= size;
212 w->task_num--;
213 }
214 if (use_task_array) {
215 task_array->spawn_task_array(t_exec->next());
216 task_array->set_cpu(spe_cpu);
217 task_array->spawn();
218 } else {
219 //if (!all) t_next->wait_for(h_exec);
220 }
221 }
222 }
223
224 /**
225 * このTaskは、PPE上で実行されるので、並列に実行されることはない
226 * 二つ実行されていて、Task が足りなくなることがないようにしている。
227 */
228
229 SchedDefineTask1(BREAD_RUN_TASK_BLOCKS,bread_run16);
230
231 static int
232 bread_run16(SchedTask *manager, void *in, void *out)
233 {
234 WordCount *w = *(WordCount **)in;
235
236 HTaskPtr t_read = manager->create_task(READ_TASK);
237 w->t_print->wait_for(t_read);
238 t_read->set_cpu(read_spe_cpu);
239 t_read->set_param(0,w->fd);
240
241 if (w->task_num < w->task_blocks) {
242 t_read->set_param(1,w->task_spawned*w->division_size);
243 t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size);
244
245 // last case
246 while (w->size >= w->division_size)
247 run_tasks(manager,w,w->task_num,t_read,w->t_print, w->division_size);
248 // remaining data
249 while (w->size>0)
250 run_tasks(manager,w,1,t_read,w->t_print, w->division_size);
251
252 t_read->set_param(2,w->task_spawned*w->division_size);
253 t_read->spawn();
254 } else {
255 HTaskPtr t_next = manager->create_task(BREAD_RUN_TASK_BLOCKS,
256 (memaddr)&w->self,sizeof(memaddr),0,0);
257 w->t_print->wait_for(t_next);
258
259 t_read->set_param(1,w->task_spawned*w->division_size);
260 t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size);
261
262 run_tasks(manager,w, w->task_blocks, t_read, t_next, w->division_size);
263
264 t_read->set_param(2,w->task_spawned*w->division_size);
265
266 t_read->spawn();
267 t_next->spawn();
268 }
269 return 0;
270 }
271
272 SchedDefineTask1(RUN_TASK_BLOCKS,run16);
273
274 static int
275 run16(SchedTask *manager, void *in, void *out)
276 {
277 WordCount *w = *(WordCount **)in;
278
279 if(use_iterate) {
280 run_tasks(manager, w, w->task_num, 0, w->t_print, w->division_size);
281 } else if (w->task_num < w->task_blocks) {
282 // last case
283 while (w->size >= w->division_size)
284 run_tasks(manager,w,w->task_num,0,w->t_print, w->division_size);
285 // remaining data
286 while (w->size>0)
287 run_tasks(manager,w,1,0, w->t_print, w->size);
288 // printf("run16 last %d\n",w->task_num);
289 } else {
290 HTaskPtr t_next = manager->create_task(RUN_TASK_BLOCKS,
291 (memaddr)&w->self,sizeof(memaddr),0,0);
292 w->t_print->wait_for(t_next);
293
294 run_tasks(manager,w, w->task_blocks,0, t_next, w->division_size);
295
296 t_next->spawn();
297 // printf("run16 next %d\n",w->task_num);
298 }
299 return 0;
300 }
301
302 static int blocks = 48;
303 //static int blocks = 31 * 6 * 24;
304 static int division = 16; // in Kbyte
305
306 static void
307 run_start(TaskManager *manager, char *filename)
308 {
309 long fd = (long)manager->allocate(sizeof(long));
310 struct stat *sb = (struct stat*)manager->allocate(sizeof(struct stat));
311 HTaskPtr t_exec;
312
313 if ((fd=open(filename,O_RDONLY,0666))==0) {
314 fprintf(stderr,"can't open %s\n",filename);
315 return ;
316 }
317
318 if (fstat(fd,sb)) {
319 fprintf(stderr,"can't fstat %s\n",filename);
320 return ;
321 }
322
323 WordCountPtr w = (WordCountPtr)manager->allocate(sizeof(WordCount));
324
325 w->self = w;
326 w->fd = fd;
327 w->read_filesize = sb->st_size;
328
329
330 if (read_type == BLOCKED_READ) {
331 printf("[blocked read mode]\n");
332 w->file_mmap = (char*)manager->allocate(w->read_filesize);
333 }else if (read_type == MY_READ) {
334 printf("[single read mode]\n");
335 my_read(filename, w, manager);
336 }else if(read_type == MY_MMAP) {
337 printf("[mmap mode]\n");
338 my_mmap(filename, w);
339 }else if(read_type == BLOCKED_MMAP) {
340 printf("[blocked mmap mode]\n");
341 my_mmap(filename, w);
342 }
343
344 HTaskPtr t_print;
345
346 //w->task_blocks = blocks;
347 w->self = w;
348 w->task_spawned = 0;
349
350 w->size = w->file_size = w->read_filesize;
351 printf("w %lx\n",(long)w);
352
353 /* 1task分のデータサイズ(byte) */
354 if (w->size >= 1024*division) {
355 w->division_size = 1024 * division;/*16kbyte*/
356 } else {
357 w->division_size = w->size;
358 }
359
360 printf("division_size %ld\n",w->division_size);
361
362 /* "word num" and "line num" */
363 w->status_num = 2;
364 /* taskの数 */
365 w->task_num = w->size / w->division_size;
366 w->task_num = w->task_num + (w->division_size*w->task_num < w->size);
367 int out_task_num = w->task_num;
368
369 if(!all) {
370 w->task_blocks = blocks;
371 } else {
372 w->task_blocks = w->task_num;
373 }
374
375 w->out_task_num = out_task_num;
376 printf("task_num %ld\n",w->task_num);
377 printf("out_task_num %ld\n",w->out_task_num);
378
379 /* out用のdivision_size. statusが2つなので、あわせて16byteになるように、long long(4byte)を使用 */
380
381 w->division_out_size = sizeof(unsigned long long)*4;
382 int out_size = w->division_out_size*out_task_num;
383 w->o_data = (unsigned long long *)manager->allocate(out_size);
384 w->out_size_ = out_size;
385 w->out_size = 4;
386 printf("out size %d\n",out_size);
387
388 /*各SPEの結果を合計して出力するタスク*/
389
390 t_print = manager->create_task(TASK_PRINT,
391 (memaddr)&w->self,sizeof(memaddr),0,0);
392 w->t_print = t_print;
393 for(int i=0;i<t_exec_num;i++) {
394 /* Task を task_blocks ずつ起動する Task */
395 /* serialize されていると仮定する... */
396 if (read_type == BLOCKED_READ) {
397 t_exec = manager->create_task(BREAD_RUN_TASK_BLOCKS,
398 (memaddr)&w->self,sizeof(memaddr),0,0);
399 }else {
400 t_exec = manager->create_task(RUN_TASK_BLOCKS,
401 (memaddr)&w->self,sizeof(memaddr),0,0);
402 }
403
404 t_print->wait_for(t_exec);
405 // t_exec->iterate(4);
406 t_exec->spawn();
407 }
408 t_print->spawn();
409 }
410
411 static char*
412 init(int argc, char **argv)
413 {
414
415 char *filename = 0;
416
417 for (int i = 1; argv[i]; ++i) {
418 if (strcmp(argv[i], "-file") == 0) {
419 filename = argv[i+1]; i++;
420 } else if (strcmp(argv[i], "-division") == 0) {
421 division = atoi(argv[i+1]);
422 i++;
423 } else if (strcmp(argv[i], "-block") == 0) {
424 blocks = atoi(argv[i+1]);
425 i++;
426 } else if (strcmp(argv[i], "-a") == 0) {
427 // create task all at once
428 all = 1;
429 } else if (strcmp(argv[i], "-c") == 0) {
430 use_task_array = 0;
431 use_compat = 1;
432 } else if (strcmp(argv[i], "-s") == 0) {
433 use_task_array = 0;
434 use_compat = 0;
435 } else if (strcmp(argv[i], "-t") == 0) {
436 use_task_creater = 1;
437 use_task_array = 0;
438 use_compat = 0;
439 } else if (strcmp(argv[i], "-anum") == 0) {
440 array_task_num = atoi(argv[i+1]);
441 i++;
442 } else if (strcmp(argv[i], "-g") == 0) {
443 spe_cpu = GPU_0;
444 } else if (strcmp(argv[i], "-any") == 0) {
445 spe_cpu = ANY_ANY;
446 } else if (strcmp(argv[i], "-i") == 0) {
447 use_iterate = 1;
448 use_task_array = 0;
449 t_exec_num = 1;
450 } else if (strcmp(argv[i], "-br") == 0) {
451 read_type = BLOCKED_READ;
452 } else if (strcmp(argv[i], "-r") == 0) {
453 read_type = MY_READ;
454 }
455 /* else if (strcmp(argv[i], "-cpu") == 0) {
456 spe_num = atoi(argv[i+1]);
457 i++;
458 if (spe_num==0) spe_num = 1;
459 } else {
460 fprintf(stderr,"%s\n",usr_help_str);
461 exit (0);
462 }*/
463 }
464 if (filename==0) {
465 puts(usr_help_str);
466 exit(1);
467 }
468
469 return filename;
470 }
471
472
473 int 25 int
474 TMmain(TaskManager *manager, int argc, char *argv[]) 26 TMmain(TaskManager *manager, int argc, char *argv[])
475 { 27 {
476 28
477 char *filename = 0; 29 char *filename = 0;
478 filename = init(argc, argv); 30 FileMapReduce *fmp = new FileMapReduce(manager,TASK_EXEC,TASK_EXEC_DATA_PARALLEL,TASK_PRINT);
31 filename = fmp->init(argc, argv);
479 32
480 if (filename < 0) { 33 if (filename < 0) {
481 return -1; 34 return -1;
482 } 35 }
483 36
484 task_init(); 37 task_init();
485 st_time = getTime(); 38 st_time = getTime();
486 run_start(manager, filename); 39 fmp->run_start(manager, filename);
487 // FileMapReduce fmp = new FileMapReduce(manager, filename, TASK_EXEC, TASK_PRINT);
488 // fmp->start(); 40 // fmp->start();
489 manager->set_TMend(TMend); 41 manager->set_TMend(TMend);
490 return 0; 42 return 0;
491 } 43 }
492 44