summaryrefslogtreecommitdiff
path: root/dtrain/test/wc_pipes/wordcount.cc
blob: 39560a3158c4d34556c01fb669f552d74e92149d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include "wordcount.hh"


void
WordcountMapper::map(HadoopPipes::MapContext & context)
{
  typedef boost::tokenizer<> tokenizer_t;
  tokenizer_t tokenizer(context.getInputValue());

  for( tokenizer_t::const_iterator i = tokenizer.begin();
      tokenizer.end() != i; ++i ) {
    context.emit(boost::to_lower_copy(*i), "1");
  }
}

void
WordcountReducer::reduce(HadoopPipes::ReduceContext & context)
{
  uint32_t count( 0 );

  do {
	++count;
  } while( context.nextValue() );

  std::cout << context.getInputKey() << endl;
  context.emit( context.getInputKey(),
                boost::lexical_cast<std::string>(count) );
}


int
main( int argc, char * argv[] )
{
  HadoopPipes::TemplateFactory2<WordcountMapper,
                                WordcountReducer> factory;
  return HadoopPipes::runTask( factory );
}