#ifndef UTIL_STREAM_CHAIN_H
#define UTIL_STREAM_CHAIN_H

#include "util/stream/block.hh"
#include "util/stream/config.hh"
#include "util/stream/multi_progress.hh"
#include "util/scoped.hh"

#include <boost/ptr_container/ptr_vector.hpp>
#include <boost/thread/thread.hpp>

#include <cstddef>

#include <assert.h>

namespace util {
template <class T> class PCQueue;
namespace stream {

class ChainConfigException : public Exception {
  public:
    ChainConfigException() throw();
    ~ChainConfigException() throw();
};

class Chain;
  
/**
 * Encapsulates a @ref PCQueue "producer queue" and a @ref PCQueue "consumer queue" within a @ref Chain "chain".
 * 
 * Specifies position in chain for Link constructor.
 */
class ChainPosition {
  public:
    const Chain &GetChain() const { return *chain_; }
  private:
    friend class Chain;
    friend class Link;
    ChainPosition(PCQueue<Block> &in, PCQueue<Block> &out, Chain *chain, MultiProgress &progress) 
      : in_(&in), out_(&out), chain_(chain), progress_(progress.Add()) {}

    PCQueue<Block> *in_, *out_;

    Chain *chain_;

    WorkerProgress progress_;
};

 
/**
 * Encapsulates a worker thread processing data at a given position in the chain.
 *
 * Each instance of this class owns one boost thread in which the worker is Run().
 */
class Thread {
  public:
    
    /**
     * Constructs a new Thread in which the provided Worker is Run().
     *
     * Position is usually ChainPosition but if there are multiple streams involved, this can be ChainPositions.
     *
     * After a call to this constructor, the provided worker will be running within a boost thread owned by the newly constructed Thread object.
     */
    template <class Position, class Worker> Thread(const Position &position, const Worker &worker)
      : thread_(boost::ref(*this), position, worker) {}

    ~Thread();

    /**
     * Launches the provided worker in this object's boost thread.
     *
     * This method is called automatically by this class's @ref Thread() "constructor".
     */
    template <class Position, class Worker> void operator()(const Position &position, Worker &worker) {
      try {
        worker.Run(position);
      } catch (const std::exception &e) {
        UnhandledException(e);
      }
    }

  private:
    void UnhandledException(const std::exception &e);

    boost::thread thread_;
};

/**
 * This resets blocks to full valid size.  Used to close the loop in Chain by recycling blocks.
 */
class Recycler {
  public:
    /**
     * Resets the blocks in the chain such that the blocks' respective valid sizes match the chain's block size.
     *
     * @see Block::SetValidSize()
     * @see Chain::BlockSize()
     */
    void Run(const ChainPosition &position);
};

extern const Recycler kRecycle;
class WriteAndRecycle;
class PWriteAndRecycle;
  
/**
 * Represents a sequence of workers, through which @ref Block "blocks" can pass.
 */
class Chain {
  private:
    template <class T, void (T::*ptr)(const ChainPosition &) = &T::Run> struct CheckForRun {
      typedef Chain type;
    };

  public:
  
    /** 
     * Constructs a configured Chain.
     * 
     * @param config Specifies how to configure the Chain.
     */
    explicit Chain(const ChainConfig &config);

    /**
     * Destructs a Chain.
     *
     * This method waits for the chain's threads to complete,
     * and frees the memory held by this chain.
     */
    ~Chain();

    void ActivateProgress() {
      assert(!Running());
      progress_.Activate();
    }

    void SetProgressTarget(uint64_t target) {
      progress_.SetTarget(target);
    }

    /**
     * Gets the number of bytes in each record of a Block.
     *
     * @see ChainConfig::entry_size
     */
    std::size_t EntrySize() const {
      return config_.entry_size;
    }
  
    /**
     * Gets the inital @ref Block::ValidSize "valid size" for @ref Block "blocks" in this chain.
     *
     * @see Block::ValidSize
     */
    std::size_t BlockSize() const {
      return block_size_;
    }

    /** Two ways to add to the chain: Add() or operator>>. */
    ChainPosition Add();

    /** 
     * Adds a new worker to this chain,
     * and runs that worker in a new Thread owned by this chain.
     * 
     * The worker must have a Run method that accepts a position argument.
     *
     * @see Thread::operator()()
     */
    template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) {
      assert(!complete_called_);
      threads_.push_back(new Thread(Add(), worker));
      return *this;
    }

