#include "util/stream/chain.hh" #include "util/stream/io.hh" #include "util/exception.hh" #include "util/pcqueue.hh" #include <cstdlib> #include <new> #include <iostream> #include <stdint.h> #include <stdlib.h> namespace util { namespace stream { ChainConfigException::ChainConfigException() throw() { *this << "Chain configured with "; } ChainConfigException::~ChainConfigException() throw() {} Thread::~Thread() { thread_.join(); } void Thread::UnhandledException(const std::exception &e) { std::cerr << e.what() << std::endl; abort(); } void Recycler::Run(const ChainPosition &position) { for (Link l(position); l; ++l) { l->SetValidSize(position.GetChain().BlockSize()); } } const Recycler kRecycle = Recycler(); Chain::Chain(const ChainConfig &config) : config_(config), complete_called_(false) { UTIL_THROW_IF(!config.entry_size, ChainConfigException, "zero-size entries."); UTIL_THROW_IF(!config.block_count, ChainConfigException, "block count zero"); UTIL_THROW_IF(config.total_memory < config.entry_size * config.block_count, ChainConfigException, config.total_memory << " total memory, too small for " << config.block_count << " blocks of containing entries of size " << config.entry_size); // Round down block size to a multiple of entry size. block_size_ = config.total_memory / (config.block_count * config.entry_size) * config.entry_size; } Chain::~Chain() { Wait(); } ChainPosition Chain::Add() { if (!Running()) Start(); PCQueue<Block> &in = queues_.back(); queues_.push_back(new PCQueue<Block>(config_.block_count)); return ChainPosition(in, queues_.back(), this, progress_); } Chain &Chain::operator>>(const WriteAndRecycle &writer) { threads_.push_back(new Thread(Complete(), writer)); return *this; } void Chain::Wait(bool release_memory) { if (queues_.empty()) { assert(threads_.empty()); return; // Nothing to wait for. } if (!complete_called_) CompleteLoop(); threads_.clear(); for (std::size_t i = 0; queues_.front().Consume(); ++i) { if (i == config_.block_count) { std::cerr << "Chain ending without poison." << std::endl; abort(); } } queues_.clear(); progress_.Finished(); complete_called_ = false; if (release_memory) memory_.reset(); } void Chain::Start() { Wait(false); if (!memory_.get()) { // Allocate memory. assert(threads_.empty()); assert(queues_.empty()); std::size_t malloc_size = block_size_ * config_.block_count; memory_.reset(MallocOrThrow(malloc_size)); } // This queue can accomodate all blocks. queues_.push_back(new PCQueue<Block>(config_.block_count)); // Populate the lead queue with blocks. uint8_t *base = static_cast<uint8_t*>(memory_.get()); for (std::size_t i = 0; i < config_.block_count; ++i) { queues_.front().Produce(Block(base, block_size_)); base += block_size_; } } ChainPosition Chain::Complete() { assert(Running()); UTIL_THROW_IF(complete_called_, util::Exception, "CompleteLoop() called twice"); complete_called_ = true; return ChainPosition(queues_.back(), queues_.front(), this, progress_); } Link::Link() : in_(NULL), out_(NULL), poisoned_(true) {} void Link::Init(const ChainPosition &position) { UTIL_THROW_IF(in_, util::Exception, "Link::Init twice"); in_ = position.in_; out_ = position.out_; poisoned_ = false; progress_ = position.progress_; in_->Consume(current_); } Link::Link(const ChainPosition &position) : in_(NULL) { Init(position); } Link::~Link() { if (current_) { // Probably an exception unwinding. std::cerr << "Last input should have been poison." << std::endl; // abort(); } else { if (!poisoned_) { // Pass the poison! out_->Produce(current_); } } } Link &Link::operator++() { assert(current_); progress_ += current_.ValidSize(); out_->Produce(current_); in_->Consume(current_); if (!current_) { poisoned_ = true; out_->Produce(current_); } return *this; } void Link::Poison() { assert(!poisoned_); current_.SetToPoison(); out_->Produce(current_); poisoned_ = true; } } // namespace stream } // namespace util