summaryrefslogtreecommitdiff
path: root/klm/util/thread_pool.hh
diff options
context:
space:
mode:
authorPatrick Simianer <p@simianer.de>2013-01-21 12:29:43 +0100
committerPatrick Simianer <p@simianer.de>2013-01-21 12:29:43 +0100
commit0d23f8aecbfaf982cd165ebfc2a1611cefcc7275 (patch)
tree8eafa6ea43224ff70635cadd4d6f027d28f4986f /klm/util/thread_pool.hh
parentdbc66cd3944321961c5e11d5254fd914f05a98ad (diff)
parent7cac43b858f3b681555bf0578f54b1f822c43207 (diff)
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'klm/util/thread_pool.hh')
-rw-r--r--klm/util/thread_pool.hh95
1 files changed, 95 insertions, 0 deletions
diff --git a/klm/util/thread_pool.hh b/klm/util/thread_pool.hh
new file mode 100644
index 00000000..84e257ea
--- /dev/null
+++ b/klm/util/thread_pool.hh
@@ -0,0 +1,95 @@
+#ifndef UTIL_THREAD_POOL__
+#define UTIL_THREAD_POOL__
+
+#include "util/pcqueue.hh"
+
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/optional.hpp>
+#include <boost/thread.hpp>
+
+#include <iostream>
+
+#include <stdlib.h>
+
+namespace util {
+
+template <class HandlerT> class Worker : boost::noncopyable {
+ public:
+ typedef HandlerT Handler;
+ typedef typename Handler::Request Request;
+
+ template <class Construct> Worker(PCQueue<Request> &in, Construct &construct, Request &poison)
+ : in_(in), handler_(construct), thread_(boost::ref(*this)), poison_(poison) {}
+
+ // Only call from thread.
+ void operator()() {
+ Request request;
+ while (1) {
+ in_.Consume(request);
+ if (request == poison_) return;
+ try {
+ (*handler_)(request);
+ }
+ catch(std::exception &e) {
+ std::cerr << "Handler threw " << e.what() << std::endl;
+ abort();
+ }
+ catch(...) {
+ std::cerr << "Handler threw an exception, dropping request" << std::endl;
+ abort();
+ }
+ }
+ }
+
+ void Join() {
+ thread_.join();
+ }
+
+ private:
+ PCQueue<Request> &in_;
+
+ boost::optional<Handler> handler_;
+
+ boost::thread thread_;
+
+ Request poison_;
+};
+
+template <class HandlerT> class ThreadPool : boost::noncopyable {
+ public:
+ typedef HandlerT Handler;
+ typedef typename Handler::Request Request;
+
+ template <class Construct> ThreadPool(size_t queue_length, size_t workers, Construct handler_construct, Request poison) : in_(queue_length), poison_(poison) {
+ for (size_t i = 0; i < workers; ++i) {
+ workers_.push_back(new Worker<Handler>(in_, handler_construct, poison));
+ }
+ }
+
+ ~ThreadPool() {
+ for (size_t i = 0; i < workers_.size(); ++i) {
+ Produce(poison_);
+ }
+ for (typename boost::ptr_vector<Worker<Handler> >::iterator i = workers_.begin(); i != workers_.end(); ++i) {
+ i->Join();
+ }
+ }
+
+ void Produce(const Request &request) {
+ in_.Produce(request);
+ }
+
+ // For adding to the queue.
+ PCQueue<Request> &In() { return in_; }
+
+ private:
+ PCQueue<Request> in_;
+
+ boost::ptr_vector<Worker<Handler> > workers_;
+
+ Request poison_;
+};
+
+} // namespace util
+
+#endif // UTIL_THREAD_POOL__