summaryrefslogtreecommitdiff
path: root/klm/util/parallel_read.cc
blob: 6435eb84375fa8eb24b8bfced85fcad632e2ab47 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include "util/parallel_read.hh"

#include "util/file.hh"

#ifdef WITH_THREADS
#include "util/thread_pool.hh"

namespace util {
namespace {

class Reader {
  public:
    explicit Reader(int fd) : fd_(fd) {}

    struct Request {
      void *to;
      std::size_t size;
      uint64_t offset;

      bool operator==(const Request &other) const {
        return (to == other.to) && (size == other.size) && (offset == other.offset);
      }
    };

    void operator()(const Request &request) {
      util::ErsatzPRead(fd_, request.to, request.size, request.offset);
    }

  private:
    int fd_;
};

} // namespace

void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) {
  Reader::Request poison;
  poison.to = NULL;
  poison.size = 0;
  poison.offset = 0;
  unsigned threads = boost::thread::hardware_concurrency();
  if (!threads) threads = 2;
  ThreadPool<Reader> pool(2 /* don't need much of a queue */, threads, fd, poison);
  const std::size_t kBatch = 1ULL << 25; // 32 MB
  Reader::Request request;
  request.to = to;
  request.size = kBatch;
  request.offset = offset;
  for (; amount > kBatch; amount -= kBatch) {
    pool.Produce(request);
    request.to = reinterpret_cast<uint8_t*>(request.to) + kBatch;
    request.offset += kBatch;
  }
  request.size = amount;
  if (request.size) {
    pool.Produce(request);
  }
}

} // namespace util

#else // WITH_THREADS

namespace util {
void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) {
 util::ErsatzPRead(fd, to, amount, offset);
}
} // namespace util

#endif