summaryrefslogtreecommitdiff
path: root/gi/pyp-topics/src/workers.hh
diff options
context:
space:
mode:
authorbothameister <bothameister@ec762483-ff6d-05da-a07a-a48fb63a330f>2010-07-13 03:33:36 +0000
committerbothameister <bothameister@ec762483-ff6d-05da-a07a-a48fb63a330f>2010-07-13 03:33:36 +0000
commit84d7b76d0896f9be3a51df3b49d557df0dd3781a (patch)
treed261f70f2af11ae753389ee9f7af532c7064b369 /gi/pyp-topics/src/workers.hh
parentbded9a46cb3a27b8049f74e9948be783ae6ec42a (diff)
added queue mechanism to parallelization of hyperparam resampling; new program argument 'num_jobs' to control granularity.
git-svn-id: https://ws10smt.googlecode.com/svn/trunk@232 ec762483-ff6d-05da-a07a-a48fb63a330f
Diffstat (limited to 'gi/pyp-topics/src/workers.hh')
-rw-r--r--gi/pyp-topics/src/workers.hh220
1 files changed, 216 insertions, 4 deletions
diff --git a/gi/pyp-topics/src/workers.hh b/gi/pyp-topics/src/workers.hh
index 55424c8d..95b18947 100644
--- a/gi/pyp-topics/src/workers.hh
+++ b/gi/pyp-topics/src/workers.hh
@@ -1,18 +1,227 @@
+/**
+ Basic thread-pool tools using Boost.Thread.
+ (Jan Botha, 7/2010)
+
+ --Simple usage--
+ Use SimpleWorker.
+ Example, call a function that returns an int in a new thread:
+ typedef boost::function<int()> JobType;
+ JobType job = boost::bind(funcname);
+ //or boost::bind(&class::funcname, this) for a member function
+ SimpleWorker<JobType, int> worker(job);
+ int result = worker.getResult(); //blocks until result is ready
+
+ --Extended usage--
+ Use WorkerPool, which uses Queuemt (a synchronized queue) and Worker.
+ Example:
+ (same context and typedef
+ WorkerPool<JobType, int> pool(num_threads);
+ JobType job = ...
+ pool.addJob(job);
+ ...
+ pool.get_result(); //blocks until all workers are done, returns the some of their results.
+
+ Jobs added to a WorkerPool need to be the same type. A WorkerPool instance should not be reused (e.g. adding jobs) after calling get_result().
+*/
+
#ifndef WORKERS_HH
#define WORKERS_HH
-#include "timing.h"
-
#include <iostream>
#include <boost/bind.hpp>
#include <boost/function.hpp>
+#include <queue>
+#include <boost/ptr_container/ptr_vector.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
+#include <boost/thread/shared_mutex.hpp>
#include <boost/thread/future.hpp>
+#include <boost/thread/condition.hpp>
+
+#include <boost/date_time/posix_time/posix_time_types.hpp>
+#include "timing.h"
+
+/** Implements a synchronized queue*/
+template<typename J>
+class Queuemt
+{
+
+public:
+ boost::condition_variable_any cond;
+ const bool& running;
+
+ Queuemt() { }
+ Queuemt(const bool& running) : running(running), maxsize(0), qsize(0)
+ {
+ }
+
+ ~Queuemt() {
+ }
+
+ J pop()
+ {
+ J job;
+ {
+ boost::unique_lock<boost::shared_mutex> qlock(q_mutex);
+ while (running && qsize == 0)
+ cond.wait(qlock);
+
+ if (qsize > 0)
+ {
+ job = q.front();
+ q.pop();
+ --qsize;
+ }
+ }
+ if (job)
+ cond.notify_one();
+ return job;
+
+ }
-//#include <boost/date_time/posix_time/posix_time_types.hpp>
+ void push(J job)
+ {
+ {
+ boost::unique_lock<boost::shared_mutex> lock(q_mutex);
+ q.push(job);
+ ++qsize;
+ }
+ if (qsize > maxsize)
+ maxsize = qsize;
+
+ cond.notify_one();
+ }
+ int getMaxsize()
+ {
+ return maxsize;
+ }
+ int size()
+ {
+ return qsize;
+ }
+
+private:
+ boost::shared_mutex q_mutex;
+ std::queue<J> q;
+ int maxsize;
+ volatile int qsize;
+};
+
+
+template<typename J, typename R>
+class Worker
+{
+typedef boost::packaged_task<R> PackagedTask;
+public:
+ Worker(Queuemt<J>& queue, int id, int num_workers) :
+ q(queue), tasktime(0.0), id(id), num_workers(num_workers)
+ {
+ PackagedTask task(boost::bind(&Worker<J, R>::run, this));
+ future = task.get_future();
+ boost::thread t(boost::move(task));
+ }
+
+ R run() //this is called upon thread creation
+ {
+ R wresult = 0;
+ while (isRunning())
+ {
+ J job = q.pop();
+
+ if (job)
+ {
+ timer.Reset();
+ wresult += job();
+ tasktime += timer.Elapsed();
+ }
+ }
+ return wresult;
+ }
+
+ R getResult()
+ {
+ if (!future.is_ready())
+ future.wait();
+ assert(future.is_ready());
+ return future.get();
+ }
+
+ double getTaskTime()
+ {
+ return tasktime;
+ }
+
+private:
+
+ Queuemt<J>& q;
+
+ boost::unique_future<R> future;
+
+ bool isRunning()
+ {
+ return q.running || q.size() > 0;
+ }
+
+ Timer timer;
+ double tasktime;
+ int id;
+ int num_workers;
+};
+
+template<typename J, typename R>
+class WorkerPool
+{
+typedef boost::packaged_task<R> PackagedTask;
+typedef Worker<J,R> WJR;
+typedef boost::ptr_vector<WJR> WorkerVector;
+public:
+
+ WorkerPool(int num_workers)
+ {
+ q.reset(new Queuemt<J>(running));
+ running = true;
+ for (int i = 0; i < num_workers; ++i)
+ workers.push_back( new Worker<J, R>(*q, i, num_workers) );
+ }
+
+ ~WorkerPool()
+ {
+ }
+
+ R get_result()
+ {
+ running = false;
+ q->cond.notify_all();
+ R tmp = 0;
+ double tasktime = 0.0;
+ for (typename WorkerVector::iterator it = workers.begin(); it != workers.end(); it++)
+ {
+ R res = it->getResult();
+ tmp += res;
+ //std::cerr << "tasktime: " << it->getTaskTime() << std::endl;
+ tasktime += it->getTaskTime();
+ }
+// std::cerr << " maxQ = " << q->getMaxsize() << std::endl;
+ return tmp;
+ }
+
+ void addJob(J job)
+ {
+ q->push(job);
+ }
+
+private:
+
+ WorkerVector workers;
+
+ boost::shared_ptr<Queuemt<J> > q;
+
+ bool running;
+};
+
+///////////////////
template <typename J, typename R>
class SimpleWorker
{
@@ -33,6 +242,7 @@ public:
timer.Reset();
wresult = job();
tasktime = timer.Elapsed();
+ std::cerr << tasktime << " s" << std::endl;
return wresult;
}
@@ -60,4 +270,6 @@ private:
};
-#endif
+
+
+#endif