Mercurial > hg > Game > Cerium
comparison example/word_count/main.cc @ 2051:b79a250b4f99 draft
FileMapReduce (no compile error)
author | masa |
---|---|
date | Thu, 28 Jan 2016 17:23:45 +0900 |
parents | 674ac7887dae |
children | 030b8efcf357 |
comparison
equal
deleted
inserted
replaced
2050:26dd777ba95d | 2051:b79a250b4f99 |
---|---|
1 #include <stdio.h> | 1 #include <stdio.h> |
2 #include <stdlib.h> | |
3 #include <string.h> | |
4 #include <sys/mman.h> | |
5 #include <sys/types.h> | |
6 #include <sys/stat.h> | |
7 #include <fcntl.h> | |
8 #include <unistd.h> | |
9 #include <sys/time.h> | 2 #include <sys/time.h> |
10 #include "TaskManager.h" | 3 #include "TaskManager.h" |
11 #include "SchedTask.h" | 4 #include "SchedTask.h" |
12 #include "Func.h" | 5 #include "Func.h" |
13 #include "WordCount.h" | 6 #include "FileMapReduce.h" |
14 | 7 |
15 /* ;TODO | 8 /* ;TODO |
16 * PS3でCPU数が2以上の時に、あまりが計算されてない | 9 * PS3でCPU数が2以上の時に、あまりが計算されてない |
17 */ | 10 */ |
18 | 11 |
19 extern void task_init(); | 12 extern void task_init(); |
20 void TMend(TaskManager *); | 13 void TMend(TaskManager *); |
21 static double st_time; | 14 static double st_time; |
22 static double ed_time; | 15 static double ed_time; |
23 int all = 0; | 16 const char* usr_help_str = ""; |
24 int use_task_array = 1; | |
25 int use_task_creater = 0; | |
26 int use_compat = 0; | |
27 int use_iterate = 0; | |
28 int array_task_num = 11; | |
29 int spe_num = 1; | |
30 int read_type = MY_MMAP; | |
31 int t_exec_num = 4; | |
32 CPU_TYPE spe_cpu = SPE_ANY; | |
33 CPU_TYPE read_spe_cpu = IO_0; | |
34 | |
35 const char *usr_help_str = "Usage: ./word_count [-a -c -s] [-cpu spe_num] [-g] [-file filename] [-br]\n"; | |
36 | 17 |
37 static double | 18 static double |
38 getTime() { | 19 getTime() { |
39 struct timeval tv; | 20 struct timeval tv; |
40 gettimeofday(&tv, NULL); | 21 gettimeofday(&tv, NULL); |
41 return tv.tv_sec + (double)tv.tv_usec*1e-6; | 22 return tv.tv_sec + (double)tv.tv_usec*1e-6; |
42 } | 23 } |
43 | 24 |
44 typedef struct { | |
45 caddr_t file_mmap; | |
46 off_t size; | |
47 } st_mmap_t; | |
48 | |
49 /*与えられたsizeをfix_byte_sizeの倍数にする(丸め込むっていうのかな?)*/ | |
50 static int | |
51 fix_byte(int size,int fix_byte_size) | |
52 { | |
53 size = (size/fix_byte_size)*fix_byte_size + ((size%fix_byte_size)!= 0)*fix_byte_size; | |
54 | |
55 return size; | |
56 } | |
57 | |
58 static void | |
59 my_read(char *filename, WordCount *w, TaskManager *manager) | |
60 { | |
61 long fd = w->fd; | |
62 long r_filesize = w->read_filesize; | |
63 | |
64 if ((fd=open(filename,O_RDONLY,0666))==0) { | |
65 fprintf(stderr,"can't open %s\n",filename); | |
66 } | |
67 | |
68 w->file_mmap = (char*)manager->allocate(w->read_filesize); | |
69 | |
70 long one_read_size = 1024 * 1024 * 1024; // 1GB | |
71 | |
72 for (int i = 0; 0 < r_filesize; i++) { | |
73 if (r_filesize > one_read_size) { | |
74 pread(fd, w->file_mmap + i*one_read_size, one_read_size,i*one_read_size); | |
75 }else if ((r_filesize < one_read_size) && (r_filesize != 0)) { | |
76 pread(fd, w->file_mmap + i*one_read_size, r_filesize,i*one_read_size); | |
77 } | |
78 r_filesize -= one_read_size; | |
79 } | |
80 | |
81 if (w->file_mmap == (caddr_t)-1) { | |
82 fprintf(stderr,"Can't mmap file\n"); | |
83 perror(NULL); | |
84 exit(0); | |
85 } | |
86 | |
87 return ; | |
88 } | |
89 | |
90 static void | |
91 my_mmap(char *filename, WordCount *w) | |
92 { | |
93 /*マッピングだよ!*/ | |
94 int map = MAP_PRIVATE; | |
95 st_mmap_t st_mmap; | |
96 struct stat sb; | |
97 long fd = w->fd; | |
98 | |
99 if ((fd=open(filename,O_RDONLY,0666))==0) { | |
100 fprintf(stderr,"can't open %s\n",filename); | |
101 } | |
102 | |
103 if (fstat(fd,&sb)) { | |
104 fprintf(stderr,"can't fstat %s\n",filename); | |
105 } | |
106 | |
107 st_mmap.size = fix_byte(sb.st_size,4096); | |
108 | |
109 //madvise(w->file_mmap, w->read_filesize, POSIX_MADV_NORMAL); | |
110 w->file_mmap = (char*)mmap(NULL,st_mmap.size,PROT_WRITE|PROT_READ,map,fd,(off_t)0); | |
111 | |
112 if (st_mmap.file_mmap == (caddr_t)-1) { | |
113 fprintf(stderr,"Can't mmap file\n"); | |
114 perror(NULL); | |
115 exit(0); | |
116 } | |
117 | |
118 return ; | |
119 } | |
120 | |
121 static void | |
122 run_tasks(SchedTask *manager, WordCount *w, int task_count, HTaskPtr t_read, HTaskPtr t_next, int size) | |
123 { | |
124 | |
125 if (task_count < array_task_num) { | |
126 array_task_num = task_count; | |
127 if (task_count<=0) return; | |
128 } | |
129 for (int i = 0; i < task_count; i += array_task_num) { | |
130 HTask *task_array; | |
131 if (use_task_array) { | |
132 int task_num = (w->size+size-1)/size; | |
133 if (task_num>array_task_num) task_num = array_task_num; | |
134 task_array = manager->create_task_array(TASK_EXEC,task_num,1,1,1); | |
135 if (t_read != 0) task_array->wait_for(t_read); | |
136 if (!all) { | |
137 t_next->wait_for(task_array); | |
138 } else { | |
139 w->t_print->wait_for(task_array); | |
140 } | |
141 } | |
142 | |
143 Task *t_exec = 0; | |
144 HTask *h_exec = 0; | |
145 for (int j = 0; j < array_task_num; j++) { | |
146 int i = w->task_spawned++; | |
147 if (w->size < size) size = w->size; | |
148 if (size==0) break; | |
149 if (use_task_array) { | |
150 t_exec = task_array->next_task_array(TASK_EXEC,t_exec); | |
151 t_exec->set_param(0,(long)0); | |
152 t_exec->set_param(1,(long)0); | |
153 t_exec->set_param(2,(long)size); | |
154 t_exec->set_param(3,(long)w->division_out_size); | |
155 t_exec->set_inData(0,w->file_mmap + i*w->division_size, size); | |
156 t_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size); | |
157 } else if (use_compat) { | |
158 h_exec = manager->create_task(TASK_EXEC); | |
159 t_exec->set_param(0,(long)0); | |
160 t_exec->set_param(1,(long)0); | |
161 t_exec->set_param(2,(long)size); | |
162 t_exec->set_param(3,(long)w->division_out_size); | |
163 h_exec->set_inData(0,w->file_mmap + i*w->division_size, size); | |
164 h_exec->set_outData(0,w->o_data + i*w->out_size, w->division_out_size); | |
165 | |
166 t_next->wait_for(h_exec); | |
167 | |
168 h_exec->set_cpu(spe_cpu); | |
169 h_exec->spawn(); | |
170 } else if (use_iterate) { | |
171 array_task_num = w->task_num; | |
172 use_iterate = 0; | |
173 use_compat = 1; | |
174 | |
175 w->size -= size*array_task_num; | |
176 if(w->size < 0) { | |
177 array_task_num -= 1; | |
178 w->size += size; | |
179 } | |
180 h_exec = manager->create_task(TASK_EXEC_DATA_PARALLEL); | |
181 h_exec->flip(); | |
182 h_exec->set_inData(0,w->file_mmap,w->file_size); | |
183 h_exec->set_inData(1,w->o_data,w->out_size_); | |
184 h_exec->set_outData(0,w->file_mmap,w->file_size); | |
185 h_exec->set_outData(1,w->o_data,w->out_size_); | |
186 h_exec->set_param(0,(long)i); | |
187 h_exec->set_param(1,(long)w->division_size); | |
188 h_exec->set_param(2,(long)size); | |
189 h_exec->set_param(3,(long)w->out_size); | |
190 | |
191 t_next->wait_for(h_exec); | |
192 h_exec->set_cpu(spe_cpu); | |
193 h_exec->iterate(array_task_num); | |
194 | |
195 w->task_num -= array_task_num; | |
196 w->task_spawned += array_task_num-1; | |
197 | |
198 break; | |
199 } else { | |
200 h_exec = manager->create_task(TASK_EXEC, | |
201 (memaddr)(w->file_mmap + i*w->division_size), size, | |
202 (memaddr)(w->o_data + i*w->out_size), w->division_out_size); | |
203 t_exec->set_param(0,(long)0); | |
204 t_exec->set_param(1,(long)0); | |
205 t_exec->set_param(2,(long)size); | |
206 t_exec->set_param(3,(long)w->division_out_size); | |
207 t_next->wait_for(h_exec); | |
208 h_exec->set_cpu(spe_cpu); | |
209 h_exec->spawn(); | |
210 } | |
211 w->size -= size; | |
212 w->task_num--; | |
213 } | |
214 if (use_task_array) { | |
215 task_array->spawn_task_array(t_exec->next()); | |
216 task_array->set_cpu(spe_cpu); | |
217 task_array->spawn(); | |
218 } else { | |
219 //if (!all) t_next->wait_for(h_exec); | |
220 } | |
221 } | |
222 } | |
223 | |
224 /** | |
225 * このTaskは、PPE上で実行されるので、並列に実行されることはない | |
226 * 二つ実行されていて、Task が足りなくなることがないようにしている。 | |
227 */ | |
228 | |
229 SchedDefineTask1(BREAD_RUN_TASK_BLOCKS,bread_run16); | |
230 | |
231 static int | |
232 bread_run16(SchedTask *manager, void *in, void *out) | |
233 { | |
234 WordCount *w = *(WordCount **)in; | |
235 | |
236 HTaskPtr t_read = manager->create_task(READ_TASK); | |
237 w->t_print->wait_for(t_read); | |
238 t_read->set_cpu(read_spe_cpu); | |
239 t_read->set_param(0,w->fd); | |
240 | |
241 if (w->task_num < w->task_blocks) { | |
242 t_read->set_param(1,w->task_spawned*w->division_size); | |
243 t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size); | |
244 | |
245 // last case | |
246 while (w->size >= w->division_size) | |
247 run_tasks(manager,w,w->task_num,t_read,w->t_print, w->division_size); | |
248 // remaining data | |
249 while (w->size>0) | |
250 run_tasks(manager,w,1,t_read,w->t_print, w->division_size); | |
251 | |
252 t_read->set_param(2,w->task_spawned*w->division_size); | |
253 t_read->spawn(); | |
254 } else { | |
255 HTaskPtr t_next = manager->create_task(BREAD_RUN_TASK_BLOCKS, | |
256 (memaddr)&w->self,sizeof(memaddr),0,0); | |
257 w->t_print->wait_for(t_next); | |
258 | |
259 t_read->set_param(1,w->task_spawned*w->division_size); | |
260 t_read->set_outData(0, w->file_mmap + w->task_spawned * w->division_size, w->task_blocks * w->division_size); | |
261 | |
262 run_tasks(manager,w, w->task_blocks, t_read, t_next, w->division_size); | |
263 | |
264 t_read->set_param(2,w->task_spawned*w->division_size); | |
265 | |
266 t_read->spawn(); | |
267 t_next->spawn(); | |
268 } | |
269 return 0; | |
270 } | |
271 | |
272 SchedDefineTask1(RUN_TASK_BLOCKS,run16); | |
273 | |
274 static int | |
275 run16(SchedTask *manager, void *in, void *out) | |
276 { | |
277 WordCount *w = *(WordCount **)in; | |
278 | |
279 if(use_iterate) { | |
280 run_tasks(manager, w, w->task_num, 0, w->t_print, w->division_size); | |
281 } else if (w->task_num < w->task_blocks) { | |
282 // last case | |
283 while (w->size >= w->division_size) | |
284 run_tasks(manager,w,w->task_num,0,w->t_print, w->division_size); | |
285 // remaining data | |
286 while (w->size>0) | |
287 run_tasks(manager,w,1,0, w->t_print, w->size); | |
288 // printf("run16 last %d\n",w->task_num); | |
289 } else { | |
290 HTaskPtr t_next = manager->create_task(RUN_TASK_BLOCKS, | |
291 (memaddr)&w->self,sizeof(memaddr),0,0); | |
292 w->t_print->wait_for(t_next); | |
293 | |
294 run_tasks(manager,w, w->task_blocks,0, t_next, w->division_size); | |
295 | |
296 t_next->spawn(); | |
297 // printf("run16 next %d\n",w->task_num); | |
298 } | |
299 return 0; | |
300 } | |
301 | |
302 static int blocks = 48; | |
303 //static int blocks = 31 * 6 * 24; | |
304 static int division = 16; // in Kbyte | |
305 | |
306 static void | |
307 run_start(TaskManager *manager, char *filename) | |
308 { | |
309 long fd = (long)manager->allocate(sizeof(long)); | |
310 struct stat *sb = (struct stat*)manager->allocate(sizeof(struct stat)); | |
311 HTaskPtr t_exec; | |
312 | |
313 if ((fd=open(filename,O_RDONLY,0666))==0) { | |
314 fprintf(stderr,"can't open %s\n",filename); | |
315 return ; | |
316 } | |
317 | |
318 if (fstat(fd,sb)) { | |
319 fprintf(stderr,"can't fstat %s\n",filename); | |
320 return ; | |
321 } | |
322 | |
323 WordCountPtr w = (WordCountPtr)manager->allocate(sizeof(WordCount)); | |
324 | |
325 w->self = w; | |
326 w->fd = fd; | |
327 w->read_filesize = sb->st_size; | |
328 | |
329 | |
330 if (read_type == BLOCKED_READ) { | |
331 printf("[blocked read mode]\n"); | |
332 w->file_mmap = (char*)manager->allocate(w->read_filesize); | |
333 }else if (read_type == MY_READ) { | |
334 printf("[single read mode]\n"); | |
335 my_read(filename, w, manager); | |
336 }else if(read_type == MY_MMAP) { | |
337 printf("[mmap mode]\n"); | |
338 my_mmap(filename, w); | |
339 }else if(read_type == BLOCKED_MMAP) { | |
340 printf("[blocked mmap mode]\n"); | |
341 my_mmap(filename, w); | |
342 } | |
343 | |
344 HTaskPtr t_print; | |
345 | |
346 //w->task_blocks = blocks; | |
347 w->self = w; | |
348 w->task_spawned = 0; | |
349 | |
350 w->size = w->file_size = w->read_filesize; | |
351 printf("w %lx\n",(long)w); | |
352 | |
353 /* 1task分のデータサイズ(byte) */ | |
354 if (w->size >= 1024*division) { | |
355 w->division_size = 1024 * division;/*16kbyte*/ | |
356 } else { | |
357 w->division_size = w->size; | |
358 } | |
359 | |
360 printf("division_size %ld\n",w->division_size); | |
361 | |
362 /* "word num" and "line num" */ | |
363 w->status_num = 2; | |
364 /* taskの数 */ | |
365 w->task_num = w->size / w->division_size; | |
366 w->task_num = w->task_num + (w->division_size*w->task_num < w->size); | |
367 int out_task_num = w->task_num; | |
368 | |
369 if(!all) { | |
370 w->task_blocks = blocks; | |
371 } else { | |
372 w->task_blocks = w->task_num; | |
373 } | |
374 | |
375 w->out_task_num = out_task_num; | |
376 printf("task_num %ld\n",w->task_num); | |
377 printf("out_task_num %ld\n",w->out_task_num); | |
378 | |
379 /* out用のdivision_size. statusが2つなので、あわせて16byteになるように、long long(4byte)を使用 */ | |
380 | |
381 w->division_out_size = sizeof(unsigned long long)*4; | |
382 int out_size = w->division_out_size*out_task_num; | |
383 w->o_data = (unsigned long long *)manager->allocate(out_size); | |
384 w->out_size_ = out_size; | |
385 w->out_size = 4; | |
386 printf("out size %d\n",out_size); | |
387 | |
388 /*各SPEの結果を合計して出力するタスク*/ | |
389 | |
390 t_print = manager->create_task(TASK_PRINT, | |
391 (memaddr)&w->self,sizeof(memaddr),0,0); | |
392 w->t_print = t_print; | |
393 for(int i=0;i<t_exec_num;i++) { | |
394 /* Task を task_blocks ずつ起動する Task */ | |
395 /* serialize されていると仮定する... */ | |
396 if (read_type == BLOCKED_READ) { | |
397 t_exec = manager->create_task(BREAD_RUN_TASK_BLOCKS, | |
398 (memaddr)&w->self,sizeof(memaddr),0,0); | |
399 }else { | |
400 t_exec = manager->create_task(RUN_TASK_BLOCKS, | |
401 (memaddr)&w->self,sizeof(memaddr),0,0); | |
402 } | |
403 | |
404 t_print->wait_for(t_exec); | |
405 // t_exec->iterate(4); | |
406 t_exec->spawn(); | |
407 } | |
408 t_print->spawn(); | |
409 } | |
410 | |
411 static char* | |
412 init(int argc, char **argv) | |
413 { | |
414 | |
415 char *filename = 0; | |
416 | |
417 for (int i = 1; argv[i]; ++i) { | |
418 if (strcmp(argv[i], "-file") == 0) { | |
419 filename = argv[i+1]; i++; | |
420 } else if (strcmp(argv[i], "-division") == 0) { | |
421 division = atoi(argv[i+1]); | |
422 i++; | |
423 } else if (strcmp(argv[i], "-block") == 0) { | |
424 blocks = atoi(argv[i+1]); | |
425 i++; | |
426 } else if (strcmp(argv[i], "-a") == 0) { | |
427 // create task all at once | |
428 all = 1; | |
429 } else if (strcmp(argv[i], "-c") == 0) { | |
430 use_task_array = 0; | |
431 use_compat = 1; | |
432 } else if (strcmp(argv[i], "-s") == 0) { | |
433 use_task_array = 0; | |
434 use_compat = 0; | |
435 } else if (strcmp(argv[i], "-t") == 0) { | |
436 use_task_creater = 1; | |
437 use_task_array = 0; | |
438 use_compat = 0; | |
439 } else if (strcmp(argv[i], "-anum") == 0) { | |
440 array_task_num = atoi(argv[i+1]); | |
441 i++; | |
442 } else if (strcmp(argv[i], "-g") == 0) { | |
443 spe_cpu = GPU_0; | |
444 } else if (strcmp(argv[i], "-any") == 0) { | |
445 spe_cpu = ANY_ANY; | |
446 } else if (strcmp(argv[i], "-i") == 0) { | |
447 use_iterate = 1; | |
448 use_task_array = 0; | |
449 t_exec_num = 1; | |
450 } else if (strcmp(argv[i], "-br") == 0) { | |
451 read_type = BLOCKED_READ; | |
452 } else if (strcmp(argv[i], "-r") == 0) { | |
453 read_type = MY_READ; | |
454 } | |
455 /* else if (strcmp(argv[i], "-cpu") == 0) { | |
456 spe_num = atoi(argv[i+1]); | |
457 i++; | |
458 if (spe_num==0) spe_num = 1; | |
459 } else { | |
460 fprintf(stderr,"%s\n",usr_help_str); | |
461 exit (0); | |
462 }*/ | |
463 } | |
464 if (filename==0) { | |
465 puts(usr_help_str); | |
466 exit(1); | |
467 } | |
468 | |
469 return filename; | |
470 } | |
471 | |
472 | |
473 int | 25 int |
474 TMmain(TaskManager *manager, int argc, char *argv[]) | 26 TMmain(TaskManager *manager, int argc, char *argv[]) |
475 { | 27 { |
476 | 28 |
477 char *filename = 0; | 29 char *filename = 0; |
478 filename = init(argc, argv); | 30 FileMapReduce *fmp = new FileMapReduce(manager,TASK_EXEC,TASK_EXEC_DATA_PARALLEL,TASK_PRINT); |
31 filename = fmp->init(argc, argv); | |
479 | 32 |
480 if (filename < 0) { | 33 if (filename < 0) { |
481 return -1; | 34 return -1; |
482 } | 35 } |
483 | 36 |
484 task_init(); | 37 task_init(); |
485 st_time = getTime(); | 38 st_time = getTime(); |
486 run_start(manager, filename); | 39 fmp->run_start(manager, filename); |
487 // FileMapReduce fmp = new FileMapReduce(manager, filename, TASK_EXEC, TASK_PRINT); | |
488 // fmp->start(); | 40 // fmp->start(); |
489 manager->set_TMend(TMend); | 41 manager->set_TMend(TMend); |
490 return 0; | 42 return 0; |
491 } | 43 } |
492 | 44 |