#ifndef LM_BUILDER_MULTI_STREAM__ #define LM_BUILDER_MULTI_STREAM__ #include "lm/builder/ngram_stream.hh" #include "util/scoped.hh" #include "util/stream/chain.hh" #include #include #include #include namespace lm { namespace builder { template class FixedArray { public: explicit FixedArray(std::size_t count) { Init(count); } FixedArray() : newed_end_(NULL) {} void Init(std::size_t count) { assert(!block_.get()); block_.reset(malloc(sizeof(T) * count)); if (!block_.get()) throw std::bad_alloc(); newed_end_ = begin(); } FixedArray(const FixedArray &from) { std::size_t size = from.newed_end_ - static_cast(from.block_.get()); Init(size); for (std::size_t i = 0; i < size; ++i) { new(end()) T(from[i]); Constructed(); } } ~FixedArray() { clear(); } T *begin() { return static_cast(block_.get()); } const T *begin() const { return static_cast(block_.get()); } // Always call Constructed after successful completion of new. T *end() { return newed_end_; } const T *end() const { return newed_end_; } T &back() { return *(end() - 1); } const T &back() const { return *(end() - 1); } std::size_t size() const { return end() - begin(); } bool empty() const { return begin() == end(); } T &operator[](std::size_t i) { return begin()[i]; } const T &operator[](std::size_t i) const { return begin()[i]; } template void push_back(const C &c) { new (end()) T(c); Constructed(); } void clear() { for (T *i = begin(); i != end(); ++i) i->~T(); newed_end_ = begin(); } protected: void Constructed() { ++newed_end_; } private: util::scoped_malloc block_; T *newed_end_; }; class Chains; class ChainPositions : public FixedArray { public: ChainPositions() {} void Init(Chains &chains); explicit ChainPositions(Chains &chains) { Init(chains); } }; class Chains : public FixedArray { private: template struct CheckForRun { typedef Chains type; }; public: explicit Chains(std::size_t limit) : FixedArray(limit) {} template typename CheckForRun::type &operator>>(const Worker &worker) { threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker)); return *this; } template typename CheckForRun::type &operator>>(const boost::reference_wrapper &worker) { threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker)); return *this; } Chains &operator>>(const util::stream::Recycler &recycler) { for (util::stream::Chain *i = begin(); i != end(); ++i) *i >> recycler; return *this; } void Wait(bool release_memory = true) { threads_.clear(); for (util::stream::Chain *i = begin(); i != end(); ++i) { i->Wait(release_memory); } } private: boost::ptr_vector threads_; Chains(const Chains &); void operator=(const Chains &); }; inline void ChainPositions::Init(Chains &chains) { FixedArray::Init(chains.size()); for (util::stream::Chain *i = chains.begin(); i != chains.end(); ++i) { new (end()) util::stream::ChainPosition(i->Add()); Constructed(); } } inline Chains &operator>>(Chains &chains, ChainPositions &positions) { positions.Init(chains); return chains; } class NGramStreams : public FixedArray { public: NGramStreams() {} // This puts a dummy NGramStream at the beginning (useful to algorithms that need to reference something at the beginning). void InitWithDummy(const ChainPositions &positions) { FixedArray::Init(positions.size() + 1); new (end()) NGramStream(); Constructed(); for (const util::stream::ChainPosition *i = positions.begin(); i != positions.end(); ++i) { push_back(*i); } } // Limit restricts to positions[0,limit) void Init(const ChainPositions &positions, std::size_t limit) { FixedArray::Init(limit); for (const util::stream::ChainPosition *i = positions.begin(); i != positions.begin() + limit; ++i) { push_back(*i); } } void Init(const ChainPositions &positions) { Init(positions, positions.size()); } NGramStreams(const ChainPositions &positions) { Init(positions); } }; inline Chains &operator>>(Chains &chains, NGramStreams &streams) { ChainPositions positions; chains >> positions; streams.Init(positions); return chains; } }} // namespaces #endif // LM_BUILDER_MULTI_STREAM__