#ifndef LM_BUILDER_MULTI_STREAM__
#define LM_BUILDER_MULTI_STREAM__

#include "lm/builder/ngram_stream.hh"
#include "util/scoped.hh"
#include "util/stream/chain.hh"

#include <cstddef>
#include <new>

#include <assert.h>
#include <stdlib.h>

namespace lm { namespace builder {

template <class T> class FixedArray {
  public:
    explicit FixedArray(std::size_t count) {
      Init(count);
    }

    FixedArray() : newed_end_(NULL) {}

    void Init(std::size_t count) {
      assert(!block_.get());
      block_.reset(malloc(sizeof(T) * count));
      if (!block_.get()) throw std::bad_alloc();
      newed_end_ = begin();
    }

    FixedArray(const FixedArray &from) {
      std::size_t size = from.newed_end_ - static_cast<const T*>(from.block_.get());
      Init(size);
      for (std::size_t i = 0; i < size; ++i) {
        new(end()) T(from[i]);
        Constructed();
      }
    }

    ~FixedArray() { clear(); }

    T *begin() { return static_cast<T*>(block_.get()); }
    const T *begin() const { return static_cast<const T*>(block_.get()); }
    // Always call Constructed after successful completion of new.  
    T *end() { return newed_end_; }
    const T *end() const { return newed_end_; }

    T &back() { return *(end() - 1); }
    const T &back() const { return *(end() - 1); }

    std::size_t size() const { return end() - begin(); }
    bool empty() const { return begin() == end(); }

    T &operator[](std::size_t i) { return begin()[i]; }
    const T &operator[](std::size_t i) const { return begin()[i]; }

    template <class C> void push_back(const C &c) {
      new (end()) T(c);
      Constructed();
    }

    void clear() {
      for (T *i = begin(); i != end(); ++i)
        i->~T();
      newed_end_ = begin();
    }

  protected:
    void Constructed() {
      ++newed_end_;
    }

  private:
    util::scoped_malloc block_;

    T *newed_end_;
};

class Chains;

class ChainPositions : public FixedArray<util::stream::ChainPosition> {
  public:
    ChainPositions() {}

    void Init(Chains &chains);

    explicit ChainPositions(Chains &chains) {
      Init(chains);
    }
};

class Chains : public FixedArray<util::stream::Chain> {
  private:
    template <class T, void (T::*ptr)(const ChainPositions &) = &T::Run> struct CheckForRun {
      typedef Chains type;
    };

  public:
    explicit Chains(std::size_t limit) : FixedArray<util::stream::Chain>(limit) {}

    template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) {
      threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
      return *this;
    }

    template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) {
      threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
      return *this;
    }

    Chains &operator>>(const util::stream::Recycler &recycler) {
      for (util::stream::Chain *i = begin(); i != end(); ++i) 
        *i >> recycler;
      return *this;
    }

    void Wait(bool release_memory = true) {
      threads_.clear();
      for (util::stream::Chain *i = begin(); i != end(); ++i) {
        i->Wait(release_memory);
      }
    }

  private:
    boost::ptr_vector<util::stream::Thread> threads_;

    Chains(const Chains &);
    void operator=(const Chains &);
};

inline void ChainPositions::Init(Chains &chains) {
  FixedArray<util::stream::ChainPosition>::Init(chains.size());
  for (util::stream::Chain *i = chains.begin(); i != chains.end(); ++i) {
    new (end()) util::stream::ChainPosition(i->Add()); Constructed();
  }
}

inline Chains &operator>>(Chains &chains, ChainPositions &positions) {
  positions.Init(chains);
  return chains;
}

class NGramStreams : public FixedArray<NGramStream> {
  public:
    NGramStreams() {}

    // This puts a dummy NGramStream at the beginning (useful to algorithms that need to reference something at the beginning).
    void InitWithDummy(const ChainPositions &positions) {
      FixedArray<NGramStream>::Init(positions.size() + 1);
      new (end()) NGramStream(); Constructed();
      for (const util::stream::ChainPosition *i = positions.begin(); i != positions.end(); ++i) {
        push_back(*i);
      }
    }

    // Limit restricts to positions[0,limit)
    void Init(const ChainPositions &positions, std::size_t limit) {
      FixedArray<NGramStream>::Init(limit);
      for (const util::stream::ChainPosition *i = positions.begin(); i != positions.begin() + limit; ++i) {
        push_back(*i);
      }
    }
    void Init(const ChainPositions &positions) {
      Init(positions, positions.size());
    }

    NGramStreams(const ChainPositions &positions) {
      Init(positions);
    }
};

inline Chains &operator>>(Chains &chains, NGramStreams &streams) {
  ChainPositions positions;
  chains >> positions;
  streams.Init(positions);
  return chains;
}

}} // namespaces
#endif // LM_BUILDER_MULTI_STREAM__