/** Basic thread-pool tools using Boost.Thread. (Jan Botha, 7/2010) --Simple usage-- Use SimpleWorker. Example, call a function that returns an int in a new thread: typedef boost::function<int()> JobType; JobType job = boost::bind(funcname); //or boost::bind(&class::funcname, this) for a member function SimpleWorker<JobType, int> worker(job); int result = worker.getResult(); //blocks until result is ready --Extended usage-- Use WorkerPool, which uses Queuemt (a synchronized queue) and Worker. Example: (same context and typedef WorkerPool<JobType, int> pool(num_threads); JobType job = ... pool.addJob(job); ... pool.get_result(); //blocks until all workers are done, returns the some of their results. Jobs added to a WorkerPool need to be the same type. A WorkerPool instance should not be reused (e.g. adding jobs) after calling get_result(). */ #ifndef WORKERS_HH #define WORKERS_HH #include <iostream> #include <boost/bind.hpp> #include <boost/function.hpp> #include <queue> #include <boost/ptr_container/ptr_vector.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/shared_mutex.hpp> #include <boost/thread/future.hpp> #include <boost/thread/condition.hpp> #include <boost/date_time/posix_time/posix_time_types.hpp> #include "timing.h" /** Implements a synchronized queue*/ template<typename J> class Queuemt { public: boost::condition_variable_any cond; const bool& running; Queuemt() { } Queuemt(const bool& running) : running(running), maxsize(0), qsize(0) { } ~Queuemt() { } J pop() { J job; { boost::unique_lock<boost::shared_mutex> qlock(q_mutex); while (running && qsize == 0) cond.wait(qlock); if (qsize > 0) { job = q.front(); q.pop(); --qsize; } } if (job) cond.notify_one(); return job; } void push(J job) { { boost::unique_lock<boost::shared_mutex> lock(q_mutex); q.push(job); ++qsize; } if (qsize > maxsize) maxsize = qsize; cond.notify_one(); } int getMaxsize() { return maxsize; } int size() { return qsize; } private: boost::shared_mutex q_mutex; std::queue<J> q; int maxsize; volatile int qsize; }; template<typename J, typename R> class Worker { typedef boost::packaged_task<R> PackagedTask; public: Worker(Queuemt<J>& queue, int id, int num_workers) : q(queue), tasktime(0.0), id(id), num_workers(num_workers) { PackagedTask task(boost::bind(&Worker<J, R>::run, this)); future = task.get_future(); boost::thread t(boost::move(task)); } R run() //this is called upon thread creation { R wresult = 0; while (isRunning()) { J job = q.pop(); if (job) { timer.Reset(); wresult += job(); tasktime += timer.Elapsed(); } } return wresult; } R getResult() { if (!future.is_ready()) future.wait(); assert(future.is_ready()); return future.get(); } double getTaskTime() { return tasktime; } private: Queuemt<J>& q; boost::unique_future<R> future; bool isRunning() { return q.running || q.size() > 0; } Timer timer; double tasktime; int id; int num_workers; }; template<typename J, typename R> class WorkerPool { typedef boost::packaged_task<R> PackagedTask; typedef Worker<J,R> WJR; typedef boost::ptr_vector<WJR> WorkerVector; public: WorkerPool(int num_workers) { q.reset(new Queuemt<J>(running)); running = true; for (int i = 0; i < num_workers; ++i) workers.push_back( new Worker<J, R>(*q, i, num_workers) ); } ~WorkerPool() { } R get_result() { running = false; q->cond.notify_all(); R tmp = 0; double tasktime = 0.0; for (typename WorkerVector::iterator it = workers.begin(); it != workers.end(); it++) { R res = it->getResult(); tmp += res; //std::cerr << "tasktime: " << it->getTaskTime() << std::endl; tasktime += it->getTaskTime(); } // std::cerr << " maxQ = " << q->getMaxsize() << std::endl; return tmp; } void addJob(J job) { q->push(job); } private: WorkerVector workers; boost::shared_ptr<Queuemt<J> > q; bool running; }; /////////////////// template <typename J, typename R> class SimpleWorker { typedef boost::packaged_task<R> PackagedTask; public: SimpleWorker(J& job) : job(job), tasktime(0.0) { PackagedTask task(boost::bind(&SimpleWorker<J, R>::run, this)); future = task.get_future(); boost::thread t(boost::move(task)); } R run() //this is called upon thread creation { R wresult = 0; assert(job); timer.Reset(); wresult = job(); tasktime = timer.Elapsed(); std::cerr << tasktime << " s" << std::endl; return wresult; } R getResult() { if (!future.is_ready()) future.wait(); assert(future.is_ready()); return future.get(); } double getTaskTime() { return tasktime; } private: J job; boost::unique_future<R> future; Timer timer; double tasktime; }; #endif