X-Git-Url: https://arthur.barton.de/gitweb/?a=blobdiff_plain;f=lib%2Fbup%2Ft%2Ftmetadata.py;h=be9c7729dce0ea898b2a3fde293fa8ffab79e97f;hb=a35c571a85a25a7b50532089e77aa83a58986548;hp=126e1d7d0162a06b6200d49c9668d1effa91635f;hpb=598ca81985f602fdeaa938de665a3525c385ba7f;p=bup.git diff --git a/lib/bup/t/tmetadata.py b/lib/bup/t/tmetadata.py index 126e1d7..be9c772 100644 --- a/lib/bup/t/tmetadata.py +++ b/lib/bup/t/tmetadata.py @@ -1,155 +1,171 @@ + +from __future__ import absolute_import, print_function import errno, glob, grp, pwd, stat, tempfile, subprocess -import bup.helpers as helpers -from bup import git, metadata, vfs -from bup.helpers import clear_errors, detect_fakeroot, is_superuser, realpath + from wvtest import * + +from bup import git, metadata +from bup import vfs +from bup.compat import range +from bup.helpers import clear_errors, detect_fakeroot, is_superuser, resolve_parent +from bup.repo import LocalRepo from bup.xstat import utime, lutime +from buptest import no_lingering_errors, test_tempdir +import bup.helpers as helpers -top_dir = '../../..' -bup_tmp = os.path.realpath('../../../t/tmp') -bup_path = top_dir + '/bup' +top_dir = b'../../..' +bup_tmp = os.path.realpath(b'../../../t/tmp') +bup_path = top_dir + b'/bup' start_dir = os.getcwd() def ex(*cmd): try: - cmd_str = ' '.join(cmd) - print >> sys.stderr, cmd_str + cmd_str = b' '.join(cmd) + print(cmd_str, file=sys.stderr) rc = subprocess.call(cmd) if rc < 0: - print >> sys.stderr, 'terminated by signal', - rc + print('terminated by signal', - rc, file=sys.stderr) sys.exit(1) elif rc > 0: - print >> sys.stderr, 'returned exit status', rc + print('returned exit status', rc, file=sys.stderr) sys.exit(1) - except OSError, e: - print >> sys.stderr, 'subprocess call failed:', e + except OSError as e: + print('subprocess call failed:', e, file=sys.stderr) sys.exit(1) def setup_testfs(): assert(sys.platform.startswith('linux')) # Set up testfs with user_xattr, etc. - if subprocess.call(['modprobe', 'loop']) != 0: + if subprocess.call([b'modprobe', b'loop']) != 0: return False - subprocess.call(['umount', 'testfs']) - ex('dd', 'if=/dev/zero', 'of=testfs.img', 'bs=1M', 'count=32') - ex('mke2fs', '-F', '-j', '-m', '0', 'testfs.img') - ex('rm', '-rf', 'testfs') - os.mkdir('testfs') - ex('mount', '-o', 'loop,acl,user_xattr', 'testfs.img', 'testfs') + subprocess.call([b'umount', b'testfs']) + ex(b'dd', b'if=/dev/zero', b'of=testfs.img', b'bs=1M', b'count=32') + ex(b'mke2fs', b'-F', b'-j', b'-m', b'0', b'testfs.img') + ex(b'rm', b'-rf', b'testfs') + os.mkdir(b'testfs') + ex(b'mount', b'-o', b'loop,acl,user_xattr', b'testfs.img', b'testfs') # Hide, so that tests can't create risks. - os.chown('testfs', 0, 0) - os.chmod('testfs', 0700) + os.chown(b'testfs', 0, 0) + os.chmod(b'testfs', 0o700) return True def cleanup_testfs(): - subprocess.call(['umount', 'testfs']) - helpers.unlink('testfs.img') + subprocess.call([b'umount', b'testfs']) + helpers.unlink(b'testfs.img') @wvtest def test_clean_up_archive_path(): - cleanup = metadata._clean_up_path_for_archive - WVPASSEQ(cleanup('foo'), 'foo') - WVPASSEQ(cleanup('/foo'), 'foo') - WVPASSEQ(cleanup('///foo'), 'foo') - WVPASSEQ(cleanup('/foo/bar'), 'foo/bar') - WVPASSEQ(cleanup('foo/./bar'), 'foo/bar') - WVPASSEQ(cleanup('/foo/./bar'), 'foo/bar') - WVPASSEQ(cleanup('/foo/./bar/././baz'), 'foo/bar/baz') - WVPASSEQ(cleanup('/foo/./bar///././baz'), 'foo/bar/baz') - WVPASSEQ(cleanup('//./foo/./bar///././baz/.///'), 'foo/bar/baz/') - WVPASSEQ(cleanup('./foo/./.bar'), 'foo/.bar') - WVPASSEQ(cleanup('./foo/.'), 'foo') - WVPASSEQ(cleanup('./foo/..'), '.') - WVPASSEQ(cleanup('//./..//.../..//.'), '.') - WVPASSEQ(cleanup('//./..//..././/.'), '...') - WVPASSEQ(cleanup('/////.'), '.') - WVPASSEQ(cleanup('/../'), '.') - WVPASSEQ(cleanup(''), '.') + with no_lingering_errors(): + cleanup = metadata._clean_up_path_for_archive + WVPASSEQ(cleanup(b'foo'), b'foo') + WVPASSEQ(cleanup(b'/foo'), b'foo') + WVPASSEQ(cleanup(b'///foo'), b'foo') + WVPASSEQ(cleanup(b'/foo/bar'), b'foo/bar') + WVPASSEQ(cleanup(b'foo/./bar'), b'foo/bar') + WVPASSEQ(cleanup(b'/foo/./bar'), b'foo/bar') + WVPASSEQ(cleanup(b'/foo/./bar/././baz'), b'foo/bar/baz') + WVPASSEQ(cleanup(b'/foo/./bar///././baz'), b'foo/bar/baz') + WVPASSEQ(cleanup(b'//./foo/./bar///././baz/.///'), b'foo/bar/baz/') + WVPASSEQ(cleanup(b'./foo/./.bar'), b'foo/.bar') + WVPASSEQ(cleanup(b'./foo/.'), b'foo') + WVPASSEQ(cleanup(b'./foo/..'), b'.') + WVPASSEQ(cleanup(b'//./..//.../..//.'), b'.') + WVPASSEQ(cleanup(b'//./..//..././/.'), b'...') + WVPASSEQ(cleanup(b'/////.'), b'.') + WVPASSEQ(cleanup(b'/../'), b'.') + WVPASSEQ(cleanup(b''), b'.') @wvtest def test_risky_path(): - risky = metadata._risky_path - WVPASS(risky('/foo')) - WVPASS(risky('///foo')) - WVPASS(risky('/../foo')) - WVPASS(risky('../foo')) - WVPASS(risky('foo/..')) - WVPASS(risky('foo/../')) - WVPASS(risky('foo/../bar')) - WVFAIL(risky('foo')) - WVFAIL(risky('foo/')) - WVFAIL(risky('foo///')) - WVFAIL(risky('./foo')) - WVFAIL(risky('foo/.')) - WVFAIL(risky('./foo/.')) - WVFAIL(risky('foo/bar')) - WVFAIL(risky('foo/./bar')) + with no_lingering_errors(): + risky = metadata._risky_path + WVPASS(risky(b'/foo')) + WVPASS(risky(b'///foo')) + WVPASS(risky(b'/../foo')) + WVPASS(risky(b'../foo')) + WVPASS(risky(b'foo/..')) + WVPASS(risky(b'foo/../')) + WVPASS(risky(b'foo/../bar')) + WVFAIL(risky(b'foo')) + WVFAIL(risky(b'foo/')) + WVFAIL(risky(b'foo///')) + WVFAIL(risky(b'./foo')) + WVFAIL(risky(b'foo/.')) + WVFAIL(risky(b'./foo/.')) + WVFAIL(risky(b'foo/bar')) + WVFAIL(risky(b'foo/./bar')) @wvtest def test_clean_up_extract_path(): - cleanup = metadata._clean_up_extract_path - WVPASSEQ(cleanup('/foo'), 'foo') - WVPASSEQ(cleanup('///foo'), 'foo') - WVFAIL(cleanup('/../foo')) - WVFAIL(cleanup('../foo')) - WVFAIL(cleanup('foo/..')) - WVFAIL(cleanup('foo/../')) - WVFAIL(cleanup('foo/../bar')) - WVPASSEQ(cleanup('foo'), 'foo') - WVPASSEQ(cleanup('foo/'), 'foo/') - WVPASSEQ(cleanup('foo///'), 'foo///') - WVPASSEQ(cleanup('./foo'), './foo') - WVPASSEQ(cleanup('foo/.'), 'foo/.') - WVPASSEQ(cleanup('./foo/.'), './foo/.') - WVPASSEQ(cleanup('foo/bar'), 'foo/bar') - WVPASSEQ(cleanup('foo/./bar'), 'foo/./bar') - WVPASSEQ(cleanup('/'), '.') - WVPASSEQ(cleanup('./'), './') - WVPASSEQ(cleanup('///foo/bar'), 'foo/bar') - WVPASSEQ(cleanup('///foo/bar'), 'foo/bar') + with no_lingering_errors(): + cleanup = metadata._clean_up_extract_path + WVPASSEQ(cleanup(b'/foo'), b'foo') + WVPASSEQ(cleanup(b'///foo'), b'foo') + WVFAIL(cleanup(b'/../foo')) + WVFAIL(cleanup(b'../foo')) + WVFAIL(cleanup(b'foo/..')) + WVFAIL(cleanup(b'foo/../')) + WVFAIL(cleanup(b'foo/../bar')) + WVPASSEQ(cleanup(b'foo'), b'foo') + WVPASSEQ(cleanup(b'foo/'), b'foo/') + WVPASSEQ(cleanup(b'foo///'), b'foo///') + WVPASSEQ(cleanup(b'./foo'), b'./foo') + WVPASSEQ(cleanup(b'foo/.'), b'foo/.') + WVPASSEQ(cleanup(b'./foo/.'), b'./foo/.') + WVPASSEQ(cleanup(b'foo/bar'), b'foo/bar') + WVPASSEQ(cleanup(b'foo/./bar'), b'foo/./bar') + WVPASSEQ(cleanup(b'/'), b'.') + WVPASSEQ(cleanup(b'./'), b'./') + WVPASSEQ(cleanup(b'///foo/bar'), b'foo/bar') + WVPASSEQ(cleanup(b'///foo/bar'), b'foo/bar') @wvtest def test_metadata_method(): - initial_failures = wvfailure_count() - tmpdir = tempfile.mkdtemp(dir=bup_tmp, prefix='bup-tmetadata-') - bup_dir = tmpdir + '/bup' - data_path = tmpdir + '/foo' - os.mkdir(data_path) - ex('touch', data_path + '/file') - ex('ln', '-s', 'file', data_path + '/symlink') - test_time1 = 13 * 1000000000 - test_time2 = 42 * 1000000000 - utime(data_path + '/file', (0, test_time1)) - lutime(data_path + '/symlink', (0, 0)) - utime(data_path, (0, test_time2)) - ex(bup_path, '-d', bup_dir, 'init') - ex(bup_path, '-d', bup_dir, 'index', '-v', data_path) - ex(bup_path, '-d', bup_dir, 'save', '-tvvn', 'test', data_path) - git.check_repo_or_die(bup_dir) - top = vfs.RefList(None) - n = top.lresolve('/test/latest' + realpath(data_path)) - m = n.metadata() - WVPASS(m.mtime == test_time2) - WVPASS(len(n.subs()) == 2) - WVPASS(n.name == 'foo') - WVPASS(set([x.name for x in n.subs()]) == set(['file', 'symlink'])) - for sub in n: - if sub.name == 'file': - m = sub.metadata() - WVPASS(m.mtime == test_time1) - elif sub.name == 'symlink': - m = sub.metadata() - WVPASS(m.mtime == 0) - if wvfailure_count() == initial_failures: - subprocess.call(['rm', '-rf', tmpdir]) + with no_lingering_errors(): + with test_tempdir(b'bup-tmetadata-') as tmpdir: + bup_dir = tmpdir + b'/bup' + data_path = tmpdir + b'/foo' + os.mkdir(data_path) + ex(b'touch', data_path + b'/file') + ex(b'ln', b'-s', b'file', data_path + b'/symlink') + test_time1 = 13 * 1000000000 + test_time2 = 42 * 1000000000 + utime(data_path + b'/file', (0, test_time1)) + lutime(data_path + b'/symlink', (0, 0)) + utime(data_path, (0, test_time2)) + ex(bup_path, b'-d', bup_dir, b'init') + ex(bup_path, b'-d', bup_dir, b'index', b'-v', data_path) + ex(bup_path, b'-d', bup_dir, b'save', b'-tvvn', b'test', data_path) + git.check_repo_or_die(bup_dir) + repo = LocalRepo() + resolved = vfs.resolve(repo, + b'/test/latest' + resolve_parent(data_path), + follow=False) + leaf_name, leaf_item = resolved[-1] + m = leaf_item.meta + WVPASS(m.mtime == test_time2) + WVPASS(leaf_name == b'foo') + contents = tuple(vfs.contents(repo, leaf_item)) + WVPASS(len(contents) == 3) + WVPASSEQ(frozenset(name for name, item in contents), + frozenset((b'.', b'file', b'symlink'))) + for name, item in contents: + if name == b'file': + m = item.meta + WVPASS(m.mtime == test_time1) + elif name == b'symlink': + m = item.meta + WVPASSEQ(m.symlink_target, b'file') + WVPASSEQ(m.size, 4) + WVPASSEQ(m.mtime, 0) def _first_err(): @@ -160,25 +176,22 @@ def _first_err(): @wvtest def test_from_path_error(): - initial_failures = wvfailure_count() if is_superuser() or detect_fakeroot(): return - tmpdir = tempfile.mkdtemp(dir=bup_tmp, prefix='bup-tmetadata-') - path = tmpdir + '/foo' - os.mkdir(path) - m = metadata.from_path(path, archive_path=path, save_symlinks=True) - WVPASSEQ(m.path, path) - os.chmod(path, 000) - metadata.from_path(path, archive_path=path, save_symlinks=True) - if metadata.get_linux_file_attr: - print >> sys.stderr, 'saved_errors:', helpers.saved_errors - WVPASS(len(helpers.saved_errors) == 1) - errmsg = _first_err() - WVPASS(errmsg.startswith('read Linux attr')) - clear_errors() - if wvfailure_count() == initial_failures: - subprocess.call(['chmod', '-R', 'u+rwX', tmpdir]) - subprocess.call(['rm', '-rf', tmpdir]) + with no_lingering_errors(): + with test_tempdir(b'bup-tmetadata-') as tmpdir: + path = tmpdir + b'/foo' + os.mkdir(path) + m = metadata.from_path(path, archive_path=path, save_symlinks=True) + WVPASSEQ(m.path, path) + os.chmod(path, 0o000) + metadata.from_path(path, archive_path=path, save_symlinks=True) + if metadata.get_linux_file_attr: + print('saved_errors:', helpers.saved_errors, file=sys.stderr) + WVPASS(len(helpers.saved_errors) == 1) + errmsg = _first_err() + WVPASS(errmsg.startswith('read Linux attr')) + clear_errors() def _linux_attr_supported(path): @@ -187,7 +200,7 @@ def _linux_attr_supported(path): return False try: metadata.get_linux_file_attr(path) - except OSError, e: + except OSError as e: if e.errno in (errno.ENOTTY, errno.ENOSYS, errno.EOPNOTSUPP): return False else: @@ -197,73 +210,75 @@ def _linux_attr_supported(path): @wvtest def test_apply_to_path_restricted_access(): - initial_failures = wvfailure_count() if is_superuser() or detect_fakeroot(): return if sys.platform.startswith('cygwin'): return # chmod 000 isn't effective. - tmpdir = tempfile.mkdtemp(dir=bup_tmp, prefix='bup-tmetadata-') - parent = tmpdir + '/foo' - path = parent + '/bar' - os.mkdir(parent) - os.mkdir(path) - clear_errors() - m = metadata.from_path(path, archive_path=path, save_symlinks=True) - WVPASSEQ(m.path, path) - os.chmod(parent, 000) - m.apply_to_path(path) - print >> sys.stderr, 'saved_errors:', helpers.saved_errors - expected_errors = ['utime: '] - if m.linux_attr and _linux_attr_supported(tmpdir): - expected_errors.append('Linux chattr: ') - if metadata.xattr and m.linux_xattr: - expected_errors.append("xattr.set '") - WVPASS(len(helpers.saved_errors) == len(expected_errors)) - for i in xrange(len(expected_errors)): - WVPASS(str(helpers.saved_errors[i]).startswith(expected_errors[i])) - clear_errors() - if wvfailure_count() == initial_failures: - subprocess.call(['chmod', '-R', 'u+rwX', tmpdir]) - subprocess.call(['rm', '-rf', tmpdir]) + with no_lingering_errors(): + with test_tempdir(b'bup-tmetadata-') as tmpdir: + parent = tmpdir + b'/foo' + path = parent + b'/bar' + os.mkdir(parent) + os.mkdir(path) + clear_errors() + if metadata.xattr: + try: + metadata.xattr.set(path, b'user.buptest', b'bup') + except: + print("failed to set test xattr") + # ignore any failures here - maybe FS cannot do it + pass + m = metadata.from_path(path, archive_path=path, save_symlinks=True) + WVPASSEQ(m.path, path) + os.chmod(parent, 0o000) + m.apply_to_path(path) + print('saved_errors:', helpers.saved_errors, file=sys.stderr) + expected_errors = ['utime: '] + if m.linux_attr and _linux_attr_supported(tmpdir): + expected_errors.append('Linux chattr: ') + if metadata.xattr and m.linux_xattr: + expected_errors.append("xattr.set ") + WVPASS(len(helpers.saved_errors) == len(expected_errors)) + for i in range(len(expected_errors)): + WVPASS(str(helpers.saved_errors[i]).startswith(expected_errors[i])) + clear_errors() @wvtest def test_restore_over_existing_target(): - initial_failures = wvfailure_count() - tmpdir = tempfile.mkdtemp(dir=bup_tmp, prefix='bup-tmetadata-') - path = tmpdir + '/foo' - os.mkdir(path) - dir_m = metadata.from_path(path, archive_path=path, save_symlinks=True) - os.rmdir(path) - open(path, 'w').close() - file_m = metadata.from_path(path, archive_path=path, save_symlinks=True) - # Restore dir over file. - WVPASSEQ(dir_m.create_path(path, create_symlinks=True), None) - WVPASS(stat.S_ISDIR(os.stat(path).st_mode)) - # Restore dir over dir. - WVPASSEQ(dir_m.create_path(path, create_symlinks=True), None) - WVPASS(stat.S_ISDIR(os.stat(path).st_mode)) - # Restore file over dir. - WVPASSEQ(file_m.create_path(path, create_symlinks=True), None) - WVPASS(stat.S_ISREG(os.stat(path).st_mode)) - # Restore file over file. - WVPASSEQ(file_m.create_path(path, create_symlinks=True), None) - WVPASS(stat.S_ISREG(os.stat(path).st_mode)) - # Restore file over non-empty dir. - os.remove(path) - os.mkdir(path) - open(path + '/bar', 'w').close() - WVEXCEPT(Exception, file_m.create_path, path, create_symlinks=True) - # Restore dir over non-empty dir. - os.remove(path + '/bar') - os.mkdir(path + '/bar') - WVEXCEPT(Exception, dir_m.create_path, path, create_symlinks=True) - if wvfailure_count() == initial_failures: - subprocess.call(['rm', '-rf', tmpdir]) - - -from bup.metadata import posix1e -if not posix1e: + with no_lingering_errors(): + with test_tempdir(b'bup-tmetadata-') as tmpdir: + path = tmpdir + b'/foo' + os.mkdir(path) + dir_m = metadata.from_path(path, archive_path=path, save_symlinks=True) + os.rmdir(path) + open(path, 'w').close() + file_m = metadata.from_path(path, archive_path=path, save_symlinks=True) + # Restore dir over file. + WVPASSEQ(dir_m.create_path(path, create_symlinks=True), None) + WVPASS(stat.S_ISDIR(os.stat(path).st_mode)) + # Restore dir over dir. + WVPASSEQ(dir_m.create_path(path, create_symlinks=True), None) + WVPASS(stat.S_ISDIR(os.stat(path).st_mode)) + # Restore file over dir. + WVPASSEQ(file_m.create_path(path, create_symlinks=True), None) + WVPASS(stat.S_ISREG(os.stat(path).st_mode)) + # Restore file over file. + WVPASSEQ(file_m.create_path(path, create_symlinks=True), None) + WVPASS(stat.S_ISREG(os.stat(path).st_mode)) + # Restore file over non-empty dir. + os.remove(path) + os.mkdir(path) + open(path + b'/bar', 'w').close() + WVEXCEPT(Exception, file_m.create_path, path, create_symlinks=True) + # Restore dir over non-empty dir. + os.remove(path + b'/bar') + os.mkdir(path + b'/bar') + WVEXCEPT(Exception, dir_m.create_path, path, create_symlinks=True) + + +from bup.metadata import read_acl +if not read_acl: @wvtest def POSIX1E_ACL_SUPPORT_IS_MISSING(): pass @@ -271,6 +286,10 @@ if not posix1e: from bup.metadata import xattr if xattr: + def remove_selinux(attrs): + return list(filter(lambda i: not i in (b'security.selinux', ), + attrs)) + @wvtest def test_handling_of_incorrect_existing_linux_xattrs(): if not is_superuser() or detect_fakeroot(): @@ -279,23 +298,23 @@ if xattr: if not setup_testfs(): WVMSG('unable to load loop module; skipping dependent tests') return - for f in glob.glob('testfs/*'): - ex('rm', '-rf', f) - path = 'testfs/foo' + for f in glob.glob(b'testfs/*'): + ex(b'rm', b'-rf', f) + path = b'testfs/foo' open(path, 'w').close() - xattr.set(path, 'foo', 'bar', namespace=xattr.NS_USER) + xattr.set(path, b'foo', b'bar', namespace=xattr.NS_USER) m = metadata.from_path(path, archive_path=path, save_symlinks=True) - xattr.set(path, 'baz', 'bax', namespace=xattr.NS_USER) + xattr.set(path, b'baz', b'bax', namespace=xattr.NS_USER) m.apply_to_path(path, restore_numeric_ids=False) - WVPASSEQ(xattr.list(path), ['user.foo']) - WVPASSEQ(xattr.get(path, 'user.foo'), 'bar') - xattr.set(path, 'foo', 'baz', namespace=xattr.NS_USER) + WVPASSEQ(remove_selinux(xattr.list(path)), [b'user.foo']) + WVPASSEQ(xattr.get(path, b'user.foo'), b'bar') + xattr.set(path, b'foo', b'baz', namespace=xattr.NS_USER) m.apply_to_path(path, restore_numeric_ids=False) - WVPASSEQ(xattr.list(path), ['user.foo']) - WVPASSEQ(xattr.get(path, 'user.foo'), 'bar') - xattr.remove(path, 'foo', namespace=xattr.NS_USER) + WVPASSEQ(remove_selinux(xattr.list(path)), [b'user.foo']) + WVPASSEQ(xattr.get(path, b'user.foo'), b'bar') + xattr.remove(path, b'foo', namespace=xattr.NS_USER) m.apply_to_path(path, restore_numeric_ids=False) - WVPASSEQ(xattr.list(path), ['user.foo']) - WVPASSEQ(xattr.get(path, 'user.foo'), 'bar') + WVPASSEQ(remove_selinux(xattr.list(path)), [b'user.foo']) + WVPASSEQ(xattr.get(path, b'user.foo'), b'bar') os.chdir(start_dir) cleanup_testfs()