#ifndef UTIL_PCQUEUE__
#define UTIL_PCQUEUE__

#include "util/exception.hh"

#include <boost/interprocess/sync/interprocess_semaphore.hpp>
#include <boost/scoped_array.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/utility.hpp>

#include <errno.h>

#ifdef __APPLE__
#include <mach/semaphore.h>
#include <mach/task.h>
#include <mach/mach_traps.h>
#include <mach/mach.h>
#endif // __APPLE__

namespace util {

/* OS X Maverick and Boost interprocess were doing "Function not implemented."
 * So this is my own wrapper around the mach kernel APIs.
 */
#ifdef __APPLE__

#define MACH_CALL(call) UTIL_THROW_IF(KERN_SUCCESS != (call), Exception, "Mach call failure")

class Semaphore {
  public:
    explicit Semaphore(int value) : task_(mach_task_self()) {
      MACH_CALL(semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value));
    }

    ~Semaphore() {
      MACH_CALL(semaphore_destroy(task_, back_));
    }

    void wait() {
      MACH_CALL(semaphore_wait(back_));
    }

    void post() {
      MACH_CALL(semaphore_signal(back_));
    }

  private:
    semaphore_t back_;
    task_t task_;
};

inline void WaitSemaphore(Semaphore &semaphore) {
  semaphore.wait();
}

#else
typedef boost::interprocess::interprocess_semaphore Semaphore;

inline void WaitSemaphore (Semaphore &on) {
  while (1) {
    try {
      on.wait();
      break;
    }
    catch (boost::interprocess::interprocess_exception &e) {
      if (e.get_native_error() != EINTR) {
        throw;
      }
    }
  }
}

#endif // __APPLE__

/* Producer consumer queue safe for multiple producers and multiple consumers.
 * T must be default constructable and have operator=.  
 * The value is copied twice for Consume(T &out) or three times for Consume(),
 * so larger objects should be passed via pointer.
 * Strong exception guarantee if operator= throws.  Undefined if semaphores throw.  
 */
template <class T> class PCQueue : boost::noncopyable {
 public:
  explicit PCQueue(size_t size)
   : empty_(size), used_(0),
     storage_(new T[size]),
     end_(storage_.get() + size),
     produce_at_(storage_.get()),
     consume_at_(storage_.get()) {}

  // Add a value to the queue.
  void Produce(const T &val) {
    WaitSemaphore(empty_);
    {
      boost::unique_lock<boost::mutex> produce_lock(produce_at_mutex_);
      try {
        *produce_at_ = val;
      }
      catch (...) {
        empty_.post();
        throw;
      }
      if (++produce_at_ == end_) produce_at_ = storage_.get();
    }
    used_.post();
  }

  // Consume a value, assigning it to out.
  T& Consume(T &out) {
    WaitSemaphore(used_);
    {
      boost::unique_lock<boost::mutex> consume_lock(consume_at_mutex_);
      try {
        out = *consume_at_;
      }
      catch (...) {
        used_.post();
        throw;
      }
      if (++consume_at_ == end_) consume_at_ = storage_.get();
    }
    empty_.post();
    return out;
  }

  // Convenience version of Consume that copies the value to return.
  // The other version is faster.
  T Consume() {
    T ret;
    Consume(ret);
    return ret;
  }
   
 private:
  // Number of empty spaces in storage_.
  Semaphore empty_;
  // Number of occupied spaces in storage_.
  Semaphore used_;

  boost::scoped_array<T> storage_;

  T *const end_;

  // Index for next write in storage_.
  T *produce_at_;
  boost::mutex produce_at_mutex_;

  // Index for next read from storage_.
  T *consume_at_;
  boost::mutex consume_at_mutex_;

};

} // namespace util

#endif // UTIL_PCQUEUE__