summaryrefslogtreecommitdiff
path: root/klm/util/stream/stream.hh
blob: 7ea1c9f700f36a1f3c15d196a498d6f6a20cc61e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#ifndef UTIL_STREAM_STREAM_H
#define UTIL_STREAM_STREAM_H

#include "util/stream/chain.hh"

#include <boost/noncopyable.hpp>

#include <assert.h>
#include <stdint.h>

namespace util {
namespace stream {

class Stream : boost::noncopyable {
  public:
    Stream() : current_(NULL), end_(NULL) {}

    void Init(const ChainPosition &position) {
      entry_size_ = position.GetChain().EntrySize();
      block_size_ = position.GetChain().BlockSize();
      block_it_.Init(position);
      StartBlock();
    }

    explicit Stream(const ChainPosition &position) {
      Init(position);
    }

    operator bool() const { return current_ != NULL; }
    bool operator!() const { return current_ == NULL; }

    const void *Get() const { return current_; }
    void *Get() { return current_; }

    void Poison() {
      block_it_->SetValidSize(current_ - static_cast<uint8_t*>(block_it_->Get()));
      ++block_it_;
      block_it_.Poison();
    }
    
    Stream &operator++() {
      assert(*this);
      assert(current_ < end_);
      current_ += entry_size_;
      if (current_ == end_) {
        ++block_it_;
        StartBlock();
      }
      return *this;
    }

  private:
    void StartBlock() {
      for (; block_it_ && !block_it_->ValidSize(); ++block_it_) {}
      current_ = static_cast<uint8_t*>(block_it_->Get());
      end_ = current_ + block_it_->ValidSize();
    }

    // The following are pointers to raw memory
    // current_ is the current record
    // end_ is the end of the block (so we know when to move to the next block)
    uint8_t *current_, *end_;

    std::size_t entry_size_;
    std::size_t block_size_;

    Link block_it_;
};

inline Chain &operator>>(Chain &chain, Stream &stream) {
  stream.Init(chain.Add());
  return chain;
}

} // namespace stream
} // namespace util
#endif // UTIL_STREAM_STREAM_H