diff options
Diffstat (limited to 'klm/util/stream')
| -rw-r--r-- | klm/util/stream/Makefile.am | 1 | ||||
| -rw-r--r-- | klm/util/stream/block.hh | 57 | ||||
| -rw-r--r-- | klm/util/stream/chain.cc | 12 | ||||
| -rw-r--r-- | klm/util/stream/chain.hh | 165 | ||||
| -rw-r--r-- | klm/util/stream/config.hh | 43 | ||||
| -rw-r--r-- | klm/util/stream/io.cc | 14 | ||||
| -rw-r--r-- | klm/util/stream/io.hh | 21 | ||||
| -rw-r--r-- | klm/util/stream/line_input.hh | 6 | ||||
| -rw-r--r-- | klm/util/stream/multi_progress.hh | 6 | ||||
| -rw-r--r-- | klm/util/stream/multi_stream.hh | 127 | ||||
| -rw-r--r-- | klm/util/stream/sort.hh | 14 | ||||
| -rw-r--r-- | klm/util/stream/stream.hh | 9 | ||||
| -rw-r--r-- | klm/util/stream/timer.hh | 6 | 
13 files changed, 433 insertions, 48 deletions
diff --git a/klm/util/stream/Makefile.am b/klm/util/stream/Makefile.am index f18cbedb..25817b50 100644 --- a/klm/util/stream/Makefile.am +++ b/klm/util/stream/Makefile.am @@ -11,6 +11,7 @@ libklm_util_stream_a_SOURCES = \    line_input.hh \    multi_progress.cc \    multi_progress.hh \ +  multi_stream.hh \    sort.hh \    stream.hh \    timer.hh diff --git a/klm/util/stream/block.hh b/klm/util/stream/block.hh index 11aa991e..aa7e28bb 100644 --- a/klm/util/stream/block.hh +++ b/klm/util/stream/block.hh @@ -1,5 +1,5 @@ -#ifndef UTIL_STREAM_BLOCK__ -#define UTIL_STREAM_BLOCK__ +#ifndef UTIL_STREAM_BLOCK_H +#define UTIL_STREAM_BLOCK_H  #include <cstddef>  #include <stdint.h> @@ -7,28 +7,77 @@  namespace util {  namespace stream { +/** + * Encapsulates a block of memory. + */  class Block {    public: +   +    /**  +     * Constructs an empty block.  +     */      Block() : mem_(NULL), valid_size_(0) {} +    /**  +     * Constructs a block that encapsulates a segment of memory. +     * +     * @param[in] mem  The segment of memory to encapsulate +     * @param[in] size The size of the memory segment in bytes +     */      Block(void *mem, std::size_t size) : mem_(mem), valid_size_(size) {} +    /** +     * Set the number of bytes in this block that should be interpreted as valid. +     * +     * @param[in] to Number of bytes +     */      void SetValidSize(std::size_t to) { valid_size_ = to; } -    // Read might fill in less than Allocated at EOF.    + +    /**  +     * Gets the number of bytes in this block that should be interpreted as valid. +     * This is important because read might fill in less than Allocated at EOF.  +     */      std::size_t ValidSize() const { return valid_size_; } +    /** Gets a void pointer to the memory underlying this block. */      void *Get() { return mem_; } + +    /** Gets a const void pointer to the memory underlying this block. */      const void *Get() const { return mem_; } +   +    /** +     * Gets a const void pointer to the end of the valid section of memory +     * encapsulated by this block. +     */      const void *ValidEnd() const {         return reinterpret_cast<const uint8_t*>(mem_) + valid_size_;      } +    /** +     * Returns true if this block encapsulates a valid (non-NULL) block of memory. +     *  +     * This method is a user-defined implicit conversion function to boolean; +     * among other things, this method enables bare instances of this class  +     * to be used as the condition of an if statement. +     */      operator bool() const { return mem_ != NULL; } +   +    /** +     * Returns true if this block is empty. +     *  +     * In other words, if Get()==NULL, this method will return true. +     */      bool operator!() const { return mem_ == NULL; }    private:      friend class Link; +   +    /** +     * Points this block's memory at NULL. +     * +     * This class defines poison as a block whose memory pointer is NULL. +     */      void SetToPoison() {        mem_ = NULL;      } @@ -40,4 +89,4 @@ class Block {  } // namespace stream  } // namespace util -#endif // UTIL_STREAM_BLOCK__ +#endif // UTIL_STREAM_BLOCK_H diff --git a/klm/util/stream/chain.cc b/klm/util/stream/chain.cc index 46708c60..4596af7a 100644 --- a/klm/util/stream/chain.cc +++ b/klm/util/stream/chain.cc @@ -59,6 +59,11 @@ Chain &Chain::operator>>(const WriteAndRecycle &writer) {    return *this;  } +Chain &Chain::operator>>(const PWriteAndRecycle &writer) { +  threads_.push_back(new Thread(Complete(), writer)); +  return *this; +} +  void Chain::Wait(bool release_memory) {    if (queues_.empty()) {      assert(threads_.empty()); @@ -126,7 +131,12 @@ Link::~Link() {  //    abort();    } else {      if (!poisoned_) { -      // Pass the poison! +      // Poison is a block whose memory pointer is NULL. +      // +      // Because we're in the else block, +      //   we know that the memory pointer of current_ is NULL. +      // +      // Pass the current (poison) block!        out_->Produce(current_);      }    } diff --git a/klm/util/stream/chain.hh b/klm/util/stream/chain.hh index 0cc83a85..50865086 100644 --- a/klm/util/stream/chain.hh +++ b/klm/util/stream/chain.hh @@ -1,5 +1,5 @@ -#ifndef UTIL_STREAM_CHAIN__ -#define UTIL_STREAM_CHAIN__ +#ifndef UTIL_STREAM_CHAIN_H +#define UTIL_STREAM_CHAIN_H  #include "util/stream/block.hh"  #include "util/stream/config.hh" @@ -24,7 +24,12 @@ class ChainConfigException : public Exception {  };  class Chain; -// Specifies position in chain for Link constructor. +   +/** + * Encapsulates a @ref PCQueue "producer queue" and a @ref PCQueue "consumer queue" within a @ref Chain "chain". + *  + * Specifies position in chain for Link constructor. + */  class ChainPosition {    public:      const Chain &GetChain() const { return *chain_; } @@ -41,14 +46,32 @@ class ChainPosition {      WorkerProgress progress_;  }; -// Position is usually ChainPosition but if there are multiple streams involved, this can be ChainPositions.   +  +/** + * Encapsulates a worker thread processing data at a given position in the chain. + * + * Each instance of this class owns one boost thread in which the worker is Run(). + */  class Thread {    public: +     +    /** +     * Constructs a new Thread in which the provided Worker is Run(). +     * +     * Position is usually ChainPosition but if there are multiple streams involved, this can be ChainPositions. +     * +     * After a call to this constructor, the provided worker will be running within a boost thread owned by the newly constructed Thread object. +     */      template <class Position, class Worker> Thread(const Position &position, const Worker &worker)        : thread_(boost::ref(*this), position, worker) {}      ~Thread(); +    /** +     * Launches the provided worker in this object's boost thread. +     * +     * This method is called automatically by this class's @ref Thread() "constructor". +     */      template <class Position, class Worker> void operator()(const Position &position, Worker &worker) {        try {          worker.Run(position); @@ -63,14 +86,27 @@ class Thread {      boost::thread thread_;  }; +/** + * This resets blocks to full valid size.  Used to close the loop in Chain by recycling blocks. + */  class Recycler {    public: +    /** +     * Resets the blocks in the chain such that the blocks' respective valid sizes match the chain's block size. +     * +     * @see Block::SetValidSize() +     * @see Chain::BlockSize() +     */      void Run(const ChainPosition &position);  };  extern const Recycler kRecycle;  class WriteAndRecycle; - +class PWriteAndRecycle; +   +/** + * Represents a sequence of workers, through which @ref Block "blocks" can pass. + */  class Chain {    private:      template <class T, void (T::*ptr)(const ChainPosition &) = &T::Run> struct CheckForRun { @@ -78,8 +114,20 @@ class Chain {      };    public: +   +    /**  +     * Constructs a configured Chain. +     *  +     * @param config Specifies how to configure the Chain. +     */      explicit Chain(const ChainConfig &config); +    /** +     * Destructs a Chain. +     * +     * This method waits for the chain's threads to complete, +     * and frees the memory held by this chain. +     */      ~Chain();      void ActivateProgress() { @@ -91,24 +139,49 @@ class Chain {        progress_.SetTarget(target);      } +    /** +     * Gets the number of bytes in each record of a Block. +     * +     * @see ChainConfig::entry_size +     */      std::size_t EntrySize() const {        return config_.entry_size;      } +   +    /** +     * Gets the inital @ref Block::ValidSize "valid size" for @ref Block "blocks" in this chain. +     * +     * @see Block::ValidSize +     */      std::size_t BlockSize() const {        return block_size_;      } -    // Two ways to add to the chain: Add() or operator>>.   +    /** Two ways to add to the chain: Add() or operator>>. */      ChainPosition Add(); -    // This is for adding threaded workers with a Run method.   +    /**  +     * Adds a new worker to this chain, +     * and runs that worker in a new Thread owned by this chain. +     *  +     * The worker must have a Run method that accepts a position argument. +     * +     * @see Thread::operator()() +     */      template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) {        assert(!complete_called_);        threads_.push_back(new Thread(Add(), worker));        return *this;      } -    // Avoid copying the worker.   +  /**  +   * Adds a new worker to this chain (but avoids copying that worker), +   * and runs that worker in a new Thread owned by this chain. +   *  +   * The worker must have a Run method that accepts a position argument. +   * +   * @see Thread::operator()() +   */      template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) {        assert(!complete_called_);        threads_.push_back(new Thread(Add(), worker)); @@ -122,12 +195,21 @@ class Chain {        threads_.push_back(new Thread(Complete(), kRecycle));      } +    /**  +     * Adds a Recycler worker to this chain, +     * and runs that worker in a new Thread owned by this chain. +     */      Chain &operator>>(const Recycler &) {        CompleteLoop();        return *this;      } +    /**  +     * Adds a WriteAndRecycle worker to this chain, +     * and runs that worker in a new Thread owned by this chain. +     */      Chain &operator>>(const WriteAndRecycle &writer); +    Chain &operator>>(const PWriteAndRecycle &writer);      // Chains are reusable.  Call Wait to wait for everything to finish and free memory.        void Wait(bool release_memory = true); @@ -156,28 +238,87 @@ class Chain {  };  // Create the link in the worker thread using the position token. +/**  + * Represents a C++ style iterator over @ref Block "blocks". + */  class Link {    public: +        // Either default construct and Init or just construct all at once. +   +    /** +     * Constructs an @ref Init "initialized" link. +     * +     * @see Init +     */ +    explicit Link(const ChainPosition &position); +   +    /**  +     * Constructs a link that must subsequently be @ref Init "initialized".  +     * +     * @see Init +     */      Link(); +   +    /**  +     * Initializes the link with the input @ref PCQueue "consumer queue" and output @ref PCQueue "producer queue" at a given @ref ChainPosition "position" in the @ref Chain "chain". +     * +     * @see Link() +     */      void Init(const ChainPosition &position); -    explicit Link(const ChainPosition &position); - +    /** +     * Destructs the link object. +     * +     * If necessary, this method will pass a poison block  +     * to this link's output @ref PCQueue "producer queue". +     * +     * @see Block::SetToPoison() +     */      ~Link(); +    /** +     * Gets a reference to the @ref Block "block" at this link. +     */      Block &operator*() { return current_; } + +    /** +     * Gets a const reference to the @ref Block "block" at this link. +     */      const Block &operator*() const { return current_; } +    /** +     * Gets a pointer to the @ref Block "block" at this link. +     */      Block *operator->() { return ¤t_; } +   +    /** +     * Gets a const pointer to the @ref Block "block" at this link. +     */      const Block *operator->() const { return ¤t_; } +    /** +     * Gets the link at the next @ref ChainPosition "position" in the @ref Chain "chain". +     */      Link &operator++(); +    /** +     * Returns true if the @ref Block "block" at this link encapsulates a valid (non-NULL) block of memory. +     *  +     * This method is a user-defined implicit conversion function to boolean; +     * among other things, this method enables bare instances of this class  +     * to be used as the condition of an if statement. +     */      operator bool() const { return current_; } +    /**  +     * @ref Block::SetToPoison() "Poisons" the @ref Block "block" at this link, +     * and passes this now-poisoned block to this link's output @ref PCQueue "producer queue". +     * +     * @see Block::SetToPoison() +     */      void Poison(); - +      private:      Block current_;      PCQueue<Block> *in_, *out_; @@ -195,4 +336,4 @@ inline Chain &operator>>(Chain &chain, Link &link) {  } // namespace stream  } // namespace util -#endif // UTIL_STREAM_CHAIN__ +#endif // UTIL_STREAM_CHAIN_H diff --git a/klm/util/stream/config.hh b/klm/util/stream/config.hh index 1eeb3a8a..6bad36bc 100644 --- a/klm/util/stream/config.hh +++ b/klm/util/stream/config.hh @@ -1,32 +1,63 @@ -#ifndef UTIL_STREAM_CONFIG__ -#define UTIL_STREAM_CONFIG__ +#ifndef UTIL_STREAM_CONFIG_H +#define UTIL_STREAM_CONFIG_H  #include <cstddef>  #include <string>  namespace util { namespace stream { +/** + * Represents how a chain should be configured. + */  struct ChainConfig { +   +  /** Constructs an configuration with underspecified (or default) parameters. */    ChainConfig() {} +  /**  +   * Constructs a chain configuration object. +   * +   * @param [in] in_entry_size   Number of bytes in each record. +   * @param [in] in_block_count  Number of blocks in the chain. +   * @param [in] in_total_memory Total number of bytes available to the chain. +   *             This value will be divided amongst the blocks in the chain. +   */    ChainConfig(std::size_t in_entry_size, std::size_t in_block_count, std::size_t in_total_memory)      : entry_size(in_entry_size), block_count(in_block_count), total_memory(in_total_memory) {} +  /** +   * Number of bytes in each record. +   */    std::size_t entry_size; +   +  /** +   * Number of blocks in the chain. +   */    std::size_t block_count; -  // Chain's constructor will make this a multiple of entry_size.  +   +  /**  +   * Total number of bytes available to the chain. +   * This value will be divided amongst the blocks in the chain. +   * Chain's constructor will make this a multiple of entry_size.  +   */    std::size_t total_memory;  }; +   +/** + * Represents how a sorter should be configured. + */  struct SortConfig { +   +  /** Filename prefix where temporary files should be placed. */    std::string temp_prefix; -  // Size of each input/output buffer. +  /** Size of each input/output buffer. */    std::size_t buffer_size; -  // Total memory to use when running alone. +  /** Total memory to use when running alone. */    std::size_t total_memory;  };  }} // namespaces -#endif // UTIL_STREAM_CONFIG__ +#endif // UTIL_STREAM_CONFIG_H diff --git a/klm/util/stream/io.cc b/klm/util/stream/io.cc index 0459f706..c64004c0 100644 --- a/klm/util/stream/io.cc +++ b/klm/util/stream/io.cc @@ -36,12 +36,12 @@ void PRead::Run(const ChainPosition &position) {    Link link(position);    uint64_t offset = 0;    for (; offset + block_size64 < size; offset += block_size64, ++link) { -    PReadOrThrow(file_, link->Get(), block_size, offset); +    ErsatzPRead(file_, link->Get(), block_size, offset);      link->SetValidSize(block_size);    }    // size - offset is <= block_size, so it casts to 32-bit fine.    if (size - offset) { -    PReadOrThrow(file_, link->Get(), size - offset, offset); +    ErsatzPRead(file_, link->Get(), size - offset, offset);      link->SetValidSize(size - offset);      ++link;    } @@ -62,5 +62,15 @@ void WriteAndRecycle::Run(const ChainPosition &position) {    }  } +void PWriteAndRecycle::Run(const ChainPosition &position) { +  const std::size_t block_size = position.GetChain().BlockSize(); +  uint64_t offset = 0; +  for (Link link(position); link; ++link) { +    ErsatzPWrite(file_, link->Get(), link->ValidSize(), offset); +    offset += link->ValidSize(); +    link->SetValidSize(block_size); +  } +} +  } // namespace stream  } // namespace util diff --git a/klm/util/stream/io.hh b/klm/util/stream/io.hh index 934b6b3f..8dae2cbf 100644 --- a/klm/util/stream/io.hh +++ b/klm/util/stream/io.hh @@ -1,5 +1,5 @@ -#ifndef UTIL_STREAM_IO__ -#define UTIL_STREAM_IO__ +#ifndef UTIL_STREAM_IO_H +#define UTIL_STREAM_IO_H  #include "util/exception.hh"  #include "util/file.hh" @@ -41,6 +41,8 @@ class Write {      int file_;  }; +// It's a common case that stuff is written and then recycled.  So rather than +// spawn another thread to Recycle, this combines the two roles.  class WriteAndRecycle {    public:      explicit WriteAndRecycle(int fd) : file_(fd) {} @@ -49,14 +51,23 @@ class WriteAndRecycle {      int file_;  }; +class PWriteAndRecycle { +  public: +    explicit PWriteAndRecycle(int fd) : file_(fd) {} +    void Run(const ChainPosition &position); +  private: +    int file_; +}; + +  // Reuse the same file over and over again to buffer output.    class FileBuffer {    public:      explicit FileBuffer(int fd) : file_(fd) {} -    WriteAndRecycle Sink() const { +    PWriteAndRecycle Sink() const {        util::SeekOrThrow(file_.get(), 0); -      return WriteAndRecycle(file_.get()); +      return PWriteAndRecycle(file_.get());      }      PRead Source() const { @@ -73,4 +84,4 @@ class FileBuffer {  } // namespace stream  } // namespace util -#endif // UTIL_STREAM_IO__ +#endif // UTIL_STREAM_IO_H diff --git a/klm/util/stream/line_input.hh b/klm/util/stream/line_input.hh index 86db1dd0..a870a664 100644 --- a/klm/util/stream/line_input.hh +++ b/klm/util/stream/line_input.hh @@ -1,5 +1,5 @@ -#ifndef UTIL_STREAM_LINE_INPUT__ -#define UTIL_STREAM_LINE_INPUT__ +#ifndef UTIL_STREAM_LINE_INPUT_H +#define UTIL_STREAM_LINE_INPUT_H  namespace util {namespace stream {  class ChainPosition; @@ -19,4 +19,4 @@ class LineInput {  };  }} // namespaces -#endif // UTIL_STREAM_LINE_INPUT__ +#endif // UTIL_STREAM_LINE_INPUT_H diff --git a/klm/util/stream/multi_progress.hh b/klm/util/stream/multi_progress.hh index c4dd45a9..82e698a5 100644 --- a/klm/util/stream/multi_progress.hh +++ b/klm/util/stream/multi_progress.hh @@ -1,6 +1,6 @@  /* Progress bar suitable for chains of workers */ -#ifndef UTIL_MULTI_PROGRESS__ -#define UTIL_MULTI_PROGRESS__ +#ifndef UTIL_STREAM_MULTI_PROGRESS_H +#define UTIL_STREAM_MULTI_PROGRESS_H  #include <boost/thread/mutex.hpp> @@ -87,4 +87,4 @@ class WorkerProgress {  }} // namespaces -#endif // UTIL_MULTI_PROGRESS__ +#endif // UTIL_STREAM_MULTI_PROGRESS_H diff --git a/klm/util/stream/multi_stream.hh b/klm/util/stream/multi_stream.hh new file mode 100644 index 00000000..0ee7fab6 --- /dev/null +++ b/klm/util/stream/multi_stream.hh @@ -0,0 +1,127 @@ +#ifndef UTIL_STREAM_MULTI_STREAM_H +#define UTIL_STREAM_MULTI_STREAM_H + +#include "util/fixed_array.hh" +#include "util/scoped.hh" +#include "util/stream/chain.hh" +#include "util/stream/stream.hh" + +#include <cstddef> +#include <new> + +#include <assert.h> +#include <stdlib.h> + +namespace util { namespace stream { + +class Chains; + +class ChainPositions : public util::FixedArray<util::stream::ChainPosition> { +  public: +    ChainPositions() {} + +    void Init(Chains &chains); + +    explicit ChainPositions(Chains &chains) { +      Init(chains); +    } +}; + +class Chains : public util::FixedArray<util::stream::Chain> { +  private: +    template <class T, void (T::*ptr)(const ChainPositions &) = &T::Run> struct CheckForRun { +      typedef Chains type; +    }; + +  public: +    // Must call Init. +    Chains() {} + +    explicit Chains(std::size_t limit) : util::FixedArray<util::stream::Chain>(limit) {} + +    template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) { +      threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker)); +      return *this; +    } + +    template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) { +      threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker)); +      return *this; +    } + +    Chains &operator>>(const util::stream::Recycler &recycler) { +      for (util::stream::Chain *i = begin(); i != end(); ++i)  +        *i >> recycler; +      return *this; +    } + +    void Wait(bool release_memory = true) { +      threads_.clear(); +      for (util::stream::Chain *i = begin(); i != end(); ++i) { +        i->Wait(release_memory); +      } +    } + +  private: +    boost::ptr_vector<util::stream::Thread> threads_; + +    Chains(const Chains &); +    void operator=(const Chains &); +}; + +inline void ChainPositions::Init(Chains &chains) { +  util::FixedArray<util::stream::ChainPosition>::Init(chains.size()); +  for (util::stream::Chain *i = chains.begin(); i != chains.end(); ++i) { +    // use "placement new" syntax to initalize ChainPosition in an already-allocated memory location +    new (end()) util::stream::ChainPosition(i->Add()); Constructed(); +  } +} + +inline Chains &operator>>(Chains &chains, ChainPositions &positions) { +  positions.Init(chains); +  return chains; +} + +template <class T> class GenericStreams : public util::FixedArray<T> { +  private: +    typedef util::FixedArray<T> P; +  public: +    GenericStreams() {} + +    // This puts a dummy T at the beginning (useful to algorithms that need to reference something at the beginning). +    void InitWithDummy(const ChainPositions &positions) { +      P::Init(positions.size() + 1); +      new (P::end()) T(); // use "placement new" syntax to initalize T in an already-allocated memory location +      P::Constructed(); +      for (const util::stream::ChainPosition *i = positions.begin(); i != positions.end(); ++i) { +        P::push_back(*i); +      } +    } + +    // Limit restricts to positions[0,limit) +    void Init(const ChainPositions &positions, std::size_t limit) { +      P::Init(limit); +      for (const util::stream::ChainPosition *i = positions.begin(); i != positions.begin() + limit; ++i) { +        P::push_back(*i); +      } +    } +    void Init(const ChainPositions &positions) { +      Init(positions, positions.size()); +    } + +    GenericStreams(const ChainPositions &positions) { +      Init(positions); +    } +}; + +template <class T> inline Chains &operator>>(Chains &chains, GenericStreams<T> &streams) { +  ChainPositions positions; +  chains >> positions; +  streams.Init(positions); +  return chains; +} + +typedef GenericStreams<Stream> Streams; + +}} // namespaces +#endif // UTIL_STREAM_MULTI_STREAM_H diff --git a/klm/util/stream/sort.hh b/klm/util/stream/sort.hh index 16aa6a03..9082cfdd 100644 --- a/klm/util/stream/sort.hh +++ b/klm/util/stream/sort.hh @@ -15,8 +15,8 @@   * sort.  Use a hash table for that.     */ -#ifndef UTIL_STREAM_SORT__ -#define UTIL_STREAM_SORT__ +#ifndef UTIL_STREAM_SORT_H +#define UTIL_STREAM_SORT_H  #include "util/stream/chain.hh"  #include "util/stream/config.hh" @@ -182,7 +182,7 @@ template <class Compare> class MergeQueue {              amount = remaining_;              buffer_end_ = current_ + remaining_;            } -          PReadOrThrow(fd, current_, amount, offset_); +          ErsatzPRead(fd, current_, amount, offset_);            offset_ += amount;            assert(current_ <= buffer_end_);            remaining_ -= amount; @@ -307,10 +307,10 @@ template <class Compare, class Combine> class MergingReader {        const uint64_t block_size = position.GetChain().BlockSize();        Link l(position);        for (; offset + block_size < end; ++l, offset += block_size) { -        PReadOrThrow(in_, l->Get(), block_size, offset); +        ErsatzPRead(in_, l->Get(), block_size, offset);          l->SetValidSize(block_size);        } -      PReadOrThrow(in_, l->Get(), end - offset, offset); +      ErsatzPRead(in_, l->Get(), end - offset, offset);        l->SetValidSize(end - offset);        (++l).Poison();        return; @@ -388,8 +388,10 @@ class BadSortConfig : public Exception {      ~BadSortConfig() throw() {}  }; +/** Sort */  template <class Compare, class Combine = NeverCombine> class Sort {    public: +    /** Constructs an object capable of sorting */      Sort(Chain &in, const SortConfig &config, const Compare &compare = Compare(), const Combine &combine = Combine())        : config_(config),          data_(MakeTemp(config.temp_prefix)), @@ -545,4 +547,4 @@ template <class Compare, class Combine> uint64_t BlockingSort(Chain &chain, cons  } // namespace stream  } // namespace util -#endif // UTIL_STREAM_SORT__ +#endif // UTIL_STREAM_SORT_H diff --git a/klm/util/stream/stream.hh b/klm/util/stream/stream.hh index 6ff45b82..7ea1c9f7 100644 --- a/klm/util/stream/stream.hh +++ b/klm/util/stream/stream.hh @@ -1,5 +1,5 @@ -#ifndef UTIL_STREAM_STREAM__ -#define UTIL_STREAM_STREAM__ +#ifndef UTIL_STREAM_STREAM_H +#define UTIL_STREAM_STREAM_H  #include "util/stream/chain.hh" @@ -56,6 +56,9 @@ class Stream : boost::noncopyable {        end_ = current_ + block_it_->ValidSize();      } +    // The following are pointers to raw memory +    // current_ is the current record +    // end_ is the end of the block (so we know when to move to the next block)      uint8_t *current_, *end_;      std::size_t entry_size_; @@ -71,4 +74,4 @@ inline Chain &operator>>(Chain &chain, Stream &stream) {  } // namespace stream  } // namespace util -#endif // UTIL_STREAM_STREAM__ +#endif // UTIL_STREAM_STREAM_H diff --git a/klm/util/stream/timer.hh b/klm/util/stream/timer.hh index 7e1a5885..06488a17 100644 --- a/klm/util/stream/timer.hh +++ b/klm/util/stream/timer.hh @@ -1,5 +1,5 @@ -#ifndef UTIL_STREAM_TIMER__ -#define UTIL_STREAM_TIMER__ +#ifndef UTIL_STREAM_TIMER_H +#define UTIL_STREAM_TIMER_H  // Sorry Jon, this was adding library dependencies in Moses and people complained. @@ -13,4 +13,4 @@  #define UTIL_TIMER(str)   //#endif -#endif // UTIL_STREAM_TIMER__ +#endif // UTIL_STREAM_TIMER_H  | 
