summaryrefslogtreecommitdiff
path: root/klm/util/stream/multi_stream.hh
blob: 0ee7fab6fbb9374c2ce919be7cc65965322e49fc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#ifndef UTIL_STREAM_MULTI_STREAM_H
#define UTIL_STREAM_MULTI_STREAM_H

#include "util/fixed_array.hh"
#include "util/scoped.hh"
#include "util/stream/chain.hh"
#include "util/stream/stream.hh"

#include <cstddef>
#include <new>

#include <assert.h>
#include <stdlib.h>

namespace util { namespace stream {

class Chains;

class ChainPositions : public util::FixedArray<util::stream::ChainPosition> {
  public:
    ChainPositions() {}

    void Init(Chains &chains);

    explicit ChainPositions(Chains &chains) {
      Init(chains);
    }
};

class Chains : public util::FixedArray<util::stream::Chain> {
  private:
    template <class T, void (T::*ptr)(const ChainPositions &) = &T::Run> struct CheckForRun {
      typedef Chains type;
    };

  public:
    // Must call Init.
    Chains() {}

    explicit Chains(std::size_t limit) : util::FixedArray<util::stream::Chain>(limit) {}

    template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) {
      threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
      return *this;
    }

    template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) {
      threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
      return *this;
    }

    Chains &operator>>(const util::stream::Recycler &recycler) {
      for (util::stream::Chain *i = begin(); i != end(); ++i) 
        *i >> recycler;
      return *this;
    }

    void Wait(bool release_memory = true) {
      threads_.clear();
      for (util::stream::Chain *i = begin(); i != end(); ++i) {
        i->Wait(release_memory);
      }
    }

  private:
    boost::ptr_vector<util::stream::Thread> threads_;

    Chains(const Chains &);
    void operator=(const Chains &);
};

inline void ChainPositions::Init(Chains &chains) {
  util::FixedArray<util::stream::ChainPosition>::Init(chains.size());
  for (util::stream::Chain *i = chains.begin(); i != chains.end(); ++i) {
    // use "placement new" syntax to initalize ChainPosition in an already-allocated memory location
    new (end()) util::stream::ChainPosition(i->Add()); Constructed();
  }
}

inline Chains &operator>>(Chains &chains, ChainPositions &positions) {
  positions.Init(chains);
  return chains;
}

template <class T> class GenericStreams : public util::FixedArray<T> {
  private:
    typedef util::FixedArray<T> P;
  public:
    GenericStreams() {}

    // This puts a dummy T at the beginning (useful to algorithms that need to reference something at the beginning).
    void InitWithDummy(const ChainPositions &positions) {
      P::Init(positions.size() + 1);
      new (P::end()) T(); // use "placement new" syntax to initalize T in an already-allocated memory location
      P::Constructed();
      for (const util::stream::ChainPosition *i = positions.begin(); i != positions.end(); ++i) {
        P::push_back(*i);
      }
    }

    // Limit restricts to positions[0,limit)
    void Init(const ChainPositions &positions, std::size_t limit) {
      P::Init(limit);
      for (const util::stream::ChainPosition *i = positions.begin(); i != positions.begin() + limit; ++i) {
        P::push_back(*i);
      }
    }
    void Init(const ChainPositions &positions) {
      Init(positions, positions.size());
    }

    GenericStreams(const ChainPositions &positions) {
      Init(positions);
    }
};

template <class T> inline Chains &operator>>(Chains &chains, GenericStreams<T> &streams) {
  ChainPositions positions;
  chains >> positions;
  streams.Init(positions);
  return chains;
}

typedef GenericStreams<Stream> Streams;

}} // namespaces
#endif // UTIL_STREAM_MULTI_STREAM_H