1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
#include "util/parallel_read.hh"
#include "util/file.hh"
#ifdef WITH_THREADS
#include "util/thread_pool.hh"
namespace util {
namespace {
class Reader {
public:
explicit Reader(int fd) : fd_(fd) {}
struct Request {
void *to;
std::size_t size;
uint64_t offset;
bool operator==(const Request &other) const {
return (to == other.to) && (size == other.size) && (offset == other.offset);
}
};
void operator()(const Request &request) {
util::ErsatzPRead(fd_, request.to, request.size, request.offset);
}
private:
int fd_;
};
} // namespace
void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) {
Reader::Request poison;
poison.to = NULL;
poison.size = 0;
poison.offset = 0;
unsigned threads = boost::thread::hardware_concurrency();
if (!threads) threads = 2;
ThreadPool<Reader> pool(2 /* don't need much of a queue */, threads, fd, poison);
const std::size_t kBatch = 1ULL << 25; // 32 MB
Reader::Request request;
request.to = to;
request.size = kBatch;
request.offset = offset;
for (; amount > kBatch; amount -= kBatch) {
pool.Produce(request);
request.to = reinterpret_cast<uint8_t*>(request.to) + kBatch;
request.offset += kBatch;
}
request.size = amount;
if (request.size) {
pool.Produce(request);
}
}
} // namespace util
#else // WITH_THREADS
namespace util {
void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) {
util::ErsatzPRead(fd, to, amount, offset);
}
} // namespace util
#endif
|