diff options
Diffstat (limited to 'klm/util/stream/multi_stream.hh')
-rw-r--r-- | klm/util/stream/multi_stream.hh | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/klm/util/stream/multi_stream.hh b/klm/util/stream/multi_stream.hh new file mode 100644 index 00000000..0ee7fab6 --- /dev/null +++ b/klm/util/stream/multi_stream.hh @@ -0,0 +1,127 @@ +#ifndef UTIL_STREAM_MULTI_STREAM_H +#define UTIL_STREAM_MULTI_STREAM_H + +#include "util/fixed_array.hh" +#include "util/scoped.hh" +#include "util/stream/chain.hh" +#include "util/stream/stream.hh" + +#include <cstddef> +#include <new> + +#include <assert.h> +#include <stdlib.h> + +namespace util { namespace stream { + +class Chains; + +class ChainPositions : public util::FixedArray<util::stream::ChainPosition> { + public: + ChainPositions() {} + + void Init(Chains &chains); + + explicit ChainPositions(Chains &chains) { + Init(chains); + } +}; + +class Chains : public util::FixedArray<util::stream::Chain> { + private: + template <class T, void (T::*ptr)(const ChainPositions &) = &T::Run> struct CheckForRun { + typedef Chains type; + }; + + public: + // Must call Init. + Chains() {} + + explicit Chains(std::size_t limit) : util::FixedArray<util::stream::Chain>(limit) {} + + template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) { + threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker)); + return *this; + } + + template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) { + threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker)); + return *this; + } + + Chains &operator>>(const util::stream::Recycler &recycler) { + for (util::stream::Chain *i = begin(); i != end(); ++i) + *i >> recycler; + return *this; + } + + void Wait(bool release_memory = true) { + threads_.clear(); + for (util::stream::Chain *i = begin(); i != end(); ++i) { + i->Wait(release_memory); + } + } + + private: + boost::ptr_vector<util::stream::Thread> threads_; + + Chains(const Chains &); + void operator=(const Chains &); +}; + +inline void ChainPositions::Init(Chains &chains) { + util::FixedArray<util::stream::ChainPosition>::Init(chains.size()); + for (util::stream::Chain *i = chains.begin(); i != chains.end(); ++i) { + // use "placement new" syntax to initalize ChainPosition in an already-allocated memory location + new (end()) util::stream::ChainPosition(i->Add()); Constructed(); + } +} + +inline Chains &operator>>(Chains &chains, ChainPositions &positions) { + positions.Init(chains); + return chains; +} + +template <class T> class GenericStreams : public util::FixedArray<T> { + private: + typedef util::FixedArray<T> P; + public: + GenericStreams() {} + + // This puts a dummy T at the beginning (useful to algorithms that need to reference something at the beginning). + void InitWithDummy(const ChainPositions &positions) { + P::Init(positions.size() + 1); + new (P::end()) T(); // use "placement new" syntax to initalize T in an already-allocated memory location + P::Constructed(); + for (const util::stream::ChainPosition *i = positions.begin(); i != positions.end(); ++i) { + P::push_back(*i); + } + } + + // Limit restricts to positions[0,limit) + void Init(const ChainPositions &positions, std::size_t limit) { + P::Init(limit); + for (const util::stream::ChainPosition *i = positions.begin(); i != positions.begin() + limit; ++i) { + P::push_back(*i); + } + } + void Init(const ChainPositions &positions) { + Init(positions, positions.size()); + } + + GenericStreams(const ChainPositions &positions) { + Init(positions); + } +}; + +template <class T> inline Chains &operator>>(Chains &chains, GenericStreams<T> &streams) { + ChainPositions positions; + chains >> positions; + streams.Init(positions); + return chains; +} + +typedef GenericStreams<Stream> Streams; + +}} // namespaces +#endif // UTIL_STREAM_MULTI_STREAM_H |