-import sys, math
-from bup import git, _hashsplit
-from bup.helpers import *
-BLOB_LWM = 8192*2
-BLOB_MAX = BLOB_LWM*2
-BLOB_HWM = 1024*1024
+from __future__ import absolute_import
+import io, math, os
+
+from bup import _helpers, compat, helpers
+from bup.compat import buffer, join_bytes
+from bup.helpers import sc_page_size
+
+
+_fmincore = getattr(helpers, 'fmincore', None)
+
+BLOB_MAX = 8192*4 # 8192 is the "typical" blob size for bupsplit
+BLOB_READ_SIZE = 1024*1024
MAX_PER_TREE = 256
progress_callback = None
-max_pack_size = 1000*1000*1000 # larger packs will slow down pruning
-max_pack_objects = 200*1000 # cache memory usage is about 83 bytes per object
fanout = 16
+GIT_MODE_FILE = 0o100644
+GIT_MODE_TREE = 0o40000
+GIT_MODE_SYMLINK = 0o120000
+
+# The purpose of this type of buffer is to avoid copying on peek(), get(),
+# and eat(). We do copy the buffer contents on put(), but that should
+# be ok if we always only put() large amounts of data at a time.
class Buf:
def __init__(self):
- self.data = ''
+ self.data = b''
self.start = 0
def put(self, s):
if s:
- self.data = buffer(self.data, self.start) + s
+ self.data = join_bytes(buffer(self.data, self.start), s)
self.start = 0
def peek(self, count):
self.start += count
def get(self, count):
- v = buffer(self.data, self.start, count)
+ if count <= 256:
+ v = self.data[self.start : self.start + count]
+ else:
+ v = buffer(self.data, self.start, count)
self.start += count
return v
return len(self.data) - self.start
-def splitbuf(buf):
- b = buf.peek(buf.used())
- (ofs, bits) = _hashsplit.splitbuf(b)
- if ofs:
- buf.eat(ofs)
- return (buffer(b, 0, ofs), bits)
- return (None, 0)
+def _fadvise_pages_done(fd, first_page, count):
+ assert(first_page >= 0)
+ assert(count >= 0)
+ if count > 0:
+ _helpers.fadvise_done(fd,
+ first_page * sc_page_size,
+ count * sc_page_size)
+
+
+def _nonresident_page_regions(status_bytes, incore_mask, max_region_len=None):
+ """Return (start_page, count) pairs in ascending start_page order for
+ each contiguous region of nonresident pages indicated by the
+ mincore() status_bytes. Limit the number of pages in each region
+ to max_region_len."""
+ assert(max_region_len is None or max_region_len > 0)
+ start = None
+ for i, x in enumerate(status_bytes):
+ in_core = x & incore_mask
+ if start is None:
+ if not in_core:
+ start = i
+ else:
+ count = i - start
+ if in_core:
+ yield (start, count)
+ start = None
+ elif max_region_len and count >= max_region_len:
+ yield (start, count)
+ start = i
+ if start is not None:
+ yield (start, len(status_bytes) - start)
+
+
+def _uncache_ours_upto(fd, offset, first_region, remaining_regions):
+ """Uncache the pages of fd indicated by first_region and
+ remaining_regions that are before offset, where each region is a
+ (start_page, count) pair. The final region must have a start_page
+ of None."""
+ rstart, rlen = first_region
+ while rstart is not None and (rstart + rlen) * sc_page_size <= offset:
+ _fadvise_pages_done(fd, rstart, rlen)
+ rstart, rlen = next(remaining_regions, (None, None))
+ return (rstart, rlen)
-def blobiter(files):
- for f in files:
+def readfile_iter(files, progress=None):
+ for filenum,f in enumerate(files):
ofs = 0
+ b = ''
+ fd = rpr = rstart = rlen = None
+ if _fmincore and hasattr(f, 'fileno'):
+ try:
+ fd = f.fileno()
+ except io.UnsupportedOperation:
+ pass
+ if fd:
+ mcore = _fmincore(fd)
+ if mcore:
+ max_chunk = max(1, (8 * 1024 * 1024) / sc_page_size)
+ rpr = _nonresident_page_regions(mcore, helpers.MINCORE_INCORE,
+ max_chunk)
+ rstart, rlen = next(rpr, (None, None))
while 1:
- fadvise_done(f, max(0, ofs - 1024*1024))
- b = f.read(BLOB_HWM)
+ if progress:
+ progress(filenum, len(b))
+ b = f.read(BLOB_READ_SIZE)
ofs += len(b)
+ if rpr:
+ rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
if not b:
- fadvise_done(f, ofs)
break
yield b
+ if rpr:
+ rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
-def drainbuf(buf, finalize):
+def _splitbuf(buf, basebits, fanbits):
while 1:
- (blob, bits) = splitbuf(buf)
- if blob:
- yield (blob, bits)
+ b = buf.peek(buf.used())
+ (ofs, bits) = _helpers.splitbuf(b)
+ if ofs:
+ if ofs > BLOB_MAX:
+ ofs = BLOB_MAX
+ level = 0
+ else:
+ level = (bits-basebits)//fanbits # integer division
+ buf.eat(ofs)
+ yield buffer(b, 0, ofs), level
else:
break
- if buf.used() > BLOB_MAX:
+ while buf.used() >= BLOB_MAX:
# limit max blob size
- yield (buf.get(buf.used()), 0)
- elif finalize and buf.used():
- yield (buf.get(buf.used()), 0)
+ yield buf.get(BLOB_MAX), 0
-def hashsplit_iter(files):
- assert(BLOB_HWM > BLOB_MAX)
+def _hashsplit_iter(files, progress):
+ assert(BLOB_READ_SIZE > BLOB_MAX)
+ basebits = _helpers.blobbits()
+ fanbits = int(math.log(fanout or 128, 2))
buf = Buf()
- fi = blobiter(files)
- while 1:
- for i in drainbuf(buf, finalize=False):
- yield i
- while buf.used() < BLOB_HWM:
- bnew = next(fi)
- if not bnew:
- # eof
- for i in drainbuf(buf, finalize=True):
- yield i
- return
- buf.put(bnew)
+ for inblock in readfile_iter(files, progress):
+ buf.put(inblock)
+ for buf_and_level in _splitbuf(buf, basebits, fanbits):
+ yield buf_and_level
+ if buf.used():
+ yield buf.get(buf.used()), 0
+
+
+def _hashsplit_iter_keep_boundaries(files, progress):
+ for real_filenum,f in enumerate(files):
+ if progress:
+ def prog(filenum, nbytes):
+ # the inner _hashsplit_iter doesn't know the real file count,
+ # so we'll replace it here.
+ return progress(real_filenum, nbytes)
+ else:
+ prog = None
+ for buf_and_level in _hashsplit_iter([f], progress=prog):
+ yield buf_and_level
+
+
+def hashsplit_iter(files, keep_boundaries, progress):
+ if keep_boundaries:
+ return _hashsplit_iter_keep_boundaries(files, progress)
+ else:
+ return _hashsplit_iter(files, progress)
total_split = 0
-def _split_to_blobs(w, files):
+def split_to_blobs(makeblob, files, keep_boundaries, progress):
global total_split
- for (blob, bits) in hashsplit_iter(files):
- sha = w.new_blob(blob)
+ for (blob, level) in hashsplit_iter(files, keep_boundaries, progress):
+ sha = makeblob(blob)
total_split += len(blob)
- if w.outbytes >= max_pack_size or w.count >= max_pack_objects:
- w.breakpoint()
if progress_callback:
progress_callback(len(blob))
- yield (sha, len(blob), bits)
+ yield (sha, len(blob), level)
def _make_shalist(l):
ofs = 0
+ l = list(l)
+ total = sum(size for mode,sha,size, in l)
+ vlen = len(b'%x' % total)
shalist = []
for (mode, sha, size) in l:
- shalist.append((mode, '%016x' % ofs, sha))
+ shalist.append((mode, b'%0*x' % (vlen,ofs), sha))
ofs += size
- total = ofs
+ assert(ofs == total)
return (shalist, total)
-def _squish(w, stacks, n):
+def _squish(maketree, stacks, n):
i = 0
- while i<n or len(stacks[i]) > MAX_PER_TREE:
+ while i < n or len(stacks[i]) >= MAX_PER_TREE:
while len(stacks) <= i+1:
stacks.append([])
if len(stacks[i]) == 1:
stacks[i+1] += stacks[i]
elif stacks[i]:
(shalist, size) = _make_shalist(stacks[i])
- tree = w.new_tree(shalist)
- stacks[i+1].append(('40000', tree, size))
+ tree = maketree(shalist)
+ stacks[i+1].append((GIT_MODE_TREE, tree, size))
stacks[i] = []
i += 1
-def split_to_shalist(w, files):
- sl = _split_to_blobs(w, files)
+def split_to_shalist(makeblob, maketree, files,
+ keep_boundaries, progress=None):
+ sl = split_to_blobs(makeblob, files, keep_boundaries, progress)
+ assert(fanout != 0)
if not fanout:
shal = []
- for (sha,size,bits) in sl:
- shal.append(('100644', sha, size))
+ for (sha,size,level) in sl:
+ shal.append((GIT_MODE_FILE, sha, size))
return _make_shalist(shal)[0]
else:
- base_bits = _hashsplit.blobbits()
- fanout_bits = int(math.log(fanout, 2))
- def bits_to_idx(n):
- assert(n >= base_bits)
- return (n - base_bits)/fanout_bits
stacks = [[]]
- for (sha,size,bits) in sl:
- assert(bits <= 32)
- stacks[0].append(('100644', sha, size))
- if bits > base_bits:
- _squish(w, stacks, bits_to_idx(bits))
+ for (sha,size,level) in sl:
+ stacks[0].append((GIT_MODE_FILE, sha, size))
+ _squish(maketree, stacks, level)
#log('stacks: %r\n' % [len(i) for i in stacks])
- _squish(w, stacks, len(stacks)-1)
+ _squish(maketree, stacks, len(stacks)-1)
#log('stacks: %r\n' % [len(i) for i in stacks])
return _make_shalist(stacks[-1])[0]
-def split_to_blob_or_tree(w, files):
- shalist = list(split_to_shalist(w, files))
+def split_to_blob_or_tree(makeblob, maketree, files,
+ keep_boundaries, progress=None):
+ shalist = list(split_to_shalist(makeblob, maketree,
+ files, keep_boundaries, progress))
if len(shalist) == 1:
return (shalist[0][0], shalist[0][2])
elif len(shalist) == 0:
- return ('100644', w.new_blob(''))
+ return (GIT_MODE_FILE, makeblob(b''))
else:
- return ('40000', w.new_tree(shalist))
+ return (GIT_MODE_TREE, maketree(shalist))
def open_noatime(name):
- fd = _hashsplit.open_noatime(name)
+ fd = _helpers.open_noatime(name)
try:
return os.fdopen(fd, 'rb', 1024*1024)
except:
except:
pass
raise
-
-
-def fadvise_done(f, ofs):
- assert(ofs >= 0)
- if ofs > 0:
- _hashsplit.fadvise_done(f.fileno(), ofs)