#ifndef UTIL_STREAM_CHAIN_H #define UTIL_STREAM_CHAIN_H #include "util/stream/block.hh" #include "util/stream/config.hh" #include "util/stream/multi_progress.hh" #include "util/scoped.hh" #include <boost/ptr_container/ptr_vector.hpp> #include <boost/thread/thread.hpp> #include <cstddef> #include <assert.h> namespace util { template <class T> class PCQueue; namespace stream { class ChainConfigException : public Exception { public: ChainConfigException() throw(); ~ChainConfigException() throw(); }; class Chain; /** * Encapsulates a @ref PCQueue "producer queue" and a @ref PCQueue "consumer queue" within a @ref Chain "chain". * * Specifies position in chain for Link constructor. */ class ChainPosition { public: const Chain &GetChain() const { return *chain_; } private: friend class Chain; friend class Link; ChainPosition(PCQueue<Block> &in, PCQueue<Block> &out, Chain *chain, MultiProgress &progress) : in_(&in), out_(&out), chain_(chain), progress_(progress.Add()) {} PCQueue<Block> *in_, *out_; Chain *chain_; WorkerProgress progress_; }; /** * Encapsulates a worker thread processing data at a given position in the chain. * * Each instance of this class owns one boost thread in which the worker is Run(). */ class Thread { public: /** * Constructs a new Thread in which the provided Worker is Run(). * * Position is usually ChainPosition but if there are multiple streams involved, this can be ChainPositions. * * After a call to this constructor, the provided worker will be running within a boost thread owned by the newly constructed Thread object. */ template <class Position, class Worker> Thread(const Position &position, const Worker &worker) : thread_(boost::ref(*this), position, worker) {} ~Thread(); /** * Launches the provided worker in this object's boost thread. * * This method is called automatically by this class's @ref Thread() "constructor". */ template <class Position, class Worker> void operator()(const Position &position, Worker &worker) { try { worker.Run(position); } catch (const std::exception &e) { UnhandledException(e); } } private: void UnhandledException(const std::exception &e); boost::thread thread_; }; /** * This resets blocks to full valid size. Used to close the loop in Chain by recycling blocks. */ class Recycler { public: /** * Resets the blocks in the chain such that the blocks' respective valid sizes match the chain's block size. * * @see Block::SetValidSize() * @see Chain::BlockSize() */ void Run(const ChainPosition &position); }; extern const Recycler kRecycle; class WriteAndRecycle; class PWriteAndRecycle; /** * Represents a sequence of workers, through which @ref Block "blocks" can pass. */ class Chain { private: template <class T, void (T::*ptr)(const ChainPosition &) = &T::Run> struct CheckForRun { typedef Chain type; }; public: /** * Constructs a configured Chain. * * @param config Specifies how to configure the Chain. */ explicit Chain(const ChainConfig &config); /** * Destructs a Chain. * * This method waits for the chain's threads to complete, * and frees the memory held by this chain. */ ~Chain(); void ActivateProgress() { assert(!Running()); progress_.Activate(); } void SetProgressTarget(uint64_t target) { progress_.SetTarget(target); } /** * Gets the number of bytes in each record of a Block. * * @see ChainConfig::entry_size */ std::size_t EntrySize() const { return config_.entry_size; } /** * Gets the inital @ref Block::ValidSize "valid size" for @ref Block "blocks" in this chain. * * @see Block::ValidSize */ std::size_t BlockSize() const { return block_size_; } /** Two ways to add to the chain: Add() or operator>>. */ ChainPosition Add(); /** * Adds a new worker to this chain, * and runs that worker in a new Thread owned by this chain. * * The worker must have a Run method that accepts a position argument. * * @see Thread::operator()() */ template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) { assert(!complete_called_); threads_.push_back(new Thread(Add(), worker)); return *this; } /** * Adds a new worker to this chain (but avoids copying that worker), * and runs that worker in a new Thread owned by this chain. * * The worker must have a Run method that accepts a position argument. * * @see Thread::operator()() */ template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) { assert(!complete_called_); threads_.push_back(new Thread(Add(), worker)); return *this; } // Note that Link and Stream also define operator>> outside this class. // To complete the loop, call CompleteLoop(), >> kRecycle, or the destructor. void CompleteLoop() { threads_.push_back(new Thread(Complete(), kRecycle)); } /** * Adds a Recycler worker to this chain, * and runs that worker in a new Thread owned by this chain. */ Chain &operator>>(const Recycler &) { CompleteLoop(); return *this; } /** * Adds a WriteAndRecycle worker to this chain, * and runs that worker in a new Thread owned by this chain. */ Chain &operator>>(const WriteAndRecycle &writer); Chain &operator>>(const PWriteAndRecycle &writer); // Chains are reusable. Call Wait to wait for everything to finish and free memory. void Wait(bool release_memory = true); // Waits for the current chain to complete (if any) then starts again. void Start(); bool Running() const { return !queues_.empty(); } private: ChainPosition Complete(); ChainConfig config_; std::size_t block_size_; scoped_malloc memory_; boost::ptr_vector<PCQueue<Block> > queues_; bool complete_called_; boost::ptr_vector<Thread> threads_; MultiProgress progress_; }; // Create the link in the worker thread using the position token. /** * Represents a C++ style iterator over @ref Block "blocks". */ class Link { public: // Either default construct and Init or just construct all at once. /** * Constructs an @ref Init "initialized" link. * * @see Init */ explicit Link(const ChainPosition &position); /** * Constructs a link that must subsequently be @ref Init "initialized". * * @see Init */ Link(); /** * Initializes the link with the input @ref PCQueue "consumer queue" and output @ref PCQueue "producer queue" at a given @ref ChainPosition "position" in the @ref Chain "chain". * * @see Link() */ void Init(const ChainPosition &position); /** * Destructs the link object. * * If necessary, this method will pass a poison block * to this link's output @ref PCQueue "producer queue". * * @see Block::SetToPoison() */ ~Link(); /** * Gets a reference to the @ref Block "block" at this link. */ Block &operator*() { return current_; } /** * Gets a const reference to the @ref Block "block" at this link. */ const Block &operator*() const { return current_; } /** * Gets a pointer to the @ref Block "block" at this link. */ Block *operator->() { return ¤t_; } /** * Gets a const pointer to the @ref Block "block" at this link. */ const Block *operator->() const { return ¤t_; } /** * Gets the link at the next @ref ChainPosition "position" in the @ref Chain "chain". */ Link &operator++(); /** * Returns true if the @ref Block "block" at this link encapsulates a valid (non-NULL) block of memory. * * This method is a user-defined implicit conversion function to boolean; * among other things, this method enables bare instances of this class * to be used as the condition of an if statement. */ operator bool() const { return current_; } /** * @ref Block::SetToPoison() "Poisons" the @ref Block "block" at this link, * and passes this now-poisoned block to this link's output @ref PCQueue "producer queue". * * @see Block::SetToPoison() */ void Poison(); private: Block current_; PCQueue<Block> *in_, *out_; bool poisoned_; WorkerProgress progress_; }; inline Chain &operator>>(Chain &chain, Link &link) { link.Init(chain.Add()); return chain; } } // namespace stream } // namespace util #endif // UTIL_STREAM_CHAIN_H