java - Hadoop: Using a Custom Object in a Mapper's Output -
i new hadoop , stumped something:
what i'm trying take in list of text-entries in files , have initial mapper crunching on them , output customized object aggregated reducer.
i put framework using text values ok--but when try change using our own objects, npe (shown below)
here driver's run():
jobconf conf = new jobconf( getconf(), vectorconpreprocessor.class ); conf.setjobname( job_name + " - " + job_isodate ); m_log.info("job name: " + conf.getjobname() ); // need change chain-mapper later on . . . . conf.setinputformat( textinputformat.class ); // reading text files conf.setmapperclass( mapmvandsamples.class ); conf.setmapoutputvalueclass( sparsenessfilter.class ); //conf.setcombinerclass( combinesparsenesstrackers.class ); // not using combiner, because nodes must gathered before reduction conf.setreducerclass( reducesparsenesstrackers.class ); // not sure reducing required here . . . . conf.setoutputkeyclass( text.class ); // output key sha2 conf.setoutputvalueclass( text.class ); // output value featurevectormap conf.setoutputformat( sequencefileoutputformat.class ); // binary object writer
and here mapper:
public class mapmvandsamples extends mapreducebase implements mapper<longwritable, text, text, sparsenessfilter> { public static final string delim = ":"; protected static logger m_log = logger.getlogger( mapmvandsamples.class ); // in case we're reading line of text @ time file // don't care sha256 now, create sparsenessfilter // each entry. reducer aggregate them later. @override public void map( longwritable byteposition, text lineoftext, outputcollector<text, sparsenessfilter> outputcollector, reporter reporter ) throws ioexception { string[] data = lineoftext.tostring().split( delim, 2 ); string sha256 = data[0]; string json = data[1]; // create sparsenessfilter record sparsenessfilter sf = new sparsenessfilter(); // crunching goes here outputcollector.collect( new text("allonefornow"), sf ); } }
and, finally, error:
14/03/05 21:56:56 info mapreduce.job: task id : attempt_1394084907462_0002_m_000000_1, status : failed error: java.lang.nullpointerexception @ org.apache.hadoop.mapred.maptask$mapoutputbuffer.init(maptask.java:989) @ org.apache.hadoop.mapred.maptask.createsortingcollector(maptask.java:390) @ org.apache.hadoop.mapred.maptask.runoldmapper(maptask.java:418) @ org.apache.hadoop.mapred.maptask.run(maptask.java:341) @ org.apache.hadoop.mapred.yarnchild$2.run(yarnchild.java:162) @ java.security.accesscontroller.doprivileged(native method) @ javax.security.auth.subject.doas(subject.java:415) @ org.apache.hadoop.security.usergroupinformation.doas(usergroupinformation.java:1491) @ org.apache.hadoop.mapred.yarnchild.main(yarnchild.java:157)
any ideas? need implement interface on our sparsenessfilter able have mapper's outputcollector handle it?
thanks!
all custom keys , values should implement writablecomparable interface.
you need implement readfields(datainput in) & write(dataoutput out) & compareto.
Comments
Post a Comment