#include "featurecount.hh"


void
FeatureCountMapper::map( HadoopPipes::MapContext &context )
{
  string line = context.getInputValue();

  // get features substr
  size_t i = 0, c = 0, beg = 0, end = 0;
  string::iterator it = line.begin();
  string s;
  while ( c != 12 ) {
    s = *it;
    if ( s == "|" ) c += 1;
    if ( beg == 0 && c == 9 ) beg = i+2;
    if ( c == 12 ) end = i-beg-3;
    it++;
    i++;
  }
  string sub = line.substr( beg, end );

  // emit feature:1
  vector<string> f_tok;
  boost::split( f_tok, sub, boost::is_any_of(" ") );
  vector<string>::iterator f;
  for ( f = f_tok.begin(); f != f_tok.end(); f++ ) {
    if ( f->find("=1") != string::npos ) context.emit(*f, "1");
  }
}

void
FeatureCountReducer::reduce( HadoopPipes::ReduceContext &context )
{
  size_t sum = 0;
  while ( context.nextValue() ) sum += HadoopUtils::toInt( context.getInputValue() );
  context.emit( context.getInputKey(), HadoopUtils::toString(sum) );
}


int
main( int argc, char * argv[] )
{
  HadoopPipes::TemplateFactory2<FeatureCountMapper,
                                FeatureCountReducer> factory;

  return HadoopPipes::runTask(factory);
}