mapreduce - hadoop: reduce happened between flush map output and finish spill before maps done -
i'm new hadoop, , i'm trying examples wordcount/secondsort in src/examples.
wordcount test environment:
input:
file01.txt
file02.txt
secondsort test environment:
input:
sample01.txt
sample02.txt
which means both 2 test have 2 paths process. print log info trying understand process of map/reduce.
see what's between starting flush of map output , finished spill 0: wordcount program has 2 reduce task before final reduce while secondsort program reduce once , it's done. since these programs "small", dont think io.sort.mb/io.sort.refactor affect this.
can explain this?
thanks patience broken englisth , long log.
these log info (i cut useless info make short):
wordcount log: [hadoop@localhost ~]$ hadoop jar test.jar com.abc.example.test wordcount output 13/08/07 18:14:05 info mapred.fileinputformat: total input paths process : 2 13/08/07 18:14:06 info mapred.jobclient: running job: job_local_0001 13/08/07 18:14:06 info util.processtree: setsid exited exit code 0 ... 13/08/07 18:14:06 info mapred.maptask: numreducetasks: 1 13/08/07 18:14:06 info mapred.maptask: io.sort.mb = 100 13/08/07 18:14:06 info mapred.maptask: data buffer = 79691776/99614720 13/08/07 18:14:06 info mapred.maptask: record buffer = 262144/327680 mapper: 0 | hello hadoop goodbye hadoop 13/08/07 18:14:06 info mapred.maptask: **starting flush of map output** reduce: goodbye reduce: goodbye | 1 reduce: hadoop reduce: hadoop | 1 reduce: hadoop | 1 reduce: hello reduce: hello | 1 13/08/07 18:14:06 info mapred.task: task:attempt_local_0001_m_000000_0 done. , in process of commiting 13/08/07 18:14:06 info mapred.localjobrunner: hdfs://localhost:8020/user/hadoop/wordcount/file02.txt:0+28 13/08/07 18:14:06 info mapred.task: task 'attempt_local_0001_m_000000_0' done. 13/08/07 18:14:06 info mapred.task: using resourcecalculatorplugin : org.apache.hadoop.util.linuxresourcecalculatorplugin@4d16ffed 13/08/07 18:14:06 info mapred.maptask: numreducetasks: 1 13/08/07 18:14:06 info mapred.maptask: io.sort.mb = 100 13/08/07 18:14:06 info mapred.maptask: data buffer = 79691776/99614720 13/08/07 18:14:06 info mapred.maptask: record buffer = 262144/327680 13/08/07 18:14:06 info mapred.maptask: **starting flush of map output** reduce: bye reduce: bye | 1 reduce: hello reduce: hello | 1 reduce: world reduce: world | 1 reduce: world | 1 13/08/07 18:14:06 info mapred.maptask: **finished spill 0** 13/08/07 18:14:06 info mapred.task: task:attempt_local_0001_m_000001_0 done. , in process of commiting 13/08/07 18:14:06 info mapred.localjobrunner: hdfs://localhost:8020/user/hadoop/wordcount/file01.txt:0+22 13/08/07 18:14:06 info mapred.task: task 'attempt_local_0001_m_000001_0' done. 13/08/07 18:14:06 info mapred.task: using resourcecalculatorplugin : org.apache.hadoop.util.linuxresourcecalculatorplugin@1f3c0665 13/08/07 18:14:06 info mapred.localjobrunner: 13/08/07 18:14:06 info mapred.merger: merging 2 sorted segments 13/08/07 18:14:06 info mapred.merger: down last merge-pass, 2 segments left of total size: 77 bytes 13/08/07 18:14:06 info mapred.localjobrunner: reduce: bye reduce: bye | 1 reduce: goodbye reduce: goodbye | 1 reduce: hadoop reduce: hadoop | 2 reduce: hello reduce: hello | 1 reduce: hello | 1 reduce: world reduce: world | 2 13/08/07 18:14:06 info mapred.task: task:attempt_local_0001_r_000000_0 done. , in process of commiting ... 13/08/07 18:14:07 info mapred.jobclient: reduce input groups=5 13/08/07 18:14:07 info mapred.jobclient: combine output records=6 13/08/07 18:14:07 info mapred.jobclient: physical memory (bytes) snapshot=0 13/08/07 18:14:07 info mapred.jobclient: reduce output records=5 13/08/07 18:14:07 info mapred.jobclient: virtual memory (bytes) snapshot=0 13/08/07 18:14:07 info mapred.jobclient: map output records=8 secondsort log info: [hadoop@localhost ~]$ hadoop jar example.jar com.abc.example.example secondsort output 13/08/07 17:00:11 info input.fileinputformat: total input paths process : 2 13/08/07 17:00:11 warn snappy.loadsnappy: snappy native library not loaded 13/08/07 17:00:12 info mapred.jobclient: running job: job_local_0001 13/08/07 17:00:12 info util.processtree: setsid exited exit code 0 13/08/07 17:00:12 info mapred.task: using resourcecalculatorplugin : org.apache.hadoop.util.linuxresourcecalculatorplugin@57d94c7b 13/08/07 17:00:12 info mapred.maptask: io.sort.mb = 100 13/08/07 17:00:12 info mapred.maptask: data buffer = 79691776/99614720 13/08/07 17:00:12 info mapred.maptask: record buffer = 262144/327680 map: 0 | 5 49 map: 5 | 9 57 map: 10 | 19 46 map: 16 | 3 21 map: 21 | 9 48 map: 26 | 7 57 ... 13/08/07 17:00:12 info mapred.maptask: **starting flush of map output** 13/08/07 17:00:12 info mapred.maptask: **finished spill 0** 13/08/07 17:00:12 info mapred.task: task:attempt_local_0001_m_000000_0 done. , in process of commiting 13/08/07 17:00:12 info mapred.localjobrunner: 13/08/07 17:00:12 info mapred.task: task 'attempt_local_0001_m_000000_0' done. 13/08/07 17:00:12 info mapred.task: using resourcecalculatorplugin : org.apache.hadoop.util.linuxresourcecalculatorplugin@f3a1ea1 13/08/07 17:00:12 info mapred.maptask: io.sort.mb = 100 13/08/07 17:00:12 info mapred.maptask: data buffer = 79691776/99614720 13/08/07 17:00:12 info mapred.maptask: record buffer = 262144/327680 map: 0 | 20 21 map: 6 | 50 51 map: 12 | 50 52 map: 18 | 50 53 map: 24 | 50 54 ... 13/08/07 17:00:12 info mapred.maptask: **starting flush of map output** 13/08/07 17:00:12 info mapred.maptask: **finished spill 0** 13/08/07 17:00:12 info mapred.task: task:attempt_local_0001_m_000001_0 done. , in process of commiting 13/08/07 17:00:12 info mapred.localjobrunner: 13/08/07 17:00:12 info mapred.task: task 'attempt_local_0001_m_000001_0' done. 13/08/07 17:00:12 info mapred.task: using resourcecalculatorplugin : org.apache.hadoop.util.linuxresourcecalculatorplugin@cee4e92 13/08/07 17:00:12 info mapred.localjobrunner: 13/08/07 17:00:12 info mapred.merger: merging 2 sorted segments 13/08/07 17:00:12 info mapred.merger: down last merge-pass, 2 segments left of total size: 1292 bytes 13/08/07 17:00:12 info mapred.localjobrunner: reduce: 0:35 ----------------- reduce: 0:35 | 35 reduce: 0:54 ----------------- ... 13/08/07 17:00:12 info mapred.task: task:attempt_local_0001_r_000000_0 done. , in process of commiting 13/08/07 17:00:12 info mapred.localjobrunner: 13/08/07 17:00:12 info mapred.task: task attempt_local_0001_r_000000_0 allowed commit 13/08/07 17:00:12 info output.fileoutputcommitter: saved output of task 'attempt_local_0001_r_000000_0' output 13/08/07 17:00:12 info mapred.localjobrunner: reduce > reduce 13/08/07 17:00:12 info mapred.task: task 'attempt_local_0001_r_000000_0' done. 13/08/07 17:00:13 info mapred.jobclient: map 100% reduce 100% 13/08/07 17:00:13 info mapred.jobclient: job complete: job_local_0001 13/08/07 17:00:13 info mapred.jobclient: counters: 22 13/08/07 17:00:13 info mapred.jobclient: file output format counters 13/08/07 17:00:13 info mapred.jobclient: bytes written=4787 ... 13/08/07 17:00:13 info mapred.jobclient: split_raw_bytes=236 13/08/07 17:00:13 info mapred.jobclient: reduce input records=92
ps: main()s others check out.
wordcount:
public static void main(string[] args) throws exception { jobconf conf = new jobconf(test.class); conf.setjobname("wordcount"); conf.setoutputkeyclass(text.class); conf.setoutputvalueclass(intwritable.class); conf.setmapperclass(map.class); conf.setcombinerclass(reduce.class); conf.setreducerclass(reduce.class); conf.setinputformat(textinputformat.class); conf.setoutputformat(textoutputformat.class); fileinputformat.setinputpaths(conf, new path(args[0])); fileoutputformat.setoutputpath(conf, new path(args[1])); jobclient.runjob(conf); }
secondsort:
public static void main(string[] args) throws ioexception, interruptedexception, classnotfoundexception { configuration conf = new configuration(); job job = new job(conf, "secondarysort"); job.setjarbyclass(example.class); job.setmapperclass(map.class); job.setreducerclass(reduce.class); job.setpartitionerclass(firstpartitioner.class); job.setgroupingcomparatorclass(groupingcomparator.class); job.setmapoutputkeyclass(intpair.class); job.setmapoutputvalueclass(intwritable.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); job.setinputformatclass(textinputformat.class); job.setoutputformatclass(textoutputformat.class); fileinputformat.setinputpaths(job, new path(args[0])); fileoutputformat.setoutputpath(job, new path(args[1])); system.exit(job.waitforcompletion(true) ? 0 : 1); }
combine output records=6
this says all: reduce function used both combiner , reducer. seeing output combiner. combiner (sometimes) invoked when output spilled.
i think should have added code, @ least part in main() show how job set up. make easier answer questions.
Comments
Post a Comment