]> arthur.barton.de Git - bup.git/commitdiff
Support incremental backups to a remote server.
authorAvery Pennarun <apenwarr@gmail.com>
Sun, 3 Jan 2010 11:17:30 +0000 (06:17 -0500)
committerAvery Pennarun <apenwarr@gmail.com>
Sun, 3 Jan 2010 11:18:48 +0000 (06:18 -0500)
We now cache the server's packfile indexes locally, so we know which objects
he does and doesn't have.  That way we can send him a packfile with only the
ones he's missing.

cmd-split supports this now, but cmd-save still doesn't support remote
servers.

The -n option (set a ref correctly) doesn't work yet either.

cmd-server.py
cmd-split.py
git.py
helpers.py

index 35f98a3a199a4910b89a7b581267e16a8e62638c..a535fdf3f5fd13b86ed92f4599e751338fff5528 100755 (executable)
@@ -1,22 +1,40 @@
 #!/usr/bin/env python
-import sys, struct
+import sys, struct, mmap
 import options, git
 from helpers import *
 
 
-def receive_objects(f):
+def list_indexes(conn):
+    for f in os.listdir(git.repo('objects/pack')):
+        if f.endswith('.idx'):
+            conn.write('%s\n' % f)
+    conn.ok()
+
+
+def send_index(conn, name):
+    assert(name.find('/') < 0)
+    assert(name.endswith('.idx'))
+    idx = git.PackIndex(git.repo('objects/pack/%s' % name))
+    conn.write(struct.pack('!I', len(idx.map)))
+    conn.write(idx.map)
+    conn.ok()
+    
+            
+def receive_objects(conn):
     w = git.PackWriter()
     while 1:
-        ns = f.read(4)
+        ns = conn.read(4)
         if not ns:
             w.abort()
             raise Exception('object read: expected length header, got EOF\n')
         n = struct.unpack('!I', ns)[0]
         #log('expecting %d bytes\n' % n)
         if not n:
+            log('bup server: received %d object%s.\n' 
+                % (w.count, w.count!=1 and "s" or ''))
             w.close()
             return
-        buf = f.read(n)
+        buf = conn.read(n)  # object sizes in bup are reasonably small
         #log('read %d bytes\n' % n)
         if len(buf) < n:
             w.abort()
@@ -38,8 +56,9 @@ if extra:
 
 log('bup server: reading from stdin.\n')
 
-f = sys.stdin
-lr = linereader(f)
+# FIXME: this protocol is totally lame and not at all future-proof
+conn = Conn(sys.stdin, sys.stdout)
+lr = linereader(conn)
 for _line in lr:
     line = _line.strip()
     if not line:
@@ -47,13 +66,19 @@ for _line in lr:
     log('bup server: command: %r\n' % line)
     if line == 'quit':
         break
-    elif line == 'set-dir':
-        git.repodir = lr.next()
+    elif line.startswith('set-dir '):
+        git.repodir = line[8:]
         git.check_repo_or_die()
         log('bup server: bupdir is %r\n' % git.repodir)
+        conn.ok()
+    elif line == 'list-indexes':
+        list_indexes(conn)
+    elif line.startswith('send-index '):
+        send_index(conn, line[11:])
     elif line == 'receive-objects':
         git.check_repo_or_die()
-        receive_objects(f)
+        receive_objects(conn)
+        conn.ok()
     else:
         raise Exception('unknown server command: %r\n' % line)
 
index b013e6d3a08950a9a21e18fd62761d7568a94693..a165d946428708be1c142887529a2f3dc16ba0c1 100755 (executable)
@@ -1,5 +1,5 @@
 #!/usr/bin/env python
-import sys, time, re
+import sys, time, re, struct
 import hashsplit, git, options
 from helpers import *
 
@@ -32,20 +32,61 @@ start_time = time.time()
 def server_connect(remote):
     rs = remote.split(':', 1)
     if len(rs) == 1:
-        p = subprocess.Popen(['bup', 'server', '-d', opt.remote],
+        (host, dir) = ('NONE', remote)
+        p = subprocess.Popen(['bup', 'server'],
                              stdin=subprocess.PIPE, stdout=subprocess.PIPE)
     else:
         (host, dir) = rs
         p = subprocess.Popen(['ssh', host, '--', 'bup', 'server'],
                              stdin=subprocess.PIPE, stdout=subprocess.PIPE)
-        dir = re.sub(r'[\r\n]', ' ', dir)
-        p.stdin.write('set-dir\n%s\n' % dir)
-    return p
+    conn = Conn(p.stdout, p.stdin)
+    dir = re.sub(r'[\r\n]', ' ', dir)
+    conn.write('set-dir %s\n' % dir)
+    conn.check_ok()
+    
+    conn.write('list-indexes\n')
+    cachedir = git.repo('index-cache/%s' % re.sub(r'[^@:\w]', '_',
+                                                  "%s:%s" % (host, dir)))
+    packdir = git.repo('objects/pack')
+    mkdirp(cachedir)
+    all = {}
+    needed = {}
+    for line in linereader(conn):
+        if not line:
+            break
+        all[line] = 1
+        assert(line.find('/') < 0)
+        if (not os.path.exists(os.path.join(cachedir, line)) and
+            not os.path.exists(os.path.join(packdir, line))):
+                needed[line] = 1
+    conn.check_ok()
+                
+    for f in os.listdir(cachedir):
+        if f.endswith('.idx') and not f in all:
+            log('pruning old index: %r\n' % f)
+            os.unlink(os.path.join(cachedir, f))
+            
+    # FIXME this should be pipelined: request multiple indexes at a time, or
+    # we waste lots of network turnarounds.
+    for name in needed.keys():
+        log('requesting %r\n' % name)
+        conn.write('send-index %s\n' % name)
+        n = struct.unpack('!I', conn.read(4))[0]
+        assert(n)
+        log('   expect %d bytes\n' % n)
+        fn = os.path.join(cachedir, name)
+        f = open(fn + '.tmp', 'w')
+        for b in chunkyreader(conn, n):
+            f.write(b)
+        conn.check_ok()
+        f.close()
+        os.rename(fn + '.tmp', fn)
+    return (p, conn, cachedir)
 
 if opt.remote:
-    p = server_connect(opt.remote)
-    p.stdin.write('receive-objects\n')
-    w = git.PackWriter_Remote(p.stdin)
+    (p, conn, cachedir) = server_connect(opt.remote)
+    conn.write('receive-objects\n')
+    w = git.PackWriter_Remote(conn, objcache = git.MultiPackIndex(cachedir))
 else:
     w = git.PackWriter()
     
diff --git a/git.py b/git.py
index a5fcbe201496f7b29f89ffb3488339971f7e3b16..4f9c5e4379342a69304724edfaa1f8444ab87c91 100644 (file)
--- a/git.py
+++ b/git.py
@@ -5,6 +5,10 @@ verbose = 0
 repodir = os.environ.get('BUP_DIR', '.git')
 
 def repo(sub = ''):
+    global repodir
+    gd = os.path.join(repodir, '.git')
+    if os.path.exists(gd):
+        repodir = gd
     return os.path.join(repodir, sub)
 
 
@@ -235,15 +239,17 @@ class PackWriter:
 
 
 class PackWriter_Remote(PackWriter):
-    def __init__(self, file, objcache=None):
+    def __init__(self, conn, objcache=None):
         PackWriter.__init__(self, objcache)
-        self.file = file
+        self.file = conn
         self.filename = 'remote socket'
 
+    def _open(self):
+        assert(not "can't reopen a PackWriter_Remote")
+
     def close(self):
         if self.file:
             self.file.write('\0\0\0\0')
-            self.file.flush()
         self.file = None
 
     def _raw_write(self, datalist):
@@ -293,5 +299,5 @@ def init_repo():
 
 def check_repo_or_die():
     if not os.path.isdir(repo('objects/pack/.')):
-        log('error: %r is not a git repository\n' % repo())
+        log('error: %r is not a bup/git repository\n' % repo())
         exit(15)
index 1f89ce58c5c3ea8072d0ffeef928c0c3a51cb300..c07cd7b5c6eb8035b4adcdc1c4fa907a7abe7cc3 100644 (file)
@@ -1,10 +1,20 @@
-import sys, os, pwd, subprocess
+import sys, os, pwd, subprocess, errno
 
 
 def log(s):
     sys.stderr.write(s)
 
 
+def mkdirp(d):
+    try:
+        os.makedirs(d)
+    except OSError, e:
+        if e.errno == errno.EEXIST:
+            pass
+        else:
+            raise
+
+
 def readpipe(argv):
     p = subprocess.Popen(argv, stdout=subprocess.PIPE)
     r = p.stdout.read()
@@ -47,9 +57,48 @@ def hostname():
     return _hostname or 'localhost'
 
 
+class Conn:
+    def __init__(self, inp, outp):
+        self.inp = inp
+        self.outp = outp
+
+    def read(self, size):
+        self.outp.flush()
+        return self.inp.read(size)
+
+    def readline(self):
+        self.outp.flush()
+        return self.inp.readline()
+
+    def write(self, data):
+        self.outp.write(data)
+
+    def ok(self):
+        self.write('\nok\n')
+
+    def check_ok(self):
+        self.outp.flush()
+        for rl in linereader(self.inp):
+            if not rl:
+                continue
+            elif rl == 'ok':
+                return True
+            else:
+                raise Exception('expected "ok", got %r' % rl)
+
+
 def linereader(f):
     while 1:
         line = f.readline()
         if not line:
             break
         yield line[:-1]
+
+
+def chunkyreader(f, count):
+    while count > 0:
+        b = f.read(min(count, 65536))
+        if not b:
+            raise IOError('EOF with %d bytes remaining' % count)
+        yield b
+        count -= len(b)