diff options
author | Wu, Ke <wuke@cs.umd.edu> | 2014-12-17 16:11:38 -0500 |
---|---|---|
committer | Wu, Ke <wuke@cs.umd.edu> | 2014-12-17 16:11:38 -0500 |
commit | 7468e8d85e99b4619442c7afaf4a0d92870111bb (patch) | |
tree | a6f17da7c69048c8900260b5490bb9d8611be3bb /klm/util/stream/chain.hh | |
parent | b6dd5a683db9dda2d634dd2fdb76606819594901 (diff) | |
parent | 1a79175f9a101d46cf27ca921213d5dd9300518f (diff) |
Merge with upstream
Diffstat (limited to 'klm/util/stream/chain.hh')
-rw-r--r-- | klm/util/stream/chain.hh | 165 |
1 files changed, 153 insertions, 12 deletions
diff --git a/klm/util/stream/chain.hh b/klm/util/stream/chain.hh index 0cc83a85..50865086 100644 --- a/klm/util/stream/chain.hh +++ b/klm/util/stream/chain.hh @@ -1,5 +1,5 @@ -#ifndef UTIL_STREAM_CHAIN__ -#define UTIL_STREAM_CHAIN__ +#ifndef UTIL_STREAM_CHAIN_H +#define UTIL_STREAM_CHAIN_H #include "util/stream/block.hh" #include "util/stream/config.hh" @@ -24,7 +24,12 @@ class ChainConfigException : public Exception { }; class Chain; -// Specifies position in chain for Link constructor. + +/** + * Encapsulates a @ref PCQueue "producer queue" and a @ref PCQueue "consumer queue" within a @ref Chain "chain". + * + * Specifies position in chain for Link constructor. + */ class ChainPosition { public: const Chain &GetChain() const { return *chain_; } @@ -41,14 +46,32 @@ class ChainPosition { WorkerProgress progress_; }; -// Position is usually ChainPosition but if there are multiple streams involved, this can be ChainPositions. + +/** + * Encapsulates a worker thread processing data at a given position in the chain. + * + * Each instance of this class owns one boost thread in which the worker is Run(). + */ class Thread { public: + + /** + * Constructs a new Thread in which the provided Worker is Run(). + * + * Position is usually ChainPosition but if there are multiple streams involved, this can be ChainPositions. + * + * After a call to this constructor, the provided worker will be running within a boost thread owned by the newly constructed Thread object. + */ template <class Position, class Worker> Thread(const Position &position, const Worker &worker) : thread_(boost::ref(*this), position, worker) {} ~Thread(); + /** + * Launches the provided worker in this object's boost thread. + * + * This method is called automatically by this class's @ref Thread() "constructor". + */ template <class Position, class Worker> void operator()(const Position &position, Worker &worker) { try { worker.Run(position); @@ -63,14 +86,27 @@ class Thread { boost::thread thread_; }; +/** + * This resets blocks to full valid size. Used to close the loop in Chain by recycling blocks. + */ class Recycler { public: + /** + * Resets the blocks in the chain such that the blocks' respective valid sizes match the chain's block size. + * + * @see Block::SetValidSize() + * @see Chain::BlockSize() + */ void Run(const ChainPosition &position); }; extern const Recycler kRecycle; class WriteAndRecycle; - +class PWriteAndRecycle; + +/** + * Represents a sequence of workers, through which @ref Block "blocks" can pass. + */ class Chain { private: template <class T, void (T::*ptr)(const ChainPosition &) = &T::Run> struct CheckForRun { @@ -78,8 +114,20 @@ class Chain { }; public: + + /** + * Constructs a configured Chain. + * + * @param config Specifies how to configure the Chain. + */ explicit Chain(const ChainConfig &config); + /** + * Destructs a Chain. + * + * This method waits for the chain's threads to complete, + * and frees the memory held by this chain. + */ ~Chain(); void ActivateProgress() { @@ -91,24 +139,49 @@ class Chain { progress_.SetTarget(target); } + /** + * Gets the number of bytes in each record of a Block. + * + * @see ChainConfig::entry_size + */ std::size_t EntrySize() const { return config_.entry_size; } + + /** + * Gets the inital @ref Block::ValidSize "valid size" for @ref Block "blocks" in this chain. + * + * @see Block::ValidSize + */ std::size_t BlockSize() const { return block_size_; } - // Two ways to add to the chain: Add() or operator>>. + /** Two ways to add to the chain: Add() or operator>>. */ ChainPosition Add(); - // This is for adding threaded workers with a Run method. + /** + * Adds a new worker to this chain, + * and runs that worker in a new Thread owned by this chain. + * + * The worker must have a Run method that accepts a position argument. + * + * @see Thread::operator()() + */ template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) { assert(!complete_called_); threads_.push_back(new Thread(Add(), worker)); return *this; } - // Avoid copying the worker. + /** + * Adds a new worker to this chain (but avoids copying that worker), + * and runs that worker in a new Thread owned by this chain. + * + * The worker must have a Run method that accepts a position argument. + * + * @see Thread::operator()() + */ template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) { assert(!complete_called_); threads_.push_back(new Thread(Add(), worker)); @@ -122,12 +195,21 @@ class Chain { threads_.push_back(new Thread(Complete(), kRecycle)); } + /** + * Adds a Recycler worker to this chain, + * and runs that worker in a new Thread owned by this chain. + */ Chain &operator>>(const Recycler &) { CompleteLoop(); return *this; } + /** + * Adds a WriteAndRecycle worker to this chain, + * and runs that worker in a new Thread owned by this chain. + */ Chain &operator>>(const WriteAndRecycle &writer); + Chain &operator>>(const PWriteAndRecycle &writer); // Chains are reusable. Call Wait to wait for everything to finish and free memory. void Wait(bool release_memory = true); @@ -156,28 +238,87 @@ class Chain { }; // Create the link in the worker thread using the position token. +/** + * Represents a C++ style iterator over @ref Block "blocks". + */ class Link { public: + // Either default construct and Init or just construct all at once. + + /** + * Constructs an @ref Init "initialized" link. + * + * @see Init + */ + explicit Link(const ChainPosition &position); + + /** + * Constructs a link that must subsequently be @ref Init "initialized". + * + * @see Init + */ Link(); + + /** + * Initializes the link with the input @ref PCQueue "consumer queue" and output @ref PCQueue "producer queue" at a given @ref ChainPosition "position" in the @ref Chain "chain". + * + * @see Link() + */ void Init(const ChainPosition &position); - explicit Link(const ChainPosition &position); - + /** + * Destructs the link object. + * + * If necessary, this method will pass a poison block + * to this link's output @ref PCQueue "producer queue". + * + * @see Block::SetToPoison() + */ ~Link(); + /** + * Gets a reference to the @ref Block "block" at this link. + */ Block &operator*() { return current_; } + + /** + * Gets a const reference to the @ref Block "block" at this link. + */ const Block &operator*() const { return current_; } + /** + * Gets a pointer to the @ref Block "block" at this link. + */ Block *operator->() { return ¤t_; } + + /** + * Gets a const pointer to the @ref Block "block" at this link. + */ const Block *operator->() const { return ¤t_; } + /** + * Gets the link at the next @ref ChainPosition "position" in the @ref Chain "chain". + */ Link &operator++(); + /** + * Returns true if the @ref Block "block" at this link encapsulates a valid (non-NULL) block of memory. + * + * This method is a user-defined implicit conversion function to boolean; + * among other things, this method enables bare instances of this class + * to be used as the condition of an if statement. + */ operator bool() const { return current_; } + /** + * @ref Block::SetToPoison() "Poisons" the @ref Block "block" at this link, + * and passes this now-poisoned block to this link's output @ref PCQueue "producer queue". + * + * @see Block::SetToPoison() + */ void Poison(); - + private: Block current_; PCQueue<Block> *in_, *out_; @@ -195,4 +336,4 @@ inline Chain &operator>>(Chain &chain, Link &link) { } // namespace stream } // namespace util -#endif // UTIL_STREAM_CHAIN__ +#endif // UTIL_STREAM_CHAIN_H |