java - MultipleOutputs in hadoop -
i using multipleoutputs in reduce program of reduce phase. data set working on around 270 mb , running on pseudo distributed single node. have used custom writable map output values. keys countries present in datasets.
public class reduce_class extends reducer<text, name, nullwritable, text> { public void reduce(text key,iterable<name> values,context context) throws ioexception, interruptedexception{ multipleoutputs<nullwritable,text> m = new multipleoutputs<nullwritable,text>(context); long pat; string n; nullwritable out = nullwritable.get(); treemap<long,arraylist<string>> map = new treemap<long,arraylist<string>>(); for(name nn : values){ pat = nn.patent_no.get(); if(map.containskey(pat)) map.get(pat).add(nn.getname().tostring()); else{ map.put(pat,(new arraylist<string>())); map.get(pat).add(nn.getname().tostring());} } for(map.entry entry : map.entryset()){ n = entry.getkey().tostring(); m.write(out, new text("--------------------------"), key.tostring()); m.write(out, new text(n), key.tostring()); arraylist<string> names = (arraylist)entry.getvalue(); iterator = names.iterator(); while(i.hasnext()){ n = (string)i.next(); m.write(out, new text(n), key.tostring()); } m.write(out, new text("--------------------------"), key.tostring()); } m.close(); }
}
above reduce logic
problems
1) above code works fine small data set fails due heap space 270 mb data set.
2) using country key passes pretty large values in single iterable collection. tried solve mutlipleoutputs creates unique files given set of keys. point unable append existing file created previous run of reduce , throws error. particular keys have create new files. there way work around this? . solving above error caused me define keys country names(my final sorted data) throws java heap error .
sample input
3858241,"durand","philip","e.","","","hudson","ma","us","",1 3858241,"norris","lonnie","h.","","","milford","ma","us","",2 3858242,"gooding","elwyn","r.","","120 darwin rd.","pinckney","mi","us","48169",1 3858243,"pierron","claude","raymond","","","epinal","","fr","",1 3858243,"jenny","jean","paul","","","decines","","fr","",2 3858243,"zuccaro","robert","","","","epinal","","fr","",3 3858244,"mann","richard","l.","","p.o. box 69","woodstock","ct","us","06281",1
sample output small datasets
sample directory structure...
ca-r-00000
fr-r-00000
quebec-r-00000
tx-r-00000
us-r-00000
*individual contents*
3858241 philip e. durand
lonnie h. norris
3858242
elwyn r. gooding
3858244
richard l. mann
i know answering old question here, anyway let me throw ideas here. seems creating treemap in reducer records in 1 reduce call. in mapreduce cannot afford hold records in memory, cause never scale. making map of patent_no
, names
associated patent_no
. want separate out records based on patent_no
, why not leverage sorting of mapreduce framework.
you should include patent_no
, name
along country
in writable key itself.
- write
partitioner
partition on basis ofcountry
. - sorting should on
country
,patent_no
,name
. - you should write
grouping comparator
group uponcountry
,patent_no
.
as result records same country
go same reducer , sorted patent_no
, name
. , within same reducer different patent_no go different reduce call. need simple write in multipleoutputs. rid of in memory treemap.
and points suggest should take care are:
- do not create
new multipleoutputs
in reduce method every time, instead should writesetup()
method , create 1 insetup()
method. - do not create
new text()
every time instead create 1 in setup method , reuse same instanceset("string")
method oftext
. can argue whats point there, java's gc anyway garbage collect that. should try use memory low possible java's garbage collection should called less frequently.
Comments
Post a Comment