diff options
| author | armatthews <armatthe@cmu.edu> | 2014-10-13 14:59:23 -0400 | 
|---|---|---|
| committer | armatthews <armatthe@cmu.edu> | 2014-10-13 14:59:23 -0400 | 
| commit | b26cda84e05d4523eee069234a975a0153bf8608 (patch) | |
| tree | 61c9da4f8dd6070f27c8e81812a76fc0a8cf2d8d /klm/util/parallel_read.cc | |
| parent | cd7bc67f475fdfd07fba003ac4cca40e83944740 (diff) | |
| parent | b1ed81ef3216b212295afa76c5d20a56fb647204 (diff) | |
Merge branch 'master' of github.com:redpony/cdec
Diffstat (limited to 'klm/util/parallel_read.cc')
| -rw-r--r-- | klm/util/parallel_read.cc | 69 | 
1 files changed, 69 insertions, 0 deletions
| diff --git a/klm/util/parallel_read.cc b/klm/util/parallel_read.cc new file mode 100644 index 00000000..6435eb84 --- /dev/null +++ b/klm/util/parallel_read.cc @@ -0,0 +1,69 @@ +#include "util/parallel_read.hh" + +#include "util/file.hh" + +#ifdef WITH_THREADS +#include "util/thread_pool.hh" + +namespace util { +namespace { + +class Reader { +  public: +    explicit Reader(int fd) : fd_(fd) {} + +    struct Request { +      void *to; +      std::size_t size; +      uint64_t offset; + +      bool operator==(const Request &other) const { +        return (to == other.to) && (size == other.size) && (offset == other.offset); +      } +    }; + +    void operator()(const Request &request) { +      util::ErsatzPRead(fd_, request.to, request.size, request.offset); +    } + +  private: +    int fd_; +}; + +} // namespace + +void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) { +  Reader::Request poison; +  poison.to = NULL; +  poison.size = 0; +  poison.offset = 0; +  unsigned threads = boost::thread::hardware_concurrency(); +  if (!threads) threads = 2; +  ThreadPool<Reader> pool(2 /* don't need much of a queue */, threads, fd, poison); +  const std::size_t kBatch = 1ULL << 25; // 32 MB +  Reader::Request request; +  request.to = to; +  request.size = kBatch; +  request.offset = offset; +  for (; amount > kBatch; amount -= kBatch) { +    pool.Produce(request); +    request.to = reinterpret_cast<uint8_t*>(request.to) + kBatch; +    request.offset += kBatch; +  } +  request.size = amount; +  if (request.size) { +    pool.Produce(request); +  } +} + +} // namespace util + +#else // WITH_THREADS + +namespace util { +void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) { + util::ErsatzPRead(fd, to, amount, offset); +} +} // namespace util + +#endif | 
