java - When writing to context in the Reducer.reduce method, why is the toString method invoked and not the write method? -


i'm writing map-reduce batch job, consists of 3-4 chained jobs. in second job i'm using custom class output value class when writing context via context.write(). when studying behavior of code, noticed tostring method of custom class invoked, rather write method. why happen, if class implements writable interface, , implemented write method?

the custom class's code:

import org.apache.hadoop.io.writable; import java.io.datainput; import java.io.dataoutput; import java.io.ioexception;   public class writablelongpair implements writable {  private long l1; private long l2;  public writablelongpair() {     l1 = 0;     l2 = 0; }  public writablelongpair(long l1, long l2) {     this.l1 = l1;     this.l2 = l2; }  @override public void write(dataoutput dataoutput) throws ioexception {     dataoutput.writelong(l1);     dataoutput.writelong(l2); }  @override public void readfields(datainput datainput) throws ioexception {     l1 = datainput.readlong();     l2 = datainput.readlong(); }  @override public string tostring() {     return l1 + " " + l2; } } 

the second job's code:

import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.partitioner; import org.apache.hadoop.mapreduce.reducer;  import java.io.ioexception;  public class phase2 {  private static final int ascii_offset = 97;  public static class mapper2         extends mapper<object, text, text, longwritable>{      @override     public void map(object key, text value, context context     ) throws ioexception, interruptedexception {         string[] valueasstrings = value.tostring().split("\t");         string actualkey = valueasstrings[0];         longwritable actualvalue = new longwritable(long.parselong(valueasstrings[1]));         string[] components = actualkey.tostring().split("[$]");         if (!components[1].equals("*")) {             context.write(new text(components[1] + "$" + components[0]), actualvalue);             context.write(new text(components[1] + "$*"), actualvalue);         }         context.write(new text(actualkey), actualvalue);     } }  public static class partitioner2 extends partitioner<text, longwritable> {      @override     public int getpartition(text text, longwritable longwritable, int i) {         return (int)(text.tostring().charat(0)) - ascii_offset;     } }  public static class reducer2         extends reducer<text, longwritable, text, writablelongpair> {          private text currentkey;         private long sum;      @override     public void setup(context context) {         currentkey = new text();         currentkey.set("");         sum = 0l;     }      private string textcontent(string w1, string w2) {         if (w2.equals("*"))             return w1 + "$*";         if (w1.compareto(w2) < 0)             return w1 + "$" + w2;         else             return w2 + "$" + w1;     }      public void reduce(text key, iterable<longwritable> counts,                        context context     ) throws ioexception, interruptedexception {         long sumpair = 0l;         string[] components = key.tostring().split("[$]");         (longwritable count : counts) {             if (currentkey.equals(components[0])) {                 if (components[1].equals("*"))                     sum += count.get();                 else                     sumpair += count.get();             }             else {                 sum = count.get();                 currentkey.set(components[0]);             }         }         if (!components[1].equals("*"))             context.write(new text(textcontent(components[0], components[1])), new writablelongpair(sumpair, sum));     } }  public static class comparator2 extends writablecomparator {      @override     public int compare(writablecomparable o1, writablecomparable o2) {         string[] components1 = o1.tostring().split("[$]");         string[] components2 = o2.tostring().split("[$]");         if (components1[1].equals("*") && components2[1].equals("*"))             return components1[0].compareto(components2[0]);         if (components1[1].equals("*")) {             if (components1[0].equals(components2[0]))                 return -1;             else                 return components1[0].compareto(components2[0]);         }         if (components2[1].equals("*")) {             if (components1[0].equals(components2[0]))                 return 1;             else                 return components1[0].compareto(components2[0]);         }         return components1[0].compareto(components2[0]);     }  }  } 

...and how define jobs:

import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.counter; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.sequencefileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;  public class manager {  public static void main(string[] args) throws exception {     configuration conf1 = new configuration();     if (args.length != 2) {         system.err.println("usage: manager <in> <out>");         system.exit(1);     }     job job1 = job.getinstance(conf1, "phase 1");     job1.setjarbyclass(phase1.class);     job1.setmapperclass(phase1.mapper1.class);     job1.setpartitionerclass(phase1.partitioner1.class); //        job1.setcombinerclass(phase1.combiner1.class);     job1.setreducerclass(phase1.reducer1.class);     job1.setinputformatclass(sequencefileinputformat.class); //        job1.setoutputformatclass(fileoutputformat.class);     job1.setoutputkeyclass(text.class);     job1.setoutputvalueclass(longwritable.class);     job1.setnumreducetasks(12);     fileinputformat.addinputpath(job1, new path(args[0]));     path output1 = new path(args[1]);     fileoutputformat.setoutputpath(job1, output1);     boolean result = job1.waitforcompletion(true);     counter counter = job1.getcounters().findcounter("org.apache.hadoop.mapreduce.taskcounter", "reduce_input_records");     system.out.println("num of pairs sent reducers in phase 1: " + counter.getvalue());      configuration conf2 = new configuration();     job job2 = job.getinstance(conf2, "phase 2");     job2.setjarbyclass(phase2.class);     job2.setmapperclass(phase2.mapper2.class);     job2.setpartitionerclass(phase2.partitioner2.class); //        job2.setcombinerclass(phase2.combiner2.class);     job2.setreducerclass(phase2.reducer2.class);     job2.setmapoutputkeyclass(text.class);     job2.setmapoutputvalueclass(longwritable.class);     job2.setoutputkeyclass(text.class);     job2.setoutputvalueclass(writablelongpair.class);     job2.setnumreducetasks(26); //        job2.setgroupingcomparatorclass(phase2.comparator2.class);     fileinputformat.addinputpath(job2, output1);     path output2 = new path(args[1] + "2");     fileoutputformat.setoutputpath(job2, output2);     result = job2.waitforcompletion(true);     counter = job2.getcounters().findcounter("org.apache.hadoop.mapreduce.taskcounter", "reduce_input_records");     system.out.println("num of pairs sent reducers in phase 2: " + counter.getvalue());   //        system.exit(job1.waitforcompletion(true) ? 0 : 1);  } } 

if use default output formatter (textoutputformat) hadoop call tostring() method on object when writes disk. expected behavior. context.write() being called, output format that's controlling how data appears on disk.

if you're chaining jobs typically use sequencefileinputformat , sequencefileoutputformat of jobs, since makes reading output 1 job subsequent job easy.


Comments

Popular posts from this blog

sequelize.js - Sequelize group by with association includes id -

android - Robolectric "INTERNET permission is required" -

java - Android raising EPERM (Operation not permitted) when attempting to send UDP packet after network connection -