]> arthur.barton.de Git - bup.git/commitdiff
client-server: only retrieve index files when actually needed.
authorAvery Pennarun <apenwarr@gmail.com>
Mon, 11 Jan 2010 23:19:29 +0000 (18:19 -0500)
committerAvery Pennarun <apenwarr@gmail.com>
Tue, 12 Jan 2010 03:30:51 +0000 (22:30 -0500)
A busy server could end up with a *large* number of index files, mostly
referring to objects from other clients.  Downloading all the indexes not only
wastes bandwidth, but causes a more insidious problem: small servers end up
having to mmap a huge number of large index files, which sucks lots of RAM.

In general, the RAM on a server is roughly proportional to the disk space on
that server.  So it's okay for larger clients to need more RAM in order
to complete a backup.  However, it's not okay for the existence of larger
clients to make smaller clients suffer.  Hopefully this change will settle
it a bit.

client.py
cmd-index.py
cmd-server.py
git.py
helpers.py
t/test.sh
t/tgit.py

index 7be235a9a5720955f39d40eb3c497d4c60dfec9f..7b24a4345ce9d75172dea810450258b948b44b32 100644 (file)
--- a/client.py
+++ b/client.py
@@ -1,15 +1,16 @@
-import re, struct, errno
+import re, struct, errno, select
 import git
 from helpers import *
 from subprocess import Popen, PIPE
 
+
 class ClientError(Exception):
     pass
 
+
 class Client:
     def __init__(self, remote, create=False):
         self._busy = None
-        self._indexes_synced = 0
         self.p = None
         self.conn = None
         rs = remote.split(':', 1)
@@ -53,6 +54,7 @@ class Client:
             else:
                 conn.write('set-dir %s\n' % dir)
             self.check_ok()
+        self.sync_indexes_del()
 
     def __del__(self):
         try:
@@ -94,12 +96,11 @@ class Client:
     def _not_busy(self):
         self._busy = None
 
-    def sync_indexes(self):
+    def sync_indexes_del(self):
         self.check_busy()
         conn = self.conn
         conn.write('list-indexes\n')
         packdir = git.repo('objects/pack')
-        mkdirp(self.cachedir)
         all = {}
         needed = {}
         for line in linereader(conn):
@@ -111,41 +112,53 @@ class Client:
                 needed[line] = 1
         self.check_ok()
 
+        mkdirp(self.cachedir)
         for f in os.listdir(self.cachedir):
             if f.endswith('.idx') and not f in all:
                 log('pruning old index: %r\n' % f)
                 os.unlink(os.path.join(self.cachedir, f))
 
-        # FIXME this should be pipelined: request multiple indexes at a time, or
-        # we waste lots of network turnarounds.
-        for name in needed.keys():
-            log('requesting %r\n' % name)
-            conn.write('send-index %s\n' % name)
-            n = struct.unpack('!I', conn.read(4))[0]
-            assert(n)
-            log('   expect %d bytes\n' % n)
-            fn = os.path.join(self.cachedir, name)
-            f = open(fn + '.tmp', 'w')
-            for b in chunkyreader(conn, n):
-                f.write(b)
-            self.check_ok()
-            f.close()
-            os.rename(fn + '.tmp', fn)
-
-        self._indexes_synced = 1
+    def sync_index(self, name):
+        #log('requesting %r\n' % name)
+        mkdirp(self.cachedir)
+        self.conn.write('send-index %s\n' % name)
+        n = struct.unpack('!I', self.conn.read(4))[0]
+        assert(n)
+        log('   expect %d bytes\n' % n)
+        fn = os.path.join(self.cachedir, name)
+        f = open(fn + '.tmp', 'w')
+        for b in chunkyreader(self.conn, n):
+            f.write(b)
+        self.check_ok()
+        f.close()
+        os.rename(fn + '.tmp', fn)
 
     def _make_objcache(self):
         ob = self._busy
         self._busy = None
-        self.sync_indexes()
+        #self.sync_indexes()
         self._busy = ob
         return git.MultiPackIndex(self.cachedir)
 
