From 53b57e5fcb1980d49c22d1fd53d992cee3b2797a Mon Sep 17 00:00:00 2001 From: Rob Browning Date: Sat, 23 Oct 2021 12:59:06 -0500 Subject: [PATCH 1/1] git.packwriter: explicitly manage lifetimes Explicitly close all packwriters, instead of relying on __del__. Use "with ..." context management in most cases.. This is in preparation for removing __del__. Rework _end to handle *all* cleanup and to ensure that we always release the relevant resources, whether closing normally, or aborting. This means that we only have two places to consider with respect to resource management: __init__ and _end. Signed-off-by: Rob Browning Tested-by: Rob Browning --- lib/bup/client.py | 7 ++ lib/bup/cmd/save.py | 35 +++++---- lib/bup/cmd/server.py | 152 ++++++++++++++++++++++------------------ lib/bup/cmd/split.py | 7 +- lib/bup/gc.py | 7 +- lib/bup/git.py | 87 ++++++++++------------- lib/bup/helpers.py | 11 +++ lib/bup/rm.py | 8 +-- test/int/test_client.py | 45 +++++------- test/int/test_git.py | 73 ++++++++++--------- 10 files changed, 219 insertions(+), 213 deletions(-) diff --git a/lib/bup/client.py b/lib/bup/client.py index 178eda5..637737b 100644 --- a/lib/bup/client.py +++ b/lib/bup/client.py @@ -480,7 +480,9 @@ class Client: return result +# FIXME: disentangle this (stop inheriting) from PackWriter class PackWriter_Remote(git.PackWriter): + def __init__(self, conn, objcache_maker, suggest_packs, onopen, onclose, ensure_busy, @@ -502,12 +504,16 @@ class PackWriter_Remote(git.PackWriter): self._bwcount = 0 self._bwtime = time.time() + # __enter__ and __exit__ are inherited + def _open(self): if not self._packopen: self.onopen() self._packopen = True def _end(self, run_midx=True): + # Called by other PackWriter methods like breakpoint(). + # Must not close the connection (self.file) assert(run_midx) # We don't support this via remote yet if self._packopen and self.file: self.file.write(b'\0\0\0\0') @@ -518,6 +524,7 @@ class PackWriter_Remote(git.PackWriter): return None def close(self): + # Called by inherited __exit__ id = self._end() self.file = None return id diff --git a/lib/bup/cmd/save.py b/lib/bup/cmd/save.py index ee78a87..bd6ce29 100755 --- a/lib/bup/cmd/save.py +++ b/lib/bup/cmd/save.py @@ -511,26 +511,25 @@ def main(argv): else: w = cli.new_packwriter(compression_level=opt.compress) - sys.stdout.flush() - out = byte_stream(sys.stdout) + with w: + sys.stdout.flush() + out = byte_stream(sys.stdout) - if opt.name: - refname = b'refs/heads/%s' % opt.name - parent = repo.read_ref(refname) - else: - refname = parent = None - - tree = save_tree(opt, w) - if opt.tree: - out.write(hexlify(tree)) - out.write(b'\n') - if opt.commit or opt.name: - commit = commit_tree(tree, parent, opt.date, argv, w) - if opt.commit: - out.write(hexlify(commit)) - out.write(b'\n') + if opt.name: + refname = b'refs/heads/%s' % opt.name + parent = repo.read_ref(refname) + else: + refname = parent = None - w.close() + tree = save_tree(opt, w) + if opt.tree: + out.write(hexlify(tree)) + out.write(b'\n') + if opt.commit or opt.name: + commit = commit_tree(tree, parent, opt.date, argv, w) + if opt.commit: + out.write(hexlify(commit)) + out.write(b'\n') # packwriter must be closed before we can update the ref if opt.name: diff --git a/lib/bup/cmd/server.py b/lib/bup/cmd/server.py index 3574428..56b6d44 100755 --- a/lib/bup/cmd/server.py +++ b/lib/bup/cmd/server.py @@ -4,9 +4,10 @@ from binascii import hexlify, unhexlify import os, struct, subprocess, sys from bup import options, git, vfs, vint -from bup.compat import environ, hexstr -from bup.helpers import (Conn, debug1, debug2, linereader, lines_until_sentinel, - log) +from bup.compat import environ, hexstr, pending_raise +from bup.helpers \ + import (Conn, debug1, debug2, finalized, linereader, lines_until_sentinel, + log) from bup.io import byte_stream, path_msg from bup.repo import LocalRepo @@ -81,60 +82,70 @@ def send_index(conn, name): def receive_objects_v2(conn, junk): global suspended_w _init_session() - suggested = set() if suspended_w: w = suspended_w suspended_w = None + elif dumb_server_mode: + w = git.PackWriter(objcache_maker=None) else: - if dumb_server_mode: - w = git.PackWriter(objcache_maker=None) - else: - w = git.PackWriter() - while 1: - ns = conn.read(4) - if not ns: - w.abort() - raise Exception('object read: expected length header, got EOF\n') - n = struct.unpack('!I', ns)[0] - #debug2('expecting %d bytes\n' % n) - if not n: - debug1('bup server: received %d object%s.\n' - % (w.count, w.count!=1 and "s" or '')) - fullpath = w.close(run_midx=not dumb_server_mode) - if fullpath: - (dir, name) = os.path.split(fullpath) - conn.write(b'%s.idx\n' % name) - conn.ok() - return - elif n == 0xffffffff: - debug2('bup server: receive-objects suspended.\n') - suspended_w = w - conn.ok() - return - - shar = conn.read(20) - crcr = struct.unpack('!I', conn.read(4))[0] - n -= 20 + 4 - buf = conn.read(n) # object sizes in bup are reasonably small - #debug2('read %d bytes\n' % n) - _check(w, n, len(buf), 'object read: expected %d bytes, got %d\n') - if not dumb_server_mode: - oldpack = w.exists(shar, want_source=True) - if oldpack: - assert(not oldpack == True) - assert(oldpack.endswith(b'.idx')) - (dir,name) = os.path.split(oldpack) - if not (name in suggested): - debug1("bup server: suggesting index %s\n" - % git.shorten_hash(name).decode('ascii')) - debug1("bup server: because of object %s\n" - % hexstr(shar)) - conn.write(b'index %s\n' % name) - suggested.add(name) - continue - nw, crc = w._raw_write((buf,), sha=shar) - _check(w, crcr, crc, 'object read: expected crc %d, got %d\n') - # NOTREACHED + w = git.PackWriter() + try: + suggested = set() + while 1: + ns = conn.read(4) + if not ns: + w.abort() + raise Exception('object read: expected length header, got EOF') + n = struct.unpack('!I', ns)[0] + #debug2('expecting %d bytes\n' % n) + if not n: + debug1('bup server: received %d object%s.\n' + % (w.count, w.count!=1 and "s" or '')) + fullpath = w.close(run_midx=not dumb_server_mode) + w = None + if fullpath: + dir, name = os.path.split(fullpath) + conn.write(b'%s.idx\n' % name) + conn.ok() + return + elif n == 0xffffffff: + debug2('bup server: receive-objects suspending\n') + conn.ok() + suspended_w = w + w = None + return + + shar = conn.read(20) + crcr = struct.unpack('!I', conn.read(4))[0] + n -= 20 + 4 + buf = conn.read(n) # object sizes in bup are reasonably small + #debug2('read %d bytes\n' % n) + _check(w, n, len(buf), 'object read: expected %d bytes, got %d\n') + if not dumb_server_mode: + oldpack = w.exists(shar, want_source=True) + if oldpack: + assert(not oldpack == True) + assert(oldpack.endswith(b'.idx')) + (dir,name) = os.path.split(oldpack) + if not (name in suggested): + debug1("bup server: suggesting index %s\n" + % git.shorten_hash(name).decode('ascii')) + debug1("bup server: because of object %s\n" + % hexstr(shar)) + conn.write(b'index %s\n' % name) + suggested.add(name) + continue + nw, crc = w._raw_write((buf,), sha=shar) + _check(w, crcr, crc, 'object read: expected crc %d, got %d\n') + # py2: this clause is unneeded with py3 + except BaseException as ex: + with pending_raise(ex): + if w: + w, w_tmp = None, w + w_tmp.close() + finally: + if w: w.close() + assert False # should be unreachable def _check(w, expected, actual, msg): @@ -278,9 +289,10 @@ commands = { } def main(argv): + global suspended_w + o = options.Options(optspec) opt, flags, extra = o.parse_bytes(argv[1:]) - if extra: o.fatal('no arguments expected') @@ -291,21 +303,21 @@ def main(argv): sys.stdout.flush() conn = Conn(byte_stream(sys.stdin), byte_stream(sys.stdout)) lr = linereader(conn) - for _line in lr: - line = _line.strip() - if not line: - continue - debug1('bup server: command: %r\n' % line) - words = line.split(b' ', 1) - cmd = words[0] - rest = len(words)>1 and words[1] or b'' - if cmd == b'quit': - break - else: - cmd = commands.get(cmd) - if cmd: - cmd(conn, rest) + with finalized(None, lambda _: suspended_w and suspended_w.close()): + for _line in lr: + line = _line.strip() + if not line: + continue + debug1('bup server: command: %r\n' % line) + words = line.split(b' ', 1) + cmd = words[0] + rest = len(words)>1 and words[1] or b'' + if cmd == b'quit': + break else: - raise Exception('unknown server command: %r\n' % line) - + cmd = commands.get(cmd) + if cmd: + cmd(conn, rest) + else: + raise Exception('unknown server command: %r\n' % line) debug1('bup server: done\n') diff --git a/lib/bup/cmd/split.py b/lib/bup/cmd/split.py index 1ffe44d..59ad46c 100755 --- a/lib/bup/cmd/split.py +++ b/lib/bup/cmd/split.py @@ -250,10 +250,9 @@ def main(argv): max_pack_size=opt.max_pack_size, max_pack_objects=opt.max_pack_objects) - commit = split(opt, files, oldref, out, pack_writer) - - if pack_writer: - pack_writer.close() + # packwriter creation must be last command in each if clause above + with pack_writer: + commit = split(opt, files, oldref, out, pack_writer) # pack_writer must be closed before we can update the ref if refname: diff --git a/lib/bup/gc.py b/lib/bup/gc.py index b757034..e8425ff 100644 --- a/lib/bup/gc.py +++ b/lib/bup/gc.py @@ -209,10 +209,9 @@ def sweep(live_objects, existing_count, cat_pipe, threshold, compression, except BaseException as ex: with pending_raise(ex): writer.abort() - - # This will finally run midx. - # Can only change refs (if needed) after this. - writer.close() + finally: + # This will finally run midx. + writer.close() remove_stale_files(None) # In case we didn't write to the writer. diff --git a/lib/bup/git.py b/lib/bup/git.py index 90618c4..fbcf481 100644 --- a/lib/bup/git.py +++ b/lib/bup/git.py @@ -22,6 +22,7 @@ from bup.io import path_msg from bup.helpers import (Sha1, add_error, chunkyreader, debug1, debug2, exo, fdatasync, + finalized, log, merge_dict, merge_iter, @@ -818,12 +819,6 @@ class PackWriter: self.breakpoint() return sha - def breakpoint(self): - """Clear byte and object counts and return the last processed id.""" - id = self._end(self.run_midx) - self.outbytes = self.count = 0 - return id - def _require_objcache(self): if self.objcache is None and self.objcache_maker: self.objcache = self.objcache_maker() @@ -872,36 +867,26 @@ class PackWriter: msg) return self.maybe_write(b'commit', content) - def abort(self): - """Remove the pack file from disk.""" - f = self.file - if f: - pfd = self.parentfd - self.file = None - self.parentfd = None - self.idx = None - try: - try: - os.unlink(self.filename + b'.pack') - finally: - f.close() - finally: - if pfd is not None: - os.close(pfd) + def _end(self, run_midx=True, abort=False): + # Ignores run_midx during abort + if not self.file: + return None + self.file, f = None, self.file + self.idx, idx = None, self.idx + self.parentfd, pfd, = None, self.parentfd + self.objcache = None - def _end(self, run_midx=True): - f = self.file - if not f: return None - self.file = None - try: - self.objcache = None - idx = self.idx - self.idx = None + with finalized(pfd, lambda x: x is not None and os.close(x)), \ + f: + + if abort: + os.unlink(self.filename + b'.pack') + return None # update object count f.seek(8) cp = struct.pack('!i', self.count) - assert(len(cp) == 4) + assert len(cp) == 4 f.write(cp) # calculate the pack sha1sum @@ -911,29 +896,33 @@ class PackWriter: sum.update(b) packbin = sum.digest() f.write(packbin) + f.flush() fdatasync(f.fileno()) - finally: f.close() - idx.write(self.filename + b'.idx', packbin) - nameprefix = os.path.join(self.repo_dir, - b'objects/pack/pack-' + hexlify(packbin)) - if os.path.exists(self.filename + b'.map'): - os.unlink(self.filename + b'.map') - os.rename(self.filename + b'.pack', nameprefix + b'.pack') - os.rename(self.filename + b'.idx', nameprefix + b'.idx') - try: - os.fsync(self.parentfd) - finally: - os.close(self.parentfd) - - if run_midx: - auto_midx(os.path.join(self.repo_dir, b'objects/pack')) + idx.write(self.filename + b'.idx', packbin) + nameprefix = os.path.join(self.repo_dir, + b'objects/pack/pack-' + hexlify(packbin)) + if os.path.exists(self.filename + b'.map'): + os.unlink(self.filename + b'.map') + os.rename(self.filename + b'.pack', nameprefix + b'.pack') + os.rename(self.filename + b'.idx', nameprefix + b'.idx') + os.fsync(pfd) + if run_midx: + auto_midx(os.path.join(self.repo_dir, b'objects/pack')) + if self.on_pack_finish: + self.on_pack_finish(nameprefix) + return nameprefix - if self.on_pack_finish: - self.on_pack_finish(nameprefix) + def abort(self): + """Remove the pack file from disk.""" + self._end(abort=True) - return nameprefix + def breakpoint(self): + """Clear byte and object counts and return the last processed id.""" + id = self._end(self.run_midx) + self.outbytes = self.count = 0 + return id def close(self, run_midx=True): """Close the pack file and move it to its definitive path.""" diff --git a/lib/bup/helpers.py b/lib/bup/helpers.py index 93fa9e1..8a8a342 100644 --- a/lib/bup/helpers.py +++ b/lib/bup/helpers.py @@ -27,6 +27,17 @@ class Nonlocal: pass +@contextmanager +def finalized(enter_result=None, finalize=None): + assert finalize + try: + yield enter_result + except BaseException as ex: + with pending_raise(ex): + finalize(enter_result) + finalize(enter_result) + + sc_page_size = os.sysconf('SC_PAGE_SIZE') assert(sc_page_size > 0) diff --git a/lib/bup/rm.py b/lib/bup/rm.py index 844bdf1..9345c9f 100644 --- a/lib/bup/rm.py +++ b/lib/bup/rm.py @@ -116,11 +116,9 @@ def bup_rm(repo, paths, compression=6, verbosity=None): assert(saves) updated_refs[b'refs/heads/' + branch] = rm_saves(saves, writer) except BaseException as ex: - if writer: - with pending_raise(ex): - writer.abort() - if writer: - # Must close before we can update the ref(s) below. + with pending_raise(ex): + writer.abort() + finally: writer.close() # Only update the refs here, at the very end, so that if something diff --git a/test/int/test_client.py b/test/int/test_client.py index 605cffb..17e3a41 100644 --- a/test/int/test_client.py +++ b/test/int/test_client.py @@ -23,34 +23,31 @@ IDX_PAT = b'/*.idx' def test_server_split_with_indexes(tmpdir): environ[b'BUP_DIR'] = bupdir = tmpdir git.init_repo(bupdir) - lw = git.PackWriter() - with client.Client(bupdir, create=True) as c: - rw = c.new_packwriter() - + with git.PackWriter() as lw, \ + client.Client(bupdir, create=True) as c, \ + c.new_packwriter() as rw: lw.new_blob(s1) lw.close() rw.new_blob(s2) rw.breakpoint() rw.new_blob(s1) - rw.close() def test_multiple_suggestions(tmpdir): environ[b'BUP_DIR'] = bupdir = tmpdir git.init_repo(bupdir) - lw = git.PackWriter() - lw.new_blob(s1) - lw.close() - lw = git.PackWriter() - lw.new_blob(s2) - lw.close() + with git.PackWriter() as lw: + lw.new_blob(s1) + with git.PackWriter() as lw: + lw.new_blob(s2) assert len(glob.glob(git.repo(b'objects/pack'+IDX_PAT))) == 2 - with client.Client(bupdir, create=True) as c: + with client.Client(bupdir, create=True) as c, \ + c.new_packwriter() as rw: + assert len(glob.glob(c.cachedir+IDX_PAT)) == 0 - rw = c.new_packwriter() s1sha = rw.new_blob(s1) assert rw.exists(s1sha) s2sha = rw.new_blob(s2) @@ -92,8 +89,7 @@ def test_multiple_suggestions(tmpdir): assert rw.objcache.exists(s2sha) rw.new_blob(s3) assert len(glob.glob(c.cachedir+IDX_PAT)) == 2 - rw.close() - assert len(glob.glob(c.cachedir+IDX_PAT)) == 3 + assert len(glob.glob(c.cachedir+IDX_PAT)) == 3 def test_dumb_client_server(tmpdir): @@ -101,33 +97,30 @@ def test_dumb_client_server(tmpdir): git.init_repo(bupdir) open(git.repo(b'bup-dumb-server'), 'w').close() - lw = git.PackWriter() - lw.new_blob(s1) - lw.close() + with git.PackWriter() as lw: + lw.new_blob(s1) - with client.Client(bupdir, create=True) as c: - rw = c.new_packwriter() + with client.Client(bupdir, create=True) as c, \ + c.new_packwriter() as rw: assert len(glob.glob(c.cachedir+IDX_PAT)) == 1 rw.new_blob(s1) assert len(glob.glob(c.cachedir+IDX_PAT)) == 1 rw.new_blob(s2) - rw.close() - assert len(glob.glob(c.cachedir+IDX_PAT)) == 2 + assert len(glob.glob(c.cachedir+IDX_PAT)) == 2 def test_midx_refreshing(tmpdir): environ[b'BUP_DIR'] = bupdir = tmpdir git.init_repo(bupdir) - with client.Client(bupdir, create=True) as c: - rw = c.new_packwriter() + with client.Client(bupdir, create=True) as c, \ + c.new_packwriter() as rw: rw.new_blob(s1) p1base = rw.breakpoint() p1name = os.path.join(c.cachedir, p1base) s1sha = rw.new_blob(s1) # should not be written; it's already in p1 s2sha = rw.new_blob(s2) p2base = rw.close() - p2name = os.path.join(c.cachedir, p2base) - del rw + p2name = os.path.join(c.cachedir, p2base) pi = git.PackIdxList(bupdir + b'/objects/pack') assert len(pi.packs) == 2 diff --git a/test/int/test_git.py b/test/int/test_git.py index 339f858..ee65177 100644 --- a/test/int/test_git.py +++ b/test/int/test_git.py @@ -117,18 +117,18 @@ def test_packs(tmpdir): git.init_repo(bupdir) git.verbose = 1 - w = git.PackWriter() - w.new_blob(os.urandom(100)) - w.new_blob(os.urandom(100)) - w.abort() - - w = git.PackWriter() - hashes = [] - nobj = 1000 - for i in range(nobj): - hashes.append(w.new_blob(b'%d' % i)) - log('\n') - nameprefix = w.close() + with git.PackWriter() as w: + w.new_blob(os.urandom(100)) + w.new_blob(os.urandom(100)) + w.abort() + + with git.PackWriter() as w: + hashes = [] + nobj = 1000 + for i in range(nobj): + hashes.append(w.new_blob(b'%d' % i)) + log('\n') + nameprefix = w.close() print(repr(nameprefix)) WVPASS(os.path.exists(nameprefix + b'.pack')) WVPASS(os.path.exists(nameprefix + b'.idx')) @@ -163,11 +163,11 @@ def test_pack_name_lookup(tmpdir): hashes = [] for start in range(0,28,2): - w = git.PackWriter() - for i in range(start, start+2): - hashes.append(w.new_blob(b'%d' % i)) - log('\n') - idxnames.append(os.path.basename(w.close() + b'.idx')) + with git.PackWriter() as w: + for i in range(start, start+2): + hashes.append(w.new_blob(b'%d' % i)) + log('\n') + idxnames.append(os.path.basename(w.close() + b'.idx')) r = git.PackIdxList(packdir) WVPASSEQ(len(r.packs), 2) @@ -346,31 +346,30 @@ def test_new_commit(tmpdir): git.init_repo(bupdir) git.verbose = 1 - w = git.PackWriter() - tree = os.urandom(20) - parent = os.urandom(20) - author_name = b'Author' - author_mail = b'author@somewhere' - adate_sec = 1439657836 - cdate_sec = adate_sec + 1 - committer_name = b'Committer' - committer_mail = b'committer@somewhere' - adate_tz_sec = cdate_tz_sec = None - commit = w.new_commit(tree, parent, - b'%s <%s>' % (author_name, author_mail), - adate_sec, adate_tz_sec, - b'%s <%s>' % (committer_name, committer_mail), - cdate_sec, cdate_tz_sec, - b'There is a small mailbox here') - adate_tz_sec = -60 * 60 - cdate_tz_sec = 120 * 60 - commit_off = w.new_commit(tree, parent, + with git.PackWriter() as w: + tree = os.urandom(20) + parent = os.urandom(20) + author_name = b'Author' + author_mail = b'author@somewhere' + adate_sec = 1439657836 + cdate_sec = adate_sec + 1 + committer_name = b'Committer' + committer_mail = b'committer@somewhere' + adate_tz_sec = cdate_tz_sec = None + commit = w.new_commit(tree, parent, b'%s <%s>' % (author_name, author_mail), adate_sec, adate_tz_sec, b'%s <%s>' % (committer_name, committer_mail), cdate_sec, cdate_tz_sec, b'There is a small mailbox here') - w.close() + adate_tz_sec = -60 * 60 + cdate_tz_sec = 120 * 60 + commit_off = w.new_commit(tree, parent, + b'%s <%s>' % (author_name, author_mail), + adate_sec, adate_tz_sec, + b'%s <%s>' % (committer_name, committer_mail), + cdate_sec, cdate_tz_sec, + b'There is a small mailbox here') commit_items = git.get_commit_items(hexlify(commit), git.cp()) local_author_offset = localtime(adate_sec).tm_gmtoff -- 2.39.2