summaryrefslogtreecommitdiff
path: root/klm/util/mmap.cc
diff options
context:
space:
mode:
Diffstat (limited to 'klm/util/mmap.cc')
-rw-r--r--klm/util/mmap.cc69
1 files changed, 68 insertions, 1 deletions
diff --git a/klm/util/mmap.cc b/klm/util/mmap.cc
index cee6a970..a3c8a022 100644
--- a/klm/util/mmap.cc
+++ b/klm/util/mmap.cc
@@ -6,6 +6,7 @@
#include "util/exception.hh"
#include "util/file.hh"
+#include "util/parallel_read.hh"
#include "util/scoped.hh"
#include <iostream>
@@ -40,7 +41,7 @@ void SyncOrThrow(void *start, size_t length) {
#if defined(_WIN32) || defined(_WIN64)
UTIL_THROW_IF(!::FlushViewOfFile(start, length), ErrnoException, "Failed to sync mmap");
#else
- UTIL_THROW_IF(msync(start, length, MS_SYNC), ErrnoException, "Failed to sync mmap");
+ UTIL_THROW_IF(length && msync(start, length, MS_SYNC), ErrnoException, "Failed to sync mmap");
#endif
}
@@ -154,6 +155,10 @@ void MapRead(LoadMethod method, int fd, uint64_t offset, std::size_t size, scope
SeekOrThrow(fd, offset);
ReadOrThrow(fd, out.get(), size);
break;
+ case PARALLEL_READ:
+ out.reset(MallocOrThrow(size), size, scoped_memory::MALLOC_ALLOCATED);
+ ParallelRead(fd, out.get(), size, offset);
+ break;
}
}
@@ -189,4 +194,66 @@ void *MapZeroedWrite(const char *name, std::size_t size, scoped_fd &file) {
}
}
+Rolling::Rolling(const Rolling &copy_from, uint64_t increase) {
+ *this = copy_from;
+ IncreaseBase(increase);
+}
+
+Rolling &Rolling::operator=(const Rolling &copy_from) {
+ fd_ = copy_from.fd_;
+ file_begin_ = copy_from.file_begin_;
+ file_end_ = copy_from.file_end_;
+ for_write_ = copy_from.for_write_;
+ block_ = copy_from.block_;
+ read_bound_ = copy_from.read_bound_;
+
+ current_begin_ = 0;
+ if (copy_from.IsPassthrough()) {
+ current_end_ = copy_from.current_end_;
+ ptr_ = copy_from.ptr_;
+ } else {
+ // Force call on next mmap.
+ current_end_ = 0;
+ ptr_ = NULL;
+ }
+ return *this;
+}
+
+Rolling::Rolling(int fd, bool for_write, std::size_t block, std::size_t read_bound, uint64_t offset, uint64_t amount) {
+ current_begin_ = 0;
+ current_end_ = 0;
+ fd_ = fd;
+ file_begin_ = offset;
+ file_end_ = offset + amount;
+ for_write_ = for_write;
+ block_ = block;
+ read_bound_ = read_bound;
+}
+
+void *Rolling::ExtractNonRolling(scoped_memory &out, uint64_t index, std::size_t size) {
+ out.reset();
+ if (IsPassthrough()) return static_cast<uint8_t*>(get()) + index;
+ uint64_t offset = index + file_begin_;
+ // Round down to multiple of page size.
+ uint64_t cruft = offset % static_cast<uint64_t>(SizePage());
+ std::size_t map_size = static_cast<std::size_t>(size + cruft);
+ out.reset(MapOrThrow(map_size, for_write_, kFileFlags, true, fd_, offset - cruft), map_size, scoped_memory::MMAP_ALLOCATED);
+ return static_cast<uint8_t*>(out.get()) + static_cast<std::size_t>(cruft);
+}
+
+void Rolling::Roll(uint64_t index) {
+ assert(!IsPassthrough());
+ std::size_t amount;
+ if (file_end_ - (index + file_begin_) > static_cast<uint64_t>(block_)) {
+ amount = block_;
+ current_end_ = index + amount - read_bound_;
+ } else {
+ amount = file_end_ - (index + file_begin_);
+ current_end_ = index + amount;
+ }
+ ptr_ = static_cast<uint8_t*>(ExtractNonRolling(mem_, index, amount)) - index;
+
+ current_begin_ = index;
+}
+
} // namespace util