+    def _suggest_pack(self, indexname):
+        log('received index suggestion: %s\n' % indexname)
+        ob = self._busy
+        if ob:
+            assert(ob == 'receive-objects')
+            self._busy = None
+            self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects
+            self.conn.drain_and_check_ok()
+        self.sync_index(indexname)
+        if ob:
+            self.conn.write('receive-objects\n')
+            self._busy = ob
+
     def new_packwriter(self):
         self.check_busy()
         self._busy = 'receive-objects'
         return PackWriter_Remote(self.conn,
                                  objcache_maker = self._make_objcache,
+                                 suggest_pack = self._suggest_pack,
                                  onclose = self._not_busy)
 
     def read_ref(self, refname):
@@ -179,10 +192,11 @@ class Client:
 
 
 class PackWriter_Remote(git.PackWriter):
-    def __init__(self, conn, objcache_maker=None, onclose=None):
+    def __init__(self, conn, objcache_maker, suggest_pack, onclose):
         git.PackWriter.__init__(self, objcache_maker)
         self.file = conn
         self.filename = 'remote socket'
+        self.suggest_pack = suggest_pack
         self.onclose = onclose
         self._packopen = False
 
@@ -201,6 +215,8 @@ class PackWriter_Remote(git.PackWriter):
             self.objcache = None
             if self.onclose:
                 self.onclose()
+            if self.suggest_pack:
+                self.suggest_pack(id)
             return id
 
     def close(self):
@@ -221,4 +237,10 @@ class PackWriter_Remote(git.PackWriter):
         self.outbytes += len(data)
         self.count += 1
 
-
+        if self.file.has_input():
+            line = self.file.readline().strip()
+            assert(line.startswith('index '))
+            idxname = line[6:]
+            if self.suggest_pack:
+                self.suggest_pack(idxname)
+                self.objcache.refresh()
index c3b6ce9f3f7e79f7147f59783196eb7b946a1109..47302c615ec5be72da134afff19e8a7ce0c5e34a 100755 (executable)
@@ -219,7 +219,8 @@ if opt.update:
 
 if opt['print'] or opt.status or opt.modified:
     for (name, ent) in index.Reader(indexfile).filter(extra or ['']):
-        if opt.modified and ent.flags & index.IX_HASHVALID:
+        if opt.modified and (ent.flags & index.IX_HASHVALID
+                             or stat.S_ISDIR(ent.mode)):
             continue
         line = ''
         if opt.status:
index 5de5c9972882b43d947574e72dc33c6250c59a05..09622f70fb8896f1c80c7d35065054ff203c3c1b 100755 (executable)
@@ -3,6 +3,8 @@ import sys, struct, mmap
 import options, git
 from helpers import *
 
+suspended_w = None
+
 
 def init_dir(conn, arg):
     git.init_repo(arg)
@@ -32,11 +34,17 @@ def send_index(conn, name):
     conn.write(struct.pack('!I', len(idx.map)))
     conn.write(idx.map)
     conn.ok()
-    
-            
+
+
 def receive_objects(conn, junk):
+    global suspended_w
     git.check_repo_or_die()
-    w = git.PackWriter()
+    suggested = {}
+    if suspended_w:
+        w = suspended_w
+        suspended_w = None
+    else:
+        w = git.PackWriter()
     while 1:
         ns = conn.read(4)
         if not ns:
@@ -52,13 +60,30 @@ def receive_objects(conn, junk):
             conn.write('%s.idx\n' % name)
             conn.ok()
             return
+        elif n == 0xffffffff:
+            log('bup server: receive-objects suspended.\n')
+            suspended_w = w
+            conn.ok()
+            return
+            
         buf = conn.read(n)  # object sizes in bup are reasonably small
         #log('read %d bytes\n' % n)
         if len(buf) < n:
             w.abort()
             raise Exception('object read: expected %d bytes, got %d\n'
                             % (n, len(buf)))
-        w._raw_write([buf])
+        (type, content) = git._decode_packobj(buf)
+        sha = git.calc_hash(type, content)
+        oldpack = w.exists(sha)
+        if oldpack:
+            assert(oldpack.endswith('.idx'))
+            (dir,name) = os.path.split(oldpack)
+            if not (name in suggested):
+                log("bup server: suggesting index %s\n" % name)
+                conn.write('index %s\n' % name)
+                suggested[name] = 1
+        else:
+            w._raw_write([buf])
     # NOTREACHED
 
 
