]> arthur.barton.de Git - bup.git/commitdiff
Combine and speed up idx->midx and bupindex merge
authorBrandon Low <lostlogic@lostlogicx.com>
Mon, 24 Jan 2011 03:31:51 +0000 (19:31 -0800)
committerAvery Pennarun <apenwarr@gmail.com>
Wed, 26 Jan 2011 03:58:47 +0000 (19:58 -0800)
These two processes used almost identical algorithms, but were
implemented separately.  The main difference was one was ascending and
the other was descending.

This patch reverses the cmp on index.Entry so that both can share an
algorithm.

It also cuts some overhead in the algorithm by using it.next() instead of
the next() wrapper, yielding a ~6% speedup on midx generation and index merging.

Signed-off-by: Brandon Low <lostlogic@lostlogicx.com>
cmd/index-cmd.py
lib/bup/git.py
lib/bup/helpers.py
lib/bup/index.py
lib/bup/t/tindex.py

index 0ff637ab97a809d4632148d732822f1518572f9b..16114575be1b88f279eb2cb2e6e07269bb5e2b21 100755 (executable)
@@ -4,12 +4,6 @@ from bup import options, git, index, drecurse
 from bup.helpers import *
 
 
-def merge_indexes(out, r1, r2):
-    for e in index.MergeIter([r1, r2]):
-        # FIXME: shouldn't we remove deleted entries eventually?  When?
-        out.add_ixentry(e)
-
-
 class IterHelper:
     def __init__(self, l):
         self.i = iter(l)
@@ -108,7 +102,11 @@ def update_index(top, excluded_paths):
                 log('check: before merging: newfile\n')
                 check_index(wr)
             mi = index.Writer(indexfile)
-            merge_indexes(mi, ri, wr)
+
+            for e in index.merge(ri, wr):
+                # FIXME: shouldn't we remove deleted entries eventually?  When?
+                mi.add_ixentry(e)
+
             ri.close()
             mi.close()
             wr.close()
index fc48e51ee8eee4122aa3ae175bc433827de188e5..cd1682afc009b2f3b09d46fb6a8cf0b00e0c3cf7 100644 (file)
@@ -2,7 +2,7 @@
 bup repositories are in Git format. This library allows us to
 interact with the Git data structures.
 """
-import os, sys, zlib, time, subprocess, struct, stat, re, tempfile, heapq
+import os, sys, zlib, time, subprocess, struct, stat, re, tempfile
 from bup.helpers import *
 from bup import _helpers, path
 
@@ -500,28 +500,13 @@ def open_idx(filename):
 
 def idxmerge(idxlist, final_progress=True):
     """Generate a list of all the objects reachable in a PackIdxList."""
-    total = sum(len(i) for i in idxlist)
-    iters = (iter(i) for i in idxlist)
-    heap = [(next(it), it) for it in iters]
-    heapq.heapify(heap)
-    count = 0
-    last = None
-    while heap:
-        if (count % 10024) == 0:
-            progress('Reading indexes: %.2f%% (%d/%d)\r'
-                     % (count*100.0/total, count, total))
-        (e, it) = heap[0]
-        if e != last:
-            yield e
-            last = e
-        count += 1
-        e = next(it)
-        if e:
-            heapq.heapreplace(heap, (e, it))
-        else:
-            heapq.heappop(heap)
-    if final_progress:
-        log('Reading indexes: %.2f%% (%d/%d), done.\n' % (100, total, total))
+    def pfunc(count, total):
+        progress('Reading indexes: %.2f%% (%d/%d)\r'
+                 % (count*100.0/total, count, total))
+    def pfinal(count, total):
+        if final_progress:
+            log('Reading indexes: %.2f%% (%d/%d), done.\n' % (100, total, total))
+    return merge_iter(idxlist, 10024, pfunc, pfinal)
 
 
 def _make_objcache():
index 36b95602e9c8f2088ec9fd55d9693f47619b1d4f..8a4da97056caf558a3d1aa39c0898f59329b1b4f 100644 (file)
@@ -1,6 +1,7 @@
 """Helper functions and classes for bup."""
 
 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re
+import heapq, operator
 from bup import _version
 
 # This function should really be in helpers, not in bup.options.  But we
@@ -87,6 +88,36 @@ def next(it):
         return None
 
 
+def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
+    if key:
+        samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
+    else:
+        samekey = operator.eq
+    count = 0
+    total = sum(len(it) for it in iters)
+    iters = (iter(it) for it in iters)
+    heap = ((next(it),it) for it in iters)
+    heap = [(e,it) for e,it in heap if e]
+
+    heapq.heapify(heap)
+    pe = None
+    while heap:
+        if not count % pfreq:
+            pfunc(count, total)
+        e, it = heap[0]
+        if not samekey(e, pe):
+            pe = e
+            yield e
+        count += 1
+        try:
+            e = it.next() # Don't use next() function, it's too expensive
+        except StopIteration:
+            heapq.heappop(heap) # remove current
+        else:
+            heapq.heapreplace(heap, (e, it)) # shift current to new location
+    pfinal(count, total)
+
+
 def unlink(f):
     """Delete a file at path 'f' if it currently exists.
 
