summaryrefslogtreecommitdiff
path: root/dtrain/scfg/features/count/featurecount.cc
diff options
context:
space:
mode:
Diffstat (limited to 'dtrain/scfg/features/count/featurecount.cc')
-rw-r--r--dtrain/scfg/features/count/featurecount.cc49
1 files changed, 49 insertions, 0 deletions
diff --git a/dtrain/scfg/features/count/featurecount.cc b/dtrain/scfg/features/count/featurecount.cc
new file mode 100644
index 00000000..db31885c
--- /dev/null
+++ b/dtrain/scfg/features/count/featurecount.cc
@@ -0,0 +1,49 @@
+#include "featurecount.hh"
+
+
+void
+FeatureCountMapper::map( HadoopPipes::MapContext &context )
+{
+ string line = context.getInputValue();
+
+ // get features substr
+ size_t i = 0, c = 0, beg = 0, end = 0;
+ string::iterator it = line.begin();
+ string s;
+ while ( c != 12 ) {
+ s = *it;
+ if ( s == "|" ) c += 1;
+ if ( beg == 0 && c == 9 ) beg = i+2;
+ if ( c == 12 ) end = i-beg-3;
+ it++;
+ i++;
+ }
+ string sub = line.substr( beg, end );
+
+ // emit feature:1
+ vector<string> f_tok;
+ boost::split( f_tok, sub, boost::is_any_of(" ") );
+ vector<string>::iterator f;
+ for ( f = f_tok.begin(); f != f_tok.end(); f++ ) {
+ if ( f->find("=1") != string::npos ) context.emit(*f, "1");
+ }
+}
+
+void
+FeatureCountReducer::reduce( HadoopPipes::ReduceContext &context )
+{
+ size_t sum = 0;
+ while ( context.nextValue() ) sum += HadoopUtils::toInt( context.getInputValue() );
+ context.emit( context.getInputKey(), HadoopUtils::toString(sum) );
+}
+
+
+int
+main( int argc, char * argv[] )
+{
+ HadoopPipes::TemplateFactory2<FeatureCountMapper,
+ FeatureCountReducer> factory;
+
+ return HadoopPipes::runTask(factory);
+}
+