X-Git-Url: https://arthur.barton.de/cgi-bin/gitweb.cgi?p=bup.git;a=blobdiff_plain;f=lib%2Fbup%2Fhashsplit.py;h=dc3a538fe52d329537a3021c289dca16c9692de7;hp=d73e9f689acb27662241ff6af0e8d2b98b509566;hb=bf67f94dd4f4096de4eee07a7dc377d6c889a016;hpb=5570b95f0519dfa7c2ce78413131d4433738cc8f diff --git a/lib/bup/hashsplit.py b/lib/bup/hashsplit.py index d73e9f6..dc3a538 100644 --- a/lib/bup/hashsplit.py +++ b/lib/bup/hashsplit.py @@ -1,24 +1,35 @@ -import math -from bup import _helpers -from bup.helpers import * -BLOB_LWM = 8192*2 -BLOB_MAX = BLOB_LWM*2 -BLOB_HWM = 1024*1024 +from __future__ import absolute_import +import io, math, os + +from bup import _helpers, compat, helpers +from bup.compat import buffer, join_bytes +from bup.helpers import sc_page_size + + +_fmincore = getattr(helpers, 'fmincore', None) + +BLOB_MAX = 8192*4 # 8192 is the "typical" blob size for bupsplit +BLOB_READ_SIZE = 1024*1024 MAX_PER_TREE = 256 progress_callback = None -max_pack_size = 1000*1000*1000 # larger packs will slow down pruning -max_pack_objects = 200*1000 # cache memory usage is about 83 bytes per object fanout = 16 +GIT_MODE_FILE = 0o100644 +GIT_MODE_TREE = 0o40000 +GIT_MODE_SYMLINK = 0o120000 + +# The purpose of this type of buffer is to avoid copying on peek(), get(), +# and eat(). We do copy the buffer contents on put(), but that should +# be ok if we always only put() large amounts of data at a time. class Buf: def __init__(self): - self.data = '' + self.data = b'' self.start = 0 def put(self, s): if s: - self.data = buffer(self.data, self.start) + s + self.data = join_bytes(buffer(self.data, self.start), s) self.start = 0 def peek(self, count): @@ -28,7 +39,10 @@ class Buf: self.start += count def get(self, count): - v = buffer(self.data, self.start, count) + if count <= 256: + v = self.data[self.start : self.start + count] + else: + v = buffer(self.data, self.start, count) self.start += count return v @@ -36,143 +50,203 @@ class Buf: return len(self.data) - self.start -def splitbuf(buf): - b = buf.peek(buf.used()) - (ofs, bits) = _helpers.splitbuf(b) - if ofs: - buf.eat(ofs) - return (buffer(b, 0, ofs), bits) - return (None, 0) +def _fadvise_pages_done(fd, first_page, count): + assert(first_page >= 0) + assert(count >= 0) + if count > 0: + _helpers.fadvise_done(fd, + first_page * sc_page_size, + count * sc_page_size) -def blobiter(files): - for f in files: +def _nonresident_page_regions(status_bytes, incore_mask, max_region_len=None): + """Return (start_page, count) pairs in ascending start_page order for + each contiguous region of nonresident pages indicated by the + mincore() status_bytes. Limit the number of pages in each region + to max_region_len.""" + assert(max_region_len is None or max_region_len > 0) + start = None + for i, x in enumerate(status_bytes): + in_core = x & incore_mask + if start is None: + if not in_core: + start = i + else: + count = i - start + if in_core: + yield (start, count) + start = None + elif max_region_len and count >= max_region_len: + yield (start, count) + start = i + if start is not None: + yield (start, len(status_bytes) - start) + + +def _uncache_ours_upto(fd, offset, first_region, remaining_regions): + """Uncache the pages of fd indicated by first_region and + remaining_regions that are before offset, where each region is a + (start_page, count) pair. The final region must have a start_page + of None.""" + rstart, rlen = first_region + while rstart is not None and (rstart + rlen) * sc_page_size <= offset: + _fadvise_pages_done(fd, rstart, rlen) + rstart, rlen = next(remaining_regions, (None, None)) + return (rstart, rlen) + + +def readfile_iter(files, progress=None): + for filenum,f in enumerate(files): ofs = 0 + b = '' + fd = rpr = rstart = rlen = None + if _fmincore and hasattr(f, 'fileno'): + try: + fd = f.fileno() + except io.UnsupportedOperation: + pass + if fd: + mcore = _fmincore(fd) + if mcore: + max_chunk = max(1, (8 * 1024 * 1024) / sc_page_size) + rpr = _nonresident_page_regions(mcore, helpers.MINCORE_INCORE, + max_chunk) + rstart, rlen = next(rpr, (None, None)) while 1: - fadvise_done(f, max(0, ofs - 1024*1024)) - b = f.read(BLOB_HWM) + if progress: + progress(filenum, len(b)) + b = f.read(BLOB_READ_SIZE) ofs += len(b) + if rpr: + rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr) if not b: - fadvise_done(f, ofs) break yield b + if rpr: + rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr) -def drainbuf(buf, finalize): +def _splitbuf(buf, basebits, fanbits): while 1: - (blob, bits) = splitbuf(buf) - if blob: - yield (blob, bits) + b = buf.peek(buf.used()) + (ofs, bits) = _helpers.splitbuf(b) + if ofs: + if ofs > BLOB_MAX: + ofs = BLOB_MAX + level = 0 + else: + level = (bits-basebits)//fanbits # integer division + buf.eat(ofs) + yield buffer(b, 0, ofs), level else: break - if buf.used() > BLOB_MAX: + while buf.used() >= BLOB_MAX: # limit max blob size - yield (buf.get(buf.used()), 0) - elif finalize and buf.used(): - yield (buf.get(buf.used()), 0) + yield buf.get(BLOB_MAX), 0 -def _hashsplit_iter(files): - assert(BLOB_HWM > BLOB_MAX) +def _hashsplit_iter(files, progress): + assert(BLOB_READ_SIZE > BLOB_MAX) + basebits = _helpers.blobbits() + fanbits = int(math.log(fanout or 128, 2)) buf = Buf() - fi = blobiter(files) - while 1: - for i in drainbuf(buf, finalize=False): - yield i - while buf.used() < BLOB_HWM: - bnew = next(fi) - if not bnew: - # eof - for i in drainbuf(buf, finalize=True): - yield i - return - buf.put(bnew) + for inblock in readfile_iter(files, progress): + buf.put(inblock) + for buf_and_level in _splitbuf(buf, basebits, fanbits): + yield buf_and_level + if buf.used(): + yield buf.get(buf.used()), 0 -def _hashsplit_iter_keep_boundaries(files): - for f in files: - for i in _hashsplit_iter([f]): - yield i +def _hashsplit_iter_keep_boundaries(files, progress): + for real_filenum,f in enumerate(files): + if progress: + def prog(filenum, nbytes): + # the inner _hashsplit_iter doesn't know the real file count, + # so we'll replace it here. + return progress(real_filenum, nbytes) + else: + prog = None + for buf_and_level in _hashsplit_iter([f], progress=prog): + yield buf_and_level -def hashsplit_iter(files, keep_boundaries): +def hashsplit_iter(files, keep_boundaries, progress): if keep_boundaries: - return _hashsplit_iter_keep_boundaries(files) + return _hashsplit_iter_keep_boundaries(files, progress) else: - return _hashsplit_iter(files) + return _hashsplit_iter(files, progress) total_split = 0 -def _split_to_blobs(w, files, keep_boundaries): +def split_to_blobs(makeblob, files, keep_boundaries, progress): global total_split - for (blob, bits) in hashsplit_iter(files, keep_boundaries): - sha = w.new_blob(blob) + for (blob, level) in hashsplit_iter(files, keep_boundaries, progress): + sha = makeblob(blob) total_split += len(blob) - if w.outbytes >= max_pack_size or w.count >= max_pack_objects: - w.breakpoint() if progress_callback: progress_callback(len(blob)) - yield (sha, len(blob), bits) + yield (sha, len(blob), level) def _make_shalist(l): ofs = 0 + l = list(l) + total = sum(size for mode,sha,size, in l) + vlen = len(b'%x' % total) shalist = [] for (mode, sha, size) in l: - shalist.append((mode, '%016x' % ofs, sha)) + shalist.append((mode, b'%0*x' % (vlen,ofs), sha)) ofs += size - total = ofs + assert(ofs == total) return (shalist, total) -def _squish(w, stacks, n): +def _squish(maketree, stacks, n): i = 0 - while i MAX_PER_TREE: + while i < n or len(stacks[i]) >= MAX_PER_TREE: while len(stacks) <= i+1: stacks.append([]) if len(stacks[i]) == 1: stacks[i+1] += stacks[i] elif stacks[i]: (shalist, size) = _make_shalist(stacks[i]) - tree = w.new_tree(shalist) - stacks[i+1].append(('40000', tree, size)) + tree = maketree(shalist) + stacks[i+1].append((GIT_MODE_TREE, tree, size)) stacks[i] = [] i += 1 -def split_to_shalist(w, files, keep_boundaries): - sl = _split_to_blobs(w, files, keep_boundaries) +def split_to_shalist(makeblob, maketree, files, + keep_boundaries, progress=None): + sl = split_to_blobs(makeblob, files, keep_boundaries, progress) + assert(fanout != 0) if not fanout: shal = [] - for (sha,size,bits) in sl: - shal.append(('100644', sha, size)) + for (sha,size,level) in sl: + shal.append((GIT_MODE_FILE, sha, size)) return _make_shalist(shal)[0] else: - base_bits = _helpers.blobbits() - fanout_bits = int(math.log(fanout, 2)) - def bits_to_idx(n): - assert(n >= base_bits) - return (n - base_bits)/fanout_bits stacks = [[]] - for (sha,size,bits) in sl: - assert(bits <= 32) - stacks[0].append(('100644', sha, size)) - if bits > base_bits: - _squish(w, stacks, bits_to_idx(bits)) + for (sha,size,level) in sl: + stacks[0].append((GIT_MODE_FILE, sha, size)) + _squish(maketree, stacks, level) #log('stacks: %r\n' % [len(i) for i in stacks]) - _squish(w, stacks, len(stacks)-1) + _squish(maketree, stacks, len(stacks)-1) #log('stacks: %r\n' % [len(i) for i in stacks]) return _make_shalist(stacks[-1])[0] -def split_to_blob_or_tree(w, files, keep_boundaries): - shalist = list(split_to_shalist(w, files, keep_boundaries)) +def split_to_blob_or_tree(makeblob, maketree, files, + keep_boundaries, progress=None): + shalist = list(split_to_shalist(makeblob, maketree, + files, keep_boundaries, progress)) if len(shalist) == 1: return (shalist[0][0], shalist[0][2]) elif len(shalist) == 0: - return ('100644', w.new_blob('')) + return (GIT_MODE_FILE, makeblob(b'')) else: - return ('40000', w.new_tree(shalist)) + return (GIT_MODE_TREE, maketree(shalist)) def open_noatime(name): @@ -185,9 +259,3 @@ def open_noatime(name): except: pass raise - - -def fadvise_done(f, ofs): - assert(ofs >= 0) - if ofs > 0 and hasattr(f, 'fileno'): - _helpers.fadvise_done(f.fileno(), ofs)