summaryrefslogtreecommitdiff
path: root/klm/util
diff options
context:
space:
mode:
Diffstat (limited to 'klm/util')
-rw-r--r--klm/util/Makefile.am18
-rw-r--r--klm/util/bit_packing.hh6
-rw-r--r--klm/util/cat_compressed_main.cc47
-rw-r--r--klm/util/ersatz_progress.hh6
-rw-r--r--klm/util/exception.hh9
-rw-r--r--klm/util/fake_ofstream.hh13
-rw-r--r--klm/util/file.cc142
-rw-r--r--klm/util/file.hh49
-rw-r--r--klm/util/file_piece.cc7
-rw-r--r--klm/util/file_piece.hh29
-rw-r--r--klm/util/fixed_array.hh153
-rw-r--r--klm/util/getopt.hh6
-rw-r--r--klm/util/have.hh6
-rw-r--r--klm/util/joint_sort.hh6
-rw-r--r--klm/util/mmap.cc69
-rw-r--r--klm/util/mmap.hh88
-rw-r--r--klm/util/multi_intersection.hh6
-rw-r--r--klm/util/murmur_hash.hh6
-rw-r--r--klm/util/parallel_read.cc69
-rw-r--r--klm/util/parallel_read.hh16
-rw-r--r--klm/util/pcqueue.hh9
-rw-r--r--klm/util/pool.hh6
-rw-r--r--klm/util/probing_hash_table.hh11
-rw-r--r--klm/util/proxy_iterator.hh6
-rw-r--r--klm/util/read_compressed.cc417
-rw-r--r--klm/util/read_compressed.hh10
-rw-r--r--klm/util/read_compressed_test.cc5
-rw-r--r--klm/util/scoped.cc4
-rw-r--r--klm/util/scoped.hh129
-rw-r--r--klm/util/sized_iterator.hh6
-rw-r--r--klm/util/sorted_uniform.hh27
-rw-r--r--klm/util/stream/Makefile.am1
-rw-r--r--klm/util/stream/block.hh57
-rw-r--r--klm/util/stream/chain.cc12
-rw-r--r--klm/util/stream/chain.hh165
-rw-r--r--klm/util/stream/config.hh43
-rw-r--r--klm/util/stream/io.cc14
-rw-r--r--klm/util/stream/io.hh21
-rw-r--r--klm/util/stream/line_input.hh6
-rw-r--r--klm/util/stream/multi_progress.hh6
-rw-r--r--klm/util/stream/multi_stream.hh127
-rw-r--r--klm/util/stream/sort.hh14
-rw-r--r--klm/util/stream/stream.hh9
-rw-r--r--klm/util/stream/timer.hh6
-rw-r--r--klm/util/string_piece.cc3
-rw-r--r--klm/util/string_piece.hh1
-rw-r--r--klm/util/string_piece_hash.hh6
-rw-r--r--klm/util/thread_pool.hh16
-rw-r--r--klm/util/tokenize_piece.hh16
-rw-r--r--klm/util/unistd.hh22
-rw-r--r--klm/util/usage.cc4
-rw-r--r--klm/util/usage.hh6
52 files changed, 1417 insertions, 519 deletions
diff --git a/klm/util/Makefile.am b/klm/util/Makefile.am
index 5e650af7..5db6e340 100644
--- a/klm/util/Makefile.am
+++ b/klm/util/Makefile.am
@@ -1,21 +1,21 @@
-#noinst_PROGRAMS = \
+noinst_PROGRAMS = cat_compressed
+
+cat_compressed_SOURCES = cat_compressed_main.cc
+cat_compressed_LDADD = libklm_util.a
+
+#TESTS = \
# file_piece_test \
# joint_sort_test \
# key_value_packing_test \
# probing_hash_table_test \
# sorted_uniform_test
-
-#TESTS = \
# file_piece_test \
# joint_sort_test \
# key_value_packing_test \
# probing_hash_table_test \
# sorted_uniform_test
-#file_piece_test_SOURCES = file_piece_test.cc
-#file_piece_test_LDADD = libklm_util.a
-
noinst_LIBRARIES = libklm_util.a
libklm_util_a_SOURCES = \
@@ -30,6 +30,8 @@ libklm_util_a_SOURCES = \
file.hh \
file_piece.cc \
file_piece.hh \
+ fixed_array.hh \
+ getopt.c \
getopt.hh \
have.hh \
joint_sort.hh \
@@ -38,6 +40,8 @@ libklm_util_a_SOURCES = \
multi_intersection.hh \
murmur_hash.cc \
murmur_hash.hh \
+ parallel_read.cc \
+ parallel_read.hh \
pcqueue.hh \
pool.cc \
pool.hh \
@@ -54,7 +58,9 @@ libklm_util_a_SOURCES = \
string_piece_hash.hh \
thread_pool.hh \
tokenize_piece.hh \
+ unistd.hh \
usage.cc \
usage.hh
AM_CPPFLAGS = -W -Wall -I$(top_srcdir)/klm -I$(top_srcdir)/klm/util/double-conversion
+
diff --git a/klm/util/bit_packing.hh b/klm/util/bit_packing.hh
index dcbd814c..1e34d9ab 100644
--- a/klm/util/bit_packing.hh
+++ b/klm/util/bit_packing.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_BIT_PACKING__
-#define UTIL_BIT_PACKING__
+#ifndef UTIL_BIT_PACKING_H
+#define UTIL_BIT_PACKING_H
/* Bit-level packing routines
*
@@ -183,4 +183,4 @@ struct BitAddress {
} // namespace util
-#endif // UTIL_BIT_PACKING__
+#endif // UTIL_BIT_PACKING_H
diff --git a/klm/util/cat_compressed_main.cc b/klm/util/cat_compressed_main.cc
new file mode 100644
index 00000000..2b4d7292
--- /dev/null
+++ b/klm/util/cat_compressed_main.cc
@@ -0,0 +1,47 @@
+// Like cat but interprets compressed files.
+#include "util/file.hh"
+#include "util/read_compressed.hh"
+
+#include <string.h>
+#include <iostream>
+
+namespace {
+const std::size_t kBufSize = 16384;
+void Copy(util::ReadCompressed &from, int to) {
+ util::scoped_malloc buffer(util::MallocOrThrow(kBufSize));
+ while (std::size_t amount = from.Read(buffer.get(), kBufSize)) {
+ util::WriteOrThrow(to, buffer.get(), amount);
+ }
+}
+} // namespace
+
+int main(int argc, char *argv[]) {
+ // Lane Schwartz likes -h and --help
+ for (int i = 1; i < argc; ++i) {
+ char *arg = argv[i];
+ if (!strcmp(arg, "--")) break;
+ if (!strcmp(arg, "-h") || !strcmp(arg, "--help")) {
+ std::cerr <<
+ "A cat implementation that interprets compressed files.\n"
+ "Usage: " << argv[0] << " [file1] [file2] ...\n"
+ "If no file is provided, then stdin is read.\n";
+ return 1;
+ }
+ }
+
+ try {
+ if (argc == 1) {
+ util::ReadCompressed in(0);
+ Copy(in, 1);
+ } else {
+ for (int i = 1; i < argc; ++i) {
+ util::ReadCompressed in(util::OpenReadOrThrow(argv[i]));
+ Copy(in, 1);
+ }
+ }
+ } catch (const std::exception &e) {
+ std::cerr << e.what() << std::endl;
+ return 2;
+ }
+ return 0;
+}
diff --git a/klm/util/ersatz_progress.hh b/klm/util/ersatz_progress.hh
index b94399a8..535dbde2 100644
--- a/klm/util/ersatz_progress.hh
+++ b/klm/util/ersatz_progress.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_ERSATZ_PROGRESS__
-#define UTIL_ERSATZ_PROGRESS__
+#ifndef UTIL_ERSATZ_PROGRESS_H
+#define UTIL_ERSATZ_PROGRESS_H
#include <iostream>
#include <string>
@@ -55,4 +55,4 @@ class ErsatzProgress {
} // namespace util
-#endif // UTIL_ERSATZ_PROGRESS__
+#endif // UTIL_ERSATZ_PROGRESS_H
diff --git a/klm/util/exception.hh b/klm/util/exception.hh
index 0298272b..4e50a6f3 100644
--- a/klm/util/exception.hh
+++ b/klm/util/exception.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_EXCEPTION__
-#define UTIL_EXCEPTION__
+#ifndef UTIL_EXCEPTION_H
+#define UTIL_EXCEPTION_H
#include <exception>
#include <limits>
@@ -83,6 +83,9 @@ template <class Except, class Data> typename Except::template ExceptionTag<Excep
#define UTIL_THROW(Exception, Modify) \
UTIL_THROW_BACKEND(NULL, Exception, , Modify);
+#define UTIL_THROW2(Modify) \
+ UTIL_THROW_BACKEND(NULL, util::Exception, , Modify);
+
#if __GNUC__ >= 3
#define UTIL_UNLIKELY(x) __builtin_expect (!!(x), 0)
#else
@@ -143,4 +146,4 @@ inline std::size_t CheckOverflow(uint64_t value) {
} // namespace util
-#endif // UTIL_EXCEPTION__
+#endif // UTIL_EXCEPTION_H
diff --git a/klm/util/fake_ofstream.hh b/klm/util/fake_ofstream.hh
index bcdebe45..eefb1edc 100644
--- a/klm/util/fake_ofstream.hh
+++ b/klm/util/fake_ofstream.hh
@@ -2,6 +2,9 @@
* Does not support many data types. Currently, it's targeted at writing ARPA
* files quickly.
*/
+#ifndef UTIL_FAKE_OFSTREAM_H
+#define UTIL_FAKE_OFSTREAM_H
+
#include "util/double-conversion/double-conversion.h"
#include "util/double-conversion/utils.h"
#include "util/file.hh"
@@ -17,7 +20,8 @@ class FakeOFStream {
static const std::size_t kOutBuf = 1048576;
// Does not take ownership of out.
- explicit FakeOFStream(int out)
+ // Allows default constructor, but must call SetFD.
+ explicit FakeOFStream(int out = -1)
: buf_(util::MallocOrThrow(kOutBuf)),
builder_(static_cast<char*>(buf_.get()), kOutBuf),
// Mostly the default but with inf instead. And no flags.
@@ -28,6 +32,11 @@ class FakeOFStream {
if (buf_.get()) Flush();
}
+ void SetFD(int to) {
+ if (builder_.position()) Flush();
+ fd_ = to;
+ }
+
FakeOFStream &operator<<(float value) {
// Odd, but this is the largest number found in the comments.
EnsureRemaining(double_conversion::DoubleToStringConverter::kMaxPrecisionDigits + 8);
@@ -92,3 +101,5 @@ class FakeOFStream {
};
} // namespace
+
+#endif
diff --git a/klm/util/file.cc b/klm/util/file.cc
index 51eaf972..aa61cf9a 100644
--- a/klm/util/file.cc
+++ b/klm/util/file.cc
@@ -5,28 +5,29 @@
#include "util/exception.hh"
+#include <algorithm>
#include <cstdlib>
#include <cstdio>
-#include <sstream>
#include <iostream>
+#include <limits>
+#include <sstream>
+
#include <assert.h>
#include <errno.h>
+#include <limits.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdint.h>
-#if defined __MINGW32__
+#if defined(__MINGW32__)
#include <windows.h>
#include <unistd.h>
#warning "The file functions on MinGW have not been tested for file sizes above 2^31 - 1. Please read https://stackoverflow.com/questions/12539488/determine-64-bit-file-size-in-c-on-mingw-32-bit and fix"
#elif defined(_WIN32) || defined(_WIN64)
#include <windows.h>
#include <io.h>
-#include <algorithm>
-#include <limits.h>
-#include <limits>
#else
#include <unistd.h>
#endif
@@ -40,9 +41,9 @@ scoped_fd::~scoped_fd() {
}
}
-scoped_FILE::~scoped_FILE() {
- if (file_ && std::fclose(file_)) {
- std::cerr << "Could not close file " << std::endl;
+void scoped_FILE_closer::Close(std::FILE *file) {
+ if (file && std::fclose(file)) {
+ std::cerr << "Could not close file " << file << std::endl;
std::abort();
}
}
@@ -111,7 +112,7 @@ uint64_t SizeOrThrow(int fd) {
void ResizeOrThrow(int fd, uint64_t to) {
#if defined __MINGW32__
- // Does this handle 64-bit?
+ // Does this handle 64-bit?
int ret = ftruncate
#elif defined(_WIN32) || defined(_WIN64)
errno_t ret = _chsize_s
@@ -128,8 +129,10 @@ namespace {
std::size_t GuardLarge(std::size_t size) {
// The following operating systems have broken read/write/pread/pwrite that
// only supports up to 2^31.
+ // OS X man pages claim to support 64-bit, but Kareem M. Darwish had problems
+ // building with larger files, so APPLE is also here.
#if defined(_WIN32) || defined(_WIN64) || defined(__APPLE__) || defined(OS_ANDROID) || defined(__MINGW32__)
- return std::min(static_cast<std::size_t>(static_cast<unsigned>(-1)), size);
+ return size < INT_MAX ? size : INT_MAX;
#else
return size;
#endif
@@ -172,13 +175,44 @@ std::size_t ReadOrEOF(int fd, void *to_void, std::size_t amount) {
return amount;
}
-void PReadOrThrow(int fd, void *to_void, std::size_t size, uint64_t off) {
- uint8_t *to = static_cast<uint8_t*>(to_void);
+void WriteOrThrow(int fd, const void *data_void, std::size_t size) {
+ const uint8_t *data = static_cast<const uint8_t*>(data_void);
+ while (size) {
#if defined(_WIN32) || defined(_WIN64)
- UTIL_THROW(Exception, "This pread implementation for windows is broken. Please send me a patch that does not change the file pointer. Atomically. Or send me an implementation of pwrite that is allowed to change the file pointer but can be called concurrently with pread.");
- const std::size_t kMaxDWORD = static_cast<std::size_t>(4294967295UL);
+ int ret;
+#else
+ ssize_t ret;
#endif
- for (;size ;) {
+ errno = 0;
+ do {
+ ret =
+#if defined(_WIN32) || defined(_WIN64)
+ _write
+#else
+ write
+#endif
+ (fd, data, GuardLarge(size));
+ } while (ret == -1 && errno == EINTR);
+ UTIL_THROW_IF_ARG(ret < 1, FDException, (fd), "while writing " << size << " bytes");
+ data += ret;
+ size -= ret;
+ }
+}
+
+void WriteOrThrow(FILE *to, const void *data, std::size_t size) {
+ if (!size) return;
+ UTIL_THROW_IF(1 != std::fwrite(data, size, 1, to), ErrnoException, "Short write; requested size " << size);
+}
+
+#if defined(_WIN32) || defined(_WIN64)
+namespace {
+const std::size_t kMaxDWORD = static_cast<std::size_t>(4294967295UL);
+} // namespace
+#endif
+
+void ErsatzPRead(int fd, void *to_void, std::size_t size, uint64_t off) {
+ uint8_t *to = static_cast<uint8_t*>(to_void);
+ while (size) {
#if defined(_WIN32) || defined(_WIN64)
/* BROKEN: changes file pointer. Even if you save it and change it back, it won't be safe to use concurrently with write() or read() which lmplz does. */
// size_t might be 64-bit. DWORD is always 32.
@@ -192,16 +226,15 @@ void PReadOrThrow(int fd, void *to_void, std::size_t size, uint64_t off) {
#else
ssize_t ret;
errno = 0;
- do {
- ret =
+ ret =
#ifdef OS_ANDROID
- pread64
+ pread64
#else
- pread
+ pread
#endif
- (fd, to, GuardLarge(size), off);
- } while (ret == -1 && errno == EINTR);
+ (fd, to, GuardLarge(size), off);
if (ret <= 0) {
+ if (ret == -1 && errno == EINTR) continue;
UTIL_THROW_IF(ret == 0, EndOfFileException, " for reading " << size << " bytes at " << off << " from " << NameFromFD(fd));
UTIL_THROW_ARG(FDException, (fd), "while reading " << size << " bytes at offset " << off);
}
@@ -212,34 +245,41 @@ void PReadOrThrow(int fd, void *to_void, std::size_t size, uint64_t off) {
}
}
-void WriteOrThrow(int fd, const void *data_void, std::size_t size) {
- const uint8_t *data = static_cast<const uint8_t*>(data_void);
- while (size) {
+void ErsatzPWrite(int fd, const void *from_void, std::size_t size, uint64_t off) {
+ const uint8_t *from = static_cast<const uint8_t*>(from_void);
+ while(size) {
#if defined(_WIN32) || defined(_WIN64)
- int ret;
+ /* Changes file pointer. Even if you save it and change it back, it won't be safe to use concurrently with write() or read() */
+ // size_t might be 64-bit. DWORD is always 32.
+ DWORD writing = static_cast<DWORD>(std::min<std::size_t>(kMaxDWORD, size));
+ DWORD ret;
+ OVERLAPPED overlapped;
+ memset(&overlapped, 0, sizeof(OVERLAPPED));
+ overlapped.Offset = static_cast<DWORD>(off);
+ overlapped.OffsetHigh = static_cast<DWORD>(off >> 32);
+ UTIL_THROW_IF(!WriteFile((HANDLE)_get_osfhandle(fd), from, writing, &ret, &overlapped), Exception, "WriteFile failed for offset " << off);
#else
ssize_t ret;
-#endif
errno = 0;
- do {
- ret =
-#if defined(_WIN32) || defined(_WIN64)
- _write
+ ret =
+#ifdef OS_ANDROID
+ pwrite64
#else
- write
+ pwrite
+#endif
+ (fd, from, GuardLarge(size), off);
+ if (ret <= 0) {
+ if (ret == -1 && errno == EINTR) continue;
+ UTIL_THROW_IF(ret == 0, EndOfFileException, " for writing " << size << " bytes at " << off << " from " << NameFromFD(fd));
+ UTIL_THROW_ARG(FDException, (fd), "while writing " << size << " bytes at offset " << off);
+ }
#endif
- (fd, data, GuardLarge(size));
- } while (ret == -1 && errno == EINTR);
- UTIL_THROW_IF_ARG(ret < 1, FDException, (fd), "while writing " << size << " bytes");
- data += ret;
size -= ret;
+ off += ret;
+ from += ret;
}
}
-void WriteOrThrow(FILE *to, const void *data, std::size_t size) {
- if (!size) return;
- UTIL_THROW_IF(1 != std::fwrite(data, size, 1, to), ErrnoException, "Short write; requested size " << size);
-}
void FSyncOrThrow(int fd) {
// Apparently windows doesn't have fsync?
@@ -443,8 +483,8 @@ void NormalizeTempPrefix(std::string &base) {
) base += '/';
}
-int MakeTemp(const std::string &base) {
- std::string name(base);
+int MakeTemp(const StringPiece &base) {
+ std::string name(base.data(), base.size());
name += "XXXXXX";
name.push_back(0);
int ret;
@@ -452,7 +492,7 @@ int MakeTemp(const std::string &base) {
return ret;
}
-std::FILE *FMakeTemp(const std::string &base) {
+std::FILE *FMakeTemp(const StringPiece &base) {
util::scoped_fd file(MakeTemp(base));
return FDOpenOrThrow(file);
}
@@ -478,14 +518,18 @@ bool TryName(int fd, std::string &out) {
if (-1 == lstat(name.c_str(), &sb))
return false;
out.resize(sb.st_size + 1);
- ssize_t ret = readlink(name.c_str(), &out[0], sb.st_size + 1);
- if (-1 == ret)
- return false;
- if (ret > sb.st_size) {
- // Increased in size?!
- return false;
+ // lstat gave us a size, but I've seen it grow, possibly due to symlinks on top of symlinks.
+ while (true) {
+ ssize_t ret = readlink(name.c_str(), &out[0], out.size());
+ if (-1 == ret)
+ return false;
+ if ((size_t)ret < out.size()) {
+ out.resize(ret);
+ break;
+ }
+ // Exponential growth.
+ out.resize(out.size() * 2);
}
- out.resize(ret);
// Don't use the non-file names.
if (!out.empty() && out[0] != '/')
return false;
diff --git a/klm/util/file.hh b/klm/util/file.hh
index be88431d..7204b6a0 100644
--- a/klm/util/file.hh
+++ b/klm/util/file.hh
@@ -1,7 +1,9 @@
-#ifndef UTIL_FILE__
-#define UTIL_FILE__
+#ifndef UTIL_FILE_H
+#define UTIL_FILE_H
#include "util/exception.hh"
+#include "util/scoped.hh"
+#include "util/string_piece.hh"
#include <cstddef>
#include <cstdio>
@@ -41,29 +43,10 @@ class scoped_fd {
scoped_fd &operator=(const scoped_fd &);
};
-class scoped_FILE {
- public:
- explicit scoped_FILE(std::FILE *file = NULL) : file_(file) {}
-
- ~scoped_FILE();
-
- std::FILE *get() { return file_; }
- const std::FILE *get() const { return file_; }
-
- void reset(std::FILE *to = NULL) {
- scoped_FILE other(file_);
- file_ = to;
- }
-
- std::FILE *release() {
- std::FILE *ret = file_;
- file_ = NULL;
- return ret;
- }
-
- private:
- std::FILE *file_;
+struct scoped_FILE_closer {
+ static void Close(std::FILE *file);
};
+typedef scoped<std::FILE, scoped_FILE_closer> scoped_FILE;
/* Thrown for any operation where the fd is known. */
class FDException : public ErrnoException {
@@ -106,12 +89,20 @@ void ResizeOrThrow(int fd, uint64_t to);
std::size_t PartialRead(int fd, void *to, std::size_t size);
void ReadOrThrow(int fd, void *to, std::size_t size);
std::size_t ReadOrEOF(int fd, void *to_void, std::size_t size);
-// Positioned: unix only for now.
-void PReadOrThrow(int fd, void *to, std::size_t size, uint64_t off);
void WriteOrThrow(int fd, const void *data_void, std::size_t size);
void WriteOrThrow(FILE *to, const void *data, std::size_t size);
+/* These call pread/pwrite in a loop. However, on Windows they call ReadFile/
+ * WriteFile which changes the file pointer. So it's safe to call ErsatzPRead
+ * and ErsatzPWrite concurrently (or any combination thereof). But it changes
+ * the file pointer on windows, so it's not safe to call concurrently with
+ * anything that uses the implicit file pointer e.g. the Read/Write functions
+ * above.
+ */
+void ErsatzPRead(int fd, void *to, std::size_t size, uint64_t off);
+void ErsatzPWrite(int fd, const void *data_void, std::size_t size, uint64_t off);
+
void FSyncOrThrow(int fd);
// Seeking
@@ -125,8 +116,8 @@ std::FILE *FDOpenReadOrThrow(scoped_fd &file);
// Temporary files
// Append a / if base is a directory.
void NormalizeTempPrefix(std::string &base);
-int MakeTemp(const std::string &prefix);
-std::FILE *FMakeTemp(const std::string &prefix);
+int MakeTemp(const StringPiece &prefix);
+std::FILE *FMakeTemp(const StringPiece &prefix);
// dup an fd.
int DupOrThrow(int fd);
@@ -139,4 +130,4 @@ std::string NameFromFD(int fd);
} // namespace util
-#endif // UTIL_FILE__
+#endif // UTIL_FILE_H
diff --git a/klm/util/file_piece.cc b/klm/util/file_piece.cc
index 9c7e00c4..4aaa250e 100644
--- a/klm/util/file_piece.cc
+++ b/klm/util/file_piece.cc
@@ -84,6 +84,13 @@ StringPiece FilePiece::ReadLine(char delim) {
}
}
+bool FilePiece::ReadLineOrEOF(StringPiece &to, char delim) {
+ try {
+ to = ReadLine(delim);
+ } catch (const util::EndOfFileException &e) { return false; }
+ return true;
+}
+
float FilePiece::ReadFloat() {
return ReadNumber<float>();
}
diff --git a/klm/util/file_piece.hh b/klm/util/file_piece.hh
index ed3dc5ad..5495ddcc 100644
--- a/klm/util/file_piece.hh
+++ b/klm/util/file_piece.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_FILE_PIECE__
-#define UTIL_FILE_PIECE__
+#ifndef UTIL_FILE_PIECE_H
+#define UTIL_FILE_PIECE_H
#include "util/ersatz_progress.hh"
#include "util/exception.hh"
@@ -56,10 +56,33 @@ class FilePiece {
return Consume(FindDelimiterOrEOF(delim));
}
+ // Read word until the line or file ends.
+ bool ReadWordSameLine(StringPiece &to, const bool *delim = kSpaces) {
+ assert(delim[static_cast<unsigned char>('\n')]);
+ // Skip non-enter spaces.
+ for (; ; ++position_) {
+ if (position_ == position_end_) {
+ try {
+ Shift();
+ } catch (const util::EndOfFileException &e) { return false; }
+ // And break out at end of file.
+ if (position_ == position_end_) return false;
+ }
+ if (!delim[static_cast<unsigned char>(*position_)]) break;
+ if (*position_ == '\n') return false;
+ }
+ // We can't be at the end of file because there's at least one character open.
+ to = Consume(FindDelimiterOrEOF(delim));
+ return true;
+ }
+
// Unlike ReadDelimited, this includes leading spaces and consumes the delimiter.
// It is similar to getline in that way.
StringPiece ReadLine(char delim = '\n');
+ // Doesn't throw EndOfFileException, just returns false.
+ bool ReadLineOrEOF(StringPiece &to, char delim = '\n');
+
float ReadFloat();
double ReadDouble();
long int ReadLong();
@@ -132,4 +155,4 @@ class FilePiece {
} // namespace util
-#endif // UTIL_FILE_PIECE__
+#endif // UTIL_FILE_PIECE_H
diff --git a/klm/util/fixed_array.hh b/klm/util/fixed_array.hh
new file mode 100644
index 00000000..416b92f4
--- /dev/null
+++ b/klm/util/fixed_array.hh
@@ -0,0 +1,153 @@
+#ifndef UTIL_FIXED_ARRAY_H
+#define UTIL_FIXED_ARRAY_H
+
+#include "util/scoped.hh"
+
+#include <cstddef>
+
+#include <assert.h>
+#include <stdlib.h>
+
+namespace util {
+
+/**
+ * Defines a fixed-size collection.
+ *
+ * Ever want an array of things by they don't have a default constructor or are
+ * non-copyable? FixedArray allows constructing one at a time.
+ */
+template <class T> class FixedArray {
+ public:
+ /** Initialize with a given size bound but do not construct the objects. */
+ explicit FixedArray(std::size_t limit) {
+ Init(limit);
+ }
+
+ /**
+ * Constructs an instance, but does not initialize it.
+ *
+ * Any objects constructed in this manner must be subsequently @ref FixedArray::Init() "initialized" prior to use.
+ *
+ * @see FixedArray::Init()
+ */
+ FixedArray()
+ : newed_end_(NULL)
+#ifndef NDEBUG
+ , allocated_end_(NULL)
+#endif
+ {}
+
+ /**
+ * Initialize with a given size bound but do not construct the objects.
+ *
+ * This method is responsible for allocating memory.
+ * Objects stored in this array will be constructed in a location within this allocated memory.
+ */
+ void Init(std::size_t count) {
+ assert(!block_.get());
+ block_.reset(malloc(sizeof(T) * count));
+ if (!block_.get()) throw std::bad_alloc();
+ newed_end_ = begin();
+#ifndef NDEBUG
+ allocated_end_ = begin() + count;
+#endif
+ }
+
+ /**
+ * Constructs a copy of the provided array.
+ *
+ * @param from Array whose elements should be copied into this newly-constructed data structure.
+ */
+ FixedArray(const FixedArray &from) {
+ std::size_t size = from.newed_end_ - static_cast<const T*>(from.block_.get());
+ Init(size);
+ for (std::size_t i = 0; i < size; ++i) {
+ push_back(from[i]);
+ }
+ }
+
+ /**
+ * Frees the memory held by this object.
+ */
+ ~FixedArray() { clear(); }
+
+ /** Gets a pointer to the first object currently stored in this data structure. */
+ T *begin() { return static_cast<T*>(block_.get()); }
+
+ /** Gets a const pointer to the last object currently stored in this data structure. */
+ const T *begin() const { return static_cast<const T*>(block_.get()); }
+
+ /** Gets a pointer to the last object currently stored in this data structure. */
+ T *end() { return newed_end_; }
+
+ /** Gets a const pointer to the last object currently stored in this data structure. */
+ const T *end() const { return newed_end_; }
+
+ /** Gets a reference to the last object currently stored in this data structure. */
+ T &back() { return *(end() - 1); }
+
+ /** Gets a const reference to the last object currently stored in this data structure. */
+ const T &back() const { return *(end() - 1); }
+
+ /** Gets the number of objects currently stored in this data structure. */
+ std::size_t size() const { return end() - begin(); }
+
+ /** Returns true if there are no objects currently stored in this data structure. */
+ bool empty() const { return begin() == end(); }
+
+ /**
+ * Gets a reference to the object with index i currently stored in this data structure.
+ *
+ * @param i Index of the object to reference
+ */
+ T &operator[](std::size_t i) { return begin()[i]; }
+
+ /**
+ * Gets a const reference to the object with index i currently stored in this data structure.
+ *
+ * @param i Index of the object to reference
+ */
+ const T &operator[](std::size_t i) const { return begin()[i]; }
+
+ /**
+ * Constructs a new object using the provided parameter,
+ * and stores it in this data structure.
+ *
+ * The memory backing the constructed object is managed by this data structure.
+ */
+ template <class C> void push_back(const C &c) {
+ new (end()) T(c); // use "placement new" syntax to initalize T in an already-allocated memory location
+ Constructed();
+ }
+
+ /**
+ * Removes all elements from this array.
+ */
+ void clear() {
+ for (T *i = begin(); i != end(); ++i)
+ i->~T();
+ newed_end_ = begin();
+ }
+
+ protected:
+ // Always call Constructed after successful completion of new.
+ void Constructed() {
+ ++newed_end_;
+#ifndef NDEBUG
+ assert(newed_end_ <= allocated_end_);
+#endif
+ }
+
+ private:
+ util::scoped_malloc block_;
+
+ T *newed_end_;
+
+#ifndef NDEBUG
+ T *allocated_end_;
+#endif
+};
+
+} // namespace util
+
+#endif // UTIL_FIXED_ARRAY_H
diff --git a/klm/util/getopt.hh b/klm/util/getopt.hh
index 6ad97732..50eab56f 100644
--- a/klm/util/getopt.hh
+++ b/klm/util/getopt.hh
@@ -11,8 +11,8 @@ Code given out at the 1985 UNIFORUM conference in Dallas.
#endif
#ifndef __GNUC__
-#ifndef _WINGETOPT_H_
-#define _WINGETOPT_H_
+#ifndef UTIL_GETOPT_H
+#define UTIL_GETOPT_H
#ifdef __cplusplus
extern "C" {
@@ -28,6 +28,6 @@ extern int getopt(int argc, char **argv, char *opts);
}
#endif
-#endif /* _GETOPT_H_ */
+#endif /* UTIL_GETOPT_H */
#endif /* __GNUC__ */
diff --git a/klm/util/have.hh b/klm/util/have.hh
index 6e18529d..dc3f6330 100644
--- a/klm/util/have.hh
+++ b/klm/util/have.hh
@@ -1,6 +1,6 @@
/* Optional packages. You might want to integrate this with your build system e.g. config.h from ./configure. */
-#ifndef UTIL_HAVE__
-#define UTIL_HAVE__
+#ifndef UTIL_HAVE_H
+#define UTIL_HAVE_H
#ifdef HAVE_CONFIG_H
#include "config.h"
@@ -10,4 +10,4 @@
//#define HAVE_ICU
#endif
-#endif // UTIL_HAVE__
+#endif // UTIL_HAVE_H
diff --git a/klm/util/joint_sort.hh b/klm/util/joint_sort.hh
index b1ec48e2..de4b554f 100644
--- a/klm/util/joint_sort.hh
+++ b/klm/util/joint_sort.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_JOINT_SORT__
-#define UTIL_JOINT_SORT__
+#ifndef UTIL_JOINT_SORT_H
+#define UTIL_JOINT_SORT_H
/* A terrifying amount of C++ to coax std::sort into soring one range while
* also permuting another range the same way.
@@ -143,4 +143,4 @@ template <class KeyIter, class ValueIter> void JointSort(const KeyIter &key_begi
} // namespace util
-#endif // UTIL_JOINT_SORT__
+#endif // UTIL_JOINT_SORT_H
diff --git a/klm/util/mmap.cc b/klm/util/mmap.cc
index cee6a970..a3c8a022 100644
--- a/klm/util/mmap.cc
+++ b/klm/util/mmap.cc
@@ -6,6 +6,7 @@
#include "util/exception.hh"
#include "util/file.hh"
+#include "util/parallel_read.hh"
#include "util/scoped.hh"
#include <iostream>
@@ -40,7 +41,7 @@ void SyncOrThrow(void *start, size_t length) {
#if defined(_WIN32) || defined(_WIN64)
UTIL_THROW_IF(!::FlushViewOfFile(start, length), ErrnoException, "Failed to sync mmap");
#else
- UTIL_THROW_IF(msync(start, length, MS_SYNC), ErrnoException, "Failed to sync mmap");
+ UTIL_THROW_IF(length && msync(start, length, MS_SYNC), ErrnoException, "Failed to sync mmap");
#endif
}
@@ -154,6 +155,10 @@ void MapRead(LoadMethod method, int fd, uint64_t offset, std::size_t size, scope
SeekOrThrow(fd, offset);
ReadOrThrow(fd, out.get(), size);
break;
+ case PARALLEL_READ:
+ out.reset(MallocOrThrow(size), size, scoped_memory::MALLOC_ALLOCATED);
+ ParallelRead(fd, out.get(), size, offset);
+ break;
}
}
@@ -189,4 +194,66 @@ void *MapZeroedWrite(const char *name, std::size_t size, scoped_fd &file) {
}
}
+Rolling::Rolling(const Rolling &copy_from, uint64_t increase) {
+ *this = copy_from;
+ IncreaseBase(increase);
+}
+
+Rolling &Rolling::operator=(const Rolling &copy_from) {
+ fd_ = copy_from.fd_;
+ file_begin_ = copy_from.file_begin_;
+ file_end_ = copy_from.file_end_;
+ for_write_ = copy_from.for_write_;
+ block_ = copy_from.block_;
+ read_bound_ = copy_from.read_bound_;
+
+ current_begin_ = 0;
+ if (copy_from.IsPassthrough()) {
+ current_end_ = copy_from.current_end_;
+ ptr_ = copy_from.ptr_;
+ } else {
+ // Force call on next mmap.
+ current_end_ = 0;
+ ptr_ = NULL;
+ }
+ return *this;
+}
+
+Rolling::Rolling(int fd, bool for_write, std::size_t block, std::size_t read_bound, uint64_t offset, uint64_t amount) {
+ current_begin_ = 0;
+ current_end_ = 0;
+ fd_ = fd;
+ file_begin_ = offset;
+ file_end_ = offset + amount;
+ for_write_ = for_write;
+ block_ = block;
+ read_bound_ = read_bound;
+}
+
+void *Rolling::ExtractNonRolling(scoped_memory &out, uint64_t index, std::size_t size) {
+ out.reset();
+ if (IsPassthrough()) return static_cast<uint8_t*>(get()) + index;
+ uint64_t offset = index + file_begin_;
+ // Round down to multiple of page size.
+ uint64_t cruft = offset % static_cast<uint64_t>(SizePage());
+ std::size_t map_size = static_cast<std::size_t>(size + cruft);
+ out.reset(MapOrThrow(map_size, for_write_, kFileFlags, true, fd_, offset - cruft), map_size, scoped_memory::MMAP_ALLOCATED);
+ return static_cast<uint8_t*>(out.get()) + static_cast<std::size_t>(cruft);
+}
+
+void Rolling::Roll(uint64_t index) {
+ assert(!IsPassthrough());
+ std::size_t amount;
+ if (file_end_ - (index + file_begin_) > static_cast<uint64_t>(block_)) {
+ amount = block_;
+ current_end_ = index + amount - read_bound_;
+ } else {
+ amount = file_end_ - (index + file_begin_);
+ current_end_ = index + amount;
+ }
+ ptr_ = static_cast<uint8_t*>(ExtractNonRolling(mem_, index, amount)) - index;
+
+ current_begin_ = index;
+}
+
} // namespace util
diff --git a/klm/util/mmap.hh b/klm/util/mmap.hh
index b218c4d1..9b1e120f 100644
--- a/klm/util/mmap.hh
+++ b/klm/util/mmap.hh
@@ -1,8 +1,9 @@
-#ifndef UTIL_MMAP__
-#define UTIL_MMAP__
+#ifndef UTIL_MMAP_H
+#define UTIL_MMAP_H
// Utilities for mmaped files.
#include <cstddef>
+#include <limits>
#include <stdint.h>
#include <sys/types.h>
@@ -52,6 +53,9 @@ class scoped_memory {
public:
typedef enum {MMAP_ALLOCATED, ARRAY_ALLOCATED, MALLOC_ALLOCATED, NONE_ALLOCATED} Alloc;
+ scoped_memory(void *data, std::size_t size, Alloc source)
+ : data_(data), size_(size), source_(source) {}
+
scoped_memory() : data_(NULL), size_(0), source_(NONE_ALLOCATED) {}
~scoped_memory() { reset(); }
@@ -72,7 +76,6 @@ class scoped_memory {
void call_realloc(std::size_t to);
private:
-
void *data_;
std::size_t size_;
@@ -90,7 +93,9 @@ typedef enum {
// Populate on Linux. malloc and read on non-Linux.
POPULATE_OR_READ,
// malloc and read.
- READ
+ READ,
+ // malloc and read in parallel (recommended for Lustre)
+ PARALLEL_READ,
} LoadMethod;
extern const int kFileFlags;
@@ -109,6 +114,79 @@ void *MapZeroedWrite(const char *name, std::size_t size, scoped_fd &file);
// msync wrapper
void SyncOrThrow(void *start, size_t length);
+// Forward rolling memory map with no overlap.
+class Rolling {
+ public:
+ Rolling() {}
+
+ explicit Rolling(void *data) { Init(data); }
+
+ Rolling(const Rolling &copy_from, uint64_t increase = 0);
+ Rolling &operator=(const Rolling &copy_from);
+
+ // For an actual rolling mmap.
+ explicit Rolling(int fd, bool for_write, std::size_t block, std::size_t read_bound, uint64_t offset, uint64_t amount);
+
+ // For a static mapping
+ void Init(void *data) {
+ ptr_ = data;
+ current_end_ = std::numeric_limits<uint64_t>::max();
+ current_begin_ = 0;
+ // Mark as a pass-through.
+ fd_ = -1;
+ }
+
+ void IncreaseBase(uint64_t by) {
+ file_begin_ += by;
+ ptr_ = static_cast<uint8_t*>(ptr_) + by;
+ if (!IsPassthrough()) current_end_ = 0;
+ }
+
+ void DecreaseBase(uint64_t by) {
+ file_begin_ -= by;
+ ptr_ = static_cast<uint8_t*>(ptr_) - by;
+ if (!IsPassthrough()) current_end_ = 0;
+ }
+
+ void *ExtractNonRolling(scoped_memory &out, uint64_t index, std::size_t size);
+
+ // Returns base pointer
+ void *get() const { return ptr_; }
+
+ // Returns base pointer.
+ void *CheckedBase(uint64_t index) {
+ if (index >= current_end_ || index < current_begin_) {
+ Roll(index);
+ }
+ return ptr_;
+ }
+
+ // Returns indexed pointer.
+ void *CheckedIndex(uint64_t index) {
+ return static_cast<uint8_t*>(CheckedBase(index)) + index;
+ }
+
+ private:
+ void Roll(uint64_t index);
+
+ // True if this is just a thin wrapper on a pointer.
+ bool IsPassthrough() const { return fd_ == -1; }
+
+ void *ptr_;
+ uint64_t current_begin_;
+ uint64_t current_end_;
+
+ scoped_memory mem_;
+
+ int fd_;
+ uint64_t file_begin_;
+ uint64_t file_end_;
+
+ bool for_write_;
+ std::size_t block_;
+ std::size_t read_bound_;
+};
+
} // namespace util
-#endif // UTIL_MMAP__
+#endif // UTIL_MMAP_H
diff --git a/klm/util/multi_intersection.hh b/klm/util/multi_intersection.hh
index 04678352..2955acc7 100644
--- a/klm/util/multi_intersection.hh
+++ b/klm/util/multi_intersection.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_MULTI_INTERSECTION__
-#define UTIL_MULTI_INTERSECTION__
+#ifndef UTIL_MULTI_INTERSECTION_H
+#define UTIL_MULTI_INTERSECTION_H
#include <boost/optional.hpp>
#include <boost/range/iterator_range.hpp>
@@ -77,4 +77,4 @@ template <class Iterator, class Output> void AllIntersection(std::vector<boost::
} // namespace util
-#endif // UTIL_MULTI_INTERSECTION__
+#endif // UTIL_MULTI_INTERSECTION_H
diff --git a/klm/util/murmur_hash.hh b/klm/util/murmur_hash.hh
index 4891833e..f17157cd 100644
--- a/klm/util/murmur_hash.hh
+++ b/klm/util/murmur_hash.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_MURMUR_HASH__
-#define UTIL_MURMUR_HASH__
+#ifndef UTIL_MURMUR_HASH_H
+#define UTIL_MURMUR_HASH_H
#include <cstddef>
#include <stdint.h>
@@ -15,4 +15,4 @@ uint64_t MurmurHashNative(const void * key, std::size_t len, uint64_t seed = 0);
} // namespace util
-#endif // UTIL_MURMUR_HASH__
+#endif // UTIL_MURMUR_HASH_H
diff --git a/klm/util/parallel_read.cc b/klm/util/parallel_read.cc
new file mode 100644
index 00000000..6435eb84
--- /dev/null
+++ b/klm/util/parallel_read.cc
@@ -0,0 +1,69 @@
+#include "util/parallel_read.hh"
+
+#include "util/file.hh"
+
+#ifdef WITH_THREADS
+#include "util/thread_pool.hh"
+
+namespace util {
+namespace {
+
+class Reader {
+ public:
+ explicit Reader(int fd) : fd_(fd) {}
+
+ struct Request {
+ void *to;
+ std::size_t size;
+ uint64_t offset;
+
+ bool operator==(const Request &other) const {
+ return (to == other.to) && (size == other.size) && (offset == other.offset);
+ }
+ };
+
+ void operator()(const Request &request) {
+ util::ErsatzPRead(fd_, request.to, request.size, request.offset);
+ }
+
+ private:
+ int fd_;
+};
+
+} // namespace
+
+void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) {
+ Reader::Request poison;
+ poison.to = NULL;
+ poison.size = 0;
+ poison.offset = 0;
+ unsigned threads = boost::thread::hardware_concurrency();
+ if (!threads) threads = 2;
+ ThreadPool<Reader> pool(2 /* don't need much of a queue */, threads, fd, poison);
+ const std::size_t kBatch = 1ULL << 25; // 32 MB
+ Reader::Request request;
+ request.to = to;
+ request.size = kBatch;
+ request.offset = offset;
+ for (; amount > kBatch; amount -= kBatch) {
+ pool.Produce(request);
+ request.to = reinterpret_cast<uint8_t*>(request.to) + kBatch;
+ request.offset += kBatch;
+ }
+ request.size = amount;
+ if (request.size) {
+ pool.Produce(request);
+ }
+}
+
+} // namespace util
+
+#else // WITH_THREADS
+
+namespace util {
+void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) {
+ util::ErsatzPRead(fd, to, amount, offset);
+}
+} // namespace util
+
+#endif
diff --git a/klm/util/parallel_read.hh b/klm/util/parallel_read.hh
new file mode 100644
index 00000000..1e96e790
--- /dev/null
+++ b/klm/util/parallel_read.hh
@@ -0,0 +1,16 @@
+#ifndef UTIL_PARALLEL_READ__
+#define UTIL_PARALLEL_READ__
+
+/* Read pieces of a file in parallel. This has a very specific use case:
+ * reading files from Lustre is CPU bound so multiple threads actually
+ * increases throughput. Speed matters when an LM takes a terabyte.
+ */
+
+#include <cstddef>
+#include <stdint.h>
+
+namespace util {
+void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset);
+} // namespace util
+
+#endif // UTIL_PARALLEL_READ__
diff --git a/klm/util/pcqueue.hh b/klm/util/pcqueue.hh
index 07e4146f..d2ffee77 100644
--- a/klm/util/pcqueue.hh
+++ b/klm/util/pcqueue.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_PCQUEUE__
-#define UTIL_PCQUEUE__
+#ifndef UTIL_PCQUEUE_H
+#define UTIL_PCQUEUE_H
#include "util/exception.hh"
@@ -72,7 +72,8 @@ inline void WaitSemaphore (Semaphore &on) {
#endif // __APPLE__
-/* Producer consumer queue safe for multiple producers and multiple consumers.
+/**
+ * Producer consumer queue safe for multiple producers and multiple consumers.
* T must be default constructable and have operator=.
* The value is copied twice for Consume(T &out) or three times for Consume(),
* so larger objects should be passed via pointer.
@@ -152,4 +153,4 @@ template <class T> class PCQueue : boost::noncopyable {
} // namespace util
-#endif // UTIL_PCQUEUE__
+#endif // UTIL_PCQUEUE_H
diff --git a/klm/util/pool.hh b/klm/util/pool.hh
index 72f8a0c8..89e793d7 100644
--- a/klm/util/pool.hh
+++ b/klm/util/pool.hh
@@ -1,8 +1,8 @@
// Very simple pool. It can only allocate memory. And all of the memory it
// allocates must be freed at the same time.
-#ifndef UTIL_POOL__
-#define UTIL_POOL__
+#ifndef UTIL_POOL_H
+#define UTIL_POOL_H
#include <vector>
@@ -42,4 +42,4 @@ class Pool {
} // namespace util
-#endif // UTIL_POOL__
+#endif // UTIL_POOL_H
diff --git a/klm/util/probing_hash_table.hh b/klm/util/probing_hash_table.hh
index 38524806..ea228dd9 100644
--- a/klm/util/probing_hash_table.hh
+++ b/klm/util/probing_hash_table.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_PROBING_HASH_TABLE__
-#define UTIL_PROBING_HASH_TABLE__
+#ifndef UTIL_PROBING_HASH_TABLE_H
+#define UTIL_PROBING_HASH_TABLE_H
#include "util/exception.hh"
#include "util/scoped.hh"
@@ -258,6 +258,10 @@ template <class EntryT, class HashT, class EqualT = std::equal_to<typename Entry
private:
typedef ProbingHashTable<EntryT, HashT, EqualT> Backend;
public:
+ static std::size_t MemUsage(std::size_t size, float multiplier = 1.5) {
+ return Backend::Size(size, multiplier);
+ }
+
typedef EntryT Entry;
typedef typename Entry::Key Key;
typedef const Entry *ConstIterator;
@@ -268,6 +272,7 @@ template <class EntryT, class HashT, class EqualT = std::equal_to<typename Entry
AutoProbing(std::size_t initial_size = 10, const Key &invalid = Key(), const Hash &hash_func = Hash(), const Equal &equal_func = Equal()) :
allocated_(Backend::Size(initial_size, 1.5)), mem_(util::MallocOrThrow(allocated_)), backend_(mem_.get(), allocated_, invalid, hash_func, equal_func) {
threshold_ = initial_size * 1.2;
+ Clear();
}
// Assumes that the key is unique. Multiple insertions won't cause a failure, just inconsistent lookup.
@@ -323,4 +328,4 @@ template <class EntryT, class HashT, class EqualT = std::equal_to<typename Entry
} // namespace util
-#endif // UTIL_PROBING_HASH_TABLE__
+#endif // UTIL_PROBING_HASH_TABLE_H
diff --git a/klm/util/proxy_iterator.hh b/klm/util/proxy_iterator.hh
index a2810a47..8aa697bf 100644
--- a/klm/util/proxy_iterator.hh
+++ b/klm/util/proxy_iterator.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_PROXY_ITERATOR__
-#define UTIL_PROXY_ITERATOR__
+#ifndef UTIL_PROXY_ITERATOR_H
+#define UTIL_PROXY_ITERATOR_H
#include <cstddef>
#include <iterator>
@@ -98,4 +98,4 @@ template <class Proxy> ProxyIterator<Proxy> operator+(std::ptrdiff_t amount, con
} // namespace util
-#endif // UTIL_PROXY_ITERATOR__
+#endif // UTIL_PROXY_ITERATOR_H
diff --git a/klm/util/read_compressed.cc b/klm/util/read_compressed.cc
index b62a6e83..cee98040 100644
--- a/klm/util/read_compressed.cc
+++ b/klm/util/read_compressed.cc
@@ -49,6 +49,8 @@ class ReadBase {
thunk.internal_.reset(with);
}
+ ReadBase *Current(ReadCompressed &thunk) { return thunk.internal_.get(); }
+
static uint64_t &ReadCount(ReadCompressed &thunk) {
return thunk.raw_amount_;
}
@@ -56,6 +58,8 @@ class ReadBase {
namespace {
+ReadBase *ReadFactory(int fd, uint64_t &raw_amount, const void *already_data, std::size_t already_size, bool require_compressed);
+
// Completed file that other classes can thunk to.
class Complete : public ReadBase {
public:
@@ -80,7 +84,7 @@ class Uncompressed : public ReadBase {
class UncompressedWithHeader : public ReadBase {
public:
- UncompressedWithHeader(int fd, void *already_data, std::size_t already_size) : fd_(fd) {
+ UncompressedWithHeader(int fd, const void *already_data, std::size_t already_size) : fd_(fd) {
assert(already_size);
buf_.reset(malloc(already_size));
if (!buf_.get()) throw std::bad_alloc();
@@ -91,6 +95,7 @@ class UncompressedWithHeader : public ReadBase {
std::size_t Read(void *to, std::size_t amount, ReadCompressed &thunk) {
assert(buf_.get());
+ assert(remain_ != end_);
std::size_t sending = std::min<std::size_t>(amount, end_ - remain_);
memcpy(to, remain_, sending);
remain_ += sending;
@@ -108,23 +113,51 @@ class UncompressedWithHeader : public ReadBase {
scoped_fd fd_;
};
-#ifdef HAVE_ZLIB
-class GZip : public ReadBase {
+static const std::size_t kInputBuffer = 16384;
+
+template <class Compression> class StreamCompressed : public ReadBase {
+ public:
+ StreamCompressed(int fd, const void *already_data, std::size_t already_size)
+ : file_(fd),
+ in_buffer_(MallocOrThrow(kInputBuffer)),
+ back_(memcpy(in_buffer_.get(), already_data, already_size), already_size) {}
+
+ std::size_t Read(void *to, std::size_t amount, ReadCompressed &thunk) {
+ if (amount == 0) return 0;
+ back_.SetOutput(to, amount);
+ do {
+ if (!back_.Stream().avail_in) ReadInput(thunk);
+ if (!back_.Process()) {
+ // reached end, at least for the compressed portion.
+ std::size_t ret = static_cast<const uint8_t *>(static_cast<void*>(back_.Stream().next_out)) - static_cast<const uint8_t*>(to);
+ ReplaceThis(ReadFactory(file_.release(), ReadCount(thunk), back_.Stream().next_in, back_.Stream().avail_in, true), thunk);
+ if (ret) return ret;
+ // We did not read anything this round, so clients might think EOF. Transfer responsibility to the next reader.
+ return Current(thunk)->Read(to, amount, thunk);
+ }
+ } while (back_.Stream().next_out == to);
+ return static_cast<const uint8_t*>(static_cast<void*>(back_.Stream().next_out)) - static_cast<const uint8_t*>(to);
+ }
+
private:
- static const std::size_t kInputBuffer = 16384;
+ void ReadInput(ReadCompressed &thunk) {
+ assert(!back_.Stream().avail_in);
+ std::size_t got = ReadOrEOF(file_.get(), in_buffer_.get(), kInputBuffer);
+ back_.SetInput(in_buffer_.get(), got);
+ ReadCount(thunk) += got;
+ }
+
+ scoped_fd file_;
+ scoped_malloc in_buffer_;
+
+ Compression back_;
+};
+
+#ifdef HAVE_ZLIB
+class GZip {
public:
- GZip(int fd, void *already_data, std::size_t already_size)
- : file_(fd), in_buffer_(malloc(kInputBuffer)) {
- if (!in_buffer_.get()) throw std::bad_alloc();
- assert(already_size < kInputBuffer);
- if (already_size) {
- memcpy(in_buffer_.get(), already_data, already_size);
- stream_.next_in = static_cast<Bytef *>(in_buffer_.get());
- stream_.avail_in = already_size;
- stream_.avail_in += ReadOrEOF(file_.get(), static_cast<uint8_t*>(in_buffer_.get()) + already_size, kInputBuffer - already_size);
- } else {
- stream_.avail_in = 0;
- }
+ GZip(const void *base, std::size_t amount) {
+ SetInput(base, amount);
stream_.zalloc = Z_NULL;
stream_.zfree = Z_NULL;
stream_.opaque = Z_NULL;
@@ -141,227 +174,154 @@ class GZip : public ReadBase {
}
}
- std::size_t Read(void *to, std::size_t amount, ReadCompressed &thunk) {
- if (amount == 0) return 0;
+ void SetOutput(void *to, std::size_t amount) {
stream_.next_out = static_cast<Bytef*>(to);
stream_.avail_out = std::min<std::size_t>(std::numeric_limits<uInt>::max(), amount);
- do {
- if (!stream_.avail_in) ReadInput(thunk);
- int result = inflate(&stream_, 0);
- switch (result) {
- case Z_OK:
- break;
- case Z_STREAM_END:
- {
- std::size_t ret = static_cast<uint8_t*>(stream_.next_out) - static_cast<uint8_t*>(to);
- ReplaceThis(new Complete(), thunk);
- return ret;
- }
- case Z_ERRNO:
- UTIL_THROW(ErrnoException, "zlib error");
- default:
- UTIL_THROW(GZException, "zlib encountered " << (stream_.msg ? stream_.msg : "an error ") << " code " << result);
- }
- } while (stream_.next_out == to);
- return static_cast<uint8_t*>(stream_.next_out) - static_cast<uint8_t*>(to);
}
- private:
- void ReadInput(ReadCompressed &thunk) {
- assert(!stream_.avail_in);
- stream_.next_in = static_cast<Bytef *>(in_buffer_.get());
- stream_.avail_in = ReadOrEOF(file_.get(), in_buffer_.get(), kInputBuffer);
- ReadCount(thunk) += stream_.avail_in;
+ void SetInput(const void *base, std::size_t amount) {
+ assert(amount < static_cast<std::size_t>(std::numeric_limits<uInt>::max()));
+ stream_.next_in = const_cast<Bytef*>(static_cast<const Bytef*>(base));
+ stream_.avail_in = amount;
}
- scoped_fd file_;
- scoped_malloc in_buffer_;
+ const z_stream &Stream() const { return stream_; }
+
+ bool Process() {
+ int result = inflate(&stream_, 0);
+ switch (result) {
+ case Z_OK:
+ return true;
+ case Z_STREAM_END:
+ return false;
+ case Z_ERRNO:
+ UTIL_THROW(ErrnoException, "zlib error");
+ default:
+ UTIL_THROW(GZException, "zlib encountered " << (stream_.msg ? stream_.msg : "an error ") << " code " << result);
+ }
+ }
+
+ private:
z_stream stream_;
};
#endif // HAVE_ZLIB
-const uint8_t kBZMagic[3] = {'B', 'Z', 'h'};
-
#ifdef HAVE_BZLIB
-class BZip : public ReadBase {
+class BZip {
public:
- BZip(int fd, void *already_data, std::size_t already_size) {
- scoped_fd hold(fd);
- closer_.reset(FDOpenReadOrThrow(hold));
- file_ = NULL;
- Open(already_data, already_size);
+ BZip(const void *base, std::size_t amount) {
+ memset(&stream_, 0, sizeof(stream_));
+ SetInput(base, amount);
+ HandleError(BZ2_bzDecompressInit(&stream_, 0, 0));
}
- BZip(FILE *file, void *already_data, std::size_t already_size) {
- closer_.reset(file);
- file_ = NULL;
- Open(already_data, already_size);
+ ~BZip() {
+ try {
+ HandleError(BZ2_bzDecompressEnd(&stream_));
+ } catch (const std::exception &e) {
+ std::cerr << e.what() << std::endl;
+ abort();
+ }
}
- ~BZip() {
- Close(file_);
+ bool Process() {
+ int ret = BZ2_bzDecompress(&stream_);
+ if (ret == BZ_STREAM_END) return false;
+ HandleError(ret);
+ return true;
}
- std::size_t Read(void *to, std::size_t amount, ReadCompressed &thunk) {
- assert(file_);
- int bzerror = BZ_OK;
- int ret = BZ2_bzRead(&bzerror, file_, to, std::min<std::size_t>(static_cast<std::size_t>(INT_MAX), amount));
- long pos = ftell(closer_.get());
- if (pos != -1) ReadCount(thunk) = pos;
- switch (bzerror) {
- case BZ_STREAM_END:
- /* bzip2 files can be concatenated by e.g. pbzip2. Annoyingly, the
- * library doesn't handle this internally. This gets the trailing
- * data, grows it up to magic as needed, validates the magic, and
- * reopens.
- */
- {
- bzerror = BZ_OK;
- void *trailing_data;
- int trailing_size;
- BZ2_bzReadGetUnused(&bzerror, file_, &trailing_data, &trailing_size);
- UTIL_THROW_IF(bzerror != BZ_OK, BZException, "bzip2 error in BZ2_bzReadGetUnused " << BZ2_bzerror(file_, &bzerror) << " code " << bzerror);
- std::string trailing(static_cast<const char*>(trailing_data), trailing_size);
- Close(file_);
-
- if (trailing_size < (int)sizeof(kBZMagic)) {
- trailing.resize(sizeof(kBZMagic));
- if (1 != fread(&trailing[trailing_size], sizeof(kBZMagic) - trailing_size, 1, closer_.get())) {
- UTIL_THROW_IF(trailing_size, BZException, "File has trailing cruft");
- // Legitimate end of file.
- ReplaceThis(new Complete(), thunk);
- return ret;
- }
- }
- UTIL_THROW_IF(memcmp(trailing.data(), kBZMagic, sizeof(kBZMagic)), BZException, "Trailing cruft is not another bzip2 stream");
- Open(&trailing[0], trailing.size());
- }
- return ret;
- case BZ_OK:
- return ret;
- default:
- UTIL_THROW(BZException, "bzip2 error " << BZ2_bzerror(file_, &bzerror) << " code " << bzerror);
- }
+ void SetOutput(void *base, std::size_t amount) {
+ stream_.next_out = static_cast<char*>(base);
+ stream_.avail_out = std::min<std::size_t>(std::numeric_limits<unsigned int>::max(), amount);
}
+ void SetInput(const void *base, std::size_t amount) {
+ stream_.next_in = const_cast<char*>(static_cast<const char*>(base));
+ stream_.avail_in = amount;
+ }
+
+ const bz_stream &Stream() const { return stream_; }
+
private:
- void Open(void *already_data, std::size_t already_size) {
- assert(!file_);
- int bzerror = BZ_OK;
- file_ = BZ2_bzReadOpen(&bzerror, closer_.get(), 0, 0, already_data, already_size);
- switch (bzerror) {
+ void HandleError(int value) {
+ switch(value) {
case BZ_OK:
return;
case BZ_CONFIG_ERROR:
- UTIL_THROW(BZException, "Looks like bzip2 was miscompiled.");
+ UTIL_THROW(BZException, "bzip2 seems to be miscompiled.");
case BZ_PARAM_ERROR:
- UTIL_THROW(BZException, "Parameter error");
- case BZ_IO_ERROR:
- UTIL_THROW(BZException, "IO error reading file");
+ UTIL_THROW(BZException, "bzip2 Parameter error");
+ case BZ_DATA_ERROR:
+ UTIL_THROW(BZException, "bzip2 detected a corrupt file");
+ case BZ_DATA_ERROR_MAGIC:
+ UTIL_THROW(BZException, "bzip2 detected bad magic bytes. Perhaps this was not a bzip2 file after all?");
case BZ_MEM_ERROR:
throw std::bad_alloc();
default:
- UTIL_THROW(BZException, "Unknown bzip2 error code " << bzerror);
+ UTIL_THROW(BZException, "Unknown bzip2 error code " << value);
}
- assert(file_);
}
- static void Close(BZFILE *&file) {
- if (file == NULL) return;
- int bzerror = BZ_OK;
- BZ2_bzReadClose(&bzerror, file);
- if (bzerror != BZ_OK) {
- std::cerr << "bz2 readclose error number " << bzerror << std::endl;
- abort();
- }
- file = NULL;
- }
-
- scoped_FILE closer_;
- BZFILE *file_;
+ bz_stream stream_;
};
#endif // HAVE_BZLIB
#ifdef HAVE_XZLIB
-class XZip : public ReadBase {
- private:
- static const std::size_t kInputBuffer = 16384;
+class XZip {
public:
- XZip(int fd, void *already_data, std::size_t already_size)
- : file_(fd), in_buffer_(malloc(kInputBuffer)), stream_(), action_(LZMA_RUN) {
- if (!in_buffer_.get()) throw std::bad_alloc();
- assert(already_size < kInputBuffer);
- if (already_size) {
- memcpy(in_buffer_.get(), already_data, already_size);
- stream_.next_in = static_cast<const uint8_t*>(in_buffer_.get());
- stream_.avail_in = already_size;
- stream_.avail_in += ReadOrEOF(file_.get(), static_cast<uint8_t*>(in_buffer_.get()) + already_size, kInputBuffer - already_size);
- } else {
- stream_.avail_in = 0;
- }
- stream_.allocator = NULL;
- lzma_ret ret = lzma_stream_decoder(&stream_, UINT64_MAX, LZMA_CONCATENATED);
- switch (ret) {
- case LZMA_OK:
- break;
- case LZMA_MEM_ERROR:
- UTIL_THROW(ErrnoException, "xz open error");
- default:
- UTIL_THROW(XZException, "xz error code " << ret);
- }
+ XZip(const void *base, std::size_t amount)
+ : stream_(), action_(LZMA_RUN) {
+ memset(&stream_, 0, sizeof(stream_));
+ SetInput(base, amount);
+ HandleError(lzma_stream_decoder(&stream_, UINT64_MAX, 0));
}
~XZip() {
lzma_end(&stream_);
}
- std::size_t Read(void *to, std::size_t amount, ReadCompressed &thunk) {
- if (amount == 0) return 0;
- stream_.next_out = static_cast<uint8_t*>(to);
+ void SetOutput(void *base, std::size_t amount) {
+ stream_.next_out = static_cast<uint8_t*>(base);
stream_.avail_out = amount;
- do {
- if (!stream_.avail_in) ReadInput(thunk);
- lzma_ret status = lzma_code(&stream_, action_);
- switch (status) {
- case LZMA_OK:
- break;
- case LZMA_STREAM_END:
- UTIL_THROW_IF(action_ != LZMA_FINISH, XZException, "Input not finished yet.");
- {
- std::size_t ret = static_cast<uint8_t*>(stream_.next_out) - static_cast<uint8_t*>(to);
- ReplaceThis(new Complete(), thunk);
- return ret;
- }
- case LZMA_MEM_ERROR:
- throw std::bad_alloc();
- case LZMA_FORMAT_ERROR:
- UTIL_THROW(XZException, "xzlib says file format not recognized");
- case LZMA_OPTIONS_ERROR:
- UTIL_THROW(XZException, "xzlib says unsupported compression options");
- case LZMA_DATA_ERROR:
- UTIL_THROW(XZException, "xzlib says this file is corrupt");
- case LZMA_BUF_ERROR:
- UTIL_THROW(XZException, "xzlib says unexpected end of input");
- default:
- UTIL_THROW(XZException, "unrecognized xzlib error " << status);
- }
- } while (stream_.next_out == to);
- return static_cast<uint8_t*>(stream_.next_out) - static_cast<uint8_t*>(to);
+ }
+
+ void SetInput(const void *base, std::size_t amount) {
+ stream_.next_in = static_cast<const uint8_t*>(base);
+ stream_.avail_in = amount;
+ if (!amount) action_ = LZMA_FINISH;
+ }
+
+ const lzma_stream &Stream() const { return stream_; }
+
+ bool Process() {
+ lzma_ret status = lzma_code(&stream_, action_);
+ if (status == LZMA_STREAM_END) return false;
+ HandleError(status);
+ return true;
}
private:
- void ReadInput(ReadCompressed &thunk) {
- assert(!stream_.avail_in);
- stream_.next_in = static_cast<const uint8_t*>(in_buffer_.get());
- stream_.avail_in = ReadOrEOF(file_.get(), in_buffer_.get(), kInputBuffer);
- if (!stream_.avail_in) action_ = LZMA_FINISH;
- ReadCount(thunk) += stream_.avail_in;
+ void HandleError(lzma_ret value) {
+ switch (value) {
+ case LZMA_OK:
+ return;
+ case LZMA_MEM_ERROR:
+ throw std::bad_alloc();
+ case LZMA_FORMAT_ERROR:
+ UTIL_THROW(XZException, "xzlib says file format not recognized");
+ case LZMA_OPTIONS_ERROR:
+ UTIL_THROW(XZException, "xzlib says unsupported compression options");
+ case LZMA_DATA_ERROR:
+ UTIL_THROW(XZException, "xzlib says this file is corrupt");
+ case LZMA_BUF_ERROR:
+ UTIL_THROW(XZException, "xzlib says unexpected end of input");
+ default:
+ UTIL_THROW(XZException, "unrecognized xzlib error " << value);
+ }
}
- scoped_fd file_;
- scoped_malloc in_buffer_;
lzma_stream stream_;
-
lzma_action action_;
};
#endif // HAVE_XZLIB
@@ -384,66 +344,67 @@ class IStreamReader : public ReadBase {
};
enum MagicResult {
- UNKNOWN, GZIP, BZIP, XZIP
+ UTIL_UNKNOWN, UTIL_GZIP, UTIL_BZIP, UTIL_XZIP
};
-MagicResult DetectMagic(const void *from_void) {
+MagicResult DetectMagic(const void *from_void, std::size_t length) {
const uint8_t *header = static_cast<const uint8_t*>(from_void);
- if (header[0] == 0x1f && header[1] == 0x8b) {
- return GZIP;
+ if (length >= 2 && header[0] == 0x1f && header[1] == 0x8b) {
+ return UTIL_GZIP;
}
- if (!memcmp(header, kBZMagic, sizeof(kBZMagic))) {
- return BZIP;
+ const uint8_t kBZMagic[3] = {'B', 'Z', 'h'};
+ if (length >= sizeof(kBZMagic) && !memcmp(header, kBZMagic, sizeof(kBZMagic))) {
+ return UTIL_BZIP;
}
const uint8_t kXZMagic[6] = { 0xFD, '7', 'z', 'X', 'Z', 0x00 };
- if (!memcmp(header, kXZMagic, sizeof(kXZMagic))) {
- return XZIP;
+ if (length >= sizeof(kXZMagic) && !memcmp(header, kXZMagic, sizeof(kXZMagic))) {
+ return UTIL_XZIP;
}
- return UNKNOWN;
+ return UTIL_UNKNOWN;
}
-ReadBase *ReadFactory(int fd, uint64_t &raw_amount) {
+ReadBase *ReadFactory(int fd, uint64_t &raw_amount, const void *already_data, const std::size_t already_size, bool require_compressed) {
scoped_fd hold(fd);
- unsigned char header[ReadCompressed::kMagicSize];
- raw_amount = ReadOrEOF(fd, header, ReadCompressed::kMagicSize);
- if (!raw_amount)
- return new Uncompressed(hold.release());
- if (raw_amount != ReadCompressed::kMagicSize)
- return new UncompressedWithHeader(hold.release(), header, raw_amount);
- switch (DetectMagic(header)) {
- case GZIP:
+ std::string header(reinterpret_cast<const char*>(already_data), already_size);
+ if (header.size() < ReadCompressed::kMagicSize) {
+ std::size_t original = header.size();
+ header.resize(ReadCompressed::kMagicSize);
+ std::size_t got = ReadOrEOF(fd, &header[original], ReadCompressed::kMagicSize - original);
+ raw_amount += got;
+ header.resize(original + got);
+ }
+ if (header.empty()) {
+ return new Complete();
+ }
+ switch (DetectMagic(&header[0], header.size())) {
+ case UTIL_GZIP:
#ifdef HAVE_ZLIB
- return new GZip(hold.release(), header, ReadCompressed::kMagicSize);
+ return new StreamCompressed<GZip>(hold.release(), header.data(), header.size());
#else
UTIL_THROW(CompressedException, "This looks like a gzip file but gzip support was not compiled in.");
#endif
- case BZIP:
+ case UTIL_BZIP:
#ifdef HAVE_BZLIB
- return new BZip(hold.release(), header, ReadCompressed::kMagicSize);
+ return new StreamCompressed<BZip>(hold.release(), &header[0], header.size());
#else
- UTIL_THROW(CompressedException, "This looks like a bzip file (it begins with BZ), but bzip support was not compiled in.");
+ UTIL_THROW(CompressedException, "This looks like a bzip file (it begins with BZh), but bzip support was not compiled in.");
#endif
- case XZIP:
+ case UTIL_XZIP:
#ifdef HAVE_XZLIB
- return new XZip(hold.release(), header, ReadCompressed::kMagicSize);
+ return new StreamCompressed<XZip>(hold.release(), header.data(), header.size());
#else
UTIL_THROW(CompressedException, "This looks like an xz file, but xz support was not compiled in.");
#endif
- case UNKNOWN:
- break;
- }
- try {
- SeekOrThrow(fd, 0);
- } catch (const util::ErrnoException &e) {
- return new UncompressedWithHeader(hold.release(), header, ReadCompressed::kMagicSize);
+ default:
+ UTIL_THROW_IF(require_compressed, CompressedException, "Uncompressed data detected after a compresssed file. This could be supported but usually indicates an error.");
+ return new UncompressedWithHeader(hold.release(), header.data(), header.size());
}
- return new Uncompressed(hold.release());
}
} // namespace
bool ReadCompressed::DetectCompressedMagic(const void *from_void) {
- return DetectMagic(from_void) != UNKNOWN;
+ return DetectMagic(from_void, kMagicSize) != UTIL_UNKNOWN;
}
ReadCompressed::ReadCompressed(int fd) {
@@ -459,8 +420,9 @@ ReadCompressed::ReadCompressed() {}
ReadCompressed::~ReadCompressed() {}
void ReadCompressed::Reset(int fd) {
+ raw_amount_ = 0;
internal_.reset();
- internal_.reset(ReadFactory(fd, raw_amount_));
+ internal_.reset(ReadFactory(fd, raw_amount_, NULL, 0, false));
}
void ReadCompressed::Reset(std::istream &in) {
@@ -472,4 +434,15 @@ std::size_t ReadCompressed::Read(void *to, std::size_t amount) {
return internal_->Read(to, amount, *this);
}
+std::size_t ReadCompressed::ReadOrEOF(void *const to_in, std::size_t amount) {
+ uint8_t *to = reinterpret_cast<uint8_t*>(to_in);
+ while (amount) {
+ std::size_t got = Read(to, amount);
+ if (!got) break;
+ to += got;
+ amount -= got;
+ }
+ return to - reinterpret_cast<uint8_t*>(to_in);
+}
+
} // namespace util
diff --git a/klm/util/read_compressed.hh b/klm/util/read_compressed.hh
index 8b54c9e8..767ee94b 100644
--- a/klm/util/read_compressed.hh
+++ b/klm/util/read_compressed.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_READ_COMPRESSED__
-#define UTIL_READ_COMPRESSED__
+#ifndef UTIL_READ_COMPRESSED_H
+#define UTIL_READ_COMPRESSED_H
#include "util/exception.hh"
#include "util/scoped.hh"
@@ -62,6 +62,10 @@ class ReadCompressed {
std::size_t Read(void *to, std::size_t amount);
+ // Repeatedly call read to fill a buffer unless EOF is hit.
+ // Return number of bytes read.
+ std::size_t ReadOrEOF(void *const to, std::size_t amount);
+
uint64_t RawAmount() const { return raw_amount_; }
private:
@@ -78,4 +82,4 @@ class ReadCompressed {
} // namespace util
-#endif // UTIL_READ_COMPRESSED__
+#endif // UTIL_READ_COMPRESSED_H
diff --git a/klm/util/read_compressed_test.cc b/klm/util/read_compressed_test.cc
index 50450a02..301e8f4b 100644
--- a/klm/util/read_compressed_test.cc
+++ b/klm/util/read_compressed_test.cc
@@ -113,6 +113,11 @@ BOOST_AUTO_TEST_CASE(ReadXZ) {
}
#endif
+#ifdef HAVE_ZLIB
+BOOST_AUTO_TEST_CASE(AppendGZ) {
+}
+#endif
+
BOOST_AUTO_TEST_CASE(IStream) {
std::string name(WriteRandom());
std::fstream stream(name.c_str(), std::ios::in);
diff --git a/klm/util/scoped.cc b/klm/util/scoped.cc
index 6c5b0c2d..de1d9e94 100644
--- a/klm/util/scoped.cc
+++ b/klm/util/scoped.cc
@@ -32,10 +32,6 @@ void *CallocOrThrow(std::size_t requested) {
return InspectAddr(std::calloc(1, requested), requested, "calloc");
}
-scoped_malloc::~scoped_malloc() {
- std::free(p_);
-}
-
void scoped_malloc::call_realloc(std::size_t requested) {
p_ = InspectAddr(std::realloc(p_, requested), requested, "realloc");
}
diff --git a/klm/util/scoped.hh b/klm/util/scoped.hh
index b642d064..60c36c36 100644
--- a/klm/util/scoped.hh
+++ b/klm/util/scoped.hh
@@ -1,9 +1,10 @@
-#ifndef UTIL_SCOPED__
-#define UTIL_SCOPED__
+#ifndef UTIL_SCOPED_H
+#define UTIL_SCOPED_H
/* Other scoped objects in the style of scoped_ptr. */
#include "util/exception.hh"
#include <cstddef>
+#include <cstdlib>
namespace util {
@@ -16,89 +17,93 @@ class MallocException : public ErrnoException {
void *MallocOrThrow(std::size_t requested);
void *CallocOrThrow(std::size_t requested);
-class scoped_malloc {
+/* Unfortunately, defining the operator* for void * makes the compiler complain.
+ * So scoped is specialized to void. This includes the functionality common to
+ * both, namely everything except reference.
+ */
+template <class T, class Closer> class scoped_base {
public:
- scoped_malloc() : p_(NULL) {}
+ explicit scoped_base(T *p = NULL) : p_(p) {}
- scoped_malloc(void *p) : p_(p) {}
+ ~scoped_base() { Closer::Close(p_); }
- ~scoped_malloc();
-
- void reset(void *p = NULL) {
- scoped_malloc other(p_);
+ void reset(T *p = NULL) {
+ scoped_base other(p_);
p_ = p;
}
- void call_realloc(std::size_t to);
-
- void *get() { return p_; }
- const void *get() const { return p_; }
-
- private:
- void *p_;
-
- scoped_malloc(const scoped_malloc &);
- scoped_malloc &operator=(const scoped_malloc &);
-};
-
-// Hat tip to boost.
-template <class T> class scoped_array {
- public:
- explicit scoped_array(T *content = NULL) : c_(content) {}
-
- ~scoped_array() { delete [] c_; }
-
- T *get() { return c_; }
- const T* get() const { return c_; }
+ T *get() { return p_; }
+ const T *get() const { return p_; }
- T &operator*() { return *c_; }
- const T&operator*() const { return *c_; }
+ T *operator->() { return p_; }
+ const T *operator->() const { return p_; }
- T &operator[](std::size_t idx) { return c_[idx]; }
- const T &operator[](std::size_t idx) const { return c_[idx]; }
-
- void reset(T *to = NULL) {
- scoped_array<T> other(c_);
- c_ = to;
+ T *release() {
+ T *ret = p_;
+ p_ = NULL;
+ return ret;
}
- private:
- T *c_;
+ protected:
+ T *p_;
- scoped_array(const scoped_array &);
- void operator=(const scoped_array &);
+ private:
+ scoped_base(const scoped_base &);
+ scoped_base &operator=(const scoped_base &);
};
-template <class T> class scoped_ptr {
+template <class T, class Closer> class scoped : public scoped_base<T, Closer> {
public:
- explicit scoped_ptr(T *content = NULL) : c_(content) {}
+ explicit scoped(T *p = NULL) : scoped_base<T, Closer>(p) {}
- ~scoped_ptr() { delete c_; }
+ T &operator*() { return *scoped_base<T, Closer>::p_; }
+ const T&operator*() const { return *scoped_base<T, Closer>::p_; }
+};
- T *get() { return c_; }
- const T* get() const { return c_; }
+template <class Closer> class scoped<void, Closer> : public scoped_base<void, Closer> {
+ public:
+ explicit scoped(void *p = NULL) : scoped_base<void, Closer>(p) {}
+};
- T &operator*() { return *c_; }
- const T&operator*() const { return *c_; }
+/* Closer for c functions like std::free and cmph cleanup functions */
+template <class T, void (*clean)(T*)> struct scoped_c_forward {
+ static void Close(T *p) { clean(p); }
+};
+// Call a C function to delete stuff
+template <class T, void (*clean)(T*)> class scoped_c : public scoped<T, scoped_c_forward<T, clean> > {
+ public:
+ explicit scoped_c(T *p = NULL) : scoped<T, scoped_c_forward<T, clean> >(p) {}
+};
- T *operator->() { return c_; }
- const T*operator->() const { return c_; }
+class scoped_malloc : public scoped_c<void, std::free> {
+ public:
+ explicit scoped_malloc(void *p = NULL) : scoped_c<void, std::free>(p) {}
- T &operator[](std::size_t idx) { return c_[idx]; }
- const T &operator[](std::size_t idx) const { return c_[idx]; }
+ void call_realloc(std::size_t to);
+};
- void reset(T *to = NULL) {
- scoped_ptr<T> other(c_);
- c_ = to;
- }
+/* scoped_array using delete[] */
+struct scoped_delete_array_forward {
+ template <class T> static void Close(T *p) { delete [] p; }
+};
+// Hat tip to boost.
+template <class T> class scoped_array : public scoped<T, scoped_delete_array_forward> {
+ public:
+ explicit scoped_array(T *p = NULL) : scoped<T, scoped_delete_array_forward>(p) {}
- private:
- T *c_;
+ T &operator[](std::size_t idx) { return scoped<T, scoped_delete_array_forward>::p_[idx]; }
+ const T &operator[](std::size_t idx) const { return scoped<T, scoped_delete_array_forward>::p_[idx]; }
+};
- scoped_ptr(const scoped_ptr &);
- void operator=(const scoped_ptr &);
+/* scoped_ptr using delete. If only there were a template typedef. */
+struct scoped_delete_forward {
+ template <class T> static void Close(T *p) { delete p; }
+};
+template <class T> class scoped_ptr : public scoped<T, scoped_delete_forward> {
+ public:
+ explicit scoped_ptr(T *p = NULL) : scoped<T, scoped_delete_forward>(p) {}
};
} // namespace util
-#endif // UTIL_SCOPED__
+#endif // UTIL_SCOPED_H
diff --git a/klm/util/sized_iterator.hh b/klm/util/sized_iterator.hh
index a72657b5..75f6886f 100644
--- a/klm/util/sized_iterator.hh
+++ b/klm/util/sized_iterator.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_SIZED_ITERATOR__
-#define UTIL_SIZED_ITERATOR__
+#ifndef UTIL_SIZED_ITERATOR_H
+#define UTIL_SIZED_ITERATOR_H
#include "util/proxy_iterator.hh"
@@ -117,4 +117,4 @@ template <class Delegate, class Proxy = SizedProxy> class SizedCompare : public
};
} // namespace util
-#endif // UTIL_SIZED_ITERATOR__
+#endif // UTIL_SIZED_ITERATOR_H
diff --git a/klm/util/sorted_uniform.hh b/klm/util/sorted_uniform.hh
index 7700d9e6..a3f6d021 100644
--- a/klm/util/sorted_uniform.hh
+++ b/klm/util/sorted_uniform.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_SORTED_UNIFORM__
-#define UTIL_SORTED_UNIFORM__
+#ifndef UTIL_SORTED_UNIFORM_H
+#define UTIL_SORTED_UNIFORM_H
#include <algorithm>
#include <cstddef>
@@ -101,27 +101,6 @@ template <class Iterator, class Accessor, class Pivot> bool SortedUniformFind(co
return BoundedSortedUniformFind<Iterator, Accessor, Pivot>(accessor, begin, below, end, above, key, out);
}
-// May return begin - 1.
-template <class Iterator, class Accessor> Iterator BinaryBelow(
- const Accessor &accessor,
- Iterator begin,
- Iterator end,
- const typename Accessor::Key key) {
- while (end > begin) {
- Iterator pivot(begin + (end - begin) / 2);
- typename Accessor::Key mid(accessor(pivot));
- if (mid < key) {
- begin = pivot + 1;
- } else if (mid > key) {
- end = pivot;
- } else {
- for (++pivot; (pivot < end) && accessor(pivot) == mid; ++pivot) {}
- return pivot - 1;
- }
- }
- return begin - 1;
-}
-
} // namespace util
-#endif // UTIL_SORTED_UNIFORM__
+#endif // UTIL_SORTED_UNIFORM_H
diff --git a/klm/util/stream/Makefile.am b/klm/util/stream/Makefile.am
index f18cbedb..25817b50 100644
--- a/klm/util/stream/Makefile.am
+++ b/klm/util/stream/Makefile.am
@@ -11,6 +11,7 @@ libklm_util_stream_a_SOURCES = \
line_input.hh \
multi_progress.cc \
multi_progress.hh \
+ multi_stream.hh \
sort.hh \
stream.hh \
timer.hh
diff --git a/klm/util/stream/block.hh b/klm/util/stream/block.hh
index 11aa991e..aa7e28bb 100644
--- a/klm/util/stream/block.hh
+++ b/klm/util/stream/block.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_STREAM_BLOCK__
-#define UTIL_STREAM_BLOCK__
+#ifndef UTIL_STREAM_BLOCK_H
+#define UTIL_STREAM_BLOCK_H
#include <cstddef>
#include <stdint.h>
@@ -7,28 +7,77 @@
namespace util {
namespace stream {
+/**
+ * Encapsulates a block of memory.
+ */
class Block {
public:
+
+ /**
+ * Constructs an empty block.
+ */
Block() : mem_(NULL), valid_size_(0) {}
+ /**
+ * Constructs a block that encapsulates a segment of memory.
+ *
+ * @param[in] mem The segment of memory to encapsulate
+ * @param[in] size The size of the memory segment in bytes
+ */
Block(void *mem, std::size_t size) : mem_(mem), valid_size_(size) {}
+ /**
+ * Set the number of bytes in this block that should be interpreted as valid.
+ *
+ * @param[in] to Number of bytes
+ */
void SetValidSize(std::size_t to) { valid_size_ = to; }
- // Read might fill in less than Allocated at EOF.
+
+ /**
+ * Gets the number of bytes in this block that should be interpreted as valid.
+ * This is important because read might fill in less than Allocated at EOF.
+ */
std::size_t ValidSize() const { return valid_size_; }
+ /** Gets a void pointer to the memory underlying this block. */
void *Get() { return mem_; }
+
+ /** Gets a const void pointer to the memory underlying this block. */
const void *Get() const { return mem_; }
+
+ /**
+ * Gets a const void pointer to the end of the valid section of memory
+ * encapsulated by this block.
+ */
const void *ValidEnd() const {
return reinterpret_cast<const uint8_t*>(mem_) + valid_size_;
}
+ /**
+ * Returns true if this block encapsulates a valid (non-NULL) block of memory.
+ *
+ * This method is a user-defined implicit conversion function to boolean;
+ * among other things, this method enables bare instances of this class
+ * to be used as the condition of an if statement.
+ */
operator bool() const { return mem_ != NULL; }
+
+ /**
+ * Returns true if this block is empty.
+ *
+ * In other words, if Get()==NULL, this method will return true.
+ */
bool operator!() const { return mem_ == NULL; }
private:
friend class Link;
+
+ /**
+ * Points this block's memory at NULL.
+ *
+ * This class defines poison as a block whose memory pointer is NULL.
+ */
void SetToPoison() {
mem_ = NULL;
}
@@ -40,4 +89,4 @@ class Block {
} // namespace stream
} // namespace util
-#endif // UTIL_STREAM_BLOCK__
+#endif // UTIL_STREAM_BLOCK_H
diff --git a/klm/util/stream/chain.cc b/klm/util/stream/chain.cc
index 46708c60..4596af7a 100644
--- a/klm/util/stream/chain.cc
+++ b/klm/util/stream/chain.cc
@@ -59,6 +59,11 @@ Chain &Chain::operator>>(const WriteAndRecycle &writer) {
return *this;
}
+Chain &Chain::operator>>(const PWriteAndRecycle &writer) {
+ threads_.push_back(new Thread(Complete(), writer));
+ return *this;
+}
+
void Chain::Wait(bool release_memory) {
if (queues_.empty()) {
assert(threads_.empty());
@@ -126,7 +131,12 @@ Link::~Link() {
// abort();
} else {
if (!poisoned_) {
- // Pass the poison!
+ // Poison is a block whose memory pointer is NULL.
+ //
+ // Because we're in the else block,
+ // we know that the memory pointer of current_ is NULL.
+ //
+ // Pass the current (poison) block!
out_->Produce(current_);
}
}
diff --git a/klm/util/stream/chain.hh b/klm/util/stream/chain.hh
index 0cc83a85..50865086 100644
--- a/klm/util/stream/chain.hh
+++ b/klm/util/stream/chain.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_STREAM_CHAIN__
-#define UTIL_STREAM_CHAIN__
+#ifndef UTIL_STREAM_CHAIN_H
+#define UTIL_STREAM_CHAIN_H
#include "util/stream/block.hh"
#include "util/stream/config.hh"
@@ -24,7 +24,12 @@ class ChainConfigException : public Exception {
};
class Chain;
-// Specifies position in chain for Link constructor.
+
+/**
+ * Encapsulates a @ref PCQueue "producer queue" and a @ref PCQueue "consumer queue" within a @ref Chain "chain".
+ *
+ * Specifies position in chain for Link constructor.
+ */
class ChainPosition {
public:
const Chain &GetChain() const { return *chain_; }
@@ -41,14 +46,32 @@ class ChainPosition {
WorkerProgress progress_;
};
-// Position is usually ChainPosition but if there are multiple streams involved, this can be ChainPositions.
+
+/**
+ * Encapsulates a worker thread processing data at a given position in the chain.
+ *
+ * Each instance of this class owns one boost thread in which the worker is Run().
+ */
class Thread {
public:
+
+ /**
+ * Constructs a new Thread in which the provided Worker is Run().
+ *
+ * Position is usually ChainPosition but if there are multiple streams involved, this can be ChainPositions.
+ *
+ * After a call to this constructor, the provided worker will be running within a boost thread owned by the newly constructed Thread object.
+ */
template <class Position, class Worker> Thread(const Position &position, const Worker &worker)
: thread_(boost::ref(*this), position, worker) {}
~Thread();
+ /**
+ * Launches the provided worker in this object's boost thread.
+ *
+ * This method is called automatically by this class's @ref Thread() "constructor".
+ */
template <class Position, class Worker> void operator()(const Position &position, Worker &worker) {
try {
worker.Run(position);
@@ -63,14 +86,27 @@ class Thread {
boost::thread thread_;
};
+/**
+ * This resets blocks to full valid size. Used to close the loop in Chain by recycling blocks.
+ */
class Recycler {
public:
+ /**
+ * Resets the blocks in the chain such that the blocks' respective valid sizes match the chain's block size.
+ *
+ * @see Block::SetValidSize()
+ * @see Chain::BlockSize()
+ */
void Run(const ChainPosition &position);
};
extern const Recycler kRecycle;
class WriteAndRecycle;
-
+class PWriteAndRecycle;
+
+/**
+ * Represents a sequence of workers, through which @ref Block "blocks" can pass.
+ */
class Chain {
private:
template <class T, void (T::*ptr)(const ChainPosition &) = &T::Run> struct CheckForRun {
@@ -78,8 +114,20 @@ class Chain {
};
public:
+
+ /**
+ * Constructs a configured Chain.
+ *
+ * @param config Specifies how to configure the Chain.
+ */
explicit Chain(const ChainConfig &config);
+ /**
+ * Destructs a Chain.
+ *
+ * This method waits for the chain's threads to complete,
+ * and frees the memory held by this chain.
+ */
~Chain();
void ActivateProgress() {
@@ -91,24 +139,49 @@ class Chain {
progress_.SetTarget(target);
}
+ /**
+ * Gets the number of bytes in each record of a Block.
+ *
+ * @see ChainConfig::entry_size
+ */
std::size_t EntrySize() const {
return config_.entry_size;
}
+
+ /**
+ * Gets the inital @ref Block::ValidSize "valid size" for @ref Block "blocks" in this chain.
+ *
+ * @see Block::ValidSize
+ */
std::size_t BlockSize() const {
return block_size_;
}
- // Two ways to add to the chain: Add() or operator>>.
+ /** Two ways to add to the chain: Add() or operator>>. */
ChainPosition Add();
- // This is for adding threaded workers with a Run method.
+ /**
+ * Adds a new worker to this chain,
+ * and runs that worker in a new Thread owned by this chain.
+ *
+ * The worker must have a Run method that accepts a position argument.
+ *
+ * @see Thread::operator()()
+ */
template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) {
assert(!complete_called_);
threads_.push_back(new Thread(Add(), worker));
return *this;
}
- // Avoid copying the worker.
+ /**
+ * Adds a new worker to this chain (but avoids copying that worker),
+ * and runs that worker in a new Thread owned by this chain.
+ *
+ * The worker must have a Run method that accepts a position argument.
+ *
+ * @see Thread::operator()()
+ */
template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) {
assert(!complete_called_);
threads_.push_back(new Thread(Add(), worker));
@@ -122,12 +195,21 @@ class Chain {
threads_.push_back(new Thread(Complete(), kRecycle));
}
+ /**
+ * Adds a Recycler worker to this chain,
+ * and runs that worker in a new Thread owned by this chain.
+ */
Chain &operator>>(const Recycler &) {
CompleteLoop();
return *this;
}
+ /**
+ * Adds a WriteAndRecycle worker to this chain,
+ * and runs that worker in a new Thread owned by this chain.
+ */
Chain &operator>>(const WriteAndRecycle &writer);
+ Chain &operator>>(const PWriteAndRecycle &writer);
// Chains are reusable. Call Wait to wait for everything to finish and free memory.
void Wait(bool release_memory = true);
@@ -156,28 +238,87 @@ class Chain {
};
// Create the link in the worker thread using the position token.
+/**
+ * Represents a C++ style iterator over @ref Block "blocks".
+ */
class Link {
public:
+
// Either default construct and Init or just construct all at once.
+
+ /**
+ * Constructs an @ref Init "initialized" link.
+ *
+ * @see Init
+ */
+ explicit Link(const ChainPosition &position);
+
+ /**
+ * Constructs a link that must subsequently be @ref Init "initialized".
+ *
+ * @see Init
+ */
Link();
+
+ /**
+ * Initializes the link with the input @ref PCQueue "consumer queue" and output @ref PCQueue "producer queue" at a given @ref ChainPosition "position" in the @ref Chain "chain".
+ *
+ * @see Link()
+ */
void Init(const ChainPosition &position);
- explicit Link(const ChainPosition &position);
-
+ /**
+ * Destructs the link object.
+ *
+ * If necessary, this method will pass a poison block
+ * to this link's output @ref PCQueue "producer queue".
+ *
+ * @see Block::SetToPoison()
+ */
~Link();
+ /**
+ * Gets a reference to the @ref Block "block" at this link.
+ */
Block &operator*() { return current_; }
+
+ /**
+ * Gets a const reference to the @ref Block "block" at this link.
+ */
const Block &operator*() const { return current_; }
+ /**
+ * Gets a pointer to the @ref Block "block" at this link.
+ */
Block *operator->() { return &current_; }
+
+ /**
+ * Gets a const pointer to the @ref Block "block" at this link.
+ */
const Block *operator->() const { return &current_; }
+ /**
+ * Gets the link at the next @ref ChainPosition "position" in the @ref Chain "chain".
+ */
Link &operator++();
+ /**
+ * Returns true if the @ref Block "block" at this link encapsulates a valid (non-NULL) block of memory.
+ *
+ * This method is a user-defined implicit conversion function to boolean;
+ * among other things, this method enables bare instances of this class
+ * to be used as the condition of an if statement.
+ */
operator bool() const { return current_; }
+ /**
+ * @ref Block::SetToPoison() "Poisons" the @ref Block "block" at this link,
+ * and passes this now-poisoned block to this link's output @ref PCQueue "producer queue".
+ *
+ * @see Block::SetToPoison()
+ */
void Poison();
-
+
private:
Block current_;
PCQueue<Block> *in_, *out_;
@@ -195,4 +336,4 @@ inline Chain &operator>>(Chain &chain, Link &link) {
} // namespace stream
} // namespace util
-#endif // UTIL_STREAM_CHAIN__
+#endif // UTIL_STREAM_CHAIN_H
diff --git a/klm/util/stream/config.hh b/klm/util/stream/config.hh
index 1eeb3a8a..6bad36bc 100644
--- a/klm/util/stream/config.hh
+++ b/klm/util/stream/config.hh
@@ -1,32 +1,63 @@
-#ifndef UTIL_STREAM_CONFIG__
-#define UTIL_STREAM_CONFIG__
+#ifndef UTIL_STREAM_CONFIG_H
+#define UTIL_STREAM_CONFIG_H
#include <cstddef>
#include <string>
namespace util { namespace stream {
+/**
+ * Represents how a chain should be configured.
+ */
struct ChainConfig {
+
+ /** Constructs an configuration with underspecified (or default) parameters. */
ChainConfig() {}
+ /**
+ * Constructs a chain configuration object.
+ *
+ * @param [in] in_entry_size Number of bytes in each record.
+ * @param [in] in_block_count Number of blocks in the chain.
+ * @param [in] in_total_memory Total number of bytes available to the chain.
+ * This value will be divided amongst the blocks in the chain.
+ */
ChainConfig(std::size_t in_entry_size, std::size_t in_block_count, std::size_t in_total_memory)
: entry_size(in_entry_size), block_count(in_block_count), total_memory(in_total_memory) {}
+ /**
+ * Number of bytes in each record.
+ */
std::size_t entry_size;
+
+ /**
+ * Number of blocks in the chain.
+ */
std::size_t block_count;
- // Chain's constructor will make this a multiple of entry_size.
+
+ /**
+ * Total number of bytes available to the chain.
+ * This value will be divided amongst the blocks in the chain.
+ * Chain's constructor will make this a multiple of entry_size.
+ */
std::size_t total_memory;
};
+
+/**
+ * Represents how a sorter should be configured.
+ */
struct SortConfig {
+
+ /** Filename prefix where temporary files should be placed. */
std::string temp_prefix;
- // Size of each input/output buffer.
+ /** Size of each input/output buffer. */
std::size_t buffer_size;
- // Total memory to use when running alone.
+ /** Total memory to use when running alone. */
std::size_t total_memory;
};
}} // namespaces
-#endif // UTIL_STREAM_CONFIG__
+#endif // UTIL_STREAM_CONFIG_H
diff --git a/klm/util/stream/io.cc b/klm/util/stream/io.cc
index 0459f706..c64004c0 100644
--- a/klm/util/stream/io.cc
+++ b/klm/util/stream/io.cc
@@ -36,12 +36,12 @@ void PRead::Run(const ChainPosition &position) {
Link link(position);
uint64_t offset = 0;
for (; offset + block_size64 < size; offset += block_size64, ++link) {
- PReadOrThrow(file_, link->Get(), block_size, offset);
+ ErsatzPRead(file_, link->Get(), block_size, offset);
link->SetValidSize(block_size);
}
// size - offset is <= block_size, so it casts to 32-bit fine.
if (size - offset) {
- PReadOrThrow(file_, link->Get(), size - offset, offset);
+ ErsatzPRead(file_, link->Get(), size - offset, offset);
link->SetValidSize(size - offset);
++link;
}
@@ -62,5 +62,15 @@ void WriteAndRecycle::Run(const ChainPosition &position) {
}
}
+void PWriteAndRecycle::Run(const ChainPosition &position) {
+ const std::size_t block_size = position.GetChain().BlockSize();
+ uint64_t offset = 0;
+ for (Link link(position); link; ++link) {
+ ErsatzPWrite(file_, link->Get(), link->ValidSize(), offset);
+ offset += link->ValidSize();
+ link->SetValidSize(block_size);
+ }
+}
+
} // namespace stream
} // namespace util
diff --git a/klm/util/stream/io.hh b/klm/util/stream/io.hh
index 934b6b3f..8dae2cbf 100644
--- a/klm/util/stream/io.hh
+++ b/klm/util/stream/io.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_STREAM_IO__
-#define UTIL_STREAM_IO__
+#ifndef UTIL_STREAM_IO_H
+#define UTIL_STREAM_IO_H
#include "util/exception.hh"
#include "util/file.hh"
@@ -41,6 +41,8 @@ class Write {
int file_;
};
+// It's a common case that stuff is written and then recycled. So rather than
+// spawn another thread to Recycle, this combines the two roles.
class WriteAndRecycle {
public:
explicit WriteAndRecycle(int fd) : file_(fd) {}
@@ -49,14 +51,23 @@ class WriteAndRecycle {
int file_;
};
+class PWriteAndRecycle {
+ public:
+ explicit PWriteAndRecycle(int fd) : file_(fd) {}
+ void Run(const ChainPosition &position);
+ private:
+ int file_;
+};
+
+
// Reuse the same file over and over again to buffer output.
class FileBuffer {
public:
explicit FileBuffer(int fd) : file_(fd) {}
- WriteAndRecycle Sink() const {
+ PWriteAndRecycle Sink() const {
util::SeekOrThrow(file_.get(), 0);
- return WriteAndRecycle(file_.get());
+ return PWriteAndRecycle(file_.get());
}
PRead Source() const {
@@ -73,4 +84,4 @@ class FileBuffer {
} // namespace stream
} // namespace util
-#endif // UTIL_STREAM_IO__
+#endif // UTIL_STREAM_IO_H
diff --git a/klm/util/stream/line_input.hh b/klm/util/stream/line_input.hh
index 86db1dd0..a870a664 100644
--- a/klm/util/stream/line_input.hh
+++ b/klm/util/stream/line_input.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_STREAM_LINE_INPUT__
-#define UTIL_STREAM_LINE_INPUT__
+#ifndef UTIL_STREAM_LINE_INPUT_H
+#define UTIL_STREAM_LINE_INPUT_H
namespace util {namespace stream {
class ChainPosition;
@@ -19,4 +19,4 @@ class LineInput {
};
}} // namespaces
-#endif // UTIL_STREAM_LINE_INPUT__
+#endif // UTIL_STREAM_LINE_INPUT_H
diff --git a/klm/util/stream/multi_progress.hh b/klm/util/stream/multi_progress.hh
index c4dd45a9..82e698a5 100644
--- a/klm/util/stream/multi_progress.hh
+++ b/klm/util/stream/multi_progress.hh
@@ -1,6 +1,6 @@
/* Progress bar suitable for chains of workers */
-#ifndef UTIL_MULTI_PROGRESS__
-#define UTIL_MULTI_PROGRESS__
+#ifndef UTIL_STREAM_MULTI_PROGRESS_H
+#define UTIL_STREAM_MULTI_PROGRESS_H
#include <boost/thread/mutex.hpp>
@@ -87,4 +87,4 @@ class WorkerProgress {
}} // namespaces
-#endif // UTIL_MULTI_PROGRESS__
+#endif // UTIL_STREAM_MULTI_PROGRESS_H
diff --git a/klm/util/stream/multi_stream.hh b/klm/util/stream/multi_stream.hh
new file mode 100644
index 00000000..0ee7fab6
--- /dev/null
+++ b/klm/util/stream/multi_stream.hh
@@ -0,0 +1,127 @@
+#ifndef UTIL_STREAM_MULTI_STREAM_H
+#define UTIL_STREAM_MULTI_STREAM_H
+
+#include "util/fixed_array.hh"
+#include "util/scoped.hh"
+#include "util/stream/chain.hh"
+#include "util/stream/stream.hh"
+
+#include <cstddef>
+#include <new>
+
+#include <assert.h>
+#include <stdlib.h>
+
+namespace util { namespace stream {
+
+class Chains;
+
+class ChainPositions : public util::FixedArray<util::stream::ChainPosition> {
+ public:
+ ChainPositions() {}
+
+ void Init(Chains &chains);
+
+ explicit ChainPositions(Chains &chains) {
+ Init(chains);
+ }
+};
+
+class Chains : public util::FixedArray<util::stream::Chain> {
+ private:
+ template <class T, void (T::*ptr)(const ChainPositions &) = &T::Run> struct CheckForRun {
+ typedef Chains type;
+ };
+
+ public:
+ // Must call Init.
+ Chains() {}
+
+ explicit Chains(std::size_t limit) : util::FixedArray<util::stream::Chain>(limit) {}
+
+ template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) {
+ threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
+ return *this;
+ }
+
+ template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) {
+ threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
+ return *this;
+ }
+
+ Chains &operator>>(const util::stream::Recycler &recycler) {
+ for (util::stream::Chain *i = begin(); i != end(); ++i)
+ *i >> recycler;
+ return *this;
+ }
+
+ void Wait(bool release_memory = true) {
+ threads_.clear();
+ for (util::stream::Chain *i = begin(); i != end(); ++i) {
+ i->Wait(release_memory);
+ }
+ }
+
+ private:
+ boost::ptr_vector<util::stream::Thread> threads_;
+
+ Chains(const Chains &);
+ void operator=(const Chains &);
+};
+
+inline void ChainPositions::Init(Chains &chains) {
+ util::FixedArray<util::stream::ChainPosition>::Init(chains.size());
+ for (util::stream::Chain *i = chains.begin(); i != chains.end(); ++i) {
+ // use "placement new" syntax to initalize ChainPosition in an already-allocated memory location
+ new (end()) util::stream::ChainPosition(i->Add()); Constructed();
+ }
+}
+
+inline Chains &operator>>(Chains &chains, ChainPositions &positions) {
+ positions.Init(chains);
+ return chains;
+}
+
+template <class T> class GenericStreams : public util::FixedArray<T> {
+ private:
+ typedef util::FixedArray<T> P;
+ public:
+ GenericStreams() {}
+
+ // This puts a dummy T at the beginning (useful to algorithms that need to reference something at the beginning).
+ void InitWithDummy(const ChainPositions &positions) {
+ P::Init(positions.size() + 1);
+ new (P::end()) T(); // use "placement new" syntax to initalize T in an already-allocated memory location
+ P::Constructed();
+ for (const util::stream::ChainPosition *i = positions.begin(); i != positions.end(); ++i) {
+ P::push_back(*i);
+ }
+ }
+
+ // Limit restricts to positions[0,limit)
+ void Init(const ChainPositions &positions, std::size_t limit) {
+ P::Init(limit);
+ for (const util::stream::ChainPosition *i = positions.begin(); i != positions.begin() + limit; ++i) {
+ P::push_back(*i);
+ }
+ }
+ void Init(const ChainPositions &positions) {
+ Init(positions, positions.size());
+ }
+
+ GenericStreams(const ChainPositions &positions) {
+ Init(positions);
+ }
+};
+
+template <class T> inline Chains &operator>>(Chains &chains, GenericStreams<T> &streams) {
+ ChainPositions positions;
+ chains >> positions;
+ streams.Init(positions);
+ return chains;
+}
+
+typedef GenericStreams<Stream> Streams;
+
+}} // namespaces
+#endif // UTIL_STREAM_MULTI_STREAM_H
diff --git a/klm/util/stream/sort.hh b/klm/util/stream/sort.hh
index 16aa6a03..9082cfdd 100644
--- a/klm/util/stream/sort.hh
+++ b/klm/util/stream/sort.hh
@@ -15,8 +15,8 @@
* sort. Use a hash table for that.
*/
-#ifndef UTIL_STREAM_SORT__
-#define UTIL_STREAM_SORT__
+#ifndef UTIL_STREAM_SORT_H
+#define UTIL_STREAM_SORT_H
#include "util/stream/chain.hh"
#include "util/stream/config.hh"
@@ -182,7 +182,7 @@ template <class Compare> class MergeQueue {
amount = remaining_;
buffer_end_ = current_ + remaining_;
}
- PReadOrThrow(fd, current_, amount, offset_);
+ ErsatzPRead(fd, current_, amount, offset_);
offset_ += amount;
assert(current_ <= buffer_end_);
remaining_ -= amount;
@@ -307,10 +307,10 @@ template <class Compare, class Combine> class MergingReader {
const uint64_t block_size = position.GetChain().BlockSize();
Link l(position);
for (; offset + block_size < end; ++l, offset += block_size) {
- PReadOrThrow(in_, l->Get(), block_size, offset);
+ ErsatzPRead(in_, l->Get(), block_size, offset);
l->SetValidSize(block_size);
}
- PReadOrThrow(in_, l->Get(), end - offset, offset);
+ ErsatzPRead(in_, l->Get(), end - offset, offset);
l->SetValidSize(end - offset);
(++l).Poison();
return;
@@ -388,8 +388,10 @@ class BadSortConfig : public Exception {
~BadSortConfig() throw() {}
};
+/** Sort */
template <class Compare, class Combine = NeverCombine> class Sort {
public:
+ /** Constructs an object capable of sorting */
Sort(Chain &in, const SortConfig &config, const Compare &compare = Compare(), const Combine &combine = Combine())
: config_(config),
data_(MakeTemp(config.temp_prefix)),
@@ -545,4 +547,4 @@ template <class Compare, class Combine> uint64_t BlockingSort(Chain &chain, cons
} // namespace stream
} // namespace util
-#endif // UTIL_STREAM_SORT__
+#endif // UTIL_STREAM_SORT_H
diff --git a/klm/util/stream/stream.hh b/klm/util/stream/stream.hh
index 6ff45b82..7ea1c9f7 100644
--- a/klm/util/stream/stream.hh
+++ b/klm/util/stream/stream.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_STREAM_STREAM__
-#define UTIL_STREAM_STREAM__
+#ifndef UTIL_STREAM_STREAM_H
+#define UTIL_STREAM_STREAM_H
#include "util/stream/chain.hh"
@@ -56,6 +56,9 @@ class Stream : boost::noncopyable {
end_ = current_ + block_it_->ValidSize();
}
+ // The following are pointers to raw memory
+ // current_ is the current record
+ // end_ is the end of the block (so we know when to move to the next block)
uint8_t *current_, *end_;
std::size_t entry_size_;
@@ -71,4 +74,4 @@ inline Chain &operator>>(Chain &chain, Stream &stream) {
} // namespace stream
} // namespace util
-#endif // UTIL_STREAM_STREAM__
+#endif // UTIL_STREAM_STREAM_H
diff --git a/klm/util/stream/timer.hh b/klm/util/stream/timer.hh
index 7e1a5885..06488a17 100644
--- a/klm/util/stream/timer.hh
+++ b/klm/util/stream/timer.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_STREAM_TIMER__
-#define UTIL_STREAM_TIMER__
+#ifndef UTIL_STREAM_TIMER_H
+#define UTIL_STREAM_TIMER_H
// Sorry Jon, this was adding library dependencies in Moses and people complained.
@@ -13,4 +13,4 @@
#define UTIL_TIMER(str)
//#endif
-#endif // UTIL_STREAM_TIMER__
+#endif // UTIL_STREAM_TIMER_H
diff --git a/klm/util/string_piece.cc b/klm/util/string_piece.cc
index 973091c4..62694a35 100644
--- a/klm/util/string_piece.cc
+++ b/klm/util/string_piece.cc
@@ -1,2 +1 @@
-// this has been moved to utils/ in cdec
-
+// moved to cdec/utils
diff --git a/klm/util/string_piece.hh b/klm/util/string_piece.hh
index 696ca084..a49779aa 100644
--- a/klm/util/string_piece.hh
+++ b/klm/util/string_piece.hh
@@ -1,2 +1 @@
#include "utils/string_piece.hh"
-
diff --git a/klm/util/string_piece_hash.hh b/klm/util/string_piece_hash.hh
index f206b1d8..5c8c525e 100644
--- a/klm/util/string_piece_hash.hh
+++ b/klm/util/string_piece_hash.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_STRING_PIECE_HASH__
-#define UTIL_STRING_PIECE_HASH__
+#ifndef UTIL_STRING_PIECE_HASH_H
+#define UTIL_STRING_PIECE_HASH_H
#include "util/string_piece.hh"
@@ -40,4 +40,4 @@ template <class T> typename T::iterator FindStringPiece(T &t, const StringPiece
#endif
}
-#endif // UTIL_STRING_PIECE_HASH__
+#endif // UTIL_STRING_PIECE_HASH_H
diff --git a/klm/util/thread_pool.hh b/klm/util/thread_pool.hh
index 84e257ea..d1a883a0 100644
--- a/klm/util/thread_pool.hh
+++ b/klm/util/thread_pool.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_THREAD_POOL__
-#define UTIL_THREAD_POOL__
+#ifndef UTIL_THREAD_POOL_H
+#define UTIL_THREAD_POOL_H
#include "util/pcqueue.hh"
@@ -18,8 +18,8 @@ template <class HandlerT> class Worker : boost::noncopyable {
typedef HandlerT Handler;
typedef typename Handler::Request Request;
- template <class Construct> Worker(PCQueue<Request> &in, Construct &construct, Request &poison)
- : in_(in), handler_(construct), thread_(boost::ref(*this)), poison_(poison) {}
+ template <class Construct> Worker(PCQueue<Request> &in, Construct &construct, const Request &poison)
+ : in_(in), handler_(construct), poison_(poison), thread_(boost::ref(*this)) {}
// Only call from thread.
void operator()() {
@@ -30,7 +30,7 @@ template <class HandlerT> class Worker : boost::noncopyable {
try {
(*handler_)(request);
}
- catch(std::exception &e) {
+ catch(const std::exception &e) {
std::cerr << "Handler threw " << e.what() << std::endl;
abort();
}
@@ -49,10 +49,10 @@ template <class HandlerT> class Worker : boost::noncopyable {
PCQueue<Request> &in_;
boost::optional<Handler> handler_;
+
+ const Request poison_;
boost::thread thread_;
-
- Request poison_;
};
template <class HandlerT> class ThreadPool : boost::noncopyable {
@@ -92,4 +92,4 @@ template <class HandlerT> class ThreadPool : boost::noncopyable {
} // namespace util
-#endif // UTIL_THREAD_POOL__
+#endif // UTIL_THREAD_POOL_H
diff --git a/klm/util/tokenize_piece.hh b/klm/util/tokenize_piece.hh
index 24eae8fb..908c8daf 100644
--- a/klm/util/tokenize_piece.hh
+++ b/klm/util/tokenize_piece.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_TOKENIZE_PIECE__
-#define UTIL_TOKENIZE_PIECE__
+#ifndef UTIL_TOKENIZE_PIECE_H
+#define UTIL_TOKENIZE_PIECE_H
#include "util/exception.hh"
#include "util/string_piece.hh"
@@ -7,7 +7,8 @@
#include <boost/iterator/iterator_facade.hpp>
#include <algorithm>
-#include <iostream>
+
+#include <string.h>
namespace util {
@@ -71,6 +72,13 @@ class BoolCharacter {
return StringPiece(in.data() + in.size(), 0);
}
+ template <unsigned Length> static void Build(const char (&characters)[Length], bool (&out)[256]) {
+ memset(out, 0, sizeof(out));
+ for (const char *i = characters; i != characters + Length; ++i) {
+ out[static_cast<unsigned char>(*i)] = true;
+ }
+ }
+
private:
const bool *delimiter_;
};
@@ -140,4 +148,4 @@ template <class Find, bool SkipEmpty = false> class TokenIter : public boost::it
} // namespace util
-#endif // UTIL_TOKENIZE_PIECE__
+#endif // UTIL_TOKENIZE_PIECE_H
diff --git a/klm/util/unistd.hh b/klm/util/unistd.hh
new file mode 100644
index 00000000..0379c491
--- /dev/null
+++ b/klm/util/unistd.hh
@@ -0,0 +1,22 @@
+#ifndef UTIL_UNISTD_H
+#define UTIL_UNISTD_H
+
+#if defined(_WIN32) || defined(_WIN64)
+
+// Windows doesn't define <unistd.h>
+//
+// So we define what we need here instead:
+//
+#define STDIN_FILENO=0
+#define STDOUT_FILENO=1
+
+
+#else // Huzzah for POSIX!
+
+#include <unistd.h>
+
+#endif
+
+
+
+#endif // UTIL_UNISTD_H
diff --git a/klm/util/usage.cc b/klm/util/usage.cc
index e68d7c7c..2a4aa47d 100644
--- a/klm/util/usage.cc
+++ b/klm/util/usage.cc
@@ -30,6 +30,8 @@ typedef struct
DWORDLONG ullAvailVirtual;
DWORDLONG ullAvailExtendedVirtual;
} lMEMORYSTATUSEX;
+// Is this really supposed to be defined like this?
+typedef int WINBOOL;
typedef WINBOOL (WINAPI *PFN_MS_EX) (lMEMORYSTATUSEX*);
#else
#include <sys/resource.h>
@@ -196,7 +198,7 @@ uint64_t GuessPhysicalMemory() {
#if defined(_WIN32) || defined(_WIN64)
{ /* this works on windows */
PFN_MS_EX pfnex;
- HMODULE h = GetModuleHandle ("kernel32.dll");
+ HMODULE h = GetModuleHandle (TEXT("kernel32.dll"));
if (!h)
return 0;
diff --git a/klm/util/usage.hh b/klm/util/usage.hh
index da53b9e3..e578b0a6 100644
--- a/klm/util/usage.hh
+++ b/klm/util/usage.hh
@@ -1,5 +1,5 @@
-#ifndef UTIL_USAGE__
-#define UTIL_USAGE__
+#ifndef UTIL_USAGE_H
+#define UTIL_USAGE_H
#include <cstddef>
#include <iosfwd>
#include <string>
@@ -18,4 +18,4 @@ uint64_t GuessPhysicalMemory();
// Parse a size like unix sort. Sadly, this means the default multiplier is K.
uint64_t ParseSize(const std::string &arg);
} // namespace util
-#endif // UTIL_USAGE__
+#endif // UTIL_USAGE_H