diff options
author | Patrick Simianer <p@simianer.de> | 2013-01-21 12:29:43 +0100 |
---|---|---|
committer | Patrick Simianer <p@simianer.de> | 2013-01-21 12:29:43 +0100 |
commit | 0d23f8aecbfaf982cd165ebfc2a1611cefcc7275 (patch) | |
tree | 8eafa6ea43224ff70635cadd4d6f027d28f4986f /klm/util/thread_pool.hh | |
parent | dbc66cd3944321961c5e11d5254fd914f05a98ad (diff) | |
parent | 7cac43b858f3b681555bf0578f54b1f822c43207 (diff) |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'klm/util/thread_pool.hh')
-rw-r--r-- | klm/util/thread_pool.hh | 95 |
1 files changed, 95 insertions, 0 deletions
diff --git a/klm/util/thread_pool.hh b/klm/util/thread_pool.hh new file mode 100644 index 00000000..84e257ea --- /dev/null +++ b/klm/util/thread_pool.hh @@ -0,0 +1,95 @@ +#ifndef UTIL_THREAD_POOL__ +#define UTIL_THREAD_POOL__ + +#include "util/pcqueue.hh" + +#include <boost/ptr_container/ptr_vector.hpp> +#include <boost/optional.hpp> +#include <boost/thread.hpp> + +#include <iostream> + +#include <stdlib.h> + +namespace util { + +template <class HandlerT> class Worker : boost::noncopyable { + public: + typedef HandlerT Handler; + typedef typename Handler::Request Request; + + template <class Construct> Worker(PCQueue<Request> &in, Construct &construct, Request &poison) + : in_(in), handler_(construct), thread_(boost::ref(*this)), poison_(poison) {} + + // Only call from thread. + void operator()() { + Request request; + while (1) { + in_.Consume(request); + if (request == poison_) return; + try { + (*handler_)(request); + } + catch(std::exception &e) { + std::cerr << "Handler threw " << e.what() << std::endl; + abort(); + } + catch(...) { + std::cerr << "Handler threw an exception, dropping request" << std::endl; + abort(); + } + } + } + + void Join() { + thread_.join(); + } + + private: + PCQueue<Request> &in_; + + boost::optional<Handler> handler_; + + boost::thread thread_; + + Request poison_; +}; + +template <class HandlerT> class ThreadPool : boost::noncopyable { + public: + typedef HandlerT Handler; + typedef typename Handler::Request Request; + + template <class Construct> ThreadPool(size_t queue_length, size_t workers, Construct handler_construct, Request poison) : in_(queue_length), poison_(poison) { + for (size_t i = 0; i < workers; ++i) { + workers_.push_back(new Worker<Handler>(in_, handler_construct, poison)); + } + } + + ~ThreadPool() { + for (size_t i = 0; i < workers_.size(); ++i) { + Produce(poison_); + } + for (typename boost::ptr_vector<Worker<Handler> >::iterator i = workers_.begin(); i != workers_.end(); ++i) { + i->Join(); + } + } + + void Produce(const Request &request) { + in_.Produce(request); + } + + // For adding to the queue. + PCQueue<Request> &In() { return in_; } + + private: + PCQueue<Request> in_; + + boost::ptr_vector<Worker<Handler> > workers_; + + Request poison_; +}; + +} // namespace util + +#endif // UTIL_THREAD_POOL__ |