]> arthur.barton.de Git - bup.git/blobdiff - lib/bup/hashsplit.py
hashsplit: replace join_bytes with cat_bytes
[bup.git] / lib / bup / hashsplit.py
index 5de6a3fa1870e580fe1cd89e402c2d2b6fe64aa5..0ae6acdb669a624a58688e3bc760ba01d0a35ddd 100644 (file)
@@ -1,34 +1,53 @@
-import math
-from bup import _helpers
-from bup.helpers import *
 
-BLOB_LWM = 8192*2
-BLOB_MAX = BLOB_LWM*2
-BLOB_HWM = 1024*1024
+from __future__ import absolute_import
+import io, math, os
+
+from bup import _helpers, compat, helpers
+from bup._helpers import cat_bytes
+from bup.compat import buffer, py_maj
+from bup.helpers import sc_page_size
+
+
+_fmincore = getattr(helpers, 'fmincore', None)
+
+BLOB_MAX = 8192*4   # 8192 is the "typical" blob size for bupsplit
+BLOB_READ_SIZE = 1024*1024
 MAX_PER_TREE = 256
 progress_callback = None
-max_pack_size = 1000*1000*1000  # larger packs will slow down pruning
-max_pack_objects = 200*1000  # cache memory usage is about 83 bytes per object
 fanout = 16
 
+GIT_MODE_FILE = 0o100644
+GIT_MODE_TREE = 0o40000
+GIT_MODE_SYMLINK = 0o120000
+
+# The purpose of this type of buffer is to avoid copying on peek(), get(),
+# and eat().  We do copy the buffer contents on put(), but that should
+# be ok if we always only put() large amounts of data at a time.
 class Buf:
     def __init__(self):
-        self.data = ''
+        self.data = b''
         self.start = 0
 
     def put(self, s):
         if s:
-            self.data = buffer(self.data, self.start) + s
+            remaining = len(self.data) - self.start
+            self.data = cat_bytes(self.data, self.start, remaining,
+                                  s, 0, len(s))
             self.start = 0
             
     def peek(self, count):
+        if count <= 256:
+            return self.data[self.start : self.start + count]
         return buffer(self.data, self.start, count)
     
     def eat(self, count):
         self.start += count
 
     def get(self, count):
-        v = buffer(self.data, self.start, count)
+        if count <= 256:
+            v = self.data[self.start : self.start + count]
+        else:
+            v = buffer(self.data, self.start, count)
         self.start += count
         return v
 
@@ -36,60 +55,112 @@ class Buf:
         return len(self.data) - self.start
 
 
-def splitbuf(buf):
-    b = buf.peek(buf.used())
-    (ofs, bits) = _helpers.splitbuf(b)
-    if ofs:
-        buf.eat(ofs)
-        return (buffer(b, 0, ofs), bits)
-    return (None, 0)
+def _fadvise_pages_done(fd, first_page, count):
+    assert(first_page >= 0)
+    assert(count >= 0)
+    if count > 0:
+        _helpers.fadvise_done(fd,
+                              first_page * sc_page_size,
+                              count * sc_page_size)
+
+
+def _nonresident_page_regions(status_bytes, incore_mask, max_region_len=None):
+    """Return (start_page, count) pairs in ascending start_page order for
+    each contiguous region of nonresident pages indicated by the
+    mincore() status_bytes.  Limit the number of pages in each region
+    to max_region_len."""
+    assert(max_region_len is None or max_region_len > 0)
+    start = None
+    for i, x in enumerate(status_bytes):
+        in_core = x & incore_mask
+        if start is None:
+            if not in_core:
+                start = i
+        else:
+            count = i - start
+            if in_core:
+                yield (start, count)
+                start = None
+            elif max_region_len and count >= max_region_len:
+                yield (start, count)
+                start = i
+    if start is not None:
+        yield (start, len(status_bytes) - start)
+
+
+def _uncache_ours_upto(fd, offset, first_region, remaining_regions):
+    """Uncache the pages of fd indicated by first_region and
+    remaining_regions that are before offset, where each region is a
+    (start_page, count) pair.  The final region must have a start_page
+    of None."""
+    rstart, rlen = first_region
+    while rstart is not None and (rstart + rlen) * sc_page_size <= offset:
+        _fadvise_pages_done(fd, rstart, rlen)
+        rstart, rlen = next(remaining_regions, (None, None))
+    return (rstart, rlen)
 
 
-def blobiter(files, progress=None):
+def readfile_iter(files, progress=None):
     for filenum,f in enumerate(files):
         ofs = 0
         b = ''
