diff options
author | Wu, Ke <wuke@cs.umd.edu> | 2014-12-17 16:15:13 -0500 |
---|---|---|
committer | Wu, Ke <wuke@cs.umd.edu> | 2014-12-17 16:15:13 -0500 |
commit | 6829a0bc624b02ebefc79f8cf9ec89d7d64a7c30 (patch) | |
tree | 125dfb20f73342873476c793995397b26fd202dd /klm/util/parallel_read.cc | |
parent | b455a108a21f4ba5a58ab1bc53a8d2bf4d829067 (diff) | |
parent | 7468e8d85e99b4619442c7afaf4a0d92870111bb (diff) |
Merge branch 'const_reorder_2' into softsyn_2
Diffstat (limited to 'klm/util/parallel_read.cc')
-rw-r--r-- | klm/util/parallel_read.cc | 69 |
1 files changed, 69 insertions, 0 deletions
diff --git a/klm/util/parallel_read.cc b/klm/util/parallel_read.cc new file mode 100644 index 00000000..6435eb84 --- /dev/null +++ b/klm/util/parallel_read.cc @@ -0,0 +1,69 @@ +#include "util/parallel_read.hh" + +#include "util/file.hh" + +#ifdef WITH_THREADS +#include "util/thread_pool.hh" + +namespace util { +namespace { + +class Reader { + public: + explicit Reader(int fd) : fd_(fd) {} + + struct Request { + void *to; + std::size_t size; + uint64_t offset; + + bool operator==(const Request &other) const { + return (to == other.to) && (size == other.size) && (offset == other.offset); + } + }; + + void operator()(const Request &request) { + util::ErsatzPRead(fd_, request.to, request.size, request.offset); + } + + private: + int fd_; +}; + +} // namespace + +void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) { + Reader::Request poison; + poison.to = NULL; + poison.size = 0; + poison.offset = 0; + unsigned threads = boost::thread::hardware_concurrency(); + if (!threads) threads = 2; + ThreadPool<Reader> pool(2 /* don't need much of a queue */, threads, fd, poison); + const std::size_t kBatch = 1ULL << 25; // 32 MB + Reader::Request request; + request.to = to; + request.size = kBatch; + request.offset = offset; + for (; amount > kBatch; amount -= kBatch) { + pool.Produce(request); + request.to = reinterpret_cast<uint8_t*>(request.to) + kBatch; + request.offset += kBatch; + } + request.size = amount; + if (request.size) { + pool.Produce(request); + } +} + +} // namespace util + +#else // WITH_THREADS + +namespace util { +void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) { + util::ErsatzPRead(fd, to, amount, offset); +} +} // namespace util + +#endif |