]> arthur.barton.de Git - bup.git/commitdiff
Eject our pages after save via fmincore
authorRob Browning <rlb@defaultvalue.org>
Sat, 30 May 2015 16:18:26 +0000 (11:18 -0500)
committerRob Browning <rlb@defaultvalue.org>
Sat, 20 Jun 2015 15:55:54 +0000 (10:55 -0500)
Use fmincore and fadvise_done to eject only pages that have become
resident during our save traversal (i.e. via hashsplitting) in
batches (currently 8MB or one VM page, whichever's larger).

Hopefully this will work better than either universal ejection (our
previous behavior), or no ejection (which dramatically slows down save
operations on some systems, perhaps due to competition with access to
the indexes, etc.

Thanks to Nimen Nachname for the initial suggestion, and thanks to Tilo
Schwarz for reporting bugs in a previous version of the patch, and for
noting that we shouldn't wait until the end of a large region before
starting to eject it.

Signed-off-by: Rob Browning <rlb@defaultvalue.org>
Tested-by: Rob Browning <rlb@defaultvalue.org>
Tested-by: Ben Wiederhake <Ben.Wiederhake@gmail.com>
Tested-by: Tilo Schwarz <mail@tilo-schwarz.de>
[rlb@defaultvalue.org: bup_fmincore: add missing malloc result check,
 and missing free(result) when munmap fails to
 437fedd07ee327c14b11cb19f6c0519ef1e50884]
Signed-off-by: Rob Browning <rlb@defaultvalue.org>
Tested-by: Rob Browning <rlb@defaultvalue.org>
lib/bup/hashsplit.py
lib/bup/t/thashsplit.py

index 048347b2fdd312be787c7cfd3dbf7d6eb9820446..3ea8e17d872c926c32eda7d104b78380632fc348 100644 (file)
@@ -2,6 +2,13 @@ import math
 from bup import _helpers
 from bup.helpers import *
 
+try:
+    _fmincore = _helpers.fmincore
+except AttributeError, e:
+    _fmincore = None
+
+_page_size = os.sysconf("SC_PAGE_SIZE")
+
 BLOB_MAX = 8192*4   # 8192 is the "typical" blob size for bupsplit
 BLOB_READ_SIZE = 1024*1024
 MAX_PER_TREE = 256
@@ -41,28 +48,71 @@ class Buf:
         return len(self.data) - self.start
 
 
-def _fadvise_done(f, ofs, len):
-    assert(ofs >= 0)
-    assert(len >= 0)
-    if len > 0 and hasattr(f, 'fileno'):
-        _helpers.fadvise_done(f.fileno(), ofs, len)
+def _fadvise_pages_done(fd, first_page, count):
+    assert(first_page >= 0)
+    assert(count >= 0)
+    if count > 0:
+        _helpers.fadvise_done(fd, first_page * _page_size, count * _page_size)
+
+
+def _nonresident_page_regions(status_bytes, max_region_len=None):
+    """Return (start_page, count) pairs in ascending start_page order for
+    each contiguous region of nonresident pages indicated by the
+    mincore() status_bytes.  Limit the number of pages in each region
+    to max_region_len."""
+    assert(max_region_len is None or max_region_len > 0)
+    start = None
+    for i, x in enumerate(status_bytes):
+        in_core = ord(x) & 1
+        if start is None:
+            if not in_core:
+                start = i
+        else:
+            count = i - start
+            if in_core:
+                yield (start, count)
+                start = None
+            elif max_region_len and count >= max_region_len:
+                yield (start, count)
+                start = i
+    if start is not None:
+        yield (start, len(status_bytes) - start)
+
+
+def _uncache_ours_upto(fd, offset, first_region, remaining_regions):
+    """Uncache the pages of fd indicated by first_region and
+    remaining_regions that are before offset, where each region is a
+    (start_page, count) pair.  The final region must have a start_page
+    of None."""
+    rstart, rlen = first_region
+    while rstart is not None and (rstart + rlen) * _page_size <= offset:
+        _fadvise_pages_done(fd, rstart, rlen)
+        rstart, rlen = next(remaining_regions, (None, None))
+    return (rstart, rlen)
 
 
 def readfile_iter(files, progress=None):
     for filenum,f in enumerate(files):
         ofs = 0
         b = ''
+        fd = rpr = rstart = rlen = None
+        if _fmincore and hasattr(f, 'fileno'):
+            fd = f.fileno()
+            max_chunk = max(1, (8 * 1024 * 1024) / _page_size)
+            rpr = _nonresident_page_regions(_helpers.fmincore(fd), max_chunk)
+            rstart, rlen = next(rpr, (None, None))
         while 1:
             if progress:
                 progress(filenum, len(b))
             b = f.read(BLOB_READ_SIZE)
             ofs += len(b)
-            # Warning: ofs == 0 means 'done with the whole file'
-            # This will only happen here when the file is empty
-            _fadvise_done(f, 0, ofs)
+            if rpr:
+                rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
             if not b:
                 break
             yield b
+        if rpr:
+            rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
 
 
 def _splitbuf(buf, basebits, fanbits):
index 62d973e6410a51ca55e3d4ff6b86709388c3c511..8b52acf5c2f9aa9ce1e67c0a8e3b009479b8f282 100644 (file)
@@ -2,6 +2,80 @@ from bup import hashsplit, _helpers
 from wvtest import *
 from cStringIO import StringIO
 
+def nr_regions(x, max_count=None):
+    return list(hashsplit._nonresident_page_regions(''.join(map(chr, x)),
+                                                    max_count))
+
+@wvtest
+def test_nonresident_page_regions():
+    WVPASSEQ(nr_regions([]), [])
+    WVPASSEQ(nr_regions([1]), [])
+    WVPASSEQ(nr_regions([0]), [(0, 1)])
+    WVPASSEQ(nr_regions([1, 0]), [(1, 1)])
+    WVPASSEQ(nr_regions([0, 0]), [(0, 2)])
+    WVPASSEQ(nr_regions([1, 0, 1]), [(1, 1)])
+    WVPASSEQ(nr_regions([1, 0, 0]), [(1, 2)])
+    WVPASSEQ(nr_regions([0, 1, 0]), [(0, 1), (2, 1)])
+    WVPASSEQ(nr_regions([0, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0]),
+             [(0, 2), (5, 3), (9, 2)])
+    WVPASSEQ(nr_regions([2, 42, 3, 101]), [(0, 2)])
+    # Test limit
+    WVPASSEQ(nr_regions([0, 0, 0], None), [(0, 3)])
+    WVPASSEQ(nr_regions([0, 0, 0], 1), [(0, 1), (1, 1), (2, 1)])
+    WVPASSEQ(nr_regions([0, 0, 0], 2), [(0, 2), (2, 1)])
+    WVPASSEQ(nr_regions([0, 0, 0], 3), [(0, 3)])
+    WVPASSEQ(nr_regions([0, 0, 0], 4), [(0, 3)])
+    WVPASSEQ(nr_regions([0, 0, 1], None), [(0, 2)])
+    WVPASSEQ(nr_regions([0, 0, 1], 1), [(0, 1), (1, 1)])
+    WVPASSEQ(nr_regions([0, 0, 1], 2), [(0, 2)])
+    WVPASSEQ(nr_regions([0, 0, 1], 3), [(0, 2)])
+    WVPASSEQ(nr_regions([1, 0, 0], None), [(1, 2)])
+    WVPASSEQ(nr_regions([1, 0, 0], 1), [(1, 1), (2, 1)])
+    WVPASSEQ(nr_regions([1, 0, 0], 2), [(1, 2)])
+    WVPASSEQ(nr_regions([1, 0, 0], 3), [(1, 2)])
+    WVPASSEQ(nr_regions([1, 0, 0, 0, 1], None), [(1, 3)])
+    WVPASSEQ(nr_regions([1, 0, 0, 0, 1], 1), [(1, 1), (2, 1), (3, 1)])
+    WVPASSEQ(nr_regions([1, 0, 0, 0, 1], 2), [(1, 2), (3, 1)])
+    WVPASSEQ(nr_regions([1, 0, 0, 0, 1], 3), [(1, 3)])
+    WVPASSEQ(nr_regions([1, 0, 0, 0, 1], 4), [(1, 3)])
+
+
+@wvtest
+def test_uncache_ours_upto():
+    history = []
+    def mock_fadvise_pages_done(f, ofs, len):
+        history.append((f, ofs, len))
+
+    uncache_upto = hashsplit._uncache_ours_upto
+    page_size = os.sysconf("SC_PAGE_SIZE")
+    orig_pages_done = hashsplit._fadvise_pages_done
+    try:
+        hashsplit._fadvise_pages_done = mock_fadvise_pages_done
+        history = []
+        uncache_upto(42, 0, (0, 1), iter([]))
+        WVPASSEQ([], history)
+        uncache_upto(42, page_size, (0, 1), iter([]))
+        WVPASSEQ([(42, 0, 1)], history)
+        history = []
+        uncache_upto(42, page_size, (0, 3), iter([(5, 2)]))
+        WVPASSEQ([], history)
+        uncache_upto(42, 2 * page_size, (0, 3), iter([(5, 2)]))
+        WVPASSEQ([], history)
+        uncache_upto(42, 3 * page_size, (0, 3), iter([(5, 2)]))
+        WVPASSEQ([(42, 0, 3)], history)
+        history = []
+        uncache_upto(42, 5 * page_size, (0, 3), iter([(5, 2)]))
+        WVPASSEQ([(42, 0, 3)], history)
+        history = []
+        uncache_upto(42, 6 * page_size, (0, 3), iter([(5, 2)]))
+        WVPASSEQ([(42, 0, 3)], history)
+        history = []
+        uncache_upto(42, 7 * page_size, (0, 3), iter([(5, 2)]))
+        WVPASSEQ([(42, 0, 3), (42, 5, 2)], history)
+    finally:
+        hashsplit._fadvise_pages_done = orig_pages_done
+
+
 @wvtest
 def test_rolling_sums():
     WVPASS(_helpers.selftest())