+        fd = rpr = rstart = rlen = None
+        if _fmincore and hasattr(f, 'fileno'):
+            try:
+                fd = f.fileno()
+            except io.UnsupportedOperation:
+                pass
+            if fd:
+                mcore = _fmincore(fd)
+                if mcore:
+                    max_chunk = max(1, (8 * 1024 * 1024) / sc_page_size)
+                    rpr = _nonresident_page_regions(mcore, helpers.MINCORE_INCORE,
+                                                    max_chunk)
+                    rstart, rlen = next(rpr, (None, None))
         while 1:
             if progress:
                 progress(filenum, len(b))
-            fadvise_done(f, max(0, ofs - 1024*1024))
-            b = f.read(BLOB_HWM)
+            b = f.read(BLOB_READ_SIZE)
             ofs += len(b)
+            if rpr:
+                rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
             if not b:
-                fadvise_done(f, ofs)
                 break
             yield b
+        if rpr:
+            rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
 
 
-def drainbuf(buf, finalize):
+def _splitbuf(buf, basebits, fanbits):
     while 1:
-        (blob, bits) = splitbuf(buf)
-        if blob:
-            yield (blob, bits)
+        b = buf.peek(buf.used())
+        (ofs, bits) = _helpers.splitbuf(b)
+        if ofs:
+            if ofs > BLOB_MAX:
+                ofs = BLOB_MAX
+                level = 0
+            else:
+                level = (bits-basebits)//fanbits  # integer division
+            buf.eat(ofs)
+            yield buffer(b, 0, ofs), level
         else:
             break
-    if buf.used() > BLOB_MAX:
+    while buf.used() >= BLOB_MAX:
         # limit max blob size
-        yield (buf.get(buf.used()), 0)
-    elif finalize and buf.used():
-        yield (buf.get(buf.used()), 0)
+        yield buf.get(BLOB_MAX), 0
 
 
 def _hashsplit_iter(files, progress):
-    assert(BLOB_HWM > BLOB_MAX)
+    assert(BLOB_READ_SIZE > BLOB_MAX)
+    basebits = _helpers.blobbits()
+    fanbits = int(math.log(fanout or 128, 2))
     buf = Buf()
-    fi = blobiter(files, progress)
-    while 1:
-        for i in drainbuf(buf, finalize=False):
-            yield i
-        while buf.used() < BLOB_HWM:
-            bnew = next(fi)
-            if not bnew:
-                # eof
-                for i in drainbuf(buf, finalize=True):
-                    yield i
-                return
-            buf.put(bnew)
+    for inblock in readfile_iter(files, progress):
+        buf.put(inblock)
+        for buf_and_level in _splitbuf(buf, basebits, fanbits):
+            yield buf_and_level
+    if buf.used():
+        yield buf.get(buf.used()), 0
 
 
 def _hashsplit_iter_keep_boundaries(files, progress):
@@ -101,8 +172,8 @@ def _hashsplit_iter_keep_boundaries(files, progress):
                 return progress(real_filenum, nbytes)
         else:
             prog = None
-        for i in _hashsplit_iter([f], progress=prog):
-            yield i
+        for buf_and_level in _hashsplit_iter([f], progress=prog):
+            yield buf_and_level
 
 
 def hashsplit_iter(files, keep_boundaries, progress):
@@ -113,76 +184,74 @@ def hashsplit_iter(files, keep_boundaries, progress):
 
 
 total_split = 0
-def _split_to_blobs(w, files, keep_boundaries, progress):
+def split_to_blobs(makeblob, files, keep_boundaries, progress):
     global total_split
-    for (blob, bits) in hashsplit_iter(files, keep_boundaries, progress):
-        sha = w.new_blob(blob)
+    for (blob, level) in hashsplit_iter(files, keep_boundaries, progress):
+        sha = makeblob(blob)
         total_split += len(blob)