diff --git a/git.py b/git.py
index 288f87553f28d30211b402d0fa8571c9970e196d..780634d0d1770ae6ddad5f12efaf427ae0db315e 100644 (file)
--- a/git.py
+++ b/git.py
@@ -5,6 +5,9 @@ verbose = 0
 home_repodir = os.path.expanduser('~/.bup')
 repodir = None
 
+_typemap =  { 'blob':3, 'tree':2, 'commit':1, 'tag':4 }
+_typermap = { 3:'blob', 2:'tree', 1:'commit', 4:'tag' }
+
 
 class GitError(Exception):
     pass
@@ -20,6 +23,62 @@ def repo(sub = ''):
     return os.path.join(repodir, sub)
 
 
+def _encode_packobj(type, content):
+    szout = ''
+    sz = len(content)
+    szbits = (sz & 0x0f) | (_typemap[type]<<4)
+    sz >>= 4
+    while 1:
+        if sz: szbits |= 0x80
+        szout += chr(szbits)
+        if not sz:
+            break
+        szbits = sz & 0x7f
+        sz >>= 7
+    z = zlib.compressobj(1)
+    yield szout
+    yield z.compress(content)
+    yield z.flush()
+
+
+def _encode_looseobj(type, content):
+    z = zlib.compressobj(1)
+    yield z.compress('%s %d\0' % (type, len(content)))
+    yield z.compress(content)
+    yield z.flush()
+
+
+def _decode_looseobj(buf):
+    assert(buf);
+    s = zlib.decompress(buf)
+    i = s.find('\0')
+    assert(i > 0)
+    l = s[:i].split(' ')
+    type = l[0]
+    sz = int(l[1])
+    content = s[i+1:]
+    assert(type in _typemap)
+    assert(sz == len(content))
+    return (type, content)
+
+
+def _decode_packobj(buf):
+    assert(buf)
+    c = ord(buf[0])
+    type = _typermap[(c & 0x70) >> 4]
+    sz = c & 0x0f
+    shift = 4
+    i = 0
+    while c & 0x80:
+        i += 1
+        c = ord(buf[i])
+        sz |= (c & 0x7f) << shift
+        shift += 7
+        if not (c & 0x80):
+            break
+    return (type, zlib.decompress(buf[i+1:]))
+
+
 class PackIndex:
     def __init__(self, filename):
         self.name = filename
@@ -71,17 +130,24 @@ class PackIndex:
         return None
 
     def exists(self, hash):
-        return (self._idx_from_hash(hash) != None) and True or None
+        return hash and (self._idx_from_hash(hash) != None) and True or None
 
 
+_mpi_count = 0
 class MultiPackIndex:
     def __init__(self, dir):
+        global _mpi_count
+        assert(_mpi_count == 0)
+        _mpi_count += 1
         self.dir = dir
         self.also = {}
         self.packs = []
-        for f in os.listdir(self.dir):
-            if f.endswith('.idx'):
-                self.packs.append(PackIndex(os.path.join(self.dir, f)))
+        self.refresh()
+
+    def __del__(self):
+        global _mpi_count
+        _mpi_count -= 1
+        assert(_mpi_count == 0)
 
     def exists(self, hash):
         if hash in self.also:
@@ -91,9 +157,18 @@ class MultiPackIndex:
             if p.exists(hash):
                 # reorder so most recently used packs are searched first
                 self.packs = [p] + self.packs[:i] + self.packs[i+1:]
-                return True
+                return p.name
         return None
 
+    def refresh(self):
+        d = dict([(p.name, 1) for p in self.packs])
+        if os.path.exists(self.dir):
+            for f in os.listdir(self.dir):
+                full = os.path.join(self.dir, f)
+                if f.endswith('.idx') and not d.get(full):
+                    self.packs.append(PackIndex(full))
+        #log('MultiPackIndex: using %d packs.\n' % len(self.packs))
+
     def add(self, hash):
         self.also[hash] = 1
 
@@ -116,7 +191,6 @@ def _shalist_sort_key(ent):
         return name
 
 
