summaryrefslogtreecommitdiff
path: root/klm/util/parallel_read.cc
diff options
context:
space:
mode:
authorarmatthews <armatthe@cmu.edu>2014-10-13 14:59:23 -0400
committerarmatthews <armatthe@cmu.edu>2014-10-13 14:59:23 -0400
commit9a06ff1465eb3477ac3d1e92ab52e7eae40316a8 (patch)
tree808c266a3f510d00f37cd19c3f1da91d8fc683f7 /klm/util/parallel_read.cc
parente51da099233df0a384b04fe5908b30e44040d13e (diff)
parentd3e2ec203a5cf550320caa8023ac3dd103b0be7d (diff)
Merge branch 'master' of github.com:redpony/cdec
Diffstat (limited to 'klm/util/parallel_read.cc')
-rw-r--r--klm/util/parallel_read.cc69
1 files changed, 69 insertions, 0 deletions
diff --git a/klm/util/parallel_read.cc b/klm/util/parallel_read.cc
new file mode 100644
index 00000000..6435eb84
--- /dev/null
+++ b/klm/util/parallel_read.cc
@@ -0,0 +1,69 @@
+#include "util/parallel_read.hh"
+
+#include "util/file.hh"
+
+#ifdef WITH_THREADS
+#include "util/thread_pool.hh"
+
+namespace util {
+namespace {
+
+class Reader {
+ public:
+ explicit Reader(int fd) : fd_(fd) {}
+
+ struct Request {
+ void *to;
+ std::size_t size;
+ uint64_t offset;
+
+ bool operator==(const Request &other) const {
+ return (to == other.to) && (size == other.size) && (offset == other.offset);
+ }
+ };
+
+ void operator()(const Request &request) {
+ util::ErsatzPRead(fd_, request.to, request.size, request.offset);
+ }
+
+ private:
+ int fd_;
+};
+
+} // namespace
+
+void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) {
+ Reader::Request poison;
+ poison.to = NULL;
+ poison.size = 0;
+ poison.offset = 0;
+ unsigned threads = boost::thread::hardware_concurrency();
+ if (!threads) threads = 2;
+ ThreadPool<Reader> pool(2 /* don't need much of a queue */, threads, fd, poison);
+ const std::size_t kBatch = 1ULL << 25; // 32 MB
+ Reader::Request request;
+ request.to = to;
+ request.size = kBatch;
+ request.offset = offset;
+ for (; amount > kBatch; amount -= kBatch) {
+ pool.Produce(request);
+ request.to = reinterpret_cast<uint8_t*>(request.to) + kBatch;
+ request.offset += kBatch;
+ }
+ request.size = amount;
+ if (request.size) {
+ pool.Produce(request);
+ }
+}
+
+} // namespace util
+
+#else // WITH_THREADS
+
+namespace util {
+void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) {
+ util::ErsatzPRead(fd, to, amount, offset);
+}
+} // namespace util
+
+#endif