  /** 
   * Adds a new worker to this chain (but avoids copying that worker),
   * and runs that worker in a new Thread owned by this chain.
   * 
   * The worker must have a Run method that accepts a position argument.
   *
   * @see Thread::operator()()
   */
    template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) {
      assert(!complete_called_);
      threads_.push_back(new Thread(Add(), worker));
      return *this;
    }

    // Note that Link and Stream also define operator>> outside this class.  

    // To complete the loop, call CompleteLoop(), >> kRecycle, or the destructor.  
    void CompleteLoop() {
      threads_.push_back(new Thread(Complete(), kRecycle));
    }

    /** 
     * Adds a Recycler worker to this chain,
     * and runs that worker in a new Thread owned by this chain.
     */
    Chain &operator>>(const Recycler &) {
      CompleteLoop();
      return *this;
    }

    /** 
     * Adds a WriteAndRecycle worker to this chain,
     * and runs that worker in a new Thread owned by this chain.
     */
    Chain &operator>>(const WriteAndRecycle &writer);
    Chain &operator>>(const PWriteAndRecycle &writer);

    // Chains are reusable.  Call Wait to wait for everything to finish and free memory.  
    void Wait(bool release_memory = true);

    // Waits for the current chain to complete (if any) then starts again.  
    void Start();

    bool Running() const { return !queues_.empty(); }

  private:
    ChainPosition Complete();

    ChainConfig config_;

    std::size_t block_size_;

    scoped_malloc memory_;

    boost::ptr_vector<PCQueue<Block> > queues_;

    bool complete_called_;

    boost::ptr_vector<Thread> threads_;

    MultiProgress progress_;
};

// Create the link in the worker thread using the position token.
/** 
 * Represents a C++ style iterator over @ref Block "blocks".
 */
class Link {
  public:
  
    // Either default construct and Init or just construct all at once.
  
    /**
     * Constructs an @ref Init "initialized" link.
     *
     * @see Init
     */
    explicit Link(const ChainPosition &position);
  
    /** 
     * Constructs a link that must subsequently be @ref Init "initialized". 
     *
     * @see Init
     */
    Link();
  
    /** 
     * Initializes the link with the input @ref PCQueue "consumer queue" and output @ref PCQueue "producer queue" at a given @ref ChainPosition "position" in the @ref Chain "chain".
     *
     * @see Link()
     */
    void Init(const ChainPosition &position);

    /**
     * Destructs the link object.
     *
     * If necessary, this method will pass a poison block 
     * to this link's output @ref PCQueue "producer queue".
     *
     * @see Block::SetToPoison()
     */
    ~Link();

    /**
     * Gets a reference to the @ref Block "block" at this link.
     */
    Block &operator*() { return current_; }

    /**
     * Gets a const reference to the @ref Block "block" at this link.
     */
    const Block &operator*() const { return current_; }

    /**
     * Gets a pointer to the @ref Block "block" at this link.
     */
    Block *operator->() { return &current_; }
  
    /**
     * Gets a const pointer to the @ref Block "block" at this link.
     */
    const Block *operator->() const { return &current_; }

    /**
     * Gets the link at the next @ref ChainPosition "position" in the @ref Chain "chain".
     */
    Link &operator++();

    /**
     * Returns true if the @ref Block "block" at this link encapsulates a valid (non-NULL) block of memory.
     * 
     * This method is a user-defined implicit conversion function to boolean;
     * among other things, this method enables bare instances of this class 
     * to be used as the condition of an if statement.
     */
    operator bool() const { return current_; }

    /** 
     * @ref Block::SetToPoison() "Poisons" the @ref Block "block" at this link,
     * and passes this now-poisoned block to this link's output @ref PCQueue "producer queue".
     *
     * @see Block::SetToPoison()
     */
    void Poison();
  
  private:
    Block current_;
    PCQueue<Block> *in_, *out_;
 
    bool poisoned_;

    WorkerProgress progress_;
};

inline Chain &operator>>(Chain &chain, Link &link) {
  link.Init(chain.Add());
  return chain;
}

} // namespace stream
} // namespace util

#endif // UTIL_STREAM_CHAIN_H