From 2dc76ceae3dfbe333b6b404e5b1298be99b211c9 Mon Sep 17 00:00:00 2001 From: bothameister Date: Tue, 13 Jul 2010 03:33:36 +0000 Subject: added queue mechanism to parallelization of hyperparam resampling; new program argument 'num_jobs' to control granularity. git-svn-id: https://ws10smt.googlecode.com/svn/trunk@232 ec762483-ff6d-05da-a07a-a48fb63a330f --- gi/pyp-topics/src/pyp-topics.cc | 105 ++++++++--------- gi/pyp-topics/src/pyp-topics.hh | 12 +- gi/pyp-topics/src/train-contexts.cc | 6 +- gi/pyp-topics/src/workers.hh | 220 +++++++++++++++++++++++++++++++++++- 4 files changed, 279 insertions(+), 64 deletions(-) (limited to 'gi') diff --git a/gi/pyp-topics/src/pyp-topics.cc b/gi/pyp-topics/src/pyp-topics.cc index 76f95b2a..e528a923 100644 --- a/gi/pyp-topics/src/pyp-topics.cc +++ b/gi/pyp-topics/src/pyp-topics.cc @@ -15,6 +15,7 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, std::cerr << "\n Training with " << m_word_pyps.size()-1 << " backoff level" << (m_word_pyps.size()==2 ? ":" : "s:") << std::endl; + for (int i=0; i<(int)m_word_pyps.size(); ++i) { m_word_pyps.at(i).reserve(m_num_topics); @@ -76,6 +77,10 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, for (int i = 0; i < corpus.num_documents(); ++i) randomDocIndices[i] = i; + if (num_jobs < max_threads) + num_jobs = max_threads; + int job_incr = (int) ( (float)m_document_pyps.size() / float(num_jobs) ); + // Sampling phase for (int curr_sample=0; curr_sample < samples; ++curr_sample) { if (freq_cutoff_interval > 0 && curr_sample != 1 @@ -149,33 +154,38 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, if (curr_sample != 0 && curr_sample % 10 == 0) { std::cerr << " ||| time=" << (timer.Elapsed() / 10.0) << " sec/sample" << std::endl; timer.Reset(); - std::cerr << " ... Resampling hyperparameters (" << max_threads << " threads)"; std::cerr.flush(); - + std::cerr << " ... Resampling hyperparameters ("; + // resample the hyperparamters F log_p=0.0; - for (std::vector::iterator levelIt=m_word_pyps.begin(); - levelIt != m_word_pyps.end(); ++levelIt) { - for (PYPs::iterator pypIt=levelIt->begin(); - pypIt != levelIt->end(); ++pypIt) { - pypIt->resample_prior(); - log_p += pypIt->log_restaurant_prob(); - } - } - - WorkerPtrVect workers; - for (int i = 0; i < max_threads; ++i) - { - JobReturnsF job = boost::bind(&PYPTopics::hresample_docs, this, max_threads, i); - workers.push_back(new SimpleResampleWorker(job)); + if (max_threads == 1) + { + std::cerr << "1 thread)" << std::endl; std::cerr.flush(); + log_p += hresample_topics(); + log_p += hresample_docs(0, m_document_pyps.size()); } + else + { //parallelize + std::cerr << max_threads << " threads, " << num_jobs << " jobs)" << std::endl; std::cerr.flush(); + + WorkerPool pool(max_threads); + int i=0, sz = m_document_pyps.size(); + //documents... + while (i <= sz - 2*job_incr) + { + JobReturnsF job = boost::bind(&PYPTopics::hresample_docs, this, i, i+job_incr); + pool.addJob(job); + i += job_incr; + } + // do all remaining documents + JobReturnsF job = boost::bind(&PYPTopics::hresample_docs, this, i,sz); + pool.addJob(job); + + //topics... + JobReturnsF topics_job = boost::bind(&PYPTopics::hresample_topics, this); + pool.addJob(topics_job); - WorkerPtrVect::iterator workerIt; - for (workerIt = workers.begin(); workerIt != workers.end(); ++workerIt) - { - //std::cerr << "Retrieving worker result.."; std::cerr.flush(); - F wresult = workerIt->getResult(); //blocks until worker done - log_p += wresult; - //std::cerr << ".. got " << wresult << std::endl; std::cerr.flush(); + log_p += pool.get_result(); //blocks } @@ -204,45 +214,38 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, delete [] randomDocIndices; } -PYPTopics::F PYPTopics::hresample_docs(int num_threads, int thread_id) +PYPTopics::F PYPTopics::hresample_docs(int start, int end) { int resample_counter=0; F log_p = 0.0; - PYPs::iterator pypIt = m_document_pyps.begin(); - PYPs::iterator end = m_document_pyps.end(); - pypIt += thread_id; -// std::cerr << thread_id << " started " << std::endl; std::cerr.flush(); - - while (pypIt < end) + assert(start >= 0); + assert(end >= 0); + assert(start <= end); + for (int i=start; i < end; ++i) { - pypIt->resample_prior(); - log_p += pypIt->log_restaurant_prob(); + m_document_pyps[i].resample_prior(); + log_p += m_document_pyps[i].log_restaurant_prob(); if (resample_counter++ % 5000 == 0) { std::cerr << "."; std::cerr.flush(); } - pypIt += num_threads; } -// std::cerr << thread_id << " did " << resample_counter << " with answer " << log_p << std::endl; std::cerr.flush(); - return log_p; } -//PYPTopics::F PYPTopics::hresample_topics() -//{ -// F log_p = 0.0; -// for (std::vector::iterator levelIt=m_word_pyps.begin(); -// levelIt != m_word_pyps.end(); ++levelIt) { -// for (PYPs::iterator pypIt=levelIt->begin(); -// pypIt != levelIt->end(); ++pypIt) { -// -// pypIt->resample_prior(); -// log_p += pypIt->log_restaurant_prob(); -// } -// } -// //std::cerr << "topicworker has answer " << log_p << std::endl; std::cerr.flush(); -// -// return log_p; -//} +PYPTopics::F PYPTopics::hresample_topics() +{ + F log_p = 0.0; + for (std::vector::iterator levelIt=m_word_pyps.begin(); + levelIt != m_word_pyps.end(); ++levelIt) { + for (PYPs::iterator pypIt=levelIt->begin(); + pypIt != levelIt->end(); ++pypIt) { + + pypIt->resample_prior(); + log_p += pypIt->log_restaurant_prob(); + } + } + return log_p; +} void PYPTopics::decrement(const Term& term, int topic, int level) { //std::cerr << "PYPTopics::decrement(" << term << "," << topic << "," << level << ")" << std::endl; diff --git a/gi/pyp-topics/src/pyp-topics.hh b/gi/pyp-topics/src/pyp-topics.hh index 8097fe19..5e1fc6d6 100644 --- a/gi/pyp-topics/src/pyp-topics.hh +++ b/gi/pyp-topics/src/pyp-topics.hh @@ -21,12 +21,12 @@ public: public: PYPTopics(int num_topics, bool use_topic_pyp=false, unsigned long seed = 0, - int max_threads = 1) + int max_threads = 1, int num_jobs = 1) : m_num_topics(num_topics), m_word_pyps(1), m_topic_pyp(0.5,1.0,seed), m_use_topic_pyp(use_topic_pyp), m_seed(seed), uni_dist(0,1), rng(seed == 0 ? (unsigned long)this : seed), - rnd(rng, uni_dist), max_threads(max_threads) {} + rnd(rng, uni_dist), max_threads(max_threads), num_jobs(num_jobs) {} void sample_corpus(const Corpus& corpus, int samples, int freq_cutoff_start=0, int freq_cutoff_end=0, @@ -81,15 +81,13 @@ private: //call: rnd() generates uniform on [0,1) typedef boost::function JobReturnsF; - typedef SimpleWorker SimpleResampleWorker; - typedef boost::ptr_vector WorkerPtrVect; - F hresample_docs(int num_threads, int thread_id); + F hresample_docs(int start, int end); //does i in [start, end) -// F hresample_topics(); + F hresample_topics(); int max_threads; - + int num_jobs; TermBackoffPtr m_backoff; }; diff --git a/gi/pyp-topics/src/train-contexts.cc b/gi/pyp-topics/src/train-contexts.cc index 455e7b1e..a673bf4e 100644 --- a/gi/pyp-topics/src/train-contexts.cc +++ b/gi/pyp-topics/src/train-contexts.cc @@ -54,6 +54,7 @@ int main(int argc, char **argv) ("freq-cutoff-end", value()->default_value(0), "final frequency cutoff.") ("freq-cutoff-interval", value()->default_value(0), "number of iterations between frequency decrement.") ("max-threads", value()->default_value(1), "maximum number of simultaneous threads allowed") + ("num-jobs", value()->default_value(1), "allows finer control over parallelization") ; cmdline_specific.add(config_options); @@ -77,10 +78,11 @@ int main(int argc, char **argv) cerr << "Please specify a file containing the data." << endl; return 1; } - + assert(vm["max-threads"].as() > 0); + assert(vm["num-jobs"].as() > -1); // seed the random number generator: 0 = automatic, specify value otherwise unsigned long seed = 0; - PYPTopics model(vm["topics"].as(), vm.count("hierarchical-topics"), seed, vm["max-threads"].as()); + PYPTopics model(vm["topics"].as(), vm.count("hierarchical-topics"), seed, vm["max-threads"].as(), vm["num-jobs"].as()); // read the data BackoffGenerator* backoff_gen=0; diff --git a/gi/pyp-topics/src/workers.hh b/gi/pyp-topics/src/workers.hh index 55424c8d..95b18947 100644 --- a/gi/pyp-topics/src/workers.hh +++ b/gi/pyp-topics/src/workers.hh @@ -1,18 +1,227 @@ +/** + Basic thread-pool tools using Boost.Thread. + (Jan Botha, 7/2010) + + --Simple usage-- + Use SimpleWorker. + Example, call a function that returns an int in a new thread: + typedef boost::function JobType; + JobType job = boost::bind(funcname); + //or boost::bind(&class::funcname, this) for a member function + SimpleWorker worker(job); + int result = worker.getResult(); //blocks until result is ready + + --Extended usage-- + Use WorkerPool, which uses Queuemt (a synchronized queue) and Worker. + Example: + (same context and typedef + WorkerPool pool(num_threads); + JobType job = ... + pool.addJob(job); + ... + pool.get_result(); //blocks until all workers are done, returns the some of their results. + + Jobs added to a WorkerPool need to be the same type. A WorkerPool instance should not be reused (e.g. adding jobs) after calling get_result(). +*/ + #ifndef WORKERS_HH #define WORKERS_HH -#include "timing.h" - #include #include #include +#include +#include #include #include +#include #include +#include + +#include +#include "timing.h" + +/** Implements a synchronized queue*/ +template +class Queuemt +{ + +public: + boost::condition_variable_any cond; + const bool& running; + + Queuemt() { } + Queuemt(const bool& running) : running(running), maxsize(0), qsize(0) + { + } + + ~Queuemt() { + } + + J pop() + { + J job; + { + boost::unique_lock qlock(q_mutex); + while (running && qsize == 0) + cond.wait(qlock); + + if (qsize > 0) + { + job = q.front(); + q.pop(); + --qsize; + } + } + if (job) + cond.notify_one(); + return job; + + } -//#include + void push(J job) + { + { + boost::unique_lock lock(q_mutex); + q.push(job); + ++qsize; + } + if (qsize > maxsize) + maxsize = qsize; + + cond.notify_one(); + } + int getMaxsize() + { + return maxsize; + } + int size() + { + return qsize; + } + +private: + boost::shared_mutex q_mutex; + std::queue q; + int maxsize; + volatile int qsize; +}; + + +template +class Worker +{ +typedef boost::packaged_task PackagedTask; +public: + Worker(Queuemt& queue, int id, int num_workers) : + q(queue), tasktime(0.0), id(id), num_workers(num_workers) + { + PackagedTask task(boost::bind(&Worker::run, this)); + future = task.get_future(); + boost::thread t(boost::move(task)); + } + + R run() //this is called upon thread creation + { + R wresult = 0; + while (isRunning()) + { + J job = q.pop(); + + if (job) + { + timer.Reset(); + wresult += job(); + tasktime += timer.Elapsed(); + } + } + return wresult; + } + + R getResult() + { + if (!future.is_ready()) + future.wait(); + assert(future.is_ready()); + return future.get(); + } + + double getTaskTime() + { + return tasktime; + } + +private: + + Queuemt& q; + + boost::unique_future future; + + bool isRunning() + { + return q.running || q.size() > 0; + } + + Timer timer; + double tasktime; + int id; + int num_workers; +}; + +template +class WorkerPool +{ +typedef boost::packaged_task PackagedTask; +typedef Worker WJR; +typedef boost::ptr_vector WorkerVector; +public: + + WorkerPool(int num_workers) + { + q.reset(new Queuemt(running)); + running = true; + for (int i = 0; i < num_workers; ++i) + workers.push_back( new Worker(*q, i, num_workers) ); + } + + ~WorkerPool() + { + } + + R get_result() + { + running = false; + q->cond.notify_all(); + R tmp = 0; + double tasktime = 0.0; + for (typename WorkerVector::iterator it = workers.begin(); it != workers.end(); it++) + { + R res = it->getResult(); + tmp += res; + //std::cerr << "tasktime: " << it->getTaskTime() << std::endl; + tasktime += it->getTaskTime(); + } +// std::cerr << " maxQ = " << q->getMaxsize() << std::endl; + return tmp; + } + + void addJob(J job) + { + q->push(job); + } + +private: + + WorkerVector workers; + + boost::shared_ptr > q; + + bool running; +}; + +/////////////////// template class SimpleWorker { @@ -33,6 +242,7 @@ public: timer.Reset(); wresult = job(); tasktime = timer.Elapsed(); + std::cerr << tasktime << " s" << std::endl; return wresult; } @@ -60,4 +270,6 @@ private: }; -#endif + + +#endif -- cgit v1.2.3