From 0b9031042500d45a098762f0a930bd6a66a58fac Mon Sep 17 00:00:00 2001 From: Kenneth Heafield Date: Fri, 18 Jan 2013 17:12:51 +0000 Subject: KenLM dffafbf with lmplz source (but not built) --- klm/util/pcqueue.hh | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 klm/util/pcqueue.hh (limited to 'klm/util/pcqueue.hh') diff --git a/klm/util/pcqueue.hh b/klm/util/pcqueue.hh new file mode 100644 index 00000000..3df8749b --- /dev/null +++ b/klm/util/pcqueue.hh @@ -0,0 +1,105 @@ +#ifndef UTIL_PCQUEUE__ +#define UTIL_PCQUEUE__ + +#include +#include +#include +#include + +#include + +namespace util { + +inline void WaitSemaphore (boost::interprocess::interprocess_semaphore &on) { + while (1) { + try { + on.wait(); + break; + } + catch (boost::interprocess::interprocess_exception &e) { + if (e.get_native_error() != EINTR) throw; + } + } +} + +/* Producer consumer queue safe for multiple producers and multiple consumers. + * T must be default constructable and have operator=. + * The value is copied twice for Consume(T &out) or three times for Consume(), + * so larger objects should be passed via pointer. + * Strong exception guarantee if operator= throws. Undefined if semaphores throw. + */ +template class PCQueue : boost::noncopyable { + public: + explicit PCQueue(size_t size) + : empty_(size), used_(0), + storage_(new T[size]), + end_(storage_.get() + size), + produce_at_(storage_.get()), + consume_at_(storage_.get()) {} + + // Add a value to the queue. + void Produce(const T &val) { + WaitSemaphore(empty_); + { + boost::unique_lock produce_lock(produce_at_mutex_); + try { + *produce_at_ = val; + } + catch (...) { + empty_.post(); + throw; + } + if (++produce_at_ == end_) produce_at_ = storage_.get(); + } + used_.post(); + } + + // Consume a value, assigning it to out. + T& Consume(T &out) { + WaitSemaphore(used_); + { + boost::unique_lock consume_lock(consume_at_mutex_); + try { + out = *consume_at_; + } + catch (...) { + used_.post(); + throw; + } + if (++consume_at_ == end_) consume_at_ = storage_.get(); + } + empty_.post(); + return out; + } + + // Convenience version of Consume that copies the value to return. + // The other version is faster. + T Consume() { + T ret; + Consume(ret); + return ret; + } + + private: + // Number of empty spaces in storage_. + boost::interprocess::interprocess_semaphore empty_; + // Number of occupied spaces in storage_. + boost::interprocess::interprocess_semaphore used_; + + boost::scoped_array storage_; + + T *const end_; + + // Index for next write in storage_. + T *produce_at_; + boost::mutex produce_at_mutex_; + + // Index for next read from storage_. + T *consume_at_; + boost::mutex consume_at_mutex_; + +}; + +} // namespace util + +#endif // UTIL_PCQUEUE__ -- cgit v1.2.3