diff options
Diffstat (limited to 'klm/util/mmap.cc')
-rw-r--r-- | klm/util/mmap.cc | 69 |
1 files changed, 68 insertions, 1 deletions
diff --git a/klm/util/mmap.cc b/klm/util/mmap.cc index cee6a970..a3c8a022 100644 --- a/klm/util/mmap.cc +++ b/klm/util/mmap.cc @@ -6,6 +6,7 @@ #include "util/exception.hh" #include "util/file.hh" +#include "util/parallel_read.hh" #include "util/scoped.hh" #include <iostream> @@ -40,7 +41,7 @@ void SyncOrThrow(void *start, size_t length) { #if defined(_WIN32) || defined(_WIN64) UTIL_THROW_IF(!::FlushViewOfFile(start, length), ErrnoException, "Failed to sync mmap"); #else - UTIL_THROW_IF(msync(start, length, MS_SYNC), ErrnoException, "Failed to sync mmap"); + UTIL_THROW_IF(length && msync(start, length, MS_SYNC), ErrnoException, "Failed to sync mmap"); #endif } @@ -154,6 +155,10 @@ void MapRead(LoadMethod method, int fd, uint64_t offset, std::size_t size, scope SeekOrThrow(fd, offset); ReadOrThrow(fd, out.get(), size); break; + case PARALLEL_READ: + out.reset(MallocOrThrow(size), size, scoped_memory::MALLOC_ALLOCATED); + ParallelRead(fd, out.get(), size, offset); + break; } } @@ -189,4 +194,66 @@ void *MapZeroedWrite(const char *name, std::size_t size, scoped_fd &file) { } } +Rolling::Rolling(const Rolling ©_from, uint64_t increase) { + *this = copy_from; + IncreaseBase(increase); +} + +Rolling &Rolling::operator=(const Rolling ©_from) { + fd_ = copy_from.fd_; + file_begin_ = copy_from.file_begin_; + file_end_ = copy_from.file_end_; + for_write_ = copy_from.for_write_; + block_ = copy_from.block_; + read_bound_ = copy_from.read_bound_; + + current_begin_ = 0; + if (copy_from.IsPassthrough()) { + current_end_ = copy_from.current_end_; + ptr_ = copy_from.ptr_; + } else { + // Force call on next mmap. + current_end_ = 0; + ptr_ = NULL; + } + return *this; +} + +Rolling::Rolling(int fd, bool for_write, std::size_t block, std::size_t read_bound, uint64_t offset, uint64_t amount) { + current_begin_ = 0; + current_end_ = 0; + fd_ = fd; + file_begin_ = offset; + file_end_ = offset + amount; + for_write_ = for_write; + block_ = block; + read_bound_ = read_bound; +} + +void *Rolling::ExtractNonRolling(scoped_memory &out, uint64_t index, std::size_t size) { + out.reset(); + if (IsPassthrough()) return static_cast<uint8_t*>(get()) + index; + uint64_t offset = index + file_begin_; + // Round down to multiple of page size. + uint64_t cruft = offset % static_cast<uint64_t>(SizePage()); + std::size_t map_size = static_cast<std::size_t>(size + cruft); + out.reset(MapOrThrow(map_size, for_write_, kFileFlags, true, fd_, offset - cruft), map_size, scoped_memory::MMAP_ALLOCATED); + return static_cast<uint8_t*>(out.get()) + static_cast<std::size_t>(cruft); +} + +void Rolling::Roll(uint64_t index) { + assert(!IsPassthrough()); + std::size_t amount; + if (file_end_ - (index + file_begin_) > static_cast<uint64_t>(block_)) { + amount = block_; + current_end_ = index + amount - read_bound_; + } else { + amount = file_end_ - (index + file_begin_); + current_end_ = index + amount; + } + ptr_ = static_cast<uint8_t*>(ExtractNonRolling(mem_, index, amount)) - index; + + current_begin_ = index; +} + } // namespace util |