From 55da6f6d4e924769cea9463c1967e4405317a8c5 Mon Sep 17 00:00:00 2001 From: bothameister Date: Tue, 6 Jul 2010 22:38:21 +0000 Subject: Added simple multi-threading during hyperparameter resampling. Added cmdarg for controlling number of threads. Moved Timer to its own header file. Cleaned up Makefile.am git-svn-id: https://ws10smt.googlecode.com/svn/trunk@170 ec762483-ff6d-05da-a07a-a48fb63a330f --- gi/pyp-topics/src/Makefile.am | 4 +- gi/pyp-topics/src/pyp-topics.cc | 99 +++++++++++++++++++++++-------------- gi/pyp-topics/src/pyp-topics.hh | 17 +++++-- gi/pyp-topics/src/timing.h | 31 ++++++++++++ gi/pyp-topics/src/train-contexts.cc | 3 +- gi/pyp-topics/src/workers.hh | 62 +++++++++++++++++++++++ 6 files changed, 172 insertions(+), 44 deletions(-) create mode 100644 gi/pyp-topics/src/timing.h create mode 100644 gi/pyp-topics/src/workers.hh (limited to 'gi/pyp-topics/src') diff --git a/gi/pyp-topics/src/Makefile.am b/gi/pyp-topics/src/Makefile.am index 681a9a0c..abfc95ac 100644 --- a/gi/pyp-topics/src/Makefile.am +++ b/gi/pyp-topics/src/Makefile.am @@ -3,10 +3,10 @@ bin_PROGRAMS = pyp-topics-train pyp-contexts-train contexts_lexer.cc: contexts_lexer.l $(LEX) -s -CF -8 -o$@ $< -pyp_topics_train_SOURCES = corpus.cc gzstream.cc mt19937ar.c pyp-topics.cc train.cc contexts_lexer.cc contexts_corpus.cc +pyp_topics_train_SOURCES = corpus.cc gzstream.cc pyp-topics.cc train.cc contexts_lexer.cc contexts_corpus.cc pyp_topics_train_LDADD = $(top_srcdir)/decoder/libcdec.a -lz -pyp_contexts_train_SOURCES = corpus.cc gzstream.cc mt19937ar.c pyp-topics.cc contexts_lexer.cc contexts_corpus.cc train-contexts.cc +pyp_contexts_train_SOURCES = corpus.cc gzstream.cc pyp-topics.cc contexts_lexer.cc contexts_corpus.cc train-contexts.cc pyp_contexts_train_LDADD = $(top_srcdir)/decoder/libcdec.a -lz AM_CPPFLAGS = -W -Wall -Wno-sign-compare -funroll-loops diff --git a/gi/pyp-topics/src/pyp-topics.cc b/gi/pyp-topics/src/pyp-topics.cc index 2b96816e..48ccf507 100644 --- a/gi/pyp-topics/src/pyp-topics.cc +++ b/gi/pyp-topics/src/pyp-topics.cc @@ -1,33 +1,7 @@ -#ifdef __CYGWIN__ -# ifndef _POSIX_MONOTONIC_CLOCK -# define _POSIX_MONOTONIC_CLOCK -# endif -#endif - #include "pyp-topics.hh" +#include "timing.h" -#include -#include -#include -#include "clock_gettime_stub.c" - -struct Timer { - Timer() { Reset(); } - void Reset() - { - clock_gettime(CLOCK_MONOTONIC, &start_t); - } - double Elapsed() const { - timespec end_t; - clock_gettime(CLOCK_MONOTONIC, &end_t); - const double elapsed = (end_t.tv_sec - start_t.tv_sec) - + (end_t.tv_nsec - start_t.tv_nsec) / 1000000000.0; - return elapsed; - } - private: - timespec start_t; -}; - +//#include void PYPTopics::sample_corpus(const Corpus& corpus, int samples, int freq_cutoff_start, int freq_cutoff_end, int freq_cutoff_interval) { @@ -175,9 +149,10 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, if (curr_sample != 0 && curr_sample % 10 == 0) { std::cerr << " ||| time=" << (timer.Elapsed() / 10.0) << " sec/sample" << std::endl; timer.Reset(); - std::cerr << " ... Resampling hyperparameters "; std::cerr.flush(); + std::cerr << " ... Resampling hyperparameters (" << max_threads << " threads)"; std::cerr.flush(); + // resample the hyperparamters - F log_p=0.0; int resample_counter=0; + F log_p=0.0; for (std::vector::iterator levelIt=m_word_pyps.begin(); levelIt != m_word_pyps.end(); ++levelIt) { for (PYPs::iterator pypIt=levelIt->begin(); @@ -187,15 +162,23 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, } } - resample_counter=0; - for (PYPs::iterator pypIt=m_document_pyps.begin(); - pypIt != m_document_pyps.end(); ++pypIt, ++resample_counter) { - pypIt->resample_prior(); - log_p += pypIt->log_restaurant_prob(); - if (resample_counter++ % 10000 == 0) { - std::cerr << "."; std::cerr.flush(); - } + WorkerPtrVect workers; + for (int i = 0; i < max_threads; ++i) + { + JobReturnsF job = boost::bind(&PYPTopics::hresample_docs, this, max_threads, i); + workers.push_back(new SimpleResampleWorker(job)); } + + WorkerPtrVect::iterator workerIt; + for (workerIt = workers.begin(); workerIt != workers.end(); ++workerIt) + { + //std::cerr << "Retrieving worker result.."; std::cerr.flush(); + F wresult = workerIt->getResult(); //blocks until worker done + log_p += wresult; + //std::cerr << ".. got " << wresult << std::endl; std::cerr.flush(); + + } + if (m_use_topic_pyp) { m_topic_pyp.resample_prior(); log_p += m_topic_pyp.log_restaurant_prob(); @@ -221,6 +204,46 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, delete [] randomDocIndices; } +PYPTopics::F PYPTopics::hresample_docs(int num_threads, int thread_id) +{ + int resample_counter=0; + F log_p = 0.0; + PYPs::iterator pypIt = m_document_pyps.begin(); + PYPs::iterator end = m_document_pyps.end(); + pypIt += thread_id; +// std::cerr << thread_id << " started " << std::endl; std::cerr.flush(); + + while (pypIt < end) + { + pypIt->resample_prior(); + log_p += pypIt->log_restaurant_prob(); + if (resample_counter++ % 5000 == 0) { + std::cerr << "."; std::cerr.flush(); + } + pypIt += num_threads; + } +// std::cerr << thread_id << " did " << resample_counter << " with answer " << log_p << std::endl; std::cerr.flush(); + + return log_p; +} + +//PYPTopics::F PYPTopics::hresample_topics() +//{ +// F log_p = 0.0; +// for (std::vector::iterator levelIt=m_word_pyps.begin(); +// levelIt != m_word_pyps.end(); ++levelIt) { +// for (PYPs::iterator pypIt=levelIt->begin(); +// pypIt != levelIt->end(); ++pypIt) { +// +// pypIt->resample_prior(); +// log_p += pypIt->log_restaurant_prob(); +// } +// } +// //std::cerr << "topicworker has answer " << log_p << std::endl; std::cerr.flush(); +// +// return log_p; +//} + void PYPTopics::decrement(const Term& term, int topic, int level) { //std::cerr << "PYPTopics::decrement(" << term << "," << topic << "," << level << ")" << std::endl; m_word_pyps.at(level).at(topic).decrement(term); diff --git a/gi/pyp-topics/src/pyp-topics.hh b/gi/pyp-topics/src/pyp-topics.hh index 9da49267..8097fe19 100644 --- a/gi/pyp-topics/src/pyp-topics.hh +++ b/gi/pyp-topics/src/pyp-topics.hh @@ -11,7 +11,7 @@ #include "pyp.hh" #include "corpus.hh" - +#include "workers.hh" class PYPTopics { public: @@ -20,12 +20,13 @@ public: typedef double F; public: - PYPTopics(int num_topics, bool use_topic_pyp=false, unsigned long seed = 0) + PYPTopics(int num_topics, bool use_topic_pyp=false, unsigned long seed = 0, + int max_threads = 1) : m_num_topics(num_topics), m_word_pyps(1), m_topic_pyp(0.5,1.0,seed), m_use_topic_pyp(use_topic_pyp), m_seed(seed), uni_dist(0,1), rng(seed == 0 ? (unsigned long)this : seed), - rnd(rng, uni_dist) {} + rnd(rng, uni_dist), max_threads(max_threads) {} void sample_corpus(const Corpus& corpus, int samples, int freq_cutoff_start=0, int freq_cutoff_end=0, @@ -79,6 +80,16 @@ private: gen_type rnd; //instantiate: rnd(rng, uni_dist) //call: rnd() generates uniform on [0,1) + typedef boost::function JobReturnsF; + typedef SimpleWorker SimpleResampleWorker; + typedef boost::ptr_vector WorkerPtrVect; + + F hresample_docs(int num_threads, int thread_id); + +// F hresample_topics(); + + int max_threads; + TermBackoffPtr m_backoff; }; diff --git a/gi/pyp-topics/src/timing.h b/gi/pyp-topics/src/timing.h new file mode 100644 index 00000000..7543295c --- /dev/null +++ b/gi/pyp-topics/src/timing.h @@ -0,0 +1,31 @@ +#ifndef TIMING_H +#define TIMING_H + +#ifdef __CYGWIN__ +# ifndef _POSIX_MONOTONIC_CLOCK +# define _POSIX_MONOTONIC_CLOCK +# endif +#endif + +#include +#include +#include "clock_gettime_stub.c" + +struct Timer { + Timer() { Reset(); } + void Reset() + { + clock_gettime(CLOCK_MONOTONIC, &start_t); + } + double Elapsed() const { + timespec end_t; + clock_gettime(CLOCK_MONOTONIC, &end_t); + const double elapsed = (end_t.tv_sec - start_t.tv_sec) + + (end_t.tv_nsec - start_t.tv_nsec) / 1000000000.0; + return elapsed; + } + private: + timespec start_t; +}; + +#endif diff --git a/gi/pyp-topics/src/train-contexts.cc b/gi/pyp-topics/src/train-contexts.cc index 8a0c8949..455e7b1e 100644 --- a/gi/pyp-topics/src/train-contexts.cc +++ b/gi/pyp-topics/src/train-contexts.cc @@ -53,6 +53,7 @@ int main(int argc, char **argv) ("freq-cutoff-start", value()->default_value(0), "initial frequency cutoff.") ("freq-cutoff-end", value()->default_value(0), "final frequency cutoff.") ("freq-cutoff-interval", value()->default_value(0), "number of iterations between frequency decrement.") + ("max-threads", value()->default_value(1), "maximum number of simultaneous threads allowed") ; cmdline_specific.add(config_options); @@ -79,7 +80,7 @@ int main(int argc, char **argv) // seed the random number generator: 0 = automatic, specify value otherwise unsigned long seed = 0; - PYPTopics model(vm["topics"].as(), vm.count("hierarchical-topics"), seed); + PYPTopics model(vm["topics"].as(), vm.count("hierarchical-topics"), seed, vm["max-threads"].as()); // read the data BackoffGenerator* backoff_gen=0; diff --git a/gi/pyp-topics/src/workers.hh b/gi/pyp-topics/src/workers.hh new file mode 100644 index 00000000..1f496acf --- /dev/null +++ b/gi/pyp-topics/src/workers.hh @@ -0,0 +1,62 @@ +#ifndef WORKERS_HH +#define WORKERS_HH + +#include +#include +#include +#include +#include +#include + +//#include + +#include "timing.h" + +template +class SimpleWorker +{ +typedef boost::packaged_task PackagedTask; +public: + SimpleWorker(J& job) : job(job), tasktime(0.0) + { + PackagedTask task(boost::bind(&SimpleWorker::run, this)); + future = task.get_future(); + boost::thread t(boost::move(task)); + } + + R run() //this is called upon thread creation + { + R wresult = 0; + + assert(job); + timer.Reset(); + wresult = job(); + tasktime = timer.Elapsed(); + return wresult; + } + + R getResult() + { + if (!future.is_ready()) + future.wait(); + assert(future.is_ready()); + return future.get(); + } + + double getTaskTime() + { + return tasktime; + } + +private: + + J job; + + boost::unique_future future; + + Timer timer; + double tasktime; + +}; + +#endif -- cgit v1.2.3