-        if w.outbytes >= max_pack_size or w.count >= max_pack_objects:
-            w.breakpoint()
         if progress_callback:
             progress_callback(len(blob))
-        yield (sha, len(blob), bits)
+        yield (sha, len(blob), level)
 
 
 def _make_shalist(l):
     ofs = 0
+    l = list(l)
+    total = sum(size for mode,sha,size, in l)
+    vlen = len(b'%x' % total)
     shalist = []
     for (mode, sha, size) in l:
-        shalist.append((mode, '%016x' % ofs, sha))
+        shalist.append((mode, b'%0*x' % (vlen,ofs), sha))
         ofs += size
-    total = ofs
+    assert(ofs == total)
     return (shalist, total)
 
 
-def _squish(w, stacks, n):
+def _squish(maketree, stacks, n):
     i = 0
-    while i<n or len(stacks[i]) > MAX_PER_TREE:
+    while i < n or len(stacks[i]) >= MAX_PER_TREE:
         while len(stacks) <= i+1:
             stacks.append([])
         if len(stacks[i]) == 1:
             stacks[i+1] += stacks[i]
         elif stacks[i]:
             (shalist, size) = _make_shalist(stacks[i])
-            tree = w.new_tree(shalist)
-            stacks[i+1].append(('40000', tree, size))
+            tree = maketree(shalist)
+            stacks[i+1].append((GIT_MODE_TREE, tree, size))
         stacks[i] = []
         i += 1
 
 
-def split_to_shalist(w, files, keep_boundaries, progress=None):
-    sl = _split_to_blobs(w, files, keep_boundaries, progress)
+def split_to_shalist(makeblob, maketree, files,
+                     keep_boundaries, progress=None):
+    sl = split_to_blobs(makeblob, files, keep_boundaries, progress)
+    assert(fanout != 0)
     if not fanout:
         shal = []
-        for (sha,size,bits) in sl:
-            shal.append(('100644', sha, size))
+        for (sha,size,level) in sl:
+            shal.append((GIT_MODE_FILE, sha, size))
         return _make_shalist(shal)[0]
     else:
-        base_bits = _helpers.blobbits()
-        fanout_bits = int(math.log(fanout, 2))
-        def bits_to_idx(n):
-            assert(n >= base_bits)
-            return (n - base_bits)/fanout_bits
         stacks = [[]]
-        for (sha,size,bits) in sl:
-            assert(bits <= 32)
-            stacks[0].append(('100644', sha, size))
-            if bits > base_bits:
-                _squish(w, stacks, bits_to_idx(bits))
+        for (sha,size,level) in sl:
+            stacks[0].append((GIT_MODE_FILE, sha, size))
+            _squish(maketree, stacks, level)
         #log('stacks: %r\n' % [len(i) for i in stacks])
-        _squish(w, stacks, len(stacks)-1)
+        _squish(maketree, stacks, len(stacks)-1)
         #log('stacks: %r\n' % [len(i) for i in stacks])
         return _make_shalist(stacks[-1])[0]
 
 
-def split_to_blob_or_tree(w, files, keep_boundaries):
-    shalist = list(split_to_shalist(w, files, keep_boundaries))
+def split_to_blob_or_tree(makeblob, maketree, files,
+                          keep_boundaries, progress=None):
+    shalist = list(split_to_shalist(makeblob, maketree,
+                                    files, keep_boundaries, progress))
     if len(shalist) == 1:
         return (shalist[0][0], shalist[0][2])
     elif len(shalist) == 0:
-        return ('100644', w.new_blob(''))
+        return (GIT_MODE_FILE, makeblob(b''))
     else:
-        return ('40000', w.new_tree(shalist))
+        return (GIT_MODE_TREE, maketree(shalist))
 
 
 def open_noatime(name):
@@ -195,9 +264,3 @@ def open_noatime(name):
         except:
             pass
         raise
-
-
-def fadvise_done(f, ofs):
-    assert(ofs >= 0)
-    if ofs > 0 and hasattr(f, 'fileno'):
-        _helpers.fadvise_done(f.fileno(), ofs)