mapreduce - hadoop: reduce happened between flush map output and finish spill before maps done -


i'm new hadoop, , i'm trying examples wordcount/secondsort in src/examples.

wordcount test environment:

input:

file01.txt

file02.txt

secondsort test environment:

input:

sample01.txt

sample02.txt

which means both 2 test have 2 paths process. print log info trying understand process of map/reduce.

see what's between starting flush of map output , finished spill 0: wordcount program has 2 reduce task before final reduce while secondsort program reduce once , it's done. since these programs "small", dont think io.sort.mb/io.sort.refactor affect this.

can explain this?

thanks patience broken englisth , long log.

these log info (i cut useless info make short):

wordcount log:  [hadoop@localhost ~]$ hadoop jar test.jar com.abc.example.test wordcount output 13/08/07 18:14:05 info mapred.fileinputformat: total input paths process : 2 13/08/07 18:14:06 info mapred.jobclient: running job: job_local_0001 13/08/07 18:14:06 info util.processtree: setsid exited exit code 0 ... 13/08/07 18:14:06 info mapred.maptask: numreducetasks: 1 13/08/07 18:14:06 info mapred.maptask: io.sort.mb = 100 13/08/07 18:14:06 info mapred.maptask: data buffer = 79691776/99614720 13/08/07 18:14:06 info mapred.maptask: record buffer = 262144/327680 mapper: 0 | hello hadoop goodbye hadoop 13/08/07 18:14:06 info mapred.maptask: **starting flush of map output** reduce: goodbye reduce: goodbye | 1 reduce: hadoop reduce: hadoop | 1 reduce: hadoop | 1 reduce: hello reduce: hello | 1 13/08/07 18:14:06 info mapred.task: task:attempt_local_0001_m_000000_0 done. , in process of commiting 13/08/07 18:14:06 info mapred.localjobrunner: hdfs://localhost:8020/user/hadoop/wordcount/file02.txt:0+28 13/08/07 18:14:06 info mapred.task: task 'attempt_local_0001_m_000000_0' done. 13/08/07 18:14:06 info mapred.task:  using resourcecalculatorplugin : org.apache.hadoop.util.linuxresourcecalculatorplugin@4d16ffed 13/08/07 18:14:06 info mapred.maptask: numreducetasks: 1 13/08/07 18:14:06 info mapred.maptask: io.sort.mb = 100 13/08/07 18:14:06 info mapred.maptask: data buffer = 79691776/99614720 13/08/07 18:14:06 info mapred.maptask: record buffer = 262144/327680 13/08/07 18:14:06 info mapred.maptask: **starting flush of map output** reduce: bye reduce: bye | 1 reduce: hello reduce: hello | 1 reduce: world reduce: world | 1 reduce: world | 1 13/08/07 18:14:06 info mapred.maptask: **finished spill 0** 13/08/07 18:14:06 info mapred.task: task:attempt_local_0001_m_000001_0 done. , in process of commiting 13/08/07 18:14:06 info mapred.localjobrunner: hdfs://localhost:8020/user/hadoop/wordcount/file01.txt:0+22 13/08/07 18:14:06 info mapred.task: task 'attempt_local_0001_m_000001_0' done. 13/08/07 18:14:06 info mapred.task:  using resourcecalculatorplugin : org.apache.hadoop.util.linuxresourcecalculatorplugin@1f3c0665 13/08/07 18:14:06 info mapred.localjobrunner:  13/08/07 18:14:06 info mapred.merger: merging 2 sorted segments 13/08/07 18:14:06 info mapred.merger: down last merge-pass, 2 segments left of total size: 77 bytes 13/08/07 18:14:06 info mapred.localjobrunner:  reduce: bye reduce: bye | 1 reduce: goodbye reduce: goodbye | 1 reduce: hadoop reduce: hadoop | 2 reduce: hello reduce: hello | 1 reduce: hello | 1 reduce: world reduce: world | 2 13/08/07 18:14:06 info mapred.task: task:attempt_local_0001_r_000000_0 done. , in process of commiting ... 13/08/07 18:14:07 info mapred.jobclient:     reduce input groups=5 13/08/07 18:14:07 info mapred.jobclient:     combine output records=6 13/08/07 18:14:07 info mapred.jobclient:     physical memory (bytes) snapshot=0 13/08/07 18:14:07 info mapred.jobclient:     reduce output records=5 13/08/07 18:14:07 info mapred.jobclient:     virtual memory (bytes) snapshot=0 13/08/07 18:14:07 info mapred.jobclient:     map output records=8  secondsort log info:  [hadoop@localhost ~]$ hadoop jar example.jar com.abc.example.example secondsort output 13/08/07 17:00:11 info input.fileinputformat: total input paths process : 2 13/08/07 17:00:11 warn snappy.loadsnappy: snappy native library not loaded 13/08/07 17:00:12 info mapred.jobclient: running job: job_local_0001 13/08/07 17:00:12 info util.processtree: setsid exited exit code 0 13/08/07 17:00:12 info mapred.task:  using resourcecalculatorplugin : org.apache.hadoop.util.linuxresourcecalculatorplugin@57d94c7b 13/08/07 17:00:12 info mapred.maptask: io.sort.mb = 100 13/08/07 17:00:12 info mapred.maptask: data buffer = 79691776/99614720 13/08/07 17:00:12 info mapred.maptask: record buffer = 262144/327680 map: 0 | 5 49 map: 5 | 9 57 map: 10 | 19 46 map: 16 | 3 21 map: 21 | 9 48 map: 26 | 7 57 ...  13/08/07 17:00:12 info mapred.maptask: **starting flush of map output** 13/08/07 17:00:12 info mapred.maptask: **finished spill 0** 13/08/07 17:00:12 info mapred.task: task:attempt_local_0001_m_000000_0 done. , in process of commiting 13/08/07 17:00:12 info mapred.localjobrunner:  13/08/07 17:00:12 info mapred.task: task 'attempt_local_0001_m_000000_0' done. 13/08/07 17:00:12 info mapred.task:  using resourcecalculatorplugin : org.apache.hadoop.util.linuxresourcecalculatorplugin@f3a1ea1 13/08/07 17:00:12 info mapred.maptask: io.sort.mb = 100 13/08/07 17:00:12 info mapred.maptask: data buffer = 79691776/99614720 13/08/07 17:00:12 info mapred.maptask: record buffer = 262144/327680 map: 0 | 20 21 map: 6 | 50 51 map: 12 | 50 52 map: 18 | 50 53 map: 24 | 50 54 ... 13/08/07 17:00:12 info mapred.maptask: **starting flush of map output** 13/08/07 17:00:12 info mapred.maptask: **finished spill 0** 13/08/07 17:00:12 info mapred.task: task:attempt_local_0001_m_000001_0 done. , in process of commiting 13/08/07 17:00:12 info mapred.localjobrunner:  13/08/07 17:00:12 info mapred.task: task 'attempt_local_0001_m_000001_0' done. 13/08/07 17:00:12 info mapred.task:  using resourcecalculatorplugin : org.apache.hadoop.util.linuxresourcecalculatorplugin@cee4e92 13/08/07 17:00:12 info mapred.localjobrunner:  13/08/07 17:00:12 info mapred.merger: merging 2 sorted segments 13/08/07 17:00:12 info mapred.merger: down last merge-pass, 2 segments left of total size: 1292 bytes 13/08/07 17:00:12 info mapred.localjobrunner:  reduce: 0:35 ----------------- reduce: 0:35 | 35 reduce: 0:54 ----------------- ...  13/08/07 17:00:12 info mapred.task: task:attempt_local_0001_r_000000_0 done. , in process of commiting 13/08/07 17:00:12 info mapred.localjobrunner:  13/08/07 17:00:12 info mapred.task: task attempt_local_0001_r_000000_0 allowed commit 13/08/07 17:00:12 info output.fileoutputcommitter: saved output of task 'attempt_local_0001_r_000000_0' output 13/08/07 17:00:12 info mapred.localjobrunner: reduce > reduce 13/08/07 17:00:12 info mapred.task: task 'attempt_local_0001_r_000000_0' done. 13/08/07 17:00:13 info mapred.jobclient:  map 100% reduce 100% 13/08/07 17:00:13 info mapred.jobclient: job complete: job_local_0001 13/08/07 17:00:13 info mapred.jobclient: counters: 22 13/08/07 17:00:13 info mapred.jobclient:   file output format counters  13/08/07 17:00:13 info mapred.jobclient:     bytes written=4787 ... 13/08/07 17:00:13 info mapred.jobclient:     split_raw_bytes=236 13/08/07 17:00:13 info mapred.jobclient:     reduce input records=92 

