summaryrefslogtreecommitdiff
path: root/gi/pyp-topics/src
diff options
context:
space:
mode:
authorbothameister <bothameister@ec762483-ff6d-05da-a07a-a48fb63a330f>2010-07-13 03:33:36 +0000
committerbothameister <bothameister@ec762483-ff6d-05da-a07a-a48fb63a330f>2010-07-13 03:33:36 +0000
commit2dc76ceae3dfbe333b6b404e5b1298be99b211c9 (patch)
tree6133d68a2222f8aeb1b598ea6156e190b43321d4 /gi/pyp-topics/src
parent47c75319638866609f669346e15663c5ba43af7f (diff)
added queue mechanism to parallelization of hyperparam resampling; new program argument 'num_jobs' to control granularity.
git-svn-id: https://ws10smt.googlecode.com/svn/trunk@232 ec762483-ff6d-05da-a07a-a48fb63a330f
Diffstat (limited to 'gi/pyp-topics/src')
-rw-r--r--gi/pyp-topics/src/pyp-topics.cc105
-rw-r--r--gi/pyp-topics/src/pyp-topics.hh12
-rw-r--r--gi/pyp-topics/src/train-contexts.cc6
-rw-r--r--gi/pyp-topics/src/workers.hh220
4 files changed, 279 insertions, 64 deletions
diff --git a/gi/pyp-topics/src/pyp-topics.cc b/gi/pyp-topics/src/pyp-topics.cc
index 76f95b2a..e528a923 100644
--- a/gi/pyp-topics/src/pyp-topics.cc
+++ b/gi/pyp-topics/src/pyp-topics.cc
@@ -15,6 +15,7 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
std::cerr << "\n Training with " << m_word_pyps.size()-1 << " backoff level"
<< (m_word_pyps.size()==2 ? ":" : "s:") << std::endl;
+
for (int i=0; i<(int)m_word_pyps.size(); ++i)
{
m_word_pyps.at(i).reserve(m_num_topics);
@@ -76,6 +77,10 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
for (int i = 0; i < corpus.num_documents(); ++i)
randomDocIndices[i] = i;
+ if (num_jobs < max_threads)
+ num_jobs = max_threads;
+ int job_incr = (int) ( (float)m_document_pyps.size() / float(num_jobs) );
+
// Sampling phase
for (int curr_sample=0; curr_sample < samples; ++curr_sample) {
if (freq_cutoff_interval > 0 && curr_sample != 1
@@ -149,33 +154,38 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
if (curr_sample != 0 && curr_sample % 10 == 0) {
std::cerr << " ||| time=" << (timer.Elapsed() / 10.0) << " sec/sample" << std::endl;
timer.Reset();
- std::cerr << " ... Resampling hyperparameters (" << max_threads << " threads)"; std::cerr.flush();
-
+ std::cerr << " ... Resampling hyperparameters (";
+
// resample the hyperparamters
F log_p=0.0;
- for (std::vector<PYPs>::iterator levelIt=m_word_pyps.begin();
- levelIt != m_word_pyps.end(); ++levelIt) {
- for (PYPs::iterator pypIt=levelIt->begin();
- pypIt != levelIt->end(); ++pypIt) {
- pypIt->resample_prior();
- log_p += pypIt->log_restaurant_prob();
- }
- }
-
- WorkerPtrVect workers;
- for (int i = 0; i < max_threads; ++i)
- {
- JobReturnsF job = boost::bind(&PYPTopics::hresample_docs, this, max_threads, i);
- workers.push_back(new SimpleResampleWorker(job));
+ if (max_threads == 1)
+ {
+ std::cerr << "1 thread)" << std::endl; std::cerr.flush();
+ log_p += hresample_topics();
+ log_p += hresample_docs(0, m_document_pyps.size());
}
+ else
+ { //parallelize
+ std::cerr << max_threads << " threads, " << num_jobs << " jobs)" << std::endl; std::cerr.flush();
+
+ WorkerPool<JobReturnsF, F> pool(max_threads);
+ int i=0, sz = m_document_pyps.size();
+ //documents...
+ while (i <= sz - 2*job_incr)
+ {
+ JobReturnsF job = boost::bind(&PYPTopics::hresample_docs, this, i, i+job_incr);
+ pool.addJob(job);
+ i += job_incr;
+ }
+ // do all remaining documents
+ JobReturnsF job = boost::bind(&PYPTopics::hresample_docs, this, i,sz);
+ pool.addJob(job);
+
+ //topics...
+ JobReturnsF topics_job = boost::bind(&PYPTopics::hresample_topics, this);
+ pool.addJob(topics_job);
- WorkerPtrVect::iterator workerIt;
- for (workerIt = workers.begin(); workerIt != workers.end(); ++workerIt)
- {
- //std::cerr << "Retrieving worker result.."; std::cerr.flush();
- F wresult = workerIt->getResult(); //blocks until worker done
- log_p += wresult;
- //std::cerr << ".. got " << wresult << std::endl; std::cerr.flush();
+ log_p += pool.get_result(); //blocks
}
@@ -204,45 +214,38 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
delete [] randomDocIndices;
}
-PYPTopics::F PYPTopics::hresample_docs(int num_threads, int thread_id)
+PYPTopics::F PYPTopics::hresample_docs(int start, int end)
{
int resample_counter=0;
F log_p = 0.0;
- PYPs::iterator pypIt = m_document_pyps.begin();
- PYPs::iterator end = m_document_pyps.end();
- pypIt += thread_id;
-// std::cerr << thread_id << " started " << std::endl; std::cerr.flush();
-
- while (pypIt < end)
+ assert(start >= 0);
+ assert(end >= 0);
+ assert(start <= end);
+ for (int i=start; i < end; ++i)
{
- pypIt->resample_prior();
- log_p += pypIt->log_restaurant_prob();
+ m_document_pyps[i].resample_prior();
+ log_p += m_document_pyps[i].log_restaurant_prob();
if (resample_counter++ % 5000 == 0) {
std::cerr << "."; std::cerr.flush();
}
- pypIt += num_threads;
}
-// std::cerr << thread_id << " did " << resample_counter << " with answer " << log_p << std::endl; std::cerr.flush();
-
return log_p;
}
-//PYPTopics::F PYPTopics::hresample_topics()
-//{
-// F log_p = 0.0;
-// for (std::vector<PYPs>::iterator levelIt=m_word_pyps.begin();
-// levelIt != m_word_pyps.end(); ++levelIt) {
-// for (PYPs::iterator pypIt=levelIt->begin();
-// pypIt != levelIt->end(); ++pypIt) {
-//
-// pypIt->resample_prior();
-// log_p += pypIt->log_restaurant_prob();
-// }
-// }
-// //std::cerr << "topicworker has answer " << log_p << std::endl; std::cerr.flush();
-//
-// return log_p;
-//}
+PYPTopics::F PYPTopics::hresample_topics()
+{
+ F log_p = 0.0;
+ for (std::vector<PYPs>::iterator levelIt=m_word_pyps.begin();
+ levelIt != m_word_pyps.end(); ++levelIt) {
+ for (PYPs::iterator pypIt=levelIt->begin();
+ pypIt != levelIt->end(); ++pypIt) {
+
+ pypIt->resample_prior();
+ log_p += pypIt->log_restaurant_prob();
+ }
+ }
+ return log_p;
+}
void PYPTopics::decrement(const Term& term, int topic, int level) {
//std::cerr << "PYPTopics::decrement(" << term << "," << topic << "," << level << ")" << std::endl;
diff --git a/gi/pyp-topics/src/pyp-topics.hh b/gi/pyp-topics/src/pyp-topics.hh
index 8097fe19..5e1fc6d6 100644
--- a/gi/pyp-topics/src/pyp-topics.hh
+++ b/gi/pyp-topics/src/pyp-topics.hh
@@ -21,12 +21,12 @@ public:
public:
PYPTopics(int num_topics, bool use_topic_pyp=false, unsigned long seed = 0,
- int max_threads = 1)
+ int max_threads = 1, int num_jobs = 1)
: m_num_topics(num_topics), m_word_pyps(1),
m_topic_pyp(0.5,1.0,seed), m_use_topic_pyp(use_topic_pyp),
m_seed(seed),
uni_dist(0,1), rng(seed == 0 ? (unsigned long)this : seed),
- rnd(rng, uni_dist), max_threads(max_threads) {}
+ rnd(rng, uni_dist), max_threads(max_threads), num_jobs(num_jobs) {}
void sample_corpus(const Corpus& corpus, int samples,
int freq_cutoff_start=0, int freq_cutoff_end=0,
@@ -81,15 +81,13 @@ private:
//call: rnd() generates uniform on [0,1)
typedef boost::function<F()> JobReturnsF;
- typedef SimpleWorker<JobReturnsF, F> SimpleResampleWorker;
- typedef boost::ptr_vector<SimpleResampleWorker> WorkerPtrVect;
- F hresample_docs(int num_threads, int thread_id);
+ F hresample_docs(int start, int end); //does i in [start, end)
-// F hresample_topics();
+ F hresample_topics();
int max_threads;
-
+ int num_jobs;
TermBackoffPtr m_backoff;
};
diff --git a/gi/pyp-topics/src/train-contexts.cc b/gi/pyp-topics/src/train-contexts.cc
index 455e7b1e..a673bf4e 100644
--- a/gi/pyp-topics/src/train-contexts.cc
+++ b/gi/pyp-topics/src/train-contexts.cc
@@ -54,6 +54,7 @@ int main(int argc, char **argv)
("freq-cutoff-end", value<int>()->default_value(0), "final frequency cutoff.")
("freq-cutoff-interval", value<int>()->default_value(0), "number of iterations between frequency decrement.")
("max-threads", value<int>()->default_value(1), "maximum number of simultaneous threads allowed")
+ ("num-jobs", value<int>()->default_value(1), "allows finer control over parallelization")
;
cmdline_specific.add(config_options);
@@ -77,10 +78,11 @@ int main(int argc, char **argv)
cerr << "Please specify a file containing the data." << endl;
return 1;
}
-
+ assert(vm["max-threads"].as<int>() > 0);
+ assert(vm["num-jobs"].as<int>() > -1);
// seed the random number generator: 0 = automatic, specify value otherwise
unsigned long seed = 0;
- PYPTopics model(vm["topics"].as<int>(), vm.count("hierarchical-topics"), seed, vm["max-threads"].as<int>());
+ PYPTopics model(vm["topics"].as<int>(), vm.count("hierarchical-topics"), seed, vm["max-threads"].as<int>(), vm["num-jobs"].as<int>());
// read the data
BackoffGenerator* backoff_gen=0;
diff --git a/gi/pyp-topics/src/workers.hh b/gi/pyp-topics/src/workers.hh
index 55424c8d..95b18947 100644
--- a/gi/pyp-topics/src/workers.hh
+++ b/gi/pyp-topics/src/workers.hh
@@ -1,18 +1,227 @@
+/**
+ Basic thread-pool tools using Boost.Thread.
+ (Jan Botha, 7/2010)
+
+ --Simple usage--
+ Use SimpleWorker.
+ Example, call a function that returns an int in a new thread:
+ typedef boost::function<int()> JobType;
+ JobType job = boost::bind(funcname);
+ //or boost::bind(&class::funcname, this) for a member function
+ SimpleWorker<JobType, int> worker(job);
+ int result = worker.getResult(); //blocks until result is ready
+
+ --Extended usage--
+ Use WorkerPool, which uses Queuemt (a synchronized queue) and Worker.
+ Example:
+ (same context and typedef
+ WorkerPool<JobType, int> pool(num_threads);
+ JobType job = ...
+ pool.addJob(job);
+ ...
+ pool.get_result(); //blocks until all workers are done, returns the some of their results.
+
+ Jobs added to a WorkerPool need to be the same type. A WorkerPool instance should not be reused (e.g. adding jobs) after calling get_result().
+*/
+
#ifndef WORKERS_HH
#define WORKERS_HH
-#include "timing.h"
-
#include <iostream>
#include <boost/bind.hpp>
#include <boost/function.hpp>
+#include <queue>
+#include <boost/ptr_container/ptr_vector.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
+#include <boost/thread/shared_mutex.hpp>
#include <boost/thread/future.hpp>
+#include <boost/thread/condition.hpp>
+
+#include <boost/date_time/posix_time/posix_time_types.hpp>
+#include "timing.h"
+
+/** Implements a synchronized queue*/
+template<typename J>
+class Queuemt
+{
+
+public:
+ boost::condition_variable_any cond;
+ const bool& running;
+
+ Queuemt() { }
+ Queuemt(const bool& running) : running(running), maxsize(0), qsize(0)
+ {
+ }
+
+ ~Queuemt() {
+ }
+
+ J pop()
+ {
+ J job;
+ {
+ boost::unique_lock<boost::shared_mutex> qlock(q_mutex);
+ while (running && qsize == 0)
+ cond.wait(qlock);
+
+ if (qsize > 0)
+ {
+ job = q.front();
+ q.pop();
+ --qsize;
+ }
+ }
+ if (job)
+ cond.notify_one();
+ return job;
+
+ }
-//#include <boost/date_time/posix_time/posix_time_types.hpp>
+ void push(J job)
+ {
+ {
+ boost::unique_lock<boost::shared_mutex> lock(q_mutex);
+ q.push(job);
+ ++qsize;
+ }
+ if (qsize > maxsize)
+ maxsize = qsize;
+
+ cond.notify_one();
+ }
+ int getMaxsize()
+ {
+ return maxsize;
+ }
+ int size()
+ {
+ return qsize;
+ }
+
+private:
+ boost::shared_mutex q_mutex;
+ std::queue<J> q;
+ int maxsize;
+ volatile int qsize;
+};
+
+
+template<typename J, typename R>
+class Worker
+{
+typedef boost::packaged_task<R> PackagedTask;
+public:
+ Worker(Queuemt<J>& queue, int id, int num_workers) :
+ q(queue), tasktime(0.0), id(id), num_workers(num_workers)
+ {
+ PackagedTask task(boost::bind(&Worker<J, R>::run, this));
+ future = task.get_future();
+ boost::thread t(boost::move(task));
+ }
+
+ R run() //this is called upon thread creation
+ {
+ R wresult = 0;
+ while (isRunning())
+ {
+ J job = q.pop();
+
+ if (job)
+ {
+ timer.Reset();
+ wresult += job();
+ tasktime += timer.Elapsed();
+ }
+ }
+ return wresult;
+ }
+
+ R getResult()
+ {
+ if (!future.is_ready())
+ future.wait();
+ assert(future.is_ready());
+ return future.get();
+ }
+
+ double getTaskTime()
+ {
+ return tasktime;
+ }
+
+private:
+
+ Queuemt<J>& q;
+
+ boost::unique_future<R> future;
+
+ bool isRunning()
+ {
+ return q.running || q.size() > 0;
+ }
+
+ Timer timer;
+ double tasktime;
+ int id;
+ int num_workers;
+};
+
+template<typename J, typename R>
+class WorkerPool
+{
+typedef boost::packaged_task<R> PackagedTask;
+typedef Worker<J,R> WJR;
+typedef boost::ptr_vector<WJR> WorkerVector;
+public:
+
+ WorkerPool(int num_workers)
+ {
+ q.reset(new Queuemt<J>(running));
+ running = true;
+ for (int i = 0; i < num_workers; ++i)
+ workers.push_back( new Worker<J, R>(*q, i, num_workers) );
+ }
+
+ ~WorkerPool()
+ {
+ }
+
+ R get_result()
+ {
+ running = false;
+ q->cond.notify_all();
+ R tmp = 0;
+ double tasktime = 0.0;
+ for (typename WorkerVector::iterator it = workers.begin(); it != workers.end(); it++)
+ {
+ R res = it->getResult();
+ tmp += res;
+ //std::cerr << "tasktime: " << it->getTaskTime() << std::endl;
+ tasktime += it->getTaskTime();
+ }
+// std::cerr << " maxQ = " << q->getMaxsize() << std::endl;
+ return tmp;
+ }
+
+ void addJob(J job)
+ {
+ q->push(job);
+ }
+
+private:
+
+ WorkerVector workers;
+
+ boost::shared_ptr<Queuemt<J> > q;
+
+ bool running;
+};
+
+///////////////////
template <typename J, typename R>
class SimpleWorker
{
@@ -33,6 +242,7 @@ public:
timer.Reset();
wresult = job();
tasktime = timer.Elapsed();
+ std::cerr << tasktime << " s" << std::endl;
return wresult;
}
@@ -60,4 +270,6 @@ private:
};
-#endif
+
+
+#endif