diff options
| -rw-r--r-- | gi/pyp-topics/src/Makefile.am | 4 | ||||
| -rw-r--r-- | gi/pyp-topics/src/pyp-topics.cc | 99 | ||||
| -rw-r--r-- | gi/pyp-topics/src/pyp-topics.hh | 17 | ||||
| -rw-r--r-- | gi/pyp-topics/src/timing.h | 31 | ||||
| -rw-r--r-- | gi/pyp-topics/src/train-contexts.cc | 3 | ||||
| -rw-r--r-- | gi/pyp-topics/src/workers.hh | 62 | 
6 files changed, 172 insertions, 44 deletions
| diff --git a/gi/pyp-topics/src/Makefile.am b/gi/pyp-topics/src/Makefile.am index 681a9a0c..abfc95ac 100644 --- a/gi/pyp-topics/src/Makefile.am +++ b/gi/pyp-topics/src/Makefile.am @@ -3,10 +3,10 @@ bin_PROGRAMS = pyp-topics-train pyp-contexts-train  contexts_lexer.cc: contexts_lexer.l  	$(LEX) -s -CF -8 -o$@ $< -pyp_topics_train_SOURCES = corpus.cc gzstream.cc mt19937ar.c pyp-topics.cc train.cc contexts_lexer.cc contexts_corpus.cc +pyp_topics_train_SOURCES = corpus.cc gzstream.cc pyp-topics.cc train.cc contexts_lexer.cc contexts_corpus.cc  pyp_topics_train_LDADD = $(top_srcdir)/decoder/libcdec.a -lz -pyp_contexts_train_SOURCES = corpus.cc gzstream.cc mt19937ar.c pyp-topics.cc contexts_lexer.cc contexts_corpus.cc train-contexts.cc +pyp_contexts_train_SOURCES = corpus.cc gzstream.cc pyp-topics.cc contexts_lexer.cc contexts_corpus.cc train-contexts.cc  pyp_contexts_train_LDADD = $(top_srcdir)/decoder/libcdec.a -lz  AM_CPPFLAGS = -W -Wall -Wno-sign-compare -funroll-loops diff --git a/gi/pyp-topics/src/pyp-topics.cc b/gi/pyp-topics/src/pyp-topics.cc index 2b96816e..48ccf507 100644 --- a/gi/pyp-topics/src/pyp-topics.cc +++ b/gi/pyp-topics/src/pyp-topics.cc @@ -1,33 +1,7 @@ -#ifdef __CYGWIN__ -# ifndef _POSIX_MONOTONIC_CLOCK -#  define _POSIX_MONOTONIC_CLOCK -# endif -#endif -  #include "pyp-topics.hh" +#include "timing.h" -#include <boost/date_time/posix_time/posix_time_types.hpp> -#include <time.h> -#include <sys/time.h> -#include "clock_gettime_stub.c" - -struct Timer { -  Timer() { Reset(); } -  void Reset() -  { -    clock_gettime(CLOCK_MONOTONIC, &start_t); -  } -  double Elapsed() const { -    timespec end_t; -    clock_gettime(CLOCK_MONOTONIC, &end_t); -    const double elapsed = (end_t.tv_sec - start_t.tv_sec) -                + (end_t.tv_nsec - start_t.tv_nsec) / 1000000000.0; -    return elapsed; -  } - private: -  timespec start_t; -}; - +//#include <boost/date_time/posix_time/posix_time_types.hpp>  void PYPTopics::sample_corpus(const Corpus& corpus, int samples,                                 int freq_cutoff_start, int freq_cutoff_end,                                 int freq_cutoff_interval) { @@ -175,9 +149,10 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,      if (curr_sample != 0 && curr_sample % 10 == 0) {        std::cerr << " ||| time=" << (timer.Elapsed() / 10.0) << " sec/sample" << std::endl;        timer.Reset(); -      std::cerr << "     ... Resampling hyperparameters "; std::cerr.flush(); +      std::cerr << "     ... Resampling hyperparameters (" << max_threads << " threads)"; std::cerr.flush(); +        // resample the hyperparamters -      F log_p=0.0; int resample_counter=0; +      F log_p=0.0;        for (std::vector<PYPs>::iterator levelIt=m_word_pyps.begin();             levelIt != m_word_pyps.end(); ++levelIt) {          for (PYPs::iterator pypIt=levelIt->begin(); @@ -187,15 +162,23 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,          }        } -      resample_counter=0; -      for (PYPs::iterator pypIt=m_document_pyps.begin(); -           pypIt != m_document_pyps.end(); ++pypIt, ++resample_counter) { -        pypIt->resample_prior(); -        log_p += pypIt->log_restaurant_prob(); -        if (resample_counter++ % 10000 == 0) { -          std::cerr << "."; std::cerr.flush(); -        } +      WorkerPtrVect workers; +      for (int i = 0; i < max_threads; ++i) +      { +        JobReturnsF job = boost::bind(&PYPTopics::hresample_docs, this, max_threads, i); +        workers.push_back(new SimpleResampleWorker(job));        } +       +      WorkerPtrVect::iterator workerIt;  +      for (workerIt = workers.begin(); workerIt != workers.end(); ++workerIt) +      {  +        //std::cerr << "Retrieving worker result.."; std::cerr.flush(); +        F wresult = workerIt->getResult(); //blocks until worker done +        log_p += wresult;  +        //std::cerr << ".. got " << wresult << std::endl; std::cerr.flush(); +         +      } +        if (m_use_topic_pyp) {          m_topic_pyp.resample_prior();          log_p += m_topic_pyp.log_restaurant_prob(); @@ -221,6 +204,46 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,    delete [] randomDocIndices;  } +PYPTopics::F PYPTopics::hresample_docs(int num_threads, int thread_id) +{ +  int resample_counter=0; +  F log_p = 0.0; +  PYPs::iterator pypIt = m_document_pyps.begin(); +  PYPs::iterator end = m_document_pyps.end(); +  pypIt += thread_id;   +//  std::cerr << thread_id << " started " << std::endl; std::cerr.flush(); +   +  while (pypIt < end) +  { +    pypIt->resample_prior(); +    log_p += pypIt->log_restaurant_prob(); +    if (resample_counter++ % 5000 == 0) { +      std::cerr << "."; std::cerr.flush(); +    } +    pypIt += num_threads; +  } +//  std::cerr << thread_id << " did " << resample_counter << " with answer " << log_p << std::endl; std::cerr.flush(); +   +  return log_p; +} + +//PYPTopics::F PYPTopics::hresample_topics() +//{ +//  F log_p = 0.0; +//  for (std::vector<PYPs>::iterator levelIt=m_word_pyps.begin(); +//      levelIt != m_word_pyps.end(); ++levelIt) { +//    for (PYPs::iterator pypIt=levelIt->begin(); +//        pypIt != levelIt->end(); ++pypIt) { +// +//      pypIt->resample_prior(); +//      log_p += pypIt->log_restaurant_prob(); +//    } +//  } +//  //std::cerr << "topicworker has answer " << log_p << std::endl; std::cerr.flush(); +//   +// return log_p;  +//} +  void PYPTopics::decrement(const Term& term, int topic, int level) {    //std::cerr << "PYPTopics::decrement(" << term << "," << topic << "," << level << ")" << std::endl;    m_word_pyps.at(level).at(topic).decrement(term); diff --git a/gi/pyp-topics/src/pyp-topics.hh b/gi/pyp-topics/src/pyp-topics.hh index 9da49267..8097fe19 100644 --- a/gi/pyp-topics/src/pyp-topics.hh +++ b/gi/pyp-topics/src/pyp-topics.hh @@ -11,7 +11,7 @@  #include "pyp.hh"  #include "corpus.hh" - +#include "workers.hh"  class PYPTopics {  public: @@ -20,12 +20,13 @@ public:    typedef double F;  public: -  PYPTopics(int num_topics, bool use_topic_pyp=false, unsigned long seed = 0)  +  PYPTopics(int num_topics, bool use_topic_pyp=false, unsigned long seed = 0, +        int max_threads = 1)       : m_num_topics(num_topics), m_word_pyps(1),       m_topic_pyp(0.5,1.0,seed), m_use_topic_pyp(use_topic_pyp),      m_seed(seed),      uni_dist(0,1), rng(seed == 0 ? (unsigned long)this : seed),  -    rnd(rng, uni_dist) {} +    rnd(rng, uni_dist), max_threads(max_threads) {}    void sample_corpus(const Corpus& corpus, int samples,                       int freq_cutoff_start=0, int freq_cutoff_end=0,  @@ -79,6 +80,16 @@ private:    gen_type rnd; //instantiate: rnd(rng, uni_dist)                  //call: rnd() generates uniform on [0,1) +  typedef boost::function<F()> JobReturnsF; +  typedef SimpleWorker<JobReturnsF, F> SimpleResampleWorker; +  typedef boost::ptr_vector<SimpleResampleWorker> WorkerPtrVect; + +  F hresample_docs(int num_threads, int thread_id); + +//  F hresample_topics(); +   +  int max_threads; +    TermBackoffPtr m_backoff;  }; diff --git a/gi/pyp-topics/src/timing.h b/gi/pyp-topics/src/timing.h new file mode 100644 index 00000000..7543295c --- /dev/null +++ b/gi/pyp-topics/src/timing.h @@ -0,0 +1,31 @@ +#ifndef TIMING_H +#define TIMING_H + +#ifdef __CYGWIN__ +# ifndef _POSIX_MONOTONIC_CLOCK +#  define _POSIX_MONOTONIC_CLOCK +# endif +#endif + +#include <time.h> +#include <sys/time.h> +#include "clock_gettime_stub.c" + +struct Timer { +  Timer() { Reset(); } +  void Reset() +  { +    clock_gettime(CLOCK_MONOTONIC, &start_t); +  } +  double Elapsed() const { +    timespec end_t; +    clock_gettime(CLOCK_MONOTONIC, &end_t); +    const double elapsed = (end_t.tv_sec - start_t.tv_sec) +                + (end_t.tv_nsec - start_t.tv_nsec) / 1000000000.0; +    return elapsed; +  } + private: +  timespec start_t; +}; + +#endif diff --git a/gi/pyp-topics/src/train-contexts.cc b/gi/pyp-topics/src/train-contexts.cc index 8a0c8949..455e7b1e 100644 --- a/gi/pyp-topics/src/train-contexts.cc +++ b/gi/pyp-topics/src/train-contexts.cc @@ -53,6 +53,7 @@ int main(int argc, char **argv)        ("freq-cutoff-start", value<int>()->default_value(0), "initial frequency cutoff.")        ("freq-cutoff-end", value<int>()->default_value(0), "final frequency cutoff.")        ("freq-cutoff-interval", value<int>()->default_value(0), "number of iterations between frequency decrement.") +      ("max-threads", value<int>()->default_value(1), "maximum number of simultaneous threads allowed")        ;      cmdline_specific.add(config_options); @@ -79,7 +80,7 @@ int main(int argc, char **argv)    // seed the random number generator: 0 = automatic, specify value otherwise    unsigned long seed = 0;  -  PYPTopics model(vm["topics"].as<int>(), vm.count("hierarchical-topics"), seed); +  PYPTopics model(vm["topics"].as<int>(), vm.count("hierarchical-topics"), seed, vm["max-threads"].as<int>());    // read the data    BackoffGenerator* backoff_gen=0; diff --git a/gi/pyp-topics/src/workers.hh b/gi/pyp-topics/src/workers.hh new file mode 100644 index 00000000..1f496acf --- /dev/null +++ b/gi/pyp-topics/src/workers.hh @@ -0,0 +1,62 @@ +#ifndef WORKERS_HH +#define WORKERS_HH + +#include <iostream> +#include <boost/bind.hpp> +#include <boost/function.hpp> +#include <boost/thread/thread.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/future.hpp> + +//#include <boost/date_time/posix_time/posix_time_types.hpp> + +#include "timing.h" + +template <typename J, typename R> +class SimpleWorker +{ +typedef boost::packaged_task<R> PackagedTask; +public: +    SimpleWorker(J& job) : job(job), tasktime(0.0) +    { +        PackagedTask task(boost::bind(&SimpleWorker<J, R>::run, this)); +        future = task.get_future(); +        boost::thread t(boost::move(task)); +    } + +    R run() //this is called upon thread creation +    { +        R wresult = 0; +     +        assert(job); +        timer.Reset(); +        wresult = job(); +        tasktime = timer.Elapsed(); +        return wresult; +    } + +    R getResult() +    { +        if (!future.is_ready()) +            future.wait(); +        assert(future.is_ready()); +        return future.get(); +    } + +    double getTaskTime() +    { +        return tasktime; +    } + +private: + +    J job; + +    boost::unique_future<R> future; + +    Timer timer; +    double tasktime; + +}; + +#endif | 