index 348b73d8ea45a1ce16ff973161a8edd19cd4fcc2..72a7296ea4fda43e299662873fbc77aa8947ff89 100644 (file)
@@ -160,9 +160,9 @@ class Entry:
         return not self.ctime
 
     def __cmp__(a, b):
-        return (cmp(a.name, b.name)
-                or -cmp(a.is_valid(), b.is_valid())
-                or -cmp(a.is_fake(), b.is_fake()))
+        return (cmp(b.name, a.name)
+                or cmp(a.is_valid(), b.is_valid())
+                or cmp(a.is_fake(), b.is_fake()))
 
     def write(self, f):
         f.write(self.basename + '\0' + self.packed())
@@ -452,36 +452,9 @@ def reduce_paths(paths):
     paths.sort(reverse=True)
     return paths
 
-
-class MergeIter:
-    def __init__(self, iters):
-        self.iters = iters
-
-    def __len__(self):
-        # FIXME: doesn't remove duplicated entries between iters.
-        # That only happens for parent directories, but will mean the
-        # actual iteration returns fewer entries than this function counts.
-        return sum(len(it) for it in self.iters)
-
-    def __iter__(self):
-        total = len(self)
-        l = [iter(it) for it in self.iters]
-        l = [(next(it),it) for it in l]
-        l = filter(lambda x: x[0], l)
-        count = 0
-        lastname = None
-        while l:
-            if not (count % 1024):
-                progress('bup: merging indexes (%d/%d)\r' % (count, total))
-            l.sort()
-            (e,it) = l.pop()
-            if not e:
-                continue
-            if e.name != lastname:
-                yield e
-                lastname = e.name
-            n = next(it)
-            if n:
-                l.append((n,it))
-            count += 1
+def merge(*iters):
+    def pfunc(count, total):
+        progress('bup: merging indexes (%d/%d)\r' % (count, total))
+    def pfinal(count, total):
         log('bup: merging indexes (%d/%d), done.\n' % (count, total))
+    return merge_iter(iters, 1024, pfunc, pfinal, key='name')
index 1fee408c4c512889b8b66a5471555f0fe22d53d4..4dd1411084be6bda2899a10dd92975cb00772941 100644 (file)
@@ -84,8 +84,7 @@ def index_dirty():
     r3all = [e.name for e in r3]
     WVPASSEQ(r3all,
              ['/a/c/n/3', '/a/c/n/', '/a/c/', '/a/', '/'])
-    m = index.MergeIter([r2,r1,r3])
-    all = [e.name for e in m]
+    all = [e.name for e in index.merge(r2, r1, r3)]
     WVPASSEQ(all,
              ['/a/c/n/3', '/a/c/n/', '/a/c/',
               '/a/b/x', '/a/b/n/2', '/a/b/n/', '/a/b/c',
@@ -96,27 +95,27 @@ def index_dirty():
     print [hex(e.flags) for e in r1]
     WVPASSEQ([e.name for e in r1 if e.is_valid()], r1all)
     WVPASSEQ([e.name for e in r1 if not e.is_valid()], [])
-    WVPASSEQ([e.name for e in m if not e.is_valid()],
+    WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
              ['/a/c/n/3', '/a/c/n/', '/a/c/',
               '/a/b/n/2', '/a/b/n/', '/a/b/', '/a/', '/'])
 
     expect_invalid = ['/'] + r2all + r3all
     expect_real = (set(r1all) - set(r2all) - set(r3all)) \
                     | set(['/a/b/n/2', '/a/c/n/3'])
-    dump(m)
-    for e in m:
+    dump(index.merge(r2, r1, r3))
+    for e in index.merge(r2, r1, r3):
         print e.name, hex(e.flags), e.ctime
         eiv = e.name in expect_invalid
         er  = e.name in expect_real
         WVPASSEQ(eiv, not e.is_valid())
         WVPASSEQ(er, e.is_real())
     fake_validate(r2, r3)
-    dump(m)
-    WVPASSEQ([e.name for e in m if not e.is_valid()], [])
+    dump(index.merge(r2, r1, r3))
+    WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()], [])
     
-    e = eget(m, '/a/b/c')
+    e = eget(index.merge(r2, r1, r3), '/a/b/c')
     e.invalidate()
     e.repack()
-    dump(m)
-    WVPASSEQ([e.name for e in m if not e.is_valid()],
+    dump(index.merge(r2, r1, r3))
+    WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
              ['/a/b/c', '/a/b/', '/a/', '/'])