#ifndef UTIL_THREAD_POOL__ #define UTIL_THREAD_POOL__ #include "util/pcqueue.hh" #include <boost/ptr_container/ptr_vector.hpp> #include <boost/optional.hpp> #include <boost/thread.hpp> #include <iostream> #include <stdlib.h> namespace util { template <class HandlerT> class Worker : boost::noncopyable { public: typedef HandlerT Handler; typedef typename Handler::Request Request; template <class Construct> Worker(PCQueue<Request> &in, Construct &construct, Request &poison) : in_(in), handler_(construct), thread_(boost::ref(*this)), poison_(poison) {} // Only call from thread. void operator()() { Request request; while (1) { in_.Consume(request); if (request == poison_) return; try { (*handler_)(request); } catch(std::exception &e) { std::cerr << "Handler threw " << e.what() << std::endl; abort(); } catch(...) { std::cerr << "Handler threw an exception, dropping request" << std::endl; abort(); } } } void Join() { thread_.join(); } private: PCQueue<Request> &in_; boost::optional<Handler> handler_; boost::thread thread_; Request poison_; }; template <class HandlerT> class ThreadPool : boost::noncopyable { public: typedef HandlerT Handler; typedef typename Handler::Request Request; template <class Construct> ThreadPool(size_t queue_length, size_t workers, Construct handler_construct, Request poison) : in_(queue_length), poison_(poison) { for (size_t i = 0; i < workers; ++i) { workers_.push_back(new Worker<Handler>(in_, handler_construct, poison)); } } ~ThreadPool() { for (size_t i = 0; i < workers_.size(); ++i) { Produce(poison_); } for (typename boost::ptr_vector<Worker<Handler> >::iterator i = workers_.begin(); i != workers_.end(); ++i) { i->Join(); } } void Produce(const Request &request) { in_.Produce(request); } // For adding to the queue. PCQueue<Request> &In() { return in_; } private: PCQueue<Request> in_; boost::ptr_vector<Worker<Handler> > workers_; Request poison_; }; } // namespace util #endif // UTIL_THREAD_POOL__