summaryrefslogtreecommitdiff
path: root/klm/util/thread_pool.hh
diff options
context:
space:
mode:
authorPaul Baltescu <pauldb89@gmail.com>2013-02-21 14:13:55 +0000
committerPaul Baltescu <pauldb89@gmail.com>2013-02-21 14:13:55 +0000
commitbca26d953a774b8efca12f30407390b3f5eef9d0 (patch)
treefe922de5c89b1844f677d550dcc24e87edd67a55 /klm/util/thread_pool.hh
parent54a1c0e2bde259e3acc9c0a8ec8da3c7704e80ca (diff)
parent95c364f2cb002241c4a62bedb1c5ef6f1e9a7f22 (diff)
Merge branch 'master' of https://github.com/pauldb89/cdec
Diffstat (limited to 'klm/util/thread_pool.hh')
-rw-r--r--klm/util/thread_pool.hh95
1 files changed, 95 insertions, 0 deletions
diff --git a/klm/util/thread_pool.hh b/klm/util/thread_pool.hh
new file mode 100644
index 00000000..84e257ea
--- /dev/null
+++ b/klm/util/thread_pool.hh
@@ -0,0 +1,95 @@
+#ifndef UTIL_THREAD_POOL__
+#define UTIL_THREAD_POOL__
+
+#include "util/pcqueue.hh"
+
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/optional.hpp>
+#include <boost/thread.hpp>
+
+#include <iostream>
+
+#include <stdlib.h>
+
+namespace util {
+
+template <class HandlerT> class Worker : boost::noncopyable {
+ public:
+ typedef HandlerT Handler;
+ typedef typename Handler::Request Request;
+
+ template <class Construct> Worker(PCQueue<Request> &in, Construct &construct, Request &poison)
+ : in_(in), handler_(construct), thread_(boost::ref(*this)), poison_(poison) {}
+
+ // Only call from thread.
+ void operator()() {
+ Request request;
+ while (1) {
+ in_.Consume(request);
+ if (request == poison_) return;
+ try {
+ (*handler_)(request);
+ }
+ catch(std::exception &e) {
+ std::cerr << "Handler threw " << e.what() << std::endl;
+ abort();
+ }
+ catch(...) {
+ std::cerr << "Handler threw an exception, dropping request" << std::endl;
+ abort();
+ }
+ }
+ }
+
+ void Join() {
+ thread_.join();
+ }
+
+ private:
+ PCQueue<Request> &in_;
+
+ boost::optional<Handler> handler_;
+
+ boost::thread thread_;
+
+ Request poison_;
+};
+
+template <class HandlerT> class ThreadPool : boost::noncopyable {
+ public:
+ typedef HandlerT Handler;
+ typedef typename Handler::Request Request;
+
+ template <class Construct> ThreadPool(size_t queue_length, size_t workers, Construct handler_construct, Request poison) : in_(queue_length), poison_(poison) {
+ for (size_t i = 0; i < workers; ++i) {
+ workers_.push_back(new Worker<Handler>(in_, handler_construct, poison));
+ }
+ }
+
+ ~ThreadPool() {
+ for (size_t i = 0; i < workers_.size(); ++i) {
+ Produce(poison_);
+ }
+ for (typename boost::ptr_vector<Worker<Handler> >::iterator i = workers_.begin(); i != workers_.end(); ++i) {
+ i->Join();
+ }
+ }
+
+ void Produce(const Request &request) {
+ in_.Produce(request);
+ }
+
+ // For adding to the queue.
+ PCQueue<Request> &In() { return in_; }
+
+ private:
+ PCQueue<Request> in_;
+
+ boost::ptr_vector<Worker<Handler> > workers_;
+
+ Request poison_;
+};
+
+} // namespace util
+
+#endif // UTIL_THREAD_POOL__