summaryrefslogtreecommitdiff
path: root/hadoop/wordcount/pipes/wordcount.cc
blob: c9394d540f0e4a531e204ee497c60062479b47c1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include "wordcount.hh"


void
WordcountMapper::map(HadoopPipes::MapContext &context)
{
  typedef boost::tokenizer<> tokenizer_t;
  tokenizer_t tokenizer(context.getInputValue());

  for(tokenizer_t::const_iterator i = tokenizer.begin();
      tokenizer.end() != i; ++i) {
    context.emit(boost::to_lower_copy(*i), "1");
  }
}

void
WordcountReducer::reduce(HadoopPipes::ReduceContext &context)
{
  uint32_t count(0);

  do {
	  ++count;
  } while(context.nextValue());

  //std::cout << context.getInputKey() << endl;
  context.emit(context.getInputKey(),
               boost::lexical_cast<std::string>(count));
}


int
main(int argc, char *argv[])
{
  HadoopPipes::TemplateFactory2<WordcountMapper,
                                WordcountReducer> factory;
  return HadoopPipes::runTask(factory);
}