diff options
author | armatthews <armatthe@cmu.edu> | 2014-10-13 14:59:23 -0400 |
---|---|---|
committer | armatthews <armatthe@cmu.edu> | 2014-10-13 14:59:23 -0400 |
commit | b26cda84e05d4523eee069234a975a0153bf8608 (patch) | |
tree | 61c9da4f8dd6070f27c8e81812a76fc0a8cf2d8d /klm/util/parallel_read.cc | |
parent | cd7bc67f475fdfd07fba003ac4cca40e83944740 (diff) | |
parent | b1ed81ef3216b212295afa76c5d20a56fb647204 (diff) |
Merge branch 'master' of github.com:redpony/cdec
Diffstat (limited to 'klm/util/parallel_read.cc')
-rw-r--r-- | klm/util/parallel_read.cc | 69 |
1 files changed, 69 insertions, 0 deletions
diff --git a/klm/util/parallel_read.cc b/klm/util/parallel_read.cc new file mode 100644 index 00000000..6435eb84 --- /dev/null +++ b/klm/util/parallel_read.cc @@ -0,0 +1,69 @@ +#include "util/parallel_read.hh" + +#include "util/file.hh" + +#ifdef WITH_THREADS +#include "util/thread_pool.hh" + +namespace util { +namespace { + +class Reader { + public: + explicit Reader(int fd) : fd_(fd) {} + + struct Request { + void *to; + std::size_t size; + uint64_t offset; + + bool operator==(const Request &other) const { + return (to == other.to) && (size == other.size) && (offset == other.offset); + } + }; + + void operator()(const Request &request) { + util::ErsatzPRead(fd_, request.to, request.size, request.offset); + } + + private: + int fd_; +}; + +} // namespace + +void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) { + Reader::Request poison; + poison.to = NULL; + poison.size = 0; + poison.offset = 0; + unsigned threads = boost::thread::hardware_concurrency(); + if (!threads) threads = 2; + ThreadPool<Reader> pool(2 /* don't need much of a queue */, threads, fd, poison); + const std::size_t kBatch = 1ULL << 25; // 32 MB + Reader::Request request; + request.to = to; + request.size = kBatch; + request.offset = offset; + for (; amount > kBatch; amount -= kBatch) { + pool.Produce(request); + request.to = reinterpret_cast<uint8_t*>(request.to) + kBatch; + request.offset += kBatch; + } + request.size = amount; + if (request.size) { + pool.Produce(request); + } +} + +} // namespace util + +#else // WITH_THREADS + +namespace util { +void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) { + util::ErsatzPRead(fd, to, amount, offset); +} +} // namespace util + +#endif |