java - When writing to context in the Reducer.reduce method, why is the toString method invoked and not the write method? -
i'm writing map-reduce batch job, consists of 3-4 chained jobs. in second job i'm using custom class output value class when writing context via context.write()
. when studying behavior of code, noticed tostring
method of custom class invoked, rather write
method. why happen, if class implements writable
interface, , implemented write
method?
the custom class's code:
import org.apache.hadoop.io.writable; import java.io.datainput; import java.io.dataoutput; import java.io.ioexception; public class writablelongpair implements writable { private long l1; private long l2; public writablelongpair() { l1 = 0; l2 = 0; } public writablelongpair(long l1, long l2) { this.l1 = l1; this.l2 = l2; } @override public void write(dataoutput dataoutput) throws ioexception { dataoutput.writelong(l1); dataoutput.writelong(l2); } @override public void readfields(datainput datainput) throws ioexception { l1 = datainput.readlong(); l2 = datainput.readlong(); } @override public string tostring() { return l1 + " " + l2; } }
the second job's code:
import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.partitioner; import org.apache.hadoop.mapreduce.reducer; import java.io.ioexception; public class phase2 { private static final int ascii_offset = 97; public static class mapper2 extends mapper<object, text, text, longwritable>{ @override public void map(object key, text value, context context ) throws ioexception, interruptedexception { string[] valueasstrings = value.tostring().split("\t"); string actualkey = valueasstrings[0]; longwritable actualvalue = new longwritable(long.parselong(valueasstrings[1])); string[] components = actualkey.tostring().split("[$]"); if (!components[1].equals("*")) { context.write(new text(components[1] + "$" + components[0]), actualvalue); context.write(new text(components[1] + "$*"), actualvalue); } context.write(new text(actualkey), actualvalue); } } public static class partitioner2 extends partitioner<text, longwritable> { @override public int getpartition(text text, longwritable longwritable, int i) { return (int)(text.tostring().charat(0)) - ascii_offset; } } public static class reducer2 extends reducer<text, longwritable, text, writablelongpair> { private text currentkey; private long sum; @override public void setup(context context) { currentkey = new text(); currentkey.set(""); sum = 0l; } private string textcontent(string w1, string w2) { if (w2.equals("*")) return w1 + "$*"; if (w1.compareto(w2) < 0) return w1 + "$" + w2; else return w2 + "$" + w1; } public void reduce(text key, iterable<longwritable> counts, context context ) throws ioexception, interruptedexception { long sumpair = 0l; string[] components = key.tostring().split("[$]"); (longwritable count : counts) { if (currentkey.equals(components[0])) { if (components[1].equals("*")) sum += count.get(); else sumpair += count.get(); } else { sum = count.get(); currentkey.set(components[0]); } } if (!components[1].equals("*")) context.write(new text(textcontent(components[0], components[1])), new writablelongpair(sumpair, sum)); } } public static class comparator2 extends writablecomparator { @override public int compare(writablecomparable o1, writablecomparable o2) { string[] components1 = o1.tostring().split("[$]"); string[] components2 = o2.tostring().split("[$]"); if (components1[1].equals("*") && components2[1].equals("*")) return components1[0].compareto(components2[0]); if (components1[1].equals("*")) { if (components1[0].equals(components2[0])) return -1; else return components1[0].compareto(components2[0]); } if (components2[1].equals("*")) { if (components1[0].equals(components2[0])) return 1; else return components1[0].compareto(components2[0]); } return components1[0].compareto(components2[0]); } } }
...and how define jobs:
import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.counter; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.sequencefileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; public class manager { public static void main(string[] args) throws exception { configuration conf1 = new configuration(); if (args.length != 2) { system.err.println("usage: manager <in> <out>"); system.exit(1); } job job1 = job.getinstance(conf1, "phase 1"); job1.setjarbyclass(phase1.class); job1.setmapperclass(phase1.mapper1.class); job1.setpartitionerclass(phase1.partitioner1.class); // job1.setcombinerclass(phase1.combiner1.class); job1.setreducerclass(phase1.reducer1.class); job1.setinputformatclass(sequencefileinputformat.class); // job1.setoutputformatclass(fileoutputformat.class); job1.setoutputkeyclass(text.class); job1.setoutputvalueclass(longwritable.class); job1.setnumreducetasks(12); fileinputformat.addinputpath(job1, new path(args[0])); path output1 = new path(args[1]); fileoutputformat.setoutputpath(job1, output1); boolean result = job1.waitforcompletion(true); counter counter = job1.getcounters().findcounter("org.apache.hadoop.mapreduce.taskcounter", "reduce_input_records"); system.out.println("num of pairs sent reducers in phase 1: " + counter.getvalue()); configuration conf2 = new configuration(); job job2 = job.getinstance(conf2, "phase 2"); job2.setjarbyclass(phase2.class); job2.setmapperclass(phase2.mapper2.class); job2.setpartitionerclass(phase2.partitioner2.class); // job2.setcombinerclass(phase2.combiner2.class); job2.setreducerclass(phase2.reducer2.class); job2.setmapoutputkeyclass(text.class); job2.setmapoutputvalueclass(longwritable.class); job2.setoutputkeyclass(text.class); job2.setoutputvalueclass(writablelongpair.class); job2.setnumreducetasks(26); // job2.setgroupingcomparatorclass(phase2.comparator2.class); fileinputformat.addinputpath(job2, output1); path output2 = new path(args[1] + "2"); fileoutputformat.setoutputpath(job2, output2); result = job2.waitforcompletion(true); counter = job2.getcounters().findcounter("org.apache.hadoop.mapreduce.taskcounter", "reduce_input_records"); system.out.println("num of pairs sent reducers in phase 2: " + counter.getvalue()); // system.exit(job1.waitforcompletion(true) ? 0 : 1); } }
if use default output formatter (textoutputformat) hadoop call tostring()
method on object when writes disk. expected behavior. context.write()
being called, output format that's controlling how data appears on disk.
if you're chaining jobs typically use sequencefileinputformat
, sequencefileoutputformat
of jobs, since makes reading output 1 job subsequent job easy.
Comments
Post a Comment