diff options
author | Paul Baltescu <pauldb89@gmail.com> | 2013-02-21 14:13:55 +0000 |
---|---|---|
committer | Paul Baltescu <pauldb89@gmail.com> | 2013-02-21 14:13:55 +0000 |
commit | bca26d953a774b8efca12f30407390b3f5eef9d0 (patch) | |
tree | fe922de5c89b1844f677d550dcc24e87edd67a55 /klm/util/stream | |
parent | 54a1c0e2bde259e3acc9c0a8ec8da3c7704e80ca (diff) | |
parent | 95c364f2cb002241c4a62bedb1c5ef6f1e9a7f22 (diff) |
Merge branch 'master' of https://github.com/pauldb89/cdec
Diffstat (limited to 'klm/util/stream')
-rw-r--r-- | klm/util/stream/Makefile.am | 20 | ||||
-rw-r--r-- | klm/util/stream/block.hh | 43 | ||||
-rw-r--r-- | klm/util/stream/chain.cc | 155 | ||||
-rw-r--r-- | klm/util/stream/chain.hh | 198 | ||||
-rw-r--r-- | klm/util/stream/config.hh | 32 | ||||
-rw-r--r-- | klm/util/stream/io.cc | 66 | ||||
-rw-r--r-- | klm/util/stream/io.hh | 76 | ||||
-rw-r--r-- | klm/util/stream/io_test.cc | 38 | ||||
-rw-r--r-- | klm/util/stream/line_input.cc | 52 | ||||
-rw-r--r-- | klm/util/stream/line_input.hh | 22 | ||||
-rw-r--r-- | klm/util/stream/multi_progress.cc | 86 | ||||
-rw-r--r-- | klm/util/stream/multi_progress.hh | 90 | ||||
-rw-r--r-- | klm/util/stream/sort.hh | 548 | ||||
-rw-r--r-- | klm/util/stream/sort_test.cc | 62 | ||||
-rw-r--r-- | klm/util/stream/stream.hh | 74 | ||||
-rw-r--r-- | klm/util/stream/stream_test.cc | 35 | ||||
-rw-r--r-- | klm/util/stream/timer.hh | 16 |
17 files changed, 1613 insertions, 0 deletions
diff --git a/klm/util/stream/Makefile.am b/klm/util/stream/Makefile.am new file mode 100644 index 00000000..f18cbedb --- /dev/null +++ b/klm/util/stream/Makefile.am @@ -0,0 +1,20 @@ +noinst_LIBRARIES = libklm_util_stream.a + +libklm_util_stream_a_SOURCES = \ + block.hh \ + chain.cc \ + chain.hh \ + config.hh \ + io.cc \ + io.hh \ + line_input.cc \ + line_input.hh \ + multi_progress.cc \ + multi_progress.hh \ + sort.hh \ + stream.hh \ + timer.hh + +AM_CPPFLAGS = -W -Wall -I$(top_srcdir)/klm + +#-I$(top_srcdir)/klm/util/double-conversion diff --git a/klm/util/stream/block.hh b/klm/util/stream/block.hh new file mode 100644 index 00000000..11aa991e --- /dev/null +++ b/klm/util/stream/block.hh @@ -0,0 +1,43 @@ +#ifndef UTIL_STREAM_BLOCK__ +#define UTIL_STREAM_BLOCK__ + +#include <cstddef> +#include <stdint.h> + +namespace util { +namespace stream { + +class Block { + public: + Block() : mem_(NULL), valid_size_(0) {} + + Block(void *mem, std::size_t size) : mem_(mem), valid_size_(size) {} + + void SetValidSize(std::size_t to) { valid_size_ = to; } + // Read might fill in less than Allocated at EOF. + std::size_t ValidSize() const { return valid_size_; } + + void *Get() { return mem_; } + const void *Get() const { return mem_; } + + const void *ValidEnd() const { + return reinterpret_cast<const uint8_t*>(mem_) + valid_size_; + } + + operator bool() const { return mem_ != NULL; } + bool operator!() const { return mem_ == NULL; } + + private: + friend class Link; + void SetToPoison() { + mem_ = NULL; + } + + void *mem_; + std::size_t valid_size_; +}; + +} // namespace stream +} // namespace util + +#endif // UTIL_STREAM_BLOCK__ diff --git a/klm/util/stream/chain.cc b/klm/util/stream/chain.cc new file mode 100644 index 00000000..46708c60 --- /dev/null +++ b/klm/util/stream/chain.cc @@ -0,0 +1,155 @@ +#include "util/stream/chain.hh" + +#include "util/stream/io.hh" + +#include "util/exception.hh" +#include "util/pcqueue.hh" + +#include <cstdlib> +#include <new> +#include <iostream> + +#include <stdint.h> +#include <stdlib.h> + +namespace util { +namespace stream { + +ChainConfigException::ChainConfigException() throw() { *this << "Chain configured with "; } +ChainConfigException::~ChainConfigException() throw() {} + +Thread::~Thread() { + thread_.join(); +} + +void Thread::UnhandledException(const std::exception &e) { + std::cerr << e.what() << std::endl; + abort(); +} + +void Recycler::Run(const ChainPosition &position) { + for (Link l(position); l; ++l) { + l->SetValidSize(position.GetChain().BlockSize()); + } +} + +const Recycler kRecycle = Recycler(); + +Chain::Chain(const ChainConfig &config) : config_(config), complete_called_(false) { + UTIL_THROW_IF(!config.entry_size, ChainConfigException, "zero-size entries."); + UTIL_THROW_IF(!config.block_count, ChainConfigException, "block count zero"); + UTIL_THROW_IF(config.total_memory < config.entry_size * config.block_count, ChainConfigException, config.total_memory << " total memory, too small for " << config.block_count << " blocks of containing entries of size " << config.entry_size); + // Round down block size to a multiple of entry size. + block_size_ = config.total_memory / (config.block_count * config.entry_size) * config.entry_size; +} + +Chain::~Chain() { + Wait(); +} + +ChainPosition Chain::Add() { + if (!Running()) Start(); + PCQueue<Block> &in = queues_.back(); + queues_.push_back(new PCQueue<Block>(config_.block_count)); + return ChainPosition(in, queues_.back(), this, progress_); +} + +Chain &Chain::operator>>(const WriteAndRecycle &writer) { + threads_.push_back(new Thread(Complete(), writer)); + return *this; +} + +void Chain::Wait(bool release_memory) { + if (queues_.empty()) { + assert(threads_.empty()); + return; // Nothing to wait for. + } + if (!complete_called_) CompleteLoop(); + threads_.clear(); + for (std::size_t i = 0; queues_.front().Consume(); ++i) { + if (i == config_.block_count) { + std::cerr << "Chain ending without poison." << std::endl; + abort(); + } + } + queues_.clear(); + progress_.Finished(); + complete_called_ = false; + if (release_memory) memory_.reset(); +} + +void Chain::Start() { + Wait(false); + if (!memory_.get()) { + // Allocate memory. + assert(threads_.empty()); + assert(queues_.empty()); + std::size_t malloc_size = block_size_ * config_.block_count; + memory_.reset(MallocOrThrow(malloc_size)); + } + // This queue can accomodate all blocks. + queues_.push_back(new PCQueue<Block>(config_.block_count)); + // Populate the lead queue with blocks. + uint8_t *base = static_cast<uint8_t*>(memory_.get()); + for (std::size_t i = 0; i < config_.block_count; ++i) { + queues_.front().Produce(Block(base, block_size_)); + base += block_size_; + } +} + +ChainPosition Chain::Complete() { + assert(Running()); + UTIL_THROW_IF(complete_called_, util::Exception, "CompleteLoop() called twice"); + complete_called_ = true; + return ChainPosition(queues_.back(), queues_.front(), this, progress_); +} + +Link::Link() : in_(NULL), out_(NULL), poisoned_(true) {} + +void Link::Init(const ChainPosition &position) { + UTIL_THROW_IF(in_, util::Exception, "Link::Init twice"); + in_ = position.in_; + out_ = position.out_; + poisoned_ = false; + progress_ = position.progress_; + in_->Consume(current_); +} + +Link::Link(const ChainPosition &position) : in_(NULL) { + Init(position); +} + +Link::~Link() { + if (current_) { + // Probably an exception unwinding. + std::cerr << "Last input should have been poison." << std::endl; +// abort(); + } else { + if (!poisoned_) { + // Pass the poison! + out_->Produce(current_); + } + } +} + +Link &Link::operator++() { + assert(current_); + progress_ += current_.ValidSize(); + out_->Produce(current_); + in_->Consume(current_); + if (!current_) { + poisoned_ = true; + out_->Produce(current_); + } + return *this; +} + +void Link::Poison() { + assert(!poisoned_); + current_.SetToPoison(); + out_->Produce(current_); + poisoned_ = true; +} + +} // namespace stream +} // namespace util diff --git a/klm/util/stream/chain.hh b/klm/util/stream/chain.hh new file mode 100644 index 00000000..154b9b33 --- /dev/null +++ b/klm/util/stream/chain.hh @@ -0,0 +1,198 @@ +#ifndef UTIL_STREAM_CHAIN__ +#define UTIL_STREAM_CHAIN__ + +#include "util/stream/block.hh" +#include "util/stream/config.hh" +#include "util/stream/multi_progress.hh" +#include "util/scoped.hh" + +#include <boost/ptr_container/ptr_vector.hpp> +#include <boost/thread/thread.hpp> + +#include <cstddef> + +#include <assert.h> + +namespace util { +template <class T> class PCQueue; +namespace stream { + +class ChainConfigException : public Exception { + public: + ChainConfigException() throw(); + ~ChainConfigException() throw(); +}; + +class Chain; +// Specifies position in chain for Link constructor. +class ChainPosition { + public: + const Chain &GetChain() const { return *chain_; } + private: + friend class Chain; + friend class Link; + ChainPosition(PCQueue<Block> &in, PCQueue<Block> &out, Chain *chain, MultiProgress &progress) + : in_(&in), out_(&out), chain_(chain), progress_(progress.Add()) {} + + PCQueue<Block> *in_, *out_; + + Chain *chain_; + + WorkerProgress progress_; +}; + +// Position is usually ChainPosition but if there are multiple streams involved, this can be ChainPositions. +class Thread { + public: + template <class Position, class Worker> Thread(const Position &position, const Worker &worker) + : thread_(boost::ref(*this), position, worker) {} + + ~Thread(); + + template <class Position, class Worker> void operator()(const Position &position, Worker &worker) { + try { + worker.Run(position); + } catch (const std::exception &e) { + UnhandledException(e); + } + } + + private: + void UnhandledException(const std::exception &e); + + boost::thread thread_; +}; + +class Recycler { + public: + void Run(const ChainPosition &position); +}; + +extern const Recycler kRecycle; +class WriteAndRecycle; + +class Chain { + private: + template <class T, void (T::*ptr)(const ChainPosition &) = &T::Run> struct CheckForRun { + typedef Chain type; + }; + + public: + explicit Chain(const ChainConfig &config); + + ~Chain(); + + void ActivateProgress() { + assert(!Running()); + progress_.Activate(); + } + + void SetProgressTarget(uint64_t target) { + progress_.SetTarget(target); + } + + std::size_t EntrySize() const { + return config_.entry_size; + } + std::size_t BlockSize() const { + return block_size_; + } + + // Two ways to add to the chain: Add() or operator>>. + ChainPosition Add(); + + // This is for adding threaded workers with a Run method. + template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) { + assert(!complete_called_); + threads_.push_back(new Thread(Add(), worker)); + return *this; + } + + // Avoid copying the worker. + template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) { + assert(!complete_called_); + threads_.push_back(new Thread(Add(), worker)); + return *this; + } + + // Note that Link and Stream also define operator>> outside this class. + + // To complete the loop, call CompleteLoop(), >> kRecycle, or the destructor. + void CompleteLoop() { + threads_.push_back(new Thread(Complete(), kRecycle)); + } + + Chain &operator>>(const Recycler &recycle) { + CompleteLoop(); + return *this; + } + + Chain &operator>>(const WriteAndRecycle &writer); + + // Chains are reusable. Call Wait to wait for everything to finish and free memory. + void Wait(bool release_memory = true); + + // Waits for the current chain to complete (if any) then starts again. + void Start(); + + bool Running() const { return !queues_.empty(); } + + private: + ChainPosition Complete(); + + ChainConfig config_; + + std::size_t block_size_; + + scoped_malloc memory_; + + boost::ptr_vector<PCQueue<Block> > queues_; + + bool complete_called_; + + boost::ptr_vector<Thread> threads_; + + MultiProgress progress_; +}; + +// Create the link in the worker thread using the position token. +class Link { + public: + // Either default construct and Init or just construct all at once. + Link(); + void Init(const ChainPosition &position); + + explicit Link(const ChainPosition &position); + + ~Link(); + + Block &operator*() { return current_; } + const Block &operator*() const { return current_; } + + Block *operator->() { return ¤t_; } + const Block *operator->() const { return ¤t_; } + + Link &operator++(); + + operator bool() const { return current_; } + + void Poison(); + + private: + Block current_; + PCQueue<Block> *in_, *out_; + + bool poisoned_; + + WorkerProgress progress_; +}; + +inline Chain &operator>>(Chain &chain, Link &link) { + link.Init(chain.Add()); + return chain; +} + +} // namespace stream +} // namespace util + +#endif // UTIL_STREAM_CHAIN__ diff --git a/klm/util/stream/config.hh b/klm/util/stream/config.hh new file mode 100644 index 00000000..1eeb3a8a --- /dev/null +++ b/klm/util/stream/config.hh @@ -0,0 +1,32 @@ +#ifndef UTIL_STREAM_CONFIG__ +#define UTIL_STREAM_CONFIG__ + +#include <cstddef> +#include <string> + +namespace util { namespace stream { + +struct ChainConfig { + ChainConfig() {} + + ChainConfig(std::size_t in_entry_size, std::size_t in_block_count, std::size_t in_total_memory) + : entry_size(in_entry_size), block_count(in_block_count), total_memory(in_total_memory) {} + + std::size_t entry_size; + std::size_t block_count; + // Chain's constructor will make this a multiple of entry_size. + std::size_t total_memory; +}; + +struct SortConfig { + std::string temp_prefix; + + // Size of each input/output buffer. + std::size_t buffer_size; + + // Total memory to use when running alone. + std::size_t total_memory; +}; + +}} // namespaces +#endif // UTIL_STREAM_CONFIG__ diff --git a/klm/util/stream/io.cc b/klm/util/stream/io.cc new file mode 100644 index 00000000..0459f706 --- /dev/null +++ b/klm/util/stream/io.cc @@ -0,0 +1,66 @@ +#include "util/stream/io.hh" + +#include "util/file.hh" +#include "util/stream/chain.hh" + +#include <cstddef> + +namespace util { +namespace stream { + +ReadSizeException::ReadSizeException() throw() {} +ReadSizeException::~ReadSizeException() throw() {} + +void Read::Run(const ChainPosition &position) { + const std::size_t block_size = position.GetChain().BlockSize(); + const std::size_t entry_size = position.GetChain().EntrySize(); + for (Link link(position); link; ++link) { + std::size_t got = util::ReadOrEOF(file_, link->Get(), block_size); + UTIL_THROW_IF(got % entry_size, ReadSizeException, "File ended with " << got << " bytes, not a multiple of " << entry_size << "."); + if (got == 0) { + link.Poison(); + return; + } else { + link->SetValidSize(got); + } + } +} + +void PRead::Run(const ChainPosition &position) { + scoped_fd owner; + if (own_) owner.reset(file_); + const uint64_t size = SizeOrThrow(file_); + UTIL_THROW_IF(size % static_cast<uint64_t>(position.GetChain().EntrySize()), ReadSizeException, "File size " << file_ << " size is " << size << " not a multiple of " << position.GetChain().EntrySize()); + const std::size_t block_size = position.GetChain().BlockSize(); + const uint64_t block_size64 = static_cast<uint64_t>(block_size); + Link link(position); + uint64_t offset = 0; + for (; offset + block_size64 < size; offset += block_size64, ++link) { + PReadOrThrow(file_, link->Get(), block_size, offset); + link->SetValidSize(block_size); + } + // size - offset is <= block_size, so it casts to 32-bit fine. + if (size - offset) { + PReadOrThrow(file_, link->Get(), size - offset, offset); + link->SetValidSize(size - offset); + ++link; + } + link.Poison(); +} + +void Write::Run(const ChainPosition &position) { + for (Link link(position); link; ++link) { + WriteOrThrow(file_, link->Get(), link->ValidSize()); + } +} + +void WriteAndRecycle::Run(const ChainPosition &position) { + const std::size_t block_size = position.GetChain().BlockSize(); + for (Link link(position); link; ++link) { + WriteOrThrow(file_, link->Get(), link->ValidSize()); + link->SetValidSize(block_size); + } +} + +} // namespace stream +} // namespace util diff --git a/klm/util/stream/io.hh b/klm/util/stream/io.hh new file mode 100644 index 00000000..934b6b3f --- /dev/null +++ b/klm/util/stream/io.hh @@ -0,0 +1,76 @@ +#ifndef UTIL_STREAM_IO__ +#define UTIL_STREAM_IO__ + +#include "util/exception.hh" +#include "util/file.hh" + +namespace util { +namespace stream { + +class ChainPosition; + +class ReadSizeException : public util::Exception { + public: + ReadSizeException() throw(); + ~ReadSizeException() throw(); +}; + +class Read { + public: + explicit Read(int fd) : file_(fd) {} + void Run(const ChainPosition &position); + private: + int file_; +}; + +// Like read but uses pread so that the file can be accessed from multiple threads. +class PRead { + public: + explicit PRead(int fd, bool take_own = false) : file_(fd), own_(take_own) {} + void Run(const ChainPosition &position); + private: + int file_; + bool own_; +}; + +class Write { + public: + explicit Write(int fd) : file_(fd) {} + void Run(const ChainPosition &position); + private: + int file_; +}; + +class WriteAndRecycle { + public: + explicit WriteAndRecycle(int fd) : file_(fd) {} + void Run(const ChainPosition &position); + private: + int file_; +}; + +// Reuse the same file over and over again to buffer output. +class FileBuffer { + public: + explicit FileBuffer(int fd) : file_(fd) {} + + WriteAndRecycle Sink() const { + util::SeekOrThrow(file_.get(), 0); + return WriteAndRecycle(file_.get()); + } + + PRead Source() const { + return PRead(file_.get()); + } + + uint64_t Size() const { + return SizeOrThrow(file_.get()); + } + + private: + scoped_fd file_; +}; + +} // namespace stream +} // namespace util +#endif // UTIL_STREAM_IO__ diff --git a/klm/util/stream/io_test.cc b/klm/util/stream/io_test.cc new file mode 100644 index 00000000..82108335 --- /dev/null +++ b/klm/util/stream/io_test.cc @@ -0,0 +1,38 @@ +#include "util/stream/io.hh" + +#include "util/stream/chain.hh" +#include "util/file.hh" + +#define BOOST_TEST_MODULE IOTest +#include <boost/test/unit_test.hpp> + +#include <unistd.h> + +namespace util { namespace stream { namespace { + +BOOST_AUTO_TEST_CASE(CopyFile) { + std::string temps("io_test_temp"); + + scoped_fd in(MakeTemp(temps)); + for (uint64_t i = 0; i < 100000; ++i) { + WriteOrThrow(in.get(), &i, sizeof(uint64_t)); + } + SeekOrThrow(in.get(), 0); + scoped_fd out(MakeTemp(temps)); + + ChainConfig config; + config.entry_size = 8; + config.total_memory = 1024; + config.block_count = 10; + + Chain(config) >> PRead(in.get()) >> Write(out.get()); + + SeekOrThrow(out.get(), 0); + for (uint64_t i = 0; i < 100000; ++i) { + uint64_t got; + ReadOrThrow(out.get(), &got, sizeof(uint64_t)); + BOOST_CHECK_EQUAL(i, got); + } +} + +}}} // namespaces diff --git a/klm/util/stream/line_input.cc b/klm/util/stream/line_input.cc new file mode 100644 index 00000000..dafa5020 --- /dev/null +++ b/klm/util/stream/line_input.cc @@ -0,0 +1,52 @@ +#include "util/stream/line_input.hh" + +#include "util/exception.hh" +#include "util/file.hh" +#include "util/read_compressed.hh" +#include "util/stream/chain.hh" + +#include <algorithm> +#include <vector> + +namespace util { namespace stream { + +void LineInput::Run(const ChainPosition &position) { + ReadCompressed reader(fd_); + // Holding area for beginning of line to be placed in next block. + std::vector<char> carry; + + for (Link block(position); ; ++block) { + char *to = static_cast<char*>(block->Get()); + char *begin = to; + char *end = to + position.GetChain().BlockSize(); + std::copy(carry.begin(), carry.end(), to); + to += carry.size(); + while (to != end) { + std::size_t got = reader.Read(to, end - to); + if (!got) { + // EOF + block->SetValidSize(to - begin); + ++block; + block.Poison(); + return; + } + to += got; + } + + // Find the last newline. + char *newline; + for (newline = to - 1; ; --newline) { + UTIL_THROW_IF(newline < begin, Exception, "Did not find a newline in " << position.GetChain().BlockSize() << " bytes of input of " << NameFromFD(fd_) << ". Is this a text file?"); + if (*newline == '\n') break; + } + + // Copy everything after the last newline to the carry. + carry.clear(); + carry.resize(to - (newline + 1)); + std::copy(newline + 1, to, &*carry.begin()); + + block->SetValidSize(newline + 1 - begin); + } +} + +}} // namespaces diff --git a/klm/util/stream/line_input.hh b/klm/util/stream/line_input.hh new file mode 100644 index 00000000..86db1dd0 --- /dev/null +++ b/klm/util/stream/line_input.hh @@ -0,0 +1,22 @@ +#ifndef UTIL_STREAM_LINE_INPUT__ +#define UTIL_STREAM_LINE_INPUT__ +namespace util {namespace stream { + +class ChainPosition; + +/* Worker that reads input into blocks, ensuring that blocks contain whole + * lines. Assumes that the maximum size of a line is less than the block size + */ +class LineInput { + public: + // Takes ownership upon thread execution. + explicit LineInput(int fd); + + void Run(const ChainPosition &position); + + private: + int fd_; +}; + +}} // namespaces +#endif // UTIL_STREAM_LINE_INPUT__ diff --git a/klm/util/stream/multi_progress.cc b/klm/util/stream/multi_progress.cc new file mode 100644 index 00000000..8ba10386 --- /dev/null +++ b/klm/util/stream/multi_progress.cc @@ -0,0 +1,86 @@ +#include "util/stream/multi_progress.hh" + +// TODO: merge some functionality with the simple progress bar? +#include "util/ersatz_progress.hh" + +#include <iostream> +#include <limits> + +#include <string.h> + +#if !defined(_WIN32) && !defined(_WIN64) +#include <unistd.h> +#endif + +namespace util { namespace stream { + +namespace { +const char kDisplayCharacters[] = "-+*#0123456789"; + +uint64_t Next(unsigned char stone, uint64_t complete) { + return (static_cast<uint64_t>(stone + 1) * complete + MultiProgress::kWidth - 1) / MultiProgress::kWidth; +} + +} // namespace + +MultiProgress::MultiProgress() : active_(false), complete_(std::numeric_limits<uint64_t>::max()), character_handout_(0) {} + +MultiProgress::~MultiProgress() { + if (active_ && complete_ != std::numeric_limits<uint64_t>::max()) + std::cerr << '\n'; +} + +void MultiProgress::Activate() { + active_ = +#if !defined(_WIN32) && !defined(_WIN64) + // Is stderr a terminal? + (isatty(2) == 1) +#else + true +#endif + ; +} + +void MultiProgress::SetTarget(uint64_t complete) { + if (!active_) return; + complete_ = complete; + if (!complete) complete_ = 1; + memset(display_, 0, sizeof(display_)); + character_handout_ = 0; + std::cerr << kProgressBanner; +} + +WorkerProgress MultiProgress::Add() { + if (!active_) + return WorkerProgress(std::numeric_limits<uint64_t>::max(), *this, '\0'); + std::size_t character_index; + { + boost::unique_lock<boost::mutex> lock(mutex_); + character_index = character_handout_++; + if (character_handout_ == sizeof(kDisplayCharacters) - 1) + character_handout_ = 0; + } + return WorkerProgress(Next(0, complete_), *this, kDisplayCharacters[character_index]); +} + +void MultiProgress::Finished() { + if (!active_ || complete_ == std::numeric_limits<uint64_t>::max()) return; + std::cerr << '\n'; + complete_ = std::numeric_limits<uint64_t>::max(); +} + +void MultiProgress::Milestone(WorkerProgress &worker) { + if (!active_ || complete_ == std::numeric_limits<uint64_t>::max()) return; + unsigned char stone = std::min(static_cast<uint64_t>(kWidth), worker.current_ * kWidth / complete_); + for (char *i = &display_[worker.stone_]; i < &display_[stone]; ++i) { + *i = worker.character_; + } + worker.next_ = Next(stone, complete_); + worker.stone_ = stone; + { + boost::unique_lock<boost::mutex> lock(mutex_); + std::cerr << '\r' << display_ << std::flush; + } +} + +}} // namespaces diff --git a/klm/util/stream/multi_progress.hh b/klm/util/stream/multi_progress.hh new file mode 100644 index 00000000..c4dd45a9 --- /dev/null +++ b/klm/util/stream/multi_progress.hh @@ -0,0 +1,90 @@ +/* Progress bar suitable for chains of workers */ +#ifndef UTIL_MULTI_PROGRESS__ +#define UTIL_MULTI_PROGRESS__ + +#include <boost/thread/mutex.hpp> + +#include <cstddef> + +#include <stdint.h> + +namespace util { namespace stream { + +class WorkerProgress; + +class MultiProgress { + public: + static const unsigned char kWidth = 100; + + MultiProgress(); + + ~MultiProgress(); + + // Turns on showing (requires SetTarget too). + void Activate(); + + void SetTarget(uint64_t complete); + + WorkerProgress Add(); + + void Finished(); + + private: + friend class WorkerProgress; + void Milestone(WorkerProgress &worker); + + bool active_; + + uint64_t complete_; + + boost::mutex mutex_; + + // \0 at the end. + char display_[kWidth + 1]; + + std::size_t character_handout_; + + MultiProgress(const MultiProgress &); + MultiProgress &operator=(const MultiProgress &); +}; + +class WorkerProgress { + public: + // Default contrutor must be initialized with operator= later. + WorkerProgress() : parent_(NULL) {} + + // Not threadsafe for the same worker by default. + WorkerProgress &operator++() { + if (++current_ >= next_) { + parent_->Milestone(*this); + } + return *this; + } + + WorkerProgress &operator+=(uint64_t amount) { + current_ += amount; + if (current_ >= next_) { + parent_->Milestone(*this); + } + return *this; + } + + private: + friend class MultiProgress; + WorkerProgress(uint64_t next, MultiProgress &parent, char character) + : current_(0), next_(next), parent_(&parent), stone_(0), character_(character) {} + + uint64_t current_, next_; + + MultiProgress *parent_; + + // Previous milestone reached. + unsigned char stone_; + + // Character to display in bar. + char character_; +}; + +}} // namespaces + +#endif // UTIL_MULTI_PROGRESS__ diff --git a/klm/util/stream/sort.hh b/klm/util/stream/sort.hh new file mode 100644 index 00000000..16aa6a03 --- /dev/null +++ b/klm/util/stream/sort.hh @@ -0,0 +1,548 @@ +/* Usage: + * Sort<Compare> sorter(temp, compare); + * Chain(config) >> Read(file) >> sorter.Unsorted(); + * Stream stream; + * Chain chain(config) >> sorter.Sorted(internal_config, lazy_config) >> stream; + * + * Note that sorter must outlive any threads that use Unsorted or Sorted. + * + * Combiners take the form: + * bool operator()(void *into, const void *option, const Compare &compare) const + * which returns true iff a combination happened. The sorting algorithm + * guarantees compare(into, option). But it does not guarantee + * compare(option, into). + * Currently, combining is only done in merge steps, not during on-the-fly + * sort. Use a hash table for that. + */ + +#ifndef UTIL_STREAM_SORT__ +#define UTIL_STREAM_SORT__ + +#include "util/stream/chain.hh" +#include "util/stream/config.hh" +#include "util/stream/io.hh" +#include "util/stream/stream.hh" +#include "util/stream/timer.hh" + +#include "util/file.hh" +#include "util/scoped.hh" +#include "util/sized_iterator.hh" + +#include <algorithm> +#include <iostream> +#include <queue> +#include <string> + +namespace util { +namespace stream { + +struct NeverCombine { + template <class Compare> bool operator()(const void *, const void *, const Compare &) const { + return false; + } +}; + +// Manage the offsets of sorted blocks in a file. +class Offsets { + public: + explicit Offsets(int fd) : log_(fd) { + Reset(); + } + + int File() const { return log_; } + + void Append(uint64_t length) { + if (!length) return; + ++block_count_; + if (length == cur_.length) { + ++cur_.run; + return; + } + WriteOrThrow(log_, &cur_, sizeof(Entry)); + cur_.length = length; + cur_.run = 1; + } + + void FinishedAppending() { + WriteOrThrow(log_, &cur_, sizeof(Entry)); + SeekOrThrow(log_, sizeof(Entry)); // Skip 0,0 at beginning. + cur_.run = 0; + if (block_count_) { + ReadOrThrow(log_, &cur_, sizeof(Entry)); + assert(cur_.length); + assert(cur_.run); + } + } + + uint64_t RemainingBlocks() const { return block_count_; } + + uint64_t TotalOffset() const { return output_sum_; } + + uint64_t PeekSize() const { + return cur_.length; + } + + uint64_t NextSize() { + assert(block_count_); + uint64_t ret = cur_.length; + output_sum_ += ret; + + --cur_.run; + --block_count_; + if (!cur_.run && block_count_) { + ReadOrThrow(log_, &cur_, sizeof(Entry)); + assert(cur_.length); + assert(cur_.run); + } + return ret; + } + + void Reset() { + SeekOrThrow(log_, 0); + ResizeOrThrow(log_, 0); + cur_.length = 0; + cur_.run = 0; + block_count_ = 0; + output_sum_ = 0; + } + + private: + int log_; + + struct Entry { + uint64_t length; + uint64_t run; + }; + Entry cur_; + + uint64_t block_count_; + + uint64_t output_sum_; +}; + +// A priority queue of entries backed by file buffers +template <class Compare> class MergeQueue { + public: + MergeQueue(int fd, std::size_t buffer_size, std::size_t entry_size, const Compare &compare) + : queue_(Greater(compare)), in_(fd), buffer_size_(buffer_size), entry_size_(entry_size) {} + + void Push(void *base, uint64_t offset, uint64_t amount) { + queue_.push(Entry(base, in_, offset, amount, buffer_size_)); + } + + const void *Top() const { + return queue_.top().Current(); + } + + void Pop() { + Entry top(queue_.top()); + queue_.pop(); + if (top.Increment(in_, buffer_size_, entry_size_)) + queue_.push(top); + } + + std::size_t Size() const { + return queue_.size(); + } + + bool Empty() const { + return queue_.empty(); + } + + private: + // Priority queue contains these entries. + class Entry { + public: + Entry() {} + + Entry(void *base, int fd, uint64_t offset, uint64_t amount, std::size_t buf_size) { + offset_ = offset; + remaining_ = amount; + buffer_end_ = static_cast<uint8_t*>(base) + buf_size; + Read(fd, buf_size); + } + + bool Increment(int fd, std::size_t buf_size, std::size_t entry_size) { + current_ += entry_size; + if (current_ != buffer_end_) return true; + return Read(fd, buf_size); + } + + const void *Current() const { return current_; } + + private: + bool Read(int fd, std::size_t buf_size) { + current_ = buffer_end_ - buf_size; + std::size_t amount; + if (static_cast<uint64_t>(buf_size) < remaining_) { + amount = buf_size; + } else if (!remaining_) { + return false; + } else { + amount = remaining_; + buffer_end_ = current_ + remaining_; + } + PReadOrThrow(fd, current_, amount, offset_); + offset_ += amount; + assert(current_ <= buffer_end_); + remaining_ -= amount; + return true; + } + + // Buffer + uint8_t *current_, *buffer_end_; + // File + uint64_t remaining_, offset_; + }; + + // Wrapper comparison function for queue entries. + class Greater : public std::binary_function<const Entry &, const Entry &, bool> { + public: + explicit Greater(const Compare &compare) : compare_(compare) {} + + bool operator()(const Entry &first, const Entry &second) const { + return compare_(second.Current(), first.Current()); + } + + private: + const Compare compare_; + }; + + typedef std::priority_queue<Entry, std::vector<Entry>, Greater> Queue; + Queue queue_; + + const int in_; + const std::size_t buffer_size_; + const std::size_t entry_size_; +}; + +/* A worker object that merges. If the number of pieces to merge exceeds the + * arity, it outputs multiple sorted blocks, recording to out_offsets. + * However, users will only every see a single sorted block out output because + * Sort::Sorted insures the arity is higher than the number of pieces before + * returning this. + */ +template <class Compare, class Combine> class MergingReader { + public: + MergingReader(int in, Offsets *in_offsets, Offsets *out_offsets, std::size_t buffer_size, std::size_t total_memory, const Compare &compare, const Combine &combine) : + compare_(compare), combine_(combine), + in_(in), + in_offsets_(in_offsets), out_offsets_(out_offsets), + buffer_size_(buffer_size), total_memory_(total_memory) {} + + void Run(const ChainPosition &position) { + Run(position, false); + } + + void Run(const ChainPosition &position, bool assert_one) { + // Special case: nothing to read. + if (!in_offsets_->RemainingBlocks()) { + Link l(position); + l.Poison(); + return; + } + // If there's just one entry, just read. + if (in_offsets_->RemainingBlocks() == 1) { + // Sequencing is important. + uint64_t offset = in_offsets_->TotalOffset(); + uint64_t amount = in_offsets_->NextSize(); + ReadSingle(offset, amount, position); + if (out_offsets_) out_offsets_->Append(amount); + return; + } + + Stream str(position); + scoped_malloc buffer(MallocOrThrow(total_memory_)); + uint8_t *const buffer_end = static_cast<uint8_t*>(buffer.get()) + total_memory_; + + const std::size_t entry_size = position.GetChain().EntrySize(); + + while (in_offsets_->RemainingBlocks()) { + // Use bigger buffers if there's less remaining. + uint64_t per_buffer = static_cast<uint64_t>(std::max<std::size_t>( + buffer_size_, + static_cast<std::size_t>((static_cast<uint64_t>(total_memory_) / in_offsets_->RemainingBlocks())))); + per_buffer -= per_buffer % entry_size; + assert(per_buffer); + + // Populate queue. + MergeQueue<Compare> queue(in_, per_buffer, entry_size, compare_); + for (uint8_t *buf = static_cast<uint8_t*>(buffer.get()); + in_offsets_->RemainingBlocks() && (buf + std::min(per_buffer, in_offsets_->PeekSize()) <= buffer_end);) { + uint64_t offset = in_offsets_->TotalOffset(); + uint64_t size = in_offsets_->NextSize(); + queue.Push(buf, offset, size); + buf += static_cast<std::size_t>(std::min<uint64_t>(size, per_buffer)); + } + // This shouldn't happen but it's probably better to die than loop indefinitely. + if (queue.Size() < 2 && in_offsets_->RemainingBlocks()) { + std::cerr << "Bug in sort implementation: not merging at least two stripes." << std::endl; + abort(); + } + if (assert_one && in_offsets_->RemainingBlocks()) { + std::cerr << "Bug in sort implementation: should only be one merge group for lazy sort" << std::endl; + abort(); + } + + uint64_t written = 0; + // Merge including combiner support. + memcpy(str.Get(), queue.Top(), entry_size); + for (queue.Pop(); !queue.Empty(); queue.Pop()) { + if (!combine_(str.Get(), queue.Top(), compare_)) { + ++written; ++str; + memcpy(str.Get(), queue.Top(), entry_size); + } + } + ++written; ++str; + if (out_offsets_) + out_offsets_->Append(written * entry_size); + } + str.Poison(); + } + + private: + void ReadSingle(uint64_t offset, const uint64_t size, const ChainPosition &position) { + // Special case: only one to read. + const uint64_t end = offset + size; + const uint64_t block_size = position.GetChain().BlockSize(); + Link l(position); + for (; offset + block_size < end; ++l, offset += block_size) { + PReadOrThrow(in_, l->Get(), block_size, offset); + l->SetValidSize(block_size); + } + PReadOrThrow(in_, l->Get(), end - offset, offset); + l->SetValidSize(end - offset); + (++l).Poison(); + return; + } + + Compare compare_; + Combine combine_; + + int in_; + + protected: + Offsets *in_offsets_; + + private: + Offsets *out_offsets_; + + std::size_t buffer_size_; + std::size_t total_memory_; +}; + +// The lazy step owns the remaining files. This keeps track of them. +template <class Compare, class Combine> class OwningMergingReader : public MergingReader<Compare, Combine> { + private: + typedef MergingReader<Compare, Combine> P; + public: + OwningMergingReader(int data, const Offsets &offsets, std::size_t buffer, std::size_t lazy, const Compare &compare, const Combine &combine) + : P(data, NULL, NULL, buffer, lazy, compare, combine), + data_(data), + offsets_(offsets) {} + + void Run(const ChainPosition &position) { + P::in_offsets_ = &offsets_; + scoped_fd data(data_); + scoped_fd offsets_file(offsets_.File()); + P::Run(position, true); + } + + private: + int data_; + Offsets offsets_; +}; + +// Don't use this directly. Worker that sorts blocks. +template <class Compare> class BlockSorter { + public: + BlockSorter(Offsets &offsets, const Compare &compare) : + offsets_(&offsets), compare_(compare) {} + + void Run(const ChainPosition &position) { + const std::size_t entry_size = position.GetChain().EntrySize(); + for (Link link(position); link; ++link) { + // Record the size of each block in a separate file. + offsets_->Append(link->ValidSize()); + void *end = static_cast<uint8_t*>(link->Get()) + link->ValidSize(); +#if defined(_WIN32) || defined(_WIN64) + std::stable_sort +#else + std::sort +#endif + (SizedIt(link->Get(), entry_size), + SizedIt(end, entry_size), + compare_); + } + offsets_->FinishedAppending(); + } + + private: + Offsets *offsets_; + SizedCompare<Compare> compare_; +}; + +class BadSortConfig : public Exception { + public: + BadSortConfig() throw() {} + ~BadSortConfig() throw() {} +}; + +template <class Compare, class Combine = NeverCombine> class Sort { + public: + Sort(Chain &in, const SortConfig &config, const Compare &compare = Compare(), const Combine &combine = Combine()) + : config_(config), + data_(MakeTemp(config.temp_prefix)), + offsets_file_(MakeTemp(config.temp_prefix)), offsets_(offsets_file_.get()), + compare_(compare), combine_(combine), + entry_size_(in.EntrySize()) { + UTIL_THROW_IF(!entry_size_, BadSortConfig, "Sorting entries of size 0"); + // Make buffer_size a multiple of the entry_size. + config_.buffer_size -= config_.buffer_size % entry_size_; + UTIL_THROW_IF(!config_.buffer_size, BadSortConfig, "Sort buffer too small"); + UTIL_THROW_IF(config_.total_memory < config_.buffer_size * 4, BadSortConfig, "Sorting memory " << config_.total_memory << " is too small for four buffers (two read and two write)."); + in >> BlockSorter<Compare>(offsets_, compare_) >> WriteAndRecycle(data_.get()); + } + + uint64_t Size() const { + return SizeOrThrow(data_.get()); + } + + // Do merge sort, terminating when lazy merge could be done with the + // specified memory. Return the minimum memory necessary to do lazy merge. + std::size_t Merge(std::size_t lazy_memory) { + if (offsets_.RemainingBlocks() <= 1) return 0; + const uint64_t lazy_arity = std::max<uint64_t>(1, lazy_memory / config_.buffer_size); + uint64_t size = Size(); + /* No overflow because + * offsets_.RemainingBlocks() * config_.buffer_size <= lazy_memory || + * size < lazy_memory + */ + if (offsets_.RemainingBlocks() <= lazy_arity || size <= static_cast<uint64_t>(lazy_memory)) + return std::min<std::size_t>(size, offsets_.RemainingBlocks() * config_.buffer_size); + + scoped_fd data2(MakeTemp(config_.temp_prefix)); + int fd_in = data_.get(), fd_out = data2.get(); + scoped_fd offsets2_file(MakeTemp(config_.temp_prefix)); + Offsets offsets2(offsets2_file.get()); + Offsets *offsets_in = &offsets_, *offsets_out = &offsets2; + + // Double buffered writing. + ChainConfig chain_config; + chain_config.entry_size = entry_size_; + chain_config.block_count = 2; + chain_config.total_memory = config_.buffer_size * 2; + Chain chain(chain_config); + + while (offsets_in->RemainingBlocks() > lazy_arity) { + if (size <= static_cast<uint64_t>(lazy_memory)) break; + std::size_t reading_memory = config_.total_memory - 2 * config_.buffer_size; + if (size < static_cast<uint64_t>(reading_memory)) { + reading_memory = static_cast<std::size_t>(size); + } + SeekOrThrow(fd_in, 0); + chain >> + MergingReader<Compare, Combine>( + fd_in, + offsets_in, offsets_out, + config_.buffer_size, + reading_memory, + compare_, combine_) >> + WriteAndRecycle(fd_out); + chain.Wait(); + offsets_out->FinishedAppending(); + ResizeOrThrow(fd_in, 0); + offsets_in->Reset(); + std::swap(fd_in, fd_out); + std::swap(offsets_in, offsets_out); + size = SizeOrThrow(fd_in); + } + + SeekOrThrow(fd_in, 0); + if (fd_in == data2.get()) { + data_.reset(data2.release()); + offsets_file_.reset(offsets2_file.release()); + offsets_ = offsets2; + } + if (offsets_.RemainingBlocks() <= 1) return 0; + // No overflow because the while loop exited. + return std::min(size, offsets_.RemainingBlocks() * static_cast<uint64_t>(config_.buffer_size)); + } + + // Output to chain, using this amount of memory, maximum, for lazy merge + // sort. + void Output(Chain &out, std::size_t lazy_memory) { + Merge(lazy_memory); + out.SetProgressTarget(Size()); + out >> OwningMergingReader<Compare, Combine>(data_.get(), offsets_, config_.buffer_size, lazy_memory, compare_, combine_); + data_.release(); + offsets_file_.release(); + } + + /* If a pipeline step is reading sorted input and writing to a different + * sort order, then there's a trade-off between using RAM to read lazily + * (avoiding copying the file) and using RAM to increase block size and, + * therefore, decrease the number of merge sort passes in the next + * iteration. + * + * Merge sort takes log_{arity}(pieces) passes. Thus, each time the chain + * block size is multiplied by arity, the number of output passes decreases + * by one. Up to a constant, then, log_{arity}(chain) is the number of + * passes saved. Chain simply divides the memory evenly over all blocks. + * + * Lazy sort saves this many passes (up to a constant) + * log_{arity}((memory-lazy)/block_count) + 1 + * Non-lazy sort saves this many passes (up to the same constant): + * log_{arity}(memory/block_count) + * Add log_{arity}(block_count) to both: + * log_{arity}(memory-lazy) + 1 versus log_{arity}(memory) + * Take arity to the power of both sizes (arity > 1) + * (memory - lazy)*arity versus memory + * Solve for lazy + * lazy = memory * (arity - 1) / arity + */ + std::size_t DefaultLazy() { + float arity = static_cast<float>(config_.total_memory / config_.buffer_size); + return static_cast<std::size_t>(static_cast<float>(config_.total_memory) * (arity - 1.0) / arity); + } + + // Same as Output with default lazy memory setting. + void Output(Chain &out) { + Output(out, DefaultLazy()); + } + + // Completely merge sort and transfer ownership to the caller. + int StealCompleted() { + // Merge all the way. + Merge(0); + SeekOrThrow(data_.get(), 0); + offsets_file_.reset(); + return data_.release(); + } + + private: + SortConfig config_; + + scoped_fd data_; + + scoped_fd offsets_file_; + Offsets offsets_; + + const Compare compare_; + const Combine combine_; + const std::size_t entry_size_; +}; + +// returns bytes to be read on demand. +template <class Compare, class Combine> uint64_t BlockingSort(Chain &chain, const SortConfig &config, const Compare &compare = Compare(), const Combine &combine = NeverCombine()) { + Sort<Compare, Combine> sorter(chain, config, compare, combine); + chain.Wait(true); + uint64_t size = sorter.Size(); + sorter.Output(chain); + return size; +} + +} // namespace stream +} // namespace util + +#endif // UTIL_STREAM_SORT__ diff --git a/klm/util/stream/sort_test.cc b/klm/util/stream/sort_test.cc new file mode 100644 index 00000000..fd7705cd --- /dev/null +++ b/klm/util/stream/sort_test.cc @@ -0,0 +1,62 @@ +#include "util/stream/sort.hh" + +#define BOOST_TEST_MODULE SortTest +#include <boost/test/unit_test.hpp> + +#include <algorithm> + +#include <unistd.h> + +namespace util { namespace stream { namespace { + +struct CompareUInt64 : public std::binary_function<const void *, const void *, bool> { + bool operator()(const void *first, const void *second) const { + return *static_cast<const uint64_t*>(first) < *reinterpret_cast<const uint64_t*>(second); + } +}; + +const uint64_t kSize = 100000; + +struct Putter { + Putter(std::vector<uint64_t> &shuffled) : shuffled_(shuffled) {} + + void Run(const ChainPosition &position) { + Stream put_shuffled(position); + for (uint64_t i = 0; i < shuffled_.size(); ++i, ++put_shuffled) { + *static_cast<uint64_t*>(put_shuffled.Get()) = shuffled_[i]; + } + put_shuffled.Poison(); + } + std::vector<uint64_t> &shuffled_; +}; + +BOOST_AUTO_TEST_CASE(FromShuffled) { + std::vector<uint64_t> shuffled; + shuffled.reserve(kSize); + for (uint64_t i = 0; i < kSize; ++i) { + shuffled.push_back(i); + } + std::random_shuffle(shuffled.begin(), shuffled.end()); + + ChainConfig config; + config.entry_size = 8; + config.total_memory = 800; + config.block_count = 3; + + SortConfig merge_config; + merge_config.temp_prefix = "sort_test_temp"; + merge_config.buffer_size = 800; + merge_config.total_memory = 3300; + + Chain chain(config); + chain >> Putter(shuffled); + BlockingSort(chain, merge_config, CompareUInt64(), NeverCombine()); + Stream sorted; + chain >> sorted >> kRecycle; + for (uint64_t i = 0; i < kSize; ++i, ++sorted) { + BOOST_CHECK_EQUAL(i, *static_cast<const uint64_t*>(sorted.Get())); + } + BOOST_CHECK(!sorted); +} + +}}} // namespaces diff --git a/klm/util/stream/stream.hh b/klm/util/stream/stream.hh new file mode 100644 index 00000000..6ff45b82 --- /dev/null +++ b/klm/util/stream/stream.hh @@ -0,0 +1,74 @@ +#ifndef UTIL_STREAM_STREAM__ +#define UTIL_STREAM_STREAM__ + +#include "util/stream/chain.hh" + +#include <boost/noncopyable.hpp> + +#include <assert.h> +#include <stdint.h> + +namespace util { +namespace stream { + +class Stream : boost::noncopyable { + public: + Stream() : current_(NULL), end_(NULL) {} + + void Init(const ChainPosition &position) { + entry_size_ = position.GetChain().EntrySize(); + block_size_ = position.GetChain().BlockSize(); + block_it_.Init(position); + StartBlock(); + } + + explicit Stream(const ChainPosition &position) { + Init(position); + } + + operator bool() const { return current_ != NULL; } + bool operator!() const { return current_ == NULL; } + + const void *Get() const { return current_; } + void *Get() { return current_; } + + void Poison() { + block_it_->SetValidSize(current_ - static_cast<uint8_t*>(block_it_->Get())); + ++block_it_; + block_it_.Poison(); + } + + Stream &operator++() { + assert(*this); + assert(current_ < end_); + current_ += entry_size_; + if (current_ == end_) { + ++block_it_; + StartBlock(); + } + return *this; + } + + private: + void StartBlock() { + for (; block_it_ && !block_it_->ValidSize(); ++block_it_) {} + current_ = static_cast<uint8_t*>(block_it_->Get()); + end_ = current_ + block_it_->ValidSize(); + } + + uint8_t *current_, *end_; + + std::size_t entry_size_; + std::size_t block_size_; + + Link block_it_; +}; + +inline Chain &operator>>(Chain &chain, Stream &stream) { + stream.Init(chain.Add()); + return chain; +} + +} // namespace stream +} // namespace util +#endif // UTIL_STREAM_STREAM__ diff --git a/klm/util/stream/stream_test.cc b/klm/util/stream/stream_test.cc new file mode 100644 index 00000000..6575d50d --- /dev/null +++ b/klm/util/stream/stream_test.cc @@ -0,0 +1,35 @@ +#include "util/stream/io.hh" + +#include "util/stream/stream.hh" +#include "util/file.hh" + +#define BOOST_TEST_MODULE StreamTest +#include <boost/test/unit_test.hpp> + +#include <unistd.h> + +namespace util { namespace stream { namespace { + +BOOST_AUTO_TEST_CASE(StreamTest) { + scoped_fd in(MakeTemp("io_test_temp")); + for (uint64_t i = 0; i < 100000; ++i) { + WriteOrThrow(in.get(), &i, sizeof(uint64_t)); + } + SeekOrThrow(in.get(), 0); + + ChainConfig config; + config.entry_size = 8; + config.total_memory = 100; + config.block_count = 12; + + Stream s; + Chain chain(config); + chain >> Read(in.get()) >> s >> kRecycle; + uint64_t i = 0; + for (; s; ++s, ++i) { + BOOST_CHECK_EQUAL(i, *static_cast<const uint64_t*>(s.Get())); + } + BOOST_CHECK_EQUAL(100000ULL, i); +} + +}}} // namespaces diff --git a/klm/util/stream/timer.hh b/klm/util/stream/timer.hh new file mode 100644 index 00000000..7e1a5885 --- /dev/null +++ b/klm/util/stream/timer.hh @@ -0,0 +1,16 @@ +#ifndef UTIL_STREAM_TIMER__ +#define UTIL_STREAM_TIMER__ + +// Sorry Jon, this was adding library dependencies in Moses and people complained. + +/*#include <boost/version.hpp> + +#if BOOST_VERSION >= 104800 +#include <boost/timer/timer.hpp> +#define UTIL_TIMER(str) boost::timer::auto_cpu_timer timer(std::cerr, 1, (str)) +#else +//#warning Using Boost older than 1.48. Timing information will not be available.*/ +#define UTIL_TIMER(str) +//#endif + +#endif // UTIL_STREAM_TIMER__ |