ps: main()s others check out.

wordcount:

public static void main(string[] args) throws exception {      jobconf conf = new jobconf(test.class);      conf.setjobname("wordcount");       conf.setoutputkeyclass(text.class);      conf.setoutputvalueclass(intwritable.class);       conf.setmapperclass(map.class);      conf.setcombinerclass(reduce.class);      conf.setreducerclass(reduce.class);       conf.setinputformat(textinputformat.class);      conf.setoutputformat(textoutputformat.class);       fileinputformat.setinputpaths(conf, new path(args[0]));      fileoutputformat.setoutputpath(conf, new path(args[1]));       jobclient.runjob(conf);    } 

secondsort:

public static void main(string[] args) throws ioexception, interruptedexception, classnotfoundexception     {         configuration conf = new configuration();         job job = new job(conf, "secondarysort");         job.setjarbyclass(example.class);         job.setmapperclass(map.class);         job.setreducerclass(reduce.class);         job.setpartitionerclass(firstpartitioner.class);         job.setgroupingcomparatorclass(groupingcomparator.class);          job.setmapoutputkeyclass(intpair.class);         job.setmapoutputvalueclass(intwritable.class);         job.setoutputkeyclass(text.class);         job.setoutputvalueclass(intwritable.class);          job.setinputformatclass(textinputformat.class);         job.setoutputformatclass(textoutputformat.class);          fileinputformat.setinputpaths(job, new path(args[0]));         fileoutputformat.setoutputpath(job, new path(args[1]));         system.exit(job.waitforcompletion(true) ? 0 : 1);     } 

combine output records=6

this says all: reduce function used both combiner , reducer. seeing output combiner. combiner (sometimes) invoked when output spilled.

i think should have added code, @ least part in main() show how job set up. make easier answer questions.


Comments

Popular posts from this blog

image - ClassNotFoundException when add a prebuilt apk into system.img in android -

I need to import mysql 5.1 to 5.5? -

Java, Hibernate, MySQL - store UTC date-time -