From 73dbb0343a895345a80d49da9d48edac8858e87a Mon Sep 17 00:00:00 2001 From: philblunsom Date: Mon, 19 Jul 2010 18:33:29 +0000 Subject: Vaguely working distributed implementation. Hierarchical topics doesn't yet work correctly. git-svn-id: https://ws10smt.googlecode.com/svn/trunk@317 ec762483-ff6d-05da-a07a-a48fb63a330f --- gi/pyp-topics/src/mpi-train-contexts.cc | 113 ++++++++++++++++++-------------- 1 file changed, 64 insertions(+), 49 deletions(-) (limited to 'gi/pyp-topics/src/mpi-train-contexts.cc') diff --git a/gi/pyp-topics/src/mpi-train-contexts.cc b/gi/pyp-topics/src/mpi-train-contexts.cc index 0651ecac..6e1e78a5 100644 --- a/gi/pyp-topics/src/mpi-train-contexts.cc +++ b/gi/pyp-topics/src/mpi-train-contexts.cc @@ -10,6 +10,7 @@ #include #include #include +#include // Local #include "mpi-pyp-topics.hh" @@ -28,9 +29,10 @@ int main(int argc, char **argv) { mpi::environment env(argc, argv); mpi::communicator world; - bool am_root = (world.rank() == 0); - if (am_root) std::cout << "I am process " << world.rank() << " of " << world.size() << "." << std::endl; + int rank = world.rank(); + bool am_root = rank; if (am_root) cout << "Pitman Yor topic models: Copyright 2010 Phil Blunsom\n"; + if (am_root) std::cout << "I am process " << world.rank() << " of " << world.size() << "." << std::endl; if (am_root) cout << REVISION << '\n' <(), vm["max-contexts-per-document"].as()); - if (world.rank() == 0) { - if (vm.count("document-topics-out")) { - ogzstream documents_out(vm["document-topics-out"].as().c_str()); - - int document_id=0; - map all_terms; - for (Corpus::const_iterator corpusIt=contexts_corpus.begin(); - corpusIt != contexts_corpus.end(); ++corpusIt, ++document_id) { - vector unique_terms; - for (Document::const_iterator docIt=corpusIt->begin(); - docIt != corpusIt->end(); ++docIt) { - if (unique_terms.empty() || *docIt != unique_terms.back()) - unique_terms.push_back(*docIt); - // increment this terms frequency - pair::iterator,bool> insert_result = all_terms.insert(make_pair(*docIt,1)); - if (!insert_result.second) - all_terms[*docIt] = all_terms[*docIt] + 1; - //insert_result.first++; - } - documents_out << contexts_corpus.key(document_id) << '\t'; - documents_out << model.max(document_id) << " " << corpusIt->size() << " ||| "; - for (std::vector::const_iterator termIt=unique_terms.begin(); - termIt != unique_terms.end(); ++termIt) { - if (termIt != unique_terms.begin()) - documents_out << " ||| "; - vector strings = contexts_corpus.context2string(*termIt); - copy(strings.begin(), strings.end(),ostream_iterator(documents_out, " ")); - documents_out << "||| C=" << model.max(document_id, *termIt); - - } - documents_out <() + ".pyp-process-" + boost::lexical_cast(rank)).c_str()); + int documents = contexts_corpus.num_documents(); + int mpi_start = 0, mpi_end = documents; + if (world.size() != 1) { + mpi_start = (documents / world.size()) * rank; + if (rank == world.size()-1) mpi_end = documents; + else mpi_end = (documents / world.size())*(rank+1); + } + + map all_terms; + for (int document_id=mpi_start; document_id unique_terms; + for (Document::const_iterator docIt=doc.begin(); docIt != doc.end(); ++docIt) { + if (unique_terms.empty() || *docIt != unique_terms.back()) + unique_terms.push_back(*docIt); + // increment this terms frequency + pair::iterator,bool> insert_result = all_terms.insert(make_pair(*docIt,1)); + if (!insert_result.second) + all_terms[*docIt] = all_terms[*docIt] + 1; } - documents_out.close(); - - if (vm.count("default-topics-out")) { - ofstream default_topics(vm["default-topics-out"].as().c_str()); - default_topics << model.max_topic() <::const_iterator termIt=all_terms.begin(); termIt != all_terms.end(); ++termIt) { - vector strings = contexts_corpus.context2string(termIt->first); - default_topics << model.max(-1, termIt->first) << " ||| " << termIt->second << " ||| "; - copy(strings.begin(), strings.end(),ostream_iterator(default_topics, " ")); - default_topics <::const_iterator termIt=unique_terms.begin(); termIt != unique_terms.end(); ++termIt) { + if (termIt != unique_terms.begin()) + documents_out << " ||| "; + vector strings = contexts_corpus.context2string(*termIt); + copy(strings.begin(), strings.end(),ostream_iterator(documents_out, " ")); + documents_out << "||| C=" << model.max(document_id, *termIt); } + documents_out <().c_str()); + for (int p=0; p < world.size(); ++p) { + std::string rank_p_prefix((vm["document-topics-out"].as() + ".pyp-process-" + boost::lexical_cast(p)).c_str()); + std::ifstream rank_p_trees_istream(rank_p_prefix.c_str(), std::ios_base::binary); + root_documents_out << rank_p_trees_istream.rdbuf(); + rank_p_trees_istream.close(); + remove((rank_p_prefix).c_str()); + } + root_documents_out.close(); } - if (vm.count("topic-words-out")) { - ogzstream topics_out(vm["topic-words-out"].as().c_str()); - model.print_topic_terms(topics_out); - topics_out.close(); + if (am_root && vm.count("default-topics-out")) { + ofstream default_topics(vm["default-topics-out"].as().c_str()); + default_topics << model.max_topic() <::const_iterator termIt=all_terms.begin(); termIt != all_terms.end(); ++termIt) { + vector strings = contexts_corpus.context2string(termIt->first); + default_topics << model.max(-1, termIt->first) << " ||| " << termIt->second << " ||| "; + copy(strings.begin(), strings.end(),ostream_iterator(default_topics, " ")); + default_topics <().c_str()); + model.print_topic_terms(topics_out); + topics_out.close(); } + cout <