summaryrefslogtreecommitdiff
path: root/klm/util/stream
diff options
context:
space:
mode:
authorWu, Ke <wuke@cs.umd.edu>2014-12-17 16:11:38 -0500
committerWu, Ke <wuke@cs.umd.edu>2014-12-17 16:11:38 -0500
commit7468e8d85e99b4619442c7afaf4a0d92870111bb (patch)
treea6f17da7c69048c8900260b5490bb9d8611be3bb /klm/util/stream
parentb6dd5a683db9dda2d634dd2fdb76606819594901 (diff)
parent1a79175f9a101d46cf27ca921213d5dd9300518f (diff)
Merge with upstream
Diffstat (limited to 'klm/util/stream')
-rw-r--r--klm/util/stream/Makefile.am1
-rw-r--r--klm/util/stream/block.hh57
-rw-r--r--klm/util/stream/chain.cc12
-rw-r--r--klm/util/stream/chain.hh165
-rw-r--r--klm/util/stream/config.hh43
-rw-r--r--klm/util/stream/io.cc14
-rw-r--r--klm/util/stream/io.hh21
-rw-r--r--klm/util/stream/line_input.hh6
-rw-r--r--klm/util/stream/multi_progress.hh6
-rw-r--r--klm/util/stream/multi_stream.hh127
-rw-r--r--klm/util/stream/sort.hh14
-rw-r--r--klm/util/stream/stream.hh9
-rw-r--r--klm/util/stream/timer.hh6
13 files changed, 433 insertions, 48 deletions
diff --git a/klm/util/stream/Makefile.am b/klm/util/stream/Makefile.am
index f18cbedb..25817b50 100644
--- a/klm/util/stream/Makefile.am
+++ b/klm/util/stream/Makefile.am
@@ -11,6 +11,7 @@ libklm_util_stream_a_SOURCES = \
line_input.hh \
multi_progress.cc \
multi_progress.hh \
+ multi_stream.hh \
sort.hh \
stream.hh \
timer.hh
diff --git a/klm/util/stream/block.hh b/klm/util/stream/block.hh
index 11aa991e..aa7e28bb 100644
--- a/klm/util/stream/block.hh
+++ b/klm/util/stream/block.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_STREAM_BLOCK__
-#define UTIL_STREAM_BLOCK__
+#ifndef UTIL_STREAM_BLOCK_H
+#define UTIL_STREAM_BLOCK_H
#include <cstddef>
#include <stdint.h>
@@ -7,28 +7,77 @@
namespace util {
namespace stream {
+/**
+ * Encapsulates a block of memory.
+ */
class Block {
public:
+
+ /**
+ * Constructs an empty block.
+ */
Block() : mem_(NULL), valid_size_(0) {}
+ /**
+ * Constructs a block that encapsulates a segment of memory.
+ *
+ * @param[in] mem The segment of memory to encapsulate
+ * @param[in] size The size of the memory segment in bytes
+ */
Block(void *mem, std::size_t size) : mem_(mem), valid_size_(size) {}
+ /**
+ * Set the number of bytes in this block that should be interpreted as valid.
+ *
+ * @param[in] to Number of bytes
+ */
void SetValidSize(std::size_t to) { valid_size_ = to; }
- // Read might fill in less than Allocated at EOF.
+
+ /**
+ * Gets the number of bytes in this block that should be interpreted as valid.
+ * This is important because read might fill in less than Allocated at EOF.
+ */
std::size_t ValidSize() const { return valid_size_; }
+ /** Gets a void pointer to the memory underlying this block. */
void *Get() { return mem_; }
+
+ /** Gets a const void pointer to the memory underlying this block. */
const void *Get() const { return mem_; }
+
+ /**
+ * Gets a const void pointer to the end of the valid section of memory
+ * encapsulated by this block.
+ */
const void *ValidEnd() const {
return reinterpret_cast<const uint8_t*>(mem_) + valid_size_;
}
+ /**
+ * Returns true if this block encapsulates a valid (non-NULL) block of memory.
+ *
+ * This method is a user-defined implicit conversion function to boolean;
+ * among other things, this method enables bare instances of this class
+ * to be used as the condition of an if statement.
+ */
operator bool() const { return mem_ != NULL; }
+
+ /**
+ * Returns true if this block is empty.
+ *
+ * In other words, if Get()==NULL, this method will return true.
+ */
bool operator!() const { return mem_ == NULL; }
private:
friend class Link;
+
+ /**
+ * Points this block's memory at NULL.
+ *
+ * This class defines poison as a block whose memory pointer is NULL.
+ */
void SetToPoison() {
mem_ = NULL;
}
@@ -40,4 +89,4 @@ class Block {
} // namespace stream
} // namespace util
-#endif // UTIL_STREAM_BLOCK__
+#endif // UTIL_STREAM_BLOCK_H
diff --git a/klm/util/stream/chain.cc b/klm/util/stream/chain.cc
index 46708c60..4596af7a 100644
--- a/klm/util/stream/chain.cc
+++ b/klm/util/stream/chain.cc
@@ -59,6 +59,11 @@ Chain &Chain::operator>>(const WriteAndRecycle &writer) {
return *this;
}
+Chain &Chain::operator>>(const PWriteAndRecycle &writer) {
+ threads_.push_back(new Thread(Complete(), writer));
+ return *this;
+}
+
void Chain::Wait(bool release_memory) {
if (queues_.empty()) {
assert(threads_.empty());
@@ -126,7 +131,12 @@ Link::~Link() {
// abort();
} else {
if (!poisoned_) {
- // Pass the poison!
+ // Poison is a block whose memory pointer is NULL.
+ //
+ // Because we're in the else block,
+ // we know that the memory pointer of current_ is NULL.
+ //
+ // Pass the current (poison) block!
out_->Produce(current_);
}
}
diff --git a/klm/util/stream/chain.hh b/klm/util/stream/chain.hh
index 0cc83a85..50865086 100644
--- a/klm/util/stream/chain.hh
+++ b/klm/util/stream/chain.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_STREAM_CHAIN__
-#define UTIL_STREAM_CHAIN__
+#ifndef UTIL_STREAM_CHAIN_H
+#define UTIL_STREAM_CHAIN_H
#include "util/stream/block.hh"
#include "util/stream/config.hh"
@@ -24,7 +24,12 @@ class ChainConfigException : public Exception {
};
class Chain;
-// Specifies position in chain for Link constructor.
+
+/**
+ * Encapsulates a @ref PCQueue "producer queue" and a @ref PCQueue "consumer queue" within a @ref Chain "chain".
+ *
+ * Specifies position in chain for Link constructor.
+ */
class ChainPosition {
public:
const Chain &GetChain() const { return *chain_; }
@@ -41,14 +46,32 @@ class ChainPosition {
WorkerProgress progress_;
};
-// Position is usually ChainPosition but if there are multiple streams involved, this can be ChainPositions.
+
+/**
+ * Encapsulates a worker thread processing data at a given position in the chain.
+ *
+ * Each instance of this class owns one boost thread in which the worker is Run().
+ */
class Thread {
public:
+
+ /**
+ * Constructs a new Thread in which the provided Worker is Run().
+ *
+ * Position is usually ChainPosition but if there are multiple streams involved, this can be ChainPositions.
+ *
+ * After a call to this constructor, the provided worker will be running within a boost thread owned by the newly constructed Thread object.
+ */
template <class Position, class Worker> Thread(const Position &position, const Worker &worker)
: thread_(boost::ref(*this), position, worker) {}
~Thread();
+ /**
+ * Launches the provided worker in this object's boost thread.
+ *
+ * This method is called automatically by this class's @ref Thread() "constructor".
+ */
template <class Position, class Worker> void operator()(const Position &position, Worker &worker) {
try {
worker.Run(position);
@@ -63,14 +86,27 @@ class Thread {
boost::thread thread_;
};
+/**
+ * This resets blocks to full valid size. Used to close the loop in Chain by recycling blocks.
+ */
class Recycler {
public:
+ /**
+ * Resets the blocks in the chain such that the blocks' respective valid sizes match the chain's block size.
+ *
+ * @see Block::SetValidSize()
+ * @see Chain::BlockSize()
+ */
void Run(const ChainPosition &position);
};
extern const Recycler kRecycle;
class WriteAndRecycle;
-
+class PWriteAndRecycle;
+
+/**
+ * Represents a sequence of workers, through which @ref Block "blocks" can pass.
+ */
class Chain {
private:
template <class T, void (T::*ptr)(const ChainPosition &) = &T::Run> struct CheckForRun {
@@ -78,8 +114,20 @@ class Chain {
};
public:
+
+ /**
+ * Constructs a configured Chain.
+ *
+ * @param config Specifies how to configure the Chain.
+ */
explicit Chain(const ChainConfig &config);
+ /**
+ * Destructs a Chain.
+ *
+ * This method waits for the chain's threads to complete,
+ * and frees the memory held by this chain.
+ */
~Chain();
void ActivateProgress() {
@@ -91,24 +139,49 @@ class Chain {
progress_.SetTarget(target);
}
+ /**
+ * Gets the number of bytes in each record of a Block.
+ *
+ * @see ChainConfig::entry_size
+ */
std::size_t EntrySize() const {
return config_.entry_size;
}
+
+ /**
+ * Gets the inital @ref Block::ValidSize "valid size" for @ref Block "blocks" in this chain.
+ *
+ * @see Block::ValidSize
+ */
std::size_t BlockSize() const {
return block_size_;
}
- // Two ways to add to the chain: Add() or operator>>.
+ /** Two ways to add to the chain: Add() or operator>>. */
ChainPosition Add();
- // This is for adding threaded workers with a Run method.
+ /**
+ * Adds a new worker to this chain,
+ * and runs that worker in a new Thread owned by this chain.
+ *
+ * The worker must have a Run method that accepts a position argument.
+ *
+ * @see Thread::operator()()
+ */
template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) {
assert(!complete_called_);
threads_.push_back(new Thread(Add(), worker));
return *this;
}
- // Avoid copying the worker.
+ /**
+ * Adds a new worker to this chain (but avoids copying that worker),
+ * and runs that worker in a new Thread owned by this chain.
+ *
+ * The worker must have a Run method that accepts a position argument.
+ *
+ * @see Thread::operator()()
+ */
template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) {
assert(!complete_called_);
threads_.push_back(new Thread(Add(), worker));
@@ -122,12 +195,21 @@ class Chain {
threads_.push_back(new Thread(Complete(), kRecycle));
}
+ /**
+ * Adds a Recycler worker to this chain,
+ * and runs that worker in a new Thread owned by this chain.
+ */
Chain &operator>>(const Recycler &) {
CompleteLoop();
return *this;
}
+ /**
+ * Adds a WriteAndRecycle worker to this chain,
+ * and runs that worker in a new Thread owned by this chain.
+ */
Chain &operator>>(const WriteAndRecycle &writer);
+ Chain &operator>>(const PWriteAndRecycle &writer);
// Chains are reusable. Call Wait to wait for everything to finish and free memory.
void Wait(bool release_memory = true);
@@ -156,28 +238,87 @@ class Chain {
};
// Create the link in the worker thread using the position token.
+/**
+ * Represents a C++ style iterator over @ref Block "blocks".
+ */
class Link {
public:
+
// Either default construct and Init or just construct all at once.
+
+ /**
+ * Constructs an @ref Init "initialized" link.
+ *
+ * @see Init
+ */
+ explicit Link(const ChainPosition &position);
+
+ /**
+ * Constructs a link that must subsequently be @ref Init "initialized".
+ *
+ * @see Init
+ */
Link();
+
+ /**
+ * Initializes the link with the input @ref PCQueue "consumer queue" and output @ref PCQueue "producer queue" at a given @ref ChainPosition "position" in the @ref Chain "chain".
+ *
+ * @see Link()
+ */
void Init(const ChainPosition &position);
- explicit Link(const ChainPosition &position);
-
+ /**
+ * Destructs the link object.
+ *
+ * If necessary, this method will pass a poison block
+ * to this link's output @ref PCQueue "producer queue".
+ *
+ * @see Block::SetToPoison()
+ */
~Link();
+ /**
+ * Gets a reference to the @ref Block "block" at this link.
+ */
Block &operator*() { return current_; }
+
+ /**
+ * Gets a const reference to the @ref Block "block" at this link.
+ */
const Block &operator*() const { return current_; }
+ /**
+ * Gets a pointer to the @ref Block "block" at this link.
+ */
Block *operator->() { return &current_; }
+
+ /**
+ * Gets a const pointer to the @ref Block "block" at this link.
+ */
const Block *operator->() const { return &current_; }
+ /**
+ * Gets the link at the next @ref ChainPosition "position" in the @ref Chain "chain".
+ */
Link &operator++();
+ /**
+ * Returns true if the @ref Block "block" at this link encapsulates a valid (non-NULL) block of memory.
+ *
+ * This method is a user-defined implicit conversion function to boolean;
+ * among other things, this method enables bare instances of this class
+ * to be used as the condition of an if statement.
+ */
operator bool() const { return current_; }
+ /**
+ * @ref Block::SetToPoison() "Poisons" the @ref Block "block" at this link,
+ * and passes this now-poisoned block to this link's output @ref PCQueue "producer queue".
+ *
+ * @see Block::SetToPoison()
+ */
void Poison();
-
+
private:
Block current_;
PCQueue<Block> *in_, *out_;
@@ -195,4 +336,4 @@ inline Chain &operator>>(Chain &chain, Link &link) {
} // namespace stream
} // namespace util
-#endif // UTIL_STREAM_CHAIN__
+#endif // UTIL_STREAM_CHAIN_H
diff --git a/klm/util/stream/config.hh b/klm/util/stream/config.hh
index 1eeb3a8a..6bad36bc 100644
--- a/klm/util/stream/config.hh
+++ b/klm/util/stream/config.hh
@@ -1,32 +1,63 @@
-#ifndef UTIL_STREAM_CONFIG__
-#define UTIL_STREAM_CONFIG__
+#ifndef UTIL_STREAM_CONFIG_H
+#define UTIL_STREAM_CONFIG_H
#include <cstddef>
#include <string>
namespace util { namespace stream {
+/**
+ * Represents how a chain should be configured.
+ */
struct ChainConfig {
+
+ /** Constructs an configuration with underspecified (or default) parameters. */
ChainConfig() {}
+ /**
+ * Constructs a chain configuration object.
+ *
+ * @param [in] in_entry_size Number of bytes in each record.
+ * @param [in] in_block_count Number of blocks in the chain.
+ * @param [in] in_total_memory Total number of bytes available to the chain.
+ * This value will be divided amongst the blocks in the chain.
+ */
ChainConfig(std::size_t in_entry_size, std::size_t in_block_count, std::size_t in_total_memory)
: entry_size(in_entry_size), block_count(in_block_count), total_memory(in_total_memory) {}
+ /**
+ * Number of bytes in each record.
+ */
std::size_t entry_size;
+
+ /**
+ * Number of blocks in the chain.
+ */
std::size_t block_count;
- // Chain's constructor will make this a multiple of entry_size.
+
+ /**
+ * Total number of bytes available to the chain.
+ * This value will be divided amongst the blocks in the chain.
+ * Chain's constructor will make this a multiple of entry_size.
+ */
std::size_t total_memory;
};
+
+/**
+ * Represents how a sorter should be configured.
+ */
struct SortConfig {
+
+ /** Filename prefix where temporary files should be placed. */
std::string temp_prefix;
- // Size of each input/output buffer.
+ /** Size of each input/output buffer. */
std::size_t buffer_size;
- // Total memory to use when running alone.
+ /** Total memory to use when running alone. */
std::size_t total_memory;
};
}} // namespaces
-#endif // UTIL_STREAM_CONFIG__
+#endif // UTIL_STREAM_CONFIG_H
diff --git a/klm/util/stream/io.cc b/klm/util/stream/io.cc
index 0459f706..c64004c0 100644
--- a/klm/util/stream/io.cc
+++ b/klm/util/stream/io.cc
@@ -36,12 +36,12 @@ void PRead::Run(const ChainPosition &position) {
Link link(position);
uint64_t offset = 0;
for (; offset + block_size64 < size; offset += block_size64, ++link) {
- PReadOrThrow(file_, link->Get(), block_size, offset);
+ ErsatzPRead(file_, link->Get(), block_size, offset);
link->SetValidSize(block_size);
}
// size - offset is <= block_size, so it casts to 32-bit fine.
if (size - offset) {
- PReadOrThrow(file_, link->Get(), size - offset, offset);
+ ErsatzPRead(file_, link->Get(), size - offset, offset);
link->SetValidSize(size - offset);
++link;
}
@@ -62,5 +62,15 @@ void WriteAndRecycle::Run(const ChainPosition &position) {
}
}
+void PWriteAndRecycle::Run(const ChainPosition &position) {
+ const std::size_t block_size = position.GetChain().BlockSize();
+ uint64_t offset = 0;
+ for (Link link(position); link; ++link) {
+ ErsatzPWrite(file_, link->Get(), link->ValidSize(), offset);
+ offset += link->ValidSize();
+ link->SetValidSize(block_size);
+ }
+}
+
} // namespace stream
} // namespace util
diff --git a/klm/util/stream/io.hh b/klm/util/stream/io.hh
index 934b6b3f..8dae2cbf 100644
--- a/klm/util/stream/io.hh
+++ b/klm/util/stream/io.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_STREAM_IO__
-#define UTIL_STREAM_IO__
+#ifndef UTIL_STREAM_IO_H
+#define UTIL_STREAM_IO_H
#include "util/exception.hh"
#include "util/file.hh"
@@ -41,6 +41,8 @@ class Write {
int file_;
};
+// It's a common case that stuff is written and then recycled. So rather than
+// spawn another thread to Recycle, this combines the two roles.
class WriteAndRecycle {
public:
explicit WriteAndRecycle(int fd) : file_(fd) {}
@@ -49,14 +51,23 @@ class WriteAndRecycle {
int file_;
};
+class PWriteAndRecycle {
+ public:
+ explicit PWriteAndRecycle(int fd) : file_(fd) {}
+ void Run(const ChainPosition &position);
+ private:
+ int file_;
+};
+
+
// Reuse the same file over and over again to buffer output.
class FileBuffer {
public:
explicit FileBuffer(int fd) : file_(fd) {}
- WriteAndRecycle Sink() const {
+ PWriteAndRecycle Sink() const {
util::SeekOrThrow(file_.get(), 0);
- return WriteAndRecycle(file_.get());
+ return PWriteAndRecycle(file_.get());
}
PRead Source() const {
@@ -73,4 +84,4 @@ class FileBuffer {
} // namespace stream
} // namespace util
-#endif // UTIL_STREAM_IO__
+#endif // UTIL_STREAM_IO_H
diff --git a/klm/util/stream/line_input.hh b/klm/util/stream/line_input.hh
index 86db1dd0..a870a664 100644
--- a/klm/util/stream/line_input.hh
+++ b/klm/util/stream/line_input.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_STREAM_LINE_INPUT__
-#define UTIL_STREAM_LINE_INPUT__
+#ifndef UTIL_STREAM_LINE_INPUT_H
+#define UTIL_STREAM_LINE_INPUT_H
namespace util {namespace stream {
class ChainPosition;
@@ -19,4 +19,4 @@ class LineInput {
};
}} // namespaces
-#endif // UTIL_STREAM_LINE_INPUT__
+#endif // UTIL_STREAM_LINE_INPUT_H
diff --git a/klm/util/stream/multi_progress.hh b/klm/util/stream/multi_progress.hh
index c4dd45a9..82e698a5 100644
--- a/klm/util/stream/multi_progress.hh
+++ b/klm/util/stream/multi_progress.hh
@@ -1,6 +1,6 @@
/* Progress bar suitable for chains of workers */
-#ifndef UTIL_MULTI_PROGRESS__
-#define UTIL_MULTI_PROGRESS__
+#ifndef UTIL_STREAM_MULTI_PROGRESS_H
+#define UTIL_STREAM_MULTI_PROGRESS_H
#include <boost/thread/mutex.hpp>
@@ -87,4 +87,4 @@ class WorkerProgress {
}} // namespaces
-#endif // UTIL_MULTI_PROGRESS__
+#endif // UTIL_STREAM_MULTI_PROGRESS_H
diff --git a/klm/util/stream/multi_stream.hh b/klm/util/stream/multi_stream.hh
new file mode 100644
index 00000000..0ee7fab6
--- /dev/null
+++ b/klm/util/stream/multi_stream.hh
@@ -0,0 +1,127 @@
+#ifndef UTIL_STREAM_MULTI_STREAM_H
+#define UTIL_STREAM_MULTI_STREAM_H
+
+#include "util/fixed_array.hh"
+#include "util/scoped.hh"
+#include "util/stream/chain.hh"
+#include "util/stream/stream.hh"
+
+#include <cstddef>
+#include <new>
+
+#include <assert.h>
+#include <stdlib.h>
+
+namespace util { namespace stream {
+
+class Chains;
+
+class ChainPositions : public util::FixedArray<util::stream::ChainPosition> {
+ public:
+ ChainPositions() {}
+
+ void Init(Chains &chains);
+
+ explicit ChainPositions(Chains &chains) {
+ Init(chains);
+ }
+};
+
+class Chains : public util::FixedArray<util::stream::Chain> {
+ private:
+ template <class T, void (T::*ptr)(const ChainPositions &) = &T::Run> struct CheckForRun {
+ typedef Chains type;
+ };
+
+ public:
+ // Must call Init.
+ Chains() {}
+
+ explicit Chains(std::size_t limit) : util::FixedArray<util::stream::Chain>(limit) {}
+
+ template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) {
+ threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
+ return *this;
+ }
+
+ template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) {
+ threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
+ return *this;
+ }
+
+ Chains &operator>>(const util::stream::Recycler &recycler) {
+ for (util::stream::Chain *i = begin(); i != end(); ++i)
+ *i >> recycler;
+ return *this;
+ }
+
+ void Wait(bool release_memory = true) {
+ threads_.clear();
+ for (util::stream::Chain *i = begin(); i != end(); ++i) {
+ i->Wait(release_memory);
+ }
+ }
+
+ private:
+ boost::ptr_vector<util::stream::Thread> threads_;
+
+ Chains(const Chains &);
+ void operator=(const Chains &);
+};
+
+inline void ChainPositions::Init(Chains &chains) {
+ util::FixedArray<util::stream::ChainPosition>::Init(chains.size());
+ for (util::stream::Chain *i = chains.begin(); i != chains.end(); ++i) {
+ // use "placement new" syntax to initalize ChainPosition in an already-allocated memory location
+ new (end()) util::stream::ChainPosition(i->Add()); Constructed();
+ }
+}
+
+inline Chains &operator>>(Chains &chains, ChainPositions &positions) {
+ positions.Init(chains);
+ return chains;
+}
+
+template <class T> class GenericStreams : public util::FixedArray<T> {
+ private:
+ typedef util::FixedArray<T> P;
+ public:
+ GenericStreams() {}
+
+ // This puts a dummy T at the beginning (useful to algorithms that need to reference something at the beginning).
+ void InitWithDummy(const ChainPositions &positions) {
+ P::Init(positions.size() + 1);
+ new (P::end()) T(); // use "placement new" syntax to initalize T in an already-allocated memory location
+ P::Constructed();
+ for (const util::stream::ChainPosition *i = positions.begin(); i != positions.end(); ++i) {
+ P::push_back(*i);
+ }
+ }
+
+ // Limit restricts to positions[0,limit)
+ void Init(const ChainPositions &positions, std::size_t limit) {
+ P::Init(limit);
+ for (const util::stream::ChainPosition *i = positions.begin(); i != positions.begin() + limit; ++i) {
+ P::push_back(*i);
+ }
+ }
+ void Init(const ChainPositions &positions) {
+ Init(positions, positions.size());
+ }
+
+ GenericStreams(const ChainPositions &positions) {
+ Init(positions);
+ }
+};
+
+template <class T> inline Chains &operator>>(Chains &chains, GenericStreams<T> &streams) {
+ ChainPositions positions;
+ chains >> positions;
+ streams.Init(positions);
+ return chains;
+}
+
+typedef GenericStreams<Stream> Streams;
+
+}} // namespaces
+#endif // UTIL_STREAM_MULTI_STREAM_H
diff --git a/klm/util/stream/sort.hh b/klm/util/stream/sort.hh
index 16aa6a03..9082cfdd 100644
--- a/klm/util/stream/sort.hh
+++ b/klm/util/stream/sort.hh
@@ -15,8 +15,8 @@
* sort. Use a hash table for that.
*/
-#ifndef UTIL_STREAM_SORT__
-#define UTIL_STREAM_SORT__
+#ifndef UTIL_STREAM_SORT_H
+#define UTIL_STREAM_SORT_H
#include "util/stream/chain.hh"
#include "util/stream/config.hh"
@@ -182,7 +182,7 @@ template <class Compare> class MergeQueue {
amount = remaining_;
buffer_end_ = current_ + remaining_;
}
- PReadOrThrow(fd, current_, amount, offset_);
+ ErsatzPRead(fd, current_, amount, offset_);
offset_ += amount;
assert(current_ <= buffer_end_);
remaining_ -= amount;
@@ -307,10 +307,10 @@ template <class Compare, class Combine> class MergingReader {
const uint64_t block_size = position.GetChain().BlockSize();
Link l(position);
for (; offset + block_size < end; ++l, offset += block_size) {
- PReadOrThrow(in_, l->Get(), block_size, offset);
+ ErsatzPRead(in_, l->Get(), block_size, offset);
l->SetValidSize(block_size);
}
- PReadOrThrow(in_, l->Get(), end - offset, offset);
+ ErsatzPRead(in_, l->Get(), end - offset, offset);
l->SetValidSize(end - offset);
(++l).Poison();
return;
@@ -388,8 +388,10 @@ class BadSortConfig : public Exception {
~BadSortConfig() throw() {}
};
+/** Sort */
template <class Compare, class Combine = NeverCombine> class Sort {
public:
+ /** Constructs an object capable of sorting */
Sort(Chain &in, const SortConfig &config, const Compare &compare = Compare(), const Combine &combine = Combine())
: config_(config),
data_(MakeTemp(config.temp_prefix)),
@@ -545,4 +547,4 @@ template <class Compare, class Combine> uint64_t BlockingSort(Chain &chain, cons
} // namespace stream
} // namespace util
-#endif // UTIL_STREAM_SORT__
+#endif // UTIL_STREAM_SORT_H
diff --git a/klm/util/stream/stream.hh b/klm/util/stream/stream.hh
index 6ff45b82..7ea1c9f7 100644
--- a/klm/util/stream/stream.hh
+++ b/klm/util/stream/stream.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_STREAM_STREAM__
-#define UTIL_STREAM_STREAM__
+#ifndef UTIL_STREAM_STREAM_H
+#define UTIL_STREAM_STREAM_H
#include "util/stream/chain.hh"
@@ -56,6 +56,9 @@ class Stream : boost::noncopyable {
end_ = current_ + block_it_->ValidSize();
}
+ // The following are pointers to raw memory
+ // current_ is the current record
+ // end_ is the end of the block (so we know when to move to the next block)
uint8_t *current_, *end_;
std::size_t entry_size_;
@@ -71,4 +74,4 @@ inline Chain &operator>>(Chain &chain, Stream &stream) {
} // namespace stream
} // namespace util
-#endif // UTIL_STREAM_STREAM__
+#endif // UTIL_STREAM_STREAM_H
diff --git a/klm/util/stream/timer.hh b/klm/util/stream/timer.hh
index 7e1a5885..06488a17 100644
--- a/klm/util/stream/timer.hh
+++ b/klm/util/stream/timer.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_STREAM_TIMER__
-#define UTIL_STREAM_TIMER__
+#ifndef UTIL_STREAM_TIMER_H
+#define UTIL_STREAM_TIMER_H
// Sorry Jon, this was adding library dependencies in Moses and people complained.
@@ -13,4 +13,4 @@
#define UTIL_TIMER(str)
//#endif
-#endif // UTIL_STREAM_TIMER__
+#endif // UTIL_STREAM_TIMER_H