X-Git-Url: https://arthur.barton.de/gitweb/?a=blobdiff_plain;f=lib%2Fbup%2Fclient.py;h=cad6e28b75145c338ab5102f0a1f4800a3ab1c18;hb=d4100c81448484b8ea222fb78757bf003e181cf1;hp=0bb5219c4f9bc6952a0031cefb5da69cfdae3aec;hpb=0fdc630b347e9c9c07752ebe5ea41606b2a774cd;p=bup.git diff --git a/lib/bup/client.py b/lib/bup/client.py index 0bb5219..cad6e28 100644 --- a/lib/bup/client.py +++ b/lib/bup/client.py @@ -1,6 +1,13 @@ -import re, struct, errno, time, zlib + +from __future__ import absolute_import +import errno, os, re, struct, sys, time, zlib + from bup import git, ssh -from bup.helpers import * +from bup.compat import range +from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1, + debug2, linereader, lines_until_sentinel, + mkdirp, progress, qprogress) + bwlimit = None @@ -21,7 +28,7 @@ def _raw_write_bwlimit(f, buf, bwcount, bwtime): # the average back up to bwlimit - that will risk overflowing the # outbound queue, which defeats the purpose. So if we fall behind # by more than one block delay, we shouldn't ever try to catch up. - for i in xrange(0,len(buf),4096): + for i in range(0,len(buf),4096): now = time.time() next = max(now, bwtime + 1.0*bwcount/bwlimit) time.sleep(next-now) @@ -40,7 +47,8 @@ def parse_remote(remote): url_match = re.match( '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I) if url_match: - assert(url_match.group(1) in ('ssh', 'bup', 'file')) + if not url_match.group(1) in ('ssh', 'bup', 'file'): + raise ClientError, 'unexpected protocol: %s' % url_match.group(1) return url_match.group(1,3,4,5) else: rs = remote.split(':', 1) @@ -65,6 +73,7 @@ class Client: if is_reverse: self.pout = os.fdopen(3, 'rb') self.pin = os.fdopen(4, 'wb') + self.conn = Conn(self.pout, self.pin) else: if self.protocol in ('ssh', 'file'): try: @@ -72,14 +81,17 @@ class Client: self.p = ssh.connect(self.host, self.port, 'server') self.pout = self.p.stdout self.pin = self.p.stdin - except OSError, e: + self.conn = Conn(self.pout, self.pin) + except OSError as e: raise ClientError, 'connect: %s' % e, sys.exc_info()[2] elif self.protocol == 'bup': self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.connect((self.host, self.port or 1982)) - self.pout = self.sock.makefile('rb') - self.pin = self.sock.makefile('wb') - self.conn = Conn(self.pout, self.pin) + self.sock.connect((self.host, atoi(self.port) or 1982)) + self.sockw = self.sock.makefile('wb') + self.conn = DemuxConn(self.sock.fileno(), self.sockw) + self._available_commands = self._get_available_commands() + self._require_command('init-dir') + self._require_command('set-dir') if self.dir: self.dir = re.sub(r'[\r\n]', ' ', self.dir) if create: @@ -92,7 +104,7 @@ class Client: def __del__(self): try: self.close() - except IOError, e: + except IOError as e: if e.errno == errno.EPIPE: pass else: @@ -101,10 +113,14 @@ class Client: def close(self): if self.conn and not self._busy: self.conn.write('quit\n') - if self.pin and self.pout: + if self.pin: self.pin.close() - while self.pout.read(65536): - pass + if self.sock and self.sockw: + self.sockw.close() + self.sock.shutdown(socket.SHUT_WR) + if self.conn: + self.conn.close() + if self.pout: self.pout.close() if self.sock: self.sock.close() @@ -124,7 +140,7 @@ class Client: % rv) try: return self.conn.check_ok() - except Exception, e: + except Exception as e: raise ClientError, e, sys.exc_info()[2] def check_busy(self): @@ -138,7 +154,39 @@ class Client: def _not_busy(self): self._busy = None + def _get_available_commands(self): + self.check_busy() + self._busy = 'help' + conn = self.conn + conn.write('help\n') + result = set() + line = self.conn.readline() + if not line == 'Commands:\n': + raise ClientError('unexpected help header ' + repr(line)) + while True: + line = self.conn.readline() + if line == '\n': + break + if not line.startswith(' '): + raise ClientError('unexpected help line ' + repr(line)) + cmd = line.strip() + if not cmd: + raise ClientError('unexpected help line ' + repr(line)) + result.add(cmd) + # FIXME: confusing + not_ok = self.check_ok() + if not_ok: + raise not_ok + self._not_busy() + return frozenset(result) + + def _require_command(self, name): + if name not in self._available_commands: + raise ClientError('server does not appear to provide %s command' + % name) + def sync_indexes(self): + self._require_command('list-indexes') self.check_busy() conn = self.conn mkdirp(self.cachedir) @@ -172,26 +220,27 @@ class Client: self.sync_index(idx) git.auto_midx(self.cachedir) - def sync_index(self, name): + self._require_command('send-index') #debug1('requesting %r\n' % name) self.check_busy() mkdirp(self.cachedir) + fn = os.path.join(self.cachedir, name) + if os.path.exists(fn): + msg = "won't request existing .idx, try `bup bloom --check %s`" % fn + raise ClientError(msg) self.conn.write('send-index %s\n' % name) n = struct.unpack('!I', self.conn.read(4))[0] assert(n) - fn = os.path.join(self.cachedir, name) - f = open(fn + '.tmp', 'w') - count = 0 - progress('Receiving index from server: %d/%d\r' % (count, n)) - for b in chunkyreader(self.conn, n): - f.write(b) - count += len(b) + with atomically_replaced_file(fn, 'w') as f: + count = 0 progress('Receiving index from server: %d/%d\r' % (count, n)) - progress('Receiving index from server: %d/%d, done.\n' % (count, n)) - self.check_ok() - f.close() - os.rename(fn + '.tmp', fn) + for b in chunkyreader(self.conn, n): + f.write(b) + count += len(b) + qprogress('Receiving index from server: %d/%d\r' % (count, n)) + progress('Receiving index from server: %d/%d, done.\n' % (count, n)) + self.check_ok() def _make_objcache(self): return git.PackIdxList(self.cachedir) @@ -208,15 +257,18 @@ class Client: debug2('%s\n' % line) if line.startswith('index '): idx = line[6:] - debug1('client: received index suggestion: %s\n' % idx) + debug1('client: received index suggestion: %s\n' + % git.shorten_hash(idx)) suggested.append(idx) else: assert(line.endswith('.idx')) - debug1('client: completed writing pack, idx: %s\n' % line) + debug1('client: completed writing pack, idx: %s\n' + % git.shorten_hash(line)) suggested.append(line) self.check_ok() if ob: self._busy = None + idx = None for idx in suggested: self.sync_index(idx) git.auto_midx(self.cachedir) @@ -225,7 +277,9 @@ class Client: self.conn.write('%s\n' % ob) return idx - def new_packwriter(self): + def new_packwriter(self, compression_level=1, + max_pack_size=None, max_pack_objects=None): + self._require_command('receive-objects-v2') self.check_busy() def _set_busy(): self._busy = 'receive-objects-v2' @@ -235,9 +289,13 @@ class Client: suggest_packs = self._suggest_packs, onopen = _set_busy, onclose = self._not_busy, - ensure_busy = self.ensure_busy) + ensure_busy = self.ensure_busy, + compression_level=compression_level, + max_pack_size=max_pack_size, + max_pack_objects=max_pack_objects) def read_ref(self, refname): + self._require_command('read-ref') self.check_busy() self.conn.write('read-ref %s\n' % refname) r = self.conn.readline().strip() @@ -249,31 +307,148 @@ class Client: return None # nonexistent ref def update_ref(self, refname, newval, oldval): + self._require_command('update-ref') self.check_busy() self.conn.write('update-ref %s\n%s\n%s\n' % (refname, newval.encode('hex'), (oldval or '').encode('hex'))) self.check_ok() - def cat(self, id): + def join(self, id): + self._require_command('join') self.check_busy() - self._busy = 'cat' + self._busy = 'join' + # Send 'cat' so we'll work fine with older versions self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id)) while 1: sz = struct.unpack('!I', self.conn.read(4))[0] if not sz: break yield self.conn.read(sz) + # FIXME: ok to assume the only NotOk is a KerError? (it is true atm) e = self.check_ok() self._not_busy() if e: raise KeyError(str(e)) + def cat_batch(self, refs): + self._require_command('cat-batch') + self.check_busy() + self._busy = 'cat-batch' + conn = self.conn + conn.write('cat-batch\n') + # FIXME: do we want (only) binary protocol? + for ref in refs: + assert ref + assert '\n' not in ref + conn.write(ref) + conn.write('\n') + conn.write('\n') + for ref in refs: + info = conn.readline() + if info == 'missing\n': + yield None, None, None, None + continue + if not (info and info.endswith('\n')): + raise ClientError('Hit EOF while looking for object info: %r' + % info) + oidx, oid_t, size = info.split(' ') + size = int(size) + cr = chunkyreader(conn, size) + yield oidx, oid_t, size, cr + detritus = next(cr, None) + if detritus: + raise ClientError('unexpected leftover data ' + repr(detritus)) + # FIXME: confusing + not_ok = self.check_ok() + if not_ok: + raise not_ok + self._not_busy() + + def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False): + patterns = patterns or tuple() + self._require_command('refs') + self.check_busy() + self._busy = 'refs' + conn = self.conn + conn.write('refs %s %s\n' % (1 if limit_to_heads else 0, + 1 if limit_to_tags else 0)) + for pattern in patterns: + assert '\n' not in pattern + conn.write(pattern) + conn.write('\n') + conn.write('\n') + for line in lines_until_sentinel(conn, '\n', ClientError): + line = line[:-1] + oidx, name = line.split(' ') + if len(oidx) != 40: + raise ClientError('Invalid object fingerprint in %r' % line) + if not name: + raise ClientError('Invalid reference name in %r' % line) + yield name, oidx.decode('hex') + # FIXME: confusing + not_ok = self.check_ok() + if not_ok: + raise not_ok + self._not_busy() + + def rev_list(self, refs, count=None, parse=None, format=None): + self._require_command('rev-list') + assert (count is None) or (isinstance(count, Integral)) + if format: + assert '\n' not in format + assert parse + for ref in refs: + assert ref + assert '\n' not in ref + self.check_busy() + self._busy = 'rev-list' + conn = self.conn + conn.write('rev-list\n') + if count is not None: + conn.write(str(count)) + conn.write('\n') + if format: + conn.write(format) + conn.write('\n') + for ref in refs: + conn.write(ref) + conn.write('\n') + conn.write('\n') + if not format: + for _ in range(len(refs)): + line = conn.readline() + if not line: + raise ClientError('unexpected EOF') + line = line.strip() + assert len(line) == 40 + yield line + else: + for _ in range(len(refs)): + line = conn.readline() + if not line: + raise ClientError('unexpected EOF') + if not line.startswith('commit '): + raise ClientError('unexpected line ' + repr(line)) + yield line[7:].strip(), parse(conn) + # FIXME: confusing + not_ok = self.check_ok() + if not_ok: + raise not_ok + self._not_busy() + class PackWriter_Remote(git.PackWriter): def __init__(self, conn, objcache_maker, suggest_packs, onopen, onclose, - ensure_busy): - git.PackWriter.__init__(self, objcache_maker) + ensure_busy, + compression_level=1, + max_pack_size=None, + max_pack_objects=None): + git.PackWriter.__init__(self, + objcache_maker=objcache_maker, + compression_level=compression_level, + max_pack_size=max_pack_size, + max_pack_objects=max_pack_objects) self.file = conn self.filename = 'remote socket' self.suggest_packs = suggest_packs @@ -289,7 +464,8 @@ class PackWriter_Remote(git.PackWriter): self.onopen() self._packopen = True - def _end(self): + def _end(self, run_midx=True): + assert(run_midx) # We don't support this via remote yet if self._packopen and self.file: self.file.write('\0\0\0\0') self._packopen = False @@ -303,7 +479,7 @@ class PackWriter_Remote(git.PackWriter): return id def abort(self): - raise GitError("don't know how to abort remote pack writing") + raise ClientError("don't know how to abort remote pack writing") def _raw_write(self, datalist, sha): assert(self.file) @@ -318,9 +494,12 @@ class PackWriter_Remote(git.PackWriter): sha, struct.pack('!I', crc), data)) - (self._bwcount, self._bwtime) = \ - _raw_write_bwlimit(self.file, outbuf, self._bwcount, self._bwtime) - self.outbytes += len(data) - 20 - 4 # Don't count sha1+crc + try: + (self._bwcount, self._bwtime) = _raw_write_bwlimit( + self.file, outbuf, self._bwcount, self._bwtime) + except IOError as e: + raise ClientError, e, sys.exc_info()[2] + self.outbytes += len(data) self.count += 1 if self.file.has_input():