summaryrefslogtreecommitdiff
path: root/klm/util/stream/chain.hh
diff options
context:
space:
mode:
authorarmatthews <armatthe@cmu.edu>2014-10-13 14:59:23 -0400
committerarmatthews <armatthe@cmu.edu>2014-10-13 14:59:23 -0400
commitb26cda84e05d4523eee069234a975a0153bf8608 (patch)
tree61c9da4f8dd6070f27c8e81812a76fc0a8cf2d8d /klm/util/stream/chain.hh
parentcd7bc67f475fdfd07fba003ac4cca40e83944740 (diff)
parentb1ed81ef3216b212295afa76c5d20a56fb647204 (diff)
Merge branch 'master' of github.com:redpony/cdec
Diffstat (limited to 'klm/util/stream/chain.hh')
-rw-r--r--klm/util/stream/chain.hh165
1 files changed, 153 insertions, 12 deletions
diff --git a/klm/util/stream/chain.hh b/klm/util/stream/chain.hh
index 0cc83a85..50865086 100644
--- a/klm/util/stream/chain.hh
+++ b/klm/util/stream/chain.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_STREAM_CHAIN__
-#define UTIL_STREAM_CHAIN__
+#ifndef UTIL_STREAM_CHAIN_H
+#define UTIL_STREAM_CHAIN_H
#include "util/stream/block.hh"
#include "util/stream/config.hh"
@@ -24,7 +24,12 @@ class ChainConfigException : public Exception {
};
class Chain;
-// Specifies position in chain for Link constructor.
+
+/**
+ * Encapsulates a @ref PCQueue "producer queue" and a @ref PCQueue "consumer queue" within a @ref Chain "chain".
+ *
+ * Specifies position in chain for Link constructor.
+ */
class ChainPosition {
public:
const Chain &GetChain() const { return *chain_; }
@@ -41,14 +46,32 @@ class ChainPosition {
WorkerProgress progress_;
};
-// Position is usually ChainPosition but if there are multiple streams involved, this can be ChainPositions.
+
+/**
+ * Encapsulates a worker thread processing data at a given position in the chain.
+ *
+ * Each instance of this class owns one boost thread in which the worker is Run().
+ */
class Thread {
public:
+
+ /**
+ * Constructs a new Thread in which the provided Worker is Run().
+ *
+ * Position is usually ChainPosition but if there are multiple streams involved, this can be ChainPositions.
+ *
+ * After a call to this constructor, the provided worker will be running within a boost thread owned by the newly constructed Thread object.
+ */
template <class Position, class Worker> Thread(const Position &position, const Worker &worker)
: thread_(boost::ref(*this), position, worker) {}
~Thread();
+ /**
+ * Launches the provided worker in this object's boost thread.
+ *
+ * This method is called automatically by this class's @ref Thread() "constructor".
+ */
template <class Position, class Worker> void operator()(const Position &position, Worker &worker) {
try {
worker.Run(position);
@@ -63,14 +86,27 @@ class Thread {
boost::thread thread_;
};
+/**
+ * This resets blocks to full valid size. Used to close the loop in Chain by recycling blocks.
+ */
class Recycler {
public:
+ /**
+ * Resets the blocks in the chain such that the blocks' respective valid sizes match the chain's block size.
+ *
+ * @see Block::SetValidSize()
+ * @see Chain::BlockSize()
+ */
void Run(const ChainPosition &position);
};
extern const Recycler kRecycle;
class WriteAndRecycle;
-
+class PWriteAndRecycle;
+
+/**
+ * Represents a sequence of workers, through which @ref Block "blocks" can pass.
+ */
class Chain {
private:
template <class T, void (T::*ptr)(const ChainPosition &) = &T::Run> struct CheckForRun {
@@ -78,8 +114,20 @@ class Chain {
};
public:
+
+ /**
+ * Constructs a configured Chain.
+ *
+ * @param config Specifies how to configure the Chain.
+ */
explicit Chain(const ChainConfig &config);
+ /**
+ * Destructs a Chain.
+ *
+ * This method waits for the chain's threads to complete,
+ * and frees the memory held by this chain.
+ */
~Chain();
void ActivateProgress() {
@@ -91,24 +139,49 @@ class Chain {
progress_.SetTarget(target);
}
+ /**
+ * Gets the number of bytes in each record of a Block.
+ *
+ * @see ChainConfig::entry_size
+ */
std::size_t EntrySize() const {
return config_.entry_size;
}
+
+ /**
+ * Gets the inital @ref Block::ValidSize "valid size" for @ref Block "blocks" in this chain.
+ *
+ * @see Block::ValidSize
+ */
std::size_t BlockSize() const {
return block_size_;
}
- // Two ways to add to the chain: Add() or operator>>.
+ /** Two ways to add to the chain: Add() or operator>>. */
ChainPosition Add();
- // This is for adding threaded workers with a Run method.
+ /**
+ * Adds a new worker to this chain,
+ * and runs that worker in a new Thread owned by this chain.
+ *
+ * The worker must have a Run method that accepts a position argument.
+ *
+ * @see Thread::operator()()
+ */
template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) {
assert(!complete_called_);
threads_.push_back(new Thread(Add(), worker));
return *this;
}
- // Avoid copying the worker.
+ /**
+ * Adds a new worker to this chain (but avoids copying that worker),
+ * and runs that worker in a new Thread owned by this chain.
+ *
+ * The worker must have a Run method that accepts a position argument.
+ *
+ * @see Thread::operator()()
+ */
template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) {
assert(!complete_called_);
threads_.push_back(new Thread(Add(), worker));
@@ -122,12 +195,21 @@ class Chain {
threads_.push_back(new Thread(Complete(), kRecycle));
}
+ /**
+ * Adds a Recycler worker to this chain,
+ * and runs that worker in a new Thread owned by this chain.
+ */
Chain &operator>>(const Recycler &) {
CompleteLoop();
return *this;
}
+ /**
+ * Adds a WriteAndRecycle worker to this chain,
+ * and runs that worker in a new Thread owned by this chain.
+ */
Chain &operator>>(const WriteAndRecycle &writer);
+ Chain &operator>>(const PWriteAndRecycle &writer);
// Chains are reusable. Call Wait to wait for everything to finish and free memory.
void Wait(bool release_memory = true);
@@ -156,28 +238,87 @@ class Chain {
};
// Create the link in the worker thread using the position token.
+/**
+ * Represents a C++ style iterator over @ref Block "blocks".
+ */
class Link {
public:
+
// Either default construct and Init or just construct all at once.
+
+ /**
+ * Constructs an @ref Init "initialized" link.
+ *
+ * @see Init
+ */
+ explicit Link(const ChainPosition &position);
+
+ /**
+ * Constructs a link that must subsequently be @ref Init "initialized".
+ *
+ * @see Init
+ */
Link();
+
+ /**
+ * Initializes the link with the input @ref PCQueue "consumer queue" and output @ref PCQueue "producer queue" at a given @ref ChainPosition "position" in the @ref Chain "chain".
+ *
+ * @see Link()
+ */
void Init(const ChainPosition &position);
- explicit Link(const ChainPosition &position);
-
+ /**
+ * Destructs the link object.
+ *
+ * If necessary, this method will pass a poison block
+ * to this link's output @ref PCQueue "producer queue".
+ *
+ * @see Block::SetToPoison()
+ */
~Link();
+ /**
+ * Gets a reference to the @ref Block "block" at this link.
+ */
Block &operator*() { return current_; }
+
+ /**
+ * Gets a const reference to the @ref Block "block" at this link.
+ */
const Block &operator*() const { return current_; }
+ /**
+ * Gets a pointer to the @ref Block "block" at this link.
+ */
Block *operator->() { return &current_; }
+
+ /**
+ * Gets a const pointer to the @ref Block "block" at this link.
+ */
const Block *operator->() const { return &current_; }
+ /**
+ * Gets the link at the next @ref ChainPosition "position" in the @ref Chain "chain".
+ */
Link &operator++();
+ /**
+ * Returns true if the @ref Block "block" at this link encapsulates a valid (non-NULL) block of memory.
+ *
+ * This method is a user-defined implicit conversion function to boolean;
+ * among other things, this method enables bare instances of this class
+ * to be used as the condition of an if statement.
+ */
operator bool() const { return current_; }
+ /**
+ * @ref Block::SetToPoison() "Poisons" the @ref Block "block" at this link,
+ * and passes this now-poisoned block to this link's output @ref PCQueue "producer queue".
+ *
+ * @see Block::SetToPoison()
+ */
void Poison();
-
+
private:
Block current_;
PCQueue<Block> *in_, *out_;
@@ -195,4 +336,4 @@ inline Chain &operator>>(Chain &chain, Link &link) {
} // namespace stream
} // namespace util
-#endif // UTIL_STREAM_CHAIN__
+#endif // UTIL_STREAM_CHAIN_H