-_typemap = dict(blob=3, tree=2, commit=1, tag=8)
 class PackWriter:
     def __init__(self, objcache_maker=None):
         self.count = 0
@@ -154,25 +228,7 @@ class PackWriter:
     def _write(self, bin, type, content):
         if verbose:
             log('>')
-
-        out = []
-
-        sz = len(content)
-        szbits = (sz & 0x0f) | (_typemap[type]<<4)
-        sz >>= 4
-        while 1:
-            if sz: szbits |= 0x80
-            out.append(chr(szbits))
-            if not sz:
-                break
-            szbits = sz & 0x7f
-            sz >>= 7
-
-        z = zlib.compressobj(1)
-        out.append(z.compress(content))
-        out.append(z.flush())
-
-        self._raw_write(out)
+        self._raw_write(_encode_packobj(type, content))
         return bin
 
     def breakpoint(self):
index 54b099c5b3e8a0c8b8e06f4beec0b10723146816..b5b1d7dc8291b09f266d60772ab7ffc95c99f692 100644 (file)
@@ -1,4 +1,4 @@
-import sys, os, pwd, subprocess, errno, socket
+import sys, os, pwd, subprocess, errno, socket, select
 
 
 def log(s):
@@ -71,9 +71,30 @@ class Conn:
         #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
         self.outp.write(data)
 
+    def has_input(self):
+        [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
+        if rl:
+            assert(rl[0] == self.inp.fileno())
+            return True
+        else:
+            return None
+
     def ok(self):
         self.write('\nok\n')
 
+    def drain_and_check_ok(self):
+        self.outp.flush()
+        rl = ''
+        for rl in linereader(self.inp):
+            #log('%d got line: %r\n' % (os.getpid(), rl))
+            if not rl:  # empty line
+                continue
+            elif rl == 'ok':
+                return True
+            else:
+                pass # ignore line
+        # NOTREACHED
+
     def check_ok(self):
         self.outp.flush()
         rl = ''
index 4472956a58b6fb3d14a7d7a0800120b48dd607ca..9bb524f8ba2a314ad9ae7253ceabab6bff3fa91a 100755 (executable)
--- a/t/test.sh
+++ b/t/test.sh
@@ -50,16 +50,15 @@ A $D/d/
   $D/b
 M $D/a
 A $D/"
+
+# FIXME: currently directories are never marked unmodified, so -m just skips
+# them.  Eventually, we should actually store the hashes of completed
+# directories, at which time the output of -m will change, so we'll have to
+# change this test too.
 WVPASSEQ "$(cd $D && bup index -m .)" \
-"./d/e/
-./d/
-./a
-./"
+"./a"
 WVPASSEQ "$(cd $D && bup index -m)" \
-"d/e/
-d/
-a
-./"
+"a"
 WVPASSEQ "$(cd $D && bup index -s .)" "$(cd $D && bup index -s .)"
 
 
index bb895d97e80dddc17983e3fb8bf6cffb44f2e993..43079b60276b006077c6fa05e255c7d3aabe22e0 100644 (file)
--- a/t/tgit.py
+++ b/t/tgit.py
@@ -3,6 +3,22 @@ from wvtest import *
 from helpers import *
 
 
+@wvtest
+def testencode():
+    s = 'hello world'
+    looseb = ''.join(git._encode_looseobj('blob', s))
+    looset = ''.join(git._encode_looseobj('tree', s))
+    loosec = ''.join(git._encode_looseobj('commit', s))
+    packb = ''.join(git._encode_packobj('blob', s))
+    packt = ''.join(git._encode_packobj('tree', s))
+    packc = ''.join(git._encode_packobj('commit', s))
+    WVPASSEQ(git._decode_looseobj(looseb), ('blob', s))
+    WVPASSEQ(git._decode_looseobj(looset), ('tree', s))
+    WVPASSEQ(git._decode_looseobj(loosec), ('commit', s))
+    WVPASSEQ(git._decode_packobj(packb), ('blob', s))
+    WVPASSEQ(git._decode_packobj(packt), ('tree', s))
+    WVPASSEQ(git._decode_packobj(packc), ('commit', s))
+
 @wvtest
 def testpacks():
     git.init_repo('pybuptest.tmp')