diff options
Diffstat (limited to 'gi/pyp-topics')
| -rw-r--r-- | gi/pyp-topics/src/pyp-topics.cc | 105 | ||||
| -rw-r--r-- | gi/pyp-topics/src/pyp-topics.hh | 12 | ||||
| -rw-r--r-- | gi/pyp-topics/src/train-contexts.cc | 6 | ||||
| -rw-r--r-- | gi/pyp-topics/src/workers.hh | 220 | 
4 files changed, 279 insertions, 64 deletions
| diff --git a/gi/pyp-topics/src/pyp-topics.cc b/gi/pyp-topics/src/pyp-topics.cc index 76f95b2a..e528a923 100644 --- a/gi/pyp-topics/src/pyp-topics.cc +++ b/gi/pyp-topics/src/pyp-topics.cc @@ -15,6 +15,7 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,    std::cerr << "\n Training with " << m_word_pyps.size()-1 << " backoff level"      << (m_word_pyps.size()==2 ? ":" : "s:") << std::endl; +    for (int i=0; i<(int)m_word_pyps.size(); ++i)    {      m_word_pyps.at(i).reserve(m_num_topics); @@ -76,6 +77,10 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,    for (int i = 0; i < corpus.num_documents(); ++i)  	  randomDocIndices[i] = i; +  if (num_jobs < max_threads) +    num_jobs = max_threads; +  int job_incr = (int) ( (float)m_document_pyps.size() / float(num_jobs) ); +    // Sampling phase    for (int curr_sample=0; curr_sample < samples; ++curr_sample) {      if (freq_cutoff_interval > 0 && curr_sample != 1 @@ -149,33 +154,38 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,      if (curr_sample != 0 && curr_sample % 10 == 0) {        std::cerr << " ||| time=" << (timer.Elapsed() / 10.0) << " sec/sample" << std::endl;        timer.Reset(); -      std::cerr << "     ... Resampling hyperparameters (" << max_threads << " threads)"; std::cerr.flush(); - +      std::cerr << "     ... Resampling hyperparameters ("; +              // resample the hyperparamters        F log_p=0.0; -      for (std::vector<PYPs>::iterator levelIt=m_word_pyps.begin(); -           levelIt != m_word_pyps.end(); ++levelIt) { -        for (PYPs::iterator pypIt=levelIt->begin(); -             pypIt != levelIt->end(); ++pypIt) { -          pypIt->resample_prior(); -          log_p += pypIt->log_restaurant_prob(); -        } -      } - -      WorkerPtrVect workers; -      for (int i = 0; i < max_threads; ++i) -      { -        JobReturnsF job = boost::bind(&PYPTopics::hresample_docs, this, max_threads, i); -        workers.push_back(new SimpleResampleWorker(job)); +      if (max_threads == 1) +      {  +        std::cerr << "1 thread)" << std::endl; std::cerr.flush(); +        log_p += hresample_topics(); +        log_p += hresample_docs(0, m_document_pyps.size());        } +      else +      { //parallelize +        std::cerr << max_threads << " threads, " << num_jobs << " jobs)" << std::endl; std::cerr.flush(); +         +        WorkerPool<JobReturnsF, F> pool(max_threads);  +        int i=0, sz = m_document_pyps.size(); +        //documents... +        while (i <= sz - 2*job_incr) +        {     +          JobReturnsF job = boost::bind(&PYPTopics::hresample_docs, this, i, i+job_incr); +          pool.addJob(job); +          i += job_incr; +        } +        //  do all remaining documents +        JobReturnsF job = boost::bind(&PYPTopics::hresample_docs, this, i,sz); +        pool.addJob(job); +         +        //topics... +        JobReturnsF topics_job = boost::bind(&PYPTopics::hresample_topics, this); +        pool.addJob(topics_job); -      WorkerPtrVect::iterator workerIt; -      for (workerIt = workers.begin(); workerIt != workers.end(); ++workerIt) -      { -        //std::cerr << "Retrieving worker result.."; std::cerr.flush(); -        F wresult = workerIt->getResult(); //blocks until worker done -        log_p += wresult; -        //std::cerr << ".. got " << wresult << std::endl; std::cerr.flush(); +        log_p += pool.get_result(); //blocks        } @@ -204,45 +214,38 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,    delete [] randomDocIndices;  } -PYPTopics::F PYPTopics::hresample_docs(int num_threads, int thread_id) +PYPTopics::F PYPTopics::hresample_docs(int start, int end)  {    int resample_counter=0;    F log_p = 0.0; -  PYPs::iterator pypIt = m_document_pyps.begin(); -  PYPs::iterator end = m_document_pyps.end(); -  pypIt += thread_id; -//  std::cerr << thread_id << " started " << std::endl; std::cerr.flush(); - -  while (pypIt < end) +  assert(start >= 0); +  assert(end >= 0); +  assert(start <= end); +  for (int i=start; i < end; ++i)    { -    pypIt->resample_prior(); -    log_p += pypIt->log_restaurant_prob(); +    m_document_pyps[i].resample_prior(); +    log_p += m_document_pyps[i].log_restaurant_prob();      if (resample_counter++ % 5000 == 0) {        std::cerr << "."; std::cerr.flush();      } -    pypIt += num_threads;    } -//  std::cerr << thread_id << " did " << resample_counter << " with answer " << log_p << std::endl; std::cerr.flush(); -    return log_p;  } -//PYPTopics::F PYPTopics::hresample_topics() -//{ -//  F log_p = 0.0; -//  for (std::vector<PYPs>::iterator levelIt=m_word_pyps.begin(); -//      levelIt != m_word_pyps.end(); ++levelIt) { -//    for (PYPs::iterator pypIt=levelIt->begin(); -//        pypIt != levelIt->end(); ++pypIt) { -// -//      pypIt->resample_prior(); -//      log_p += pypIt->log_restaurant_prob(); -//    } -//  } -//  //std::cerr << "topicworker has answer " << log_p << std::endl; std::cerr.flush(); -// -// return log_p; -//} +PYPTopics::F PYPTopics::hresample_topics() +{ +  F log_p = 0.0; +  for (std::vector<PYPs>::iterator levelIt=m_word_pyps.begin(); +      levelIt != m_word_pyps.end(); ++levelIt) { +    for (PYPs::iterator pypIt=levelIt->begin(); +        pypIt != levelIt->end(); ++pypIt) { + +      pypIt->resample_prior(); +      log_p += pypIt->log_restaurant_prob(); +    } +  } +  return log_p; +}  void PYPTopics::decrement(const Term& term, int topic, int level) {    //std::cerr << "PYPTopics::decrement(" << term << "," << topic << "," << level << ")" << std::endl; diff --git a/gi/pyp-topics/src/pyp-topics.hh b/gi/pyp-topics/src/pyp-topics.hh index 8097fe19..5e1fc6d6 100644 --- a/gi/pyp-topics/src/pyp-topics.hh +++ b/gi/pyp-topics/src/pyp-topics.hh @@ -21,12 +21,12 @@ public:  public:    PYPTopics(int num_topics, bool use_topic_pyp=false, unsigned long seed = 0, -        int max_threads = 1)  +        int max_threads = 1, int num_jobs = 1)       : m_num_topics(num_topics), m_word_pyps(1),       m_topic_pyp(0.5,1.0,seed), m_use_topic_pyp(use_topic_pyp),      m_seed(seed),      uni_dist(0,1), rng(seed == 0 ? (unsigned long)this : seed),  -    rnd(rng, uni_dist), max_threads(max_threads) {} +    rnd(rng, uni_dist), max_threads(max_threads), num_jobs(num_jobs) {}    void sample_corpus(const Corpus& corpus, int samples,                       int freq_cutoff_start=0, int freq_cutoff_end=0,  @@ -81,15 +81,13 @@ private:                  //call: rnd() generates uniform on [0,1)    typedef boost::function<F()> JobReturnsF; -  typedef SimpleWorker<JobReturnsF, F> SimpleResampleWorker; -  typedef boost::ptr_vector<SimpleResampleWorker> WorkerPtrVect; -  F hresample_docs(int num_threads, int thread_id); +  F hresample_docs(int start, int end); //does i in [start, end) -//  F hresample_topics(); +  F hresample_topics();    int max_threads; - +  int num_jobs;    TermBackoffPtr m_backoff;  }; diff --git a/gi/pyp-topics/src/train-contexts.cc b/gi/pyp-topics/src/train-contexts.cc index 455e7b1e..a673bf4e 100644 --- a/gi/pyp-topics/src/train-contexts.cc +++ b/gi/pyp-topics/src/train-contexts.cc @@ -54,6 +54,7 @@ int main(int argc, char **argv)        ("freq-cutoff-end", value<int>()->default_value(0), "final frequency cutoff.")        ("freq-cutoff-interval", value<int>()->default_value(0), "number of iterations between frequency decrement.")        ("max-threads", value<int>()->default_value(1), "maximum number of simultaneous threads allowed") +      ("num-jobs", value<int>()->default_value(1), "allows finer control over parallelization")        ;      cmdline_specific.add(config_options); @@ -77,10 +78,11 @@ int main(int argc, char **argv)      cerr << "Please specify a file containing the data." << endl;      return 1;    } - +  assert(vm["max-threads"].as<int>() > 0); +  assert(vm["num-jobs"].as<int>() > -1);    // seed the random number generator: 0 = automatic, specify value otherwise    unsigned long seed = 0;  -  PYPTopics model(vm["topics"].as<int>(), vm.count("hierarchical-topics"), seed, vm["max-threads"].as<int>()); +  PYPTopics model(vm["topics"].as<int>(), vm.count("hierarchical-topics"), seed, vm["max-threads"].as<int>(), vm["num-jobs"].as<int>());    // read the data    BackoffGenerator* backoff_gen=0; diff --git a/gi/pyp-topics/src/workers.hh b/gi/pyp-topics/src/workers.hh index 55424c8d..95b18947 100644 --- a/gi/pyp-topics/src/workers.hh +++ b/gi/pyp-topics/src/workers.hh @@ -1,18 +1,227 @@ +/** +  Basic thread-pool tools using Boost.Thread. +  (Jan Botha, 7/2010) + +  --Simple usage-- +  Use SimpleWorker. +    Example, call a function that returns an int in a new thread: +    typedef boost::function<int()> JobType; +    JobType job = boost::bind(funcname); +      //or boost::bind(&class::funcname, this) for a member function +    SimpleWorker<JobType, int> worker(job); +    int result = worker.getResult(); //blocks until result is ready + +  --Extended usage-- +  Use WorkerPool, which uses Queuemt (a synchronized queue) and Worker. +  Example: +    (same context and typedef +    WorkerPool<JobType, int> pool(num_threads); +    JobType job = ... +    pool.addJob(job); +    ... +    pool.get_result(); //blocks until all workers are done, returns the some of their results.   +     +    Jobs added to a WorkerPool need to be the same type. A WorkerPool instance should not be reused (e.g. adding jobs) after calling get_result().  +*/ +  #ifndef WORKERS_HH  #define WORKERS_HH -#include "timing.h" -  #include <iostream>  #include <boost/bind.hpp>  #include <boost/function.hpp> +#include <queue> +#include <boost/ptr_container/ptr_vector.hpp>  #include <boost/thread/thread.hpp>  #include <boost/thread/mutex.hpp> +#include <boost/thread/shared_mutex.hpp>  #include <boost/thread/future.hpp> +#include <boost/thread/condition.hpp> + +#include <boost/date_time/posix_time/posix_time_types.hpp> +#include "timing.h" + +/** Implements a synchronized queue*/ +template<typename J> +class Queuemt +{ + +public: +    boost::condition_variable_any cond; +    const bool& running; + +    Queuemt() { } +    Queuemt(const bool& running) : running(running), maxsize(0), qsize(0)  +    {  +    } + +    ~Queuemt() {  +     } + +    J pop() +    { +        J job; +        { +            boost::unique_lock<boost::shared_mutex> qlock(q_mutex); +            while (running && qsize == 0) +                cond.wait(qlock); + +            if (qsize > 0) +            { +                job = q.front(); +                q.pop(); +                --qsize;       +            } +        } +        if (job) +            cond.notify_one(); +        return job; + +    } -//#include <boost/date_time/posix_time/posix_time_types.hpp> +    void push(J job) +    { +        { +            boost::unique_lock<boost::shared_mutex> lock(q_mutex); +            q.push(job); +            ++qsize; +        } +        if (qsize > maxsize) +            maxsize = qsize; +         +        cond.notify_one(); +    } +    int getMaxsize() +    { +        return maxsize; +    } +    int size() +    { +        return qsize; +    } + +private: +    boost::shared_mutex q_mutex; +    std::queue<J> q; +    int maxsize; +    volatile int qsize; +}; + + +template<typename J, typename R> +class Worker +{ +typedef boost::packaged_task<R> PackagedTask; +public: +    Worker(Queuemt<J>& queue, int id, int num_workers) :   +      q(queue), tasktime(0.0), id(id), num_workers(num_workers) +    { +        PackagedTask task(boost::bind(&Worker<J, R>::run, this)); +        future = task.get_future(); +        boost::thread t(boost::move(task)); +    } + +    R run() //this is called upon thread creation +    { +        R wresult = 0; +        while (isRunning()) +        { +            J job = q.pop(); + +            if (job) +            { +                timer.Reset(); +                wresult += job(); +                tasktime += timer.Elapsed(); +            } +        } +        return wresult; +    } + +    R getResult() +    { +        if (!future.is_ready()) +            future.wait(); +        assert(future.is_ready()); +        return future.get(); +    } + +    double getTaskTime() +    { +        return tasktime; +    } + +private: + +    Queuemt<J>& q; + +    boost::unique_future<R> future; + +    bool isRunning() +    { +        return q.running || q.size() > 0; +    } +     +    Timer timer; +    double tasktime; +    int id; +    int num_workers; +}; + +template<typename J, typename R> +class WorkerPool +{ +typedef boost::packaged_task<R> PackagedTask; +typedef Worker<J,R> WJR; +typedef boost::ptr_vector<WJR> WorkerVector; +public: + +    WorkerPool(int num_workers) +    { +        q.reset(new Queuemt<J>(running)); +        running = true; +        for (int i = 0; i < num_workers; ++i) +            workers.push_back( new Worker<J, R>(*q, i, num_workers) ); +    } + +    ~WorkerPool() +    { +    } + +    R get_result() +    { +        running = false; +        q->cond.notify_all(); +        R tmp = 0; +        double tasktime = 0.0; +        for (typename WorkerVector::iterator it = workers.begin(); it != workers.end(); it++) +        { +            R res = it->getResult(); +            tmp += res; +            //std::cerr << "tasktime: " << it->getTaskTime() << std::endl;  +            tasktime += it->getTaskTime(); +        } +//        std::cerr << " maxQ = " << q->getMaxsize() << std::endl; +        return tmp; +    } + +    void addJob(J job) +    { +        q->push(job); +    } + +private: + +    WorkerVector workers; + +    boost::shared_ptr<Queuemt<J> > q; + +    bool running; +}; + +///////////////////  template <typename J, typename R>  class SimpleWorker  { @@ -33,6 +242,7 @@ public:          timer.Reset();          wresult = job();          tasktime = timer.Elapsed(); +        std::cerr << tasktime << " s" << std::endl;           return wresult;      } @@ -60,4 +270,6 @@ private:  }; -#endif + + +#endif  | 
