summaryrefslogtreecommitdiff
path: root/gi/pyp-topics/src
diff options
context:
space:
mode:
authorbothameister <bothameister@ec762483-ff6d-05da-a07a-a48fb63a330f>2010-07-06 22:38:21 +0000
committerbothameister <bothameister@ec762483-ff6d-05da-a07a-a48fb63a330f>2010-07-06 22:38:21 +0000
commitcbb77ac458cb031890953035df5e53ea0fcb2031 (patch)
tree2c70939eb92a7c02153b3e8edd85199b3fa0b77f /gi/pyp-topics/src
parentccfe14cad31d75bb606485b247fe583f2310e76c (diff)
Added simple multi-threading during hyperparameter resampling. Added cmdarg for controlling number of threads. Moved Timer to its own header file. Cleaned up Makefile.am
git-svn-id: https://ws10smt.googlecode.com/svn/trunk@170 ec762483-ff6d-05da-a07a-a48fb63a330f
Diffstat (limited to 'gi/pyp-topics/src')
-rw-r--r--gi/pyp-topics/src/Makefile.am4
-rw-r--r--gi/pyp-topics/src/pyp-topics.cc99
-rw-r--r--gi/pyp-topics/src/pyp-topics.hh17
-rw-r--r--gi/pyp-topics/src/timing.h31
-rw-r--r--gi/pyp-topics/src/train-contexts.cc3
-rw-r--r--gi/pyp-topics/src/workers.hh62
6 files changed, 172 insertions, 44 deletions
diff --git a/gi/pyp-topics/src/Makefile.am b/gi/pyp-topics/src/Makefile.am
index 681a9a0c..abfc95ac 100644
--- a/gi/pyp-topics/src/Makefile.am
+++ b/gi/pyp-topics/src/Makefile.am
@@ -3,10 +3,10 @@ bin_PROGRAMS = pyp-topics-train pyp-contexts-train
contexts_lexer.cc: contexts_lexer.l
$(LEX) -s -CF -8 -o$@ $<
-pyp_topics_train_SOURCES = corpus.cc gzstream.cc mt19937ar.c pyp-topics.cc train.cc contexts_lexer.cc contexts_corpus.cc
+pyp_topics_train_SOURCES = corpus.cc gzstream.cc pyp-topics.cc train.cc contexts_lexer.cc contexts_corpus.cc
pyp_topics_train_LDADD = $(top_srcdir)/decoder/libcdec.a -lz
-pyp_contexts_train_SOURCES = corpus.cc gzstream.cc mt19937ar.c pyp-topics.cc contexts_lexer.cc contexts_corpus.cc train-contexts.cc
+pyp_contexts_train_SOURCES = corpus.cc gzstream.cc pyp-topics.cc contexts_lexer.cc contexts_corpus.cc train-contexts.cc
pyp_contexts_train_LDADD = $(top_srcdir)/decoder/libcdec.a -lz
AM_CPPFLAGS = -W -Wall -Wno-sign-compare -funroll-loops
diff --git a/gi/pyp-topics/src/pyp-topics.cc b/gi/pyp-topics/src/pyp-topics.cc
index 2b96816e..48ccf507 100644
--- a/gi/pyp-topics/src/pyp-topics.cc
+++ b/gi/pyp-topics/src/pyp-topics.cc
@@ -1,33 +1,7 @@
-#ifdef __CYGWIN__
-# ifndef _POSIX_MONOTONIC_CLOCK
-# define _POSIX_MONOTONIC_CLOCK
-# endif
-#endif
-
#include "pyp-topics.hh"
+#include "timing.h"
-#include <boost/date_time/posix_time/posix_time_types.hpp>
-#include <time.h>
-#include <sys/time.h>
-#include "clock_gettime_stub.c"
-
-struct Timer {
- Timer() { Reset(); }
- void Reset()
- {
- clock_gettime(CLOCK_MONOTONIC, &start_t);
- }
- double Elapsed() const {
- timespec end_t;
- clock_gettime(CLOCK_MONOTONIC, &end_t);
- const double elapsed = (end_t.tv_sec - start_t.tv_sec)
- + (end_t.tv_nsec - start_t.tv_nsec) / 1000000000.0;
- return elapsed;
- }
- private:
- timespec start_t;
-};
-
+//#include <boost/date_time/posix_time/posix_time_types.hpp>
void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
int freq_cutoff_start, int freq_cutoff_end,
int freq_cutoff_interval) {
@@ -175,9 +149,10 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
if (curr_sample != 0 && curr_sample % 10 == 0) {
std::cerr << " ||| time=" << (timer.Elapsed() / 10.0) << " sec/sample" << std::endl;
timer.Reset();
- std::cerr << " ... Resampling hyperparameters "; std::cerr.flush();
+ std::cerr << " ... Resampling hyperparameters (" << max_threads << " threads)"; std::cerr.flush();
+
// resample the hyperparamters
- F log_p=0.0; int resample_counter=0;
+ F log_p=0.0;
for (std::vector<PYPs>::iterator levelIt=m_word_pyps.begin();
levelIt != m_word_pyps.end(); ++levelIt) {
for (PYPs::iterator pypIt=levelIt->begin();
@@ -187,15 +162,23 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
}
}
- resample_counter=0;
- for (PYPs::iterator pypIt=m_document_pyps.begin();
- pypIt != m_document_pyps.end(); ++pypIt, ++resample_counter) {
- pypIt->resample_prior();
- log_p += pypIt->log_restaurant_prob();
- if (resample_counter++ % 10000 == 0) {
- std::cerr << "."; std::cerr.flush();
- }
+ WorkerPtrVect workers;
+ for (int i = 0; i < max_threads; ++i)
+ {
+ JobReturnsF job = boost::bind(&PYPTopics::hresample_docs, this, max_threads, i);
+ workers.push_back(new SimpleResampleWorker(job));
}
+
+ WorkerPtrVect::iterator workerIt;
+ for (workerIt = workers.begin(); workerIt != workers.end(); ++workerIt)
+ {
+ //std::cerr << "Retrieving worker result.."; std::cerr.flush();
+ F wresult = workerIt->getResult(); //blocks until worker done
+ log_p += wresult;
+ //std::cerr << ".. got " << wresult << std::endl; std::cerr.flush();
+
+ }
+
if (m_use_topic_pyp) {
m_topic_pyp.resample_prior();
log_p += m_topic_pyp.log_restaurant_prob();
@@ -221,6 +204,46 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
delete [] randomDocIndices;
}
+PYPTopics::F PYPTopics::hresample_docs(int num_threads, int thread_id)
+{
+ int resample_counter=0;
+ F log_p = 0.0;
+ PYPs::iterator pypIt = m_document_pyps.begin();
+ PYPs::iterator end = m_document_pyps.end();
+ pypIt += thread_id;
+// std::cerr << thread_id << " started " << std::endl; std::cerr.flush();
+
+ while (pypIt < end)
+ {
+ pypIt->resample_prior();
+ log_p += pypIt->log_restaurant_prob();
+ if (resample_counter++ % 5000 == 0) {
+ std::cerr << "."; std::cerr.flush();
+ }
+ pypIt += num_threads;
+ }
+// std::cerr << thread_id << " did " << resample_counter << " with answer " << log_p << std::endl; std::cerr.flush();
+
+ return log_p;
+}
+
+//PYPTopics::F PYPTopics::hresample_topics()
+//{
+// F log_p = 0.0;
+// for (std::vector<PYPs>::iterator levelIt=m_word_pyps.begin();
+// levelIt != m_word_pyps.end(); ++levelIt) {
+// for (PYPs::iterator pypIt=levelIt->begin();
+// pypIt != levelIt->end(); ++pypIt) {
+//
+// pypIt->resample_prior();
+// log_p += pypIt->log_restaurant_prob();
+// }
+// }
+// //std::cerr << "topicworker has answer " << log_p << std::endl; std::cerr.flush();
+//
+// return log_p;
+//}
+
void PYPTopics::decrement(const Term& term, int topic, int level) {
//std::cerr << "PYPTopics::decrement(" << term << "," << topic << "," << level << ")" << std::endl;
m_word_pyps.at(level).at(topic).decrement(term);
diff --git a/gi/pyp-topics/src/pyp-topics.hh b/gi/pyp-topics/src/pyp-topics.hh
index 9da49267..8097fe19 100644
--- a/gi/pyp-topics/src/pyp-topics.hh
+++ b/gi/pyp-topics/src/pyp-topics.hh
@@ -11,7 +11,7 @@
#include "pyp.hh"
#include "corpus.hh"
-
+#include "workers.hh"
class PYPTopics {
public:
@@ -20,12 +20,13 @@ public:
typedef double F;
public:
- PYPTopics(int num_topics, bool use_topic_pyp=false, unsigned long seed = 0)
+ PYPTopics(int num_topics, bool use_topic_pyp=false, unsigned long seed = 0,
+ int max_threads = 1)
: m_num_topics(num_topics), m_word_pyps(1),
m_topic_pyp(0.5,1.0,seed), m_use_topic_pyp(use_topic_pyp),
m_seed(seed),
uni_dist(0,1), rng(seed == 0 ? (unsigned long)this : seed),
- rnd(rng, uni_dist) {}
+ rnd(rng, uni_dist), max_threads(max_threads) {}
void sample_corpus(const Corpus& corpus, int samples,
int freq_cutoff_start=0, int freq_cutoff_end=0,
@@ -79,6 +80,16 @@ private:
gen_type rnd; //instantiate: rnd(rng, uni_dist)
//call: rnd() generates uniform on [0,1)
+ typedef boost::function<F()> JobReturnsF;
+ typedef SimpleWorker<JobReturnsF, F> SimpleResampleWorker;
+ typedef boost::ptr_vector<SimpleResampleWorker> WorkerPtrVect;
+
+ F hresample_docs(int num_threads, int thread_id);
+
+// F hresample_topics();
+
+ int max_threads;
+
TermBackoffPtr m_backoff;
};
diff --git a/gi/pyp-topics/src/timing.h b/gi/pyp-topics/src/timing.h
new file mode 100644
index 00000000..7543295c
--- /dev/null
+++ b/gi/pyp-topics/src/timing.h
@@ -0,0 +1,31 @@
+#ifndef TIMING_H
+#define TIMING_H
+
+#ifdef __CYGWIN__
+# ifndef _POSIX_MONOTONIC_CLOCK
+# define _POSIX_MONOTONIC_CLOCK
+# endif
+#endif
+
+#include <time.h>
+#include <sys/time.h>
+#include "clock_gettime_stub.c"
+
+struct Timer {
+ Timer() { Reset(); }
+ void Reset()
+ {
+ clock_gettime(CLOCK_MONOTONIC, &start_t);
+ }
+ double Elapsed() const {
+ timespec end_t;
+ clock_gettime(CLOCK_MONOTONIC, &end_t);
+ const double elapsed = (end_t.tv_sec - start_t.tv_sec)
+ + (end_t.tv_nsec - start_t.tv_nsec) / 1000000000.0;
+ return elapsed;
+ }
+ private:
+ timespec start_t;
+};
+
+#endif
diff --git a/gi/pyp-topics/src/train-contexts.cc b/gi/pyp-topics/src/train-contexts.cc
index 8a0c8949..455e7b1e 100644
--- a/gi/pyp-topics/src/train-contexts.cc
+++ b/gi/pyp-topics/src/train-contexts.cc
@@ -53,6 +53,7 @@ int main(int argc, char **argv)
("freq-cutoff-start", value<int>()->default_value(0), "initial frequency cutoff.")
("freq-cutoff-end", value<int>()->default_value(0), "final frequency cutoff.")
("freq-cutoff-interval", value<int>()->default_value(0), "number of iterations between frequency decrement.")
+ ("max-threads", value<int>()->default_value(1), "maximum number of simultaneous threads allowed")
;
cmdline_specific.add(config_options);
@@ -79,7 +80,7 @@ int main(int argc, char **argv)
// seed the random number generator: 0 = automatic, specify value otherwise
unsigned long seed = 0;
- PYPTopics model(vm["topics"].as<int>(), vm.count("hierarchical-topics"), seed);
+ PYPTopics model(vm["topics"].as<int>(), vm.count("hierarchical-topics"), seed, vm["max-threads"].as<int>());
// read the data
BackoffGenerator* backoff_gen=0;
diff --git a/gi/pyp-topics/src/workers.hh b/gi/pyp-topics/src/workers.hh
new file mode 100644
index 00000000..1f496acf
--- /dev/null
+++ b/gi/pyp-topics/src/workers.hh
@@ -0,0 +1,62 @@
+#ifndef WORKERS_HH
+#define WORKERS_HH
+
+#include <iostream>
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/future.hpp>
+
+//#include <boost/date_time/posix_time/posix_time_types.hpp>
+
+#include "timing.h"
+
+template <typename J, typename R>
+class SimpleWorker
+{
+typedef boost::packaged_task<R> PackagedTask;
+public:
+ SimpleWorker(J& job) : job(job), tasktime(0.0)
+ {
+ PackagedTask task(boost::bind(&SimpleWorker<J, R>::run, this));
+ future = task.get_future();
+ boost::thread t(boost::move(task));
+ }
+
+ R run() //this is called upon thread creation
+ {
+ R wresult = 0;
+
+ assert(job);
+ timer.Reset();
+ wresult = job();
+ tasktime = timer.Elapsed();
+ return wresult;
+ }
+
+ R getResult()
+ {
+ if (!future.is_ready())
+ future.wait();
+ assert(future.is_ready());
+ return future.get();
+ }
+
+ double getTaskTime()
+ {
+ return tasktime;
+ }
+
+private:
+
+ J job;
+
+ boost::unique_future<R> future;
+
+ Timer timer;
+ double tasktime;
+
+};
+
+#endif