X-Git-Url: https://arthur.barton.de/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=lib%2Fbup%2Ft%2Fthelpers.py;h=e0ef21fd5098c25963b55ef7eb1858b99ac6d084;hb=c40b3dd5fd74e72024fbaad3daf5a958aefa1c54;hp=8fec0f55df91c468629785228fd4564918f214c5;hpb=15d2df2ac2395cba98ca4f4cc79a0d7bea516de0;p=bup.git diff --git a/lib/bup/t/thelpers.py b/lib/bup/t/thelpers.py index 8fec0f5..e0ef21f 100644 --- a/lib/bup/t/thelpers.py +++ b/lib/bup/t/thelpers.py @@ -1,76 +1,223 @@ -import os, math -import bup._helpers as _helpers -from bup.helpers import * +from __future__ import absolute_import +import helpers, math, os, os.path, stat, subprocess + from wvtest import * +from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot, + grafted_path_components, mkdirp, parse_num, + path_components, readpipe, stripped_path_components, + utc_offset_str) +from buptest import no_lingering_errors, test_tempdir +import bup._helpers as _helpers + + +bup_tmp = os.path.realpath('../../../t/tmp') +mkdirp(bup_tmp) + + @wvtest def test_parse_num(): - pn = parse_num - WVPASSEQ(pn('1'), 1) - WVPASSEQ(pn('0'), 0) - WVPASSEQ(pn('1.5k'), 1536) - WVPASSEQ(pn('2 gb'), 2*1024*1024*1024) - WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024) - WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024)) - + with no_lingering_errors(): + pn = parse_num + WVPASSEQ(pn('1'), 1) + WVPASSEQ(pn('0'), 0) + WVPASSEQ(pn('1.5k'), 1536) + WVPASSEQ(pn('2 gb'), 2*1024*1024*1024) + WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024) + WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024)) @wvtest def test_detect_fakeroot(): - if os.getenv('FAKEROOTKEY'): - WVPASS(detect_fakeroot()) - else: - WVPASS(not detect_fakeroot()) - - -def _test_fstime(): - def approx_eq(x, y): - return math.fabs(x - y) < 1 / 10e8 - def ts_eq_ish(x, y): - return approx_eq(x[0], y[0]) and approx_eq(x[1], y[1]) - def fst_eq_ish(x, y): - return approx_eq(x.approx_secs(), y.approx_secs()) - def s_ns_eq_ish(fst, s, ns): - (fst_s, fst_ns) = fst.secs_nsecs() - return approx_eq(fst_s, s) and approx_eq(fst_ns, ns) - from_secs = FSTime.from_secs - from_ts = FSTime.from_timespec - WVPASS(from_secs(0) == from_secs(0)) - WVPASS(from_secs(0) < from_secs(1)) - WVPASS(from_secs(-1) < from_secs(1)) - WVPASS(from_secs(1) > from_secs(0)) - WVPASS(from_secs(1) > from_secs(-1)) - - WVPASS(fst_eq_ish(from_secs(0), from_ts((0, 0)))) - WVPASS(fst_eq_ish(from_secs(1), from_ts((1, 0)))) - WVPASS(fst_eq_ish(from_secs(-1), from_ts((-1, 0)))) - WVPASS(fst_eq_ish(from_secs(-0.5), from_ts((-1, 10**9 / 2)))) - WVPASS(fst_eq_ish(from_secs(-1.5), from_ts((-2, 10**9 / 2)))) - WVPASS(fst_eq_ish(from_secs(1 / 10e8), from_ts((0, 1)))) - WVPASS(fst_eq_ish(from_secs(-1 / 10e8), from_ts((-1, 10**9 - 1)))) - - WVPASS(ts_eq_ish(from_secs(0).to_timespec(), (0, 0))) - WVPASS(ts_eq_ish(from_secs(1).to_timespec(), (1, 0))) - WVPASS(ts_eq_ish(from_secs(-1).to_timespec(), (-1, 0))) - WVPASS(ts_eq_ish(from_secs(-0.5).to_timespec(), (-1, 10**9 / 2))) - WVPASS(ts_eq_ish(from_secs(-1.5).to_timespec(), (-2, 10**9 / 2))) - WVPASS(ts_eq_ish(from_secs(1 / 10e8).to_timespec(), (0, 1))) - WVPASS(ts_eq_ish(from_secs(-1 / 10e8).to_timespec(), (-1, 10**9 - 1))) - - WVPASS(s_ns_eq_ish(from_secs(0), 0, 0)) - WVPASS(s_ns_eq_ish(from_secs(1), 1, 0)) - WVPASS(s_ns_eq_ish(from_secs(-1), -1, 0)) - WVPASS(s_ns_eq_ish(from_secs(-0.5), 0, - 10**9 / 2)) - WVPASS(s_ns_eq_ish(from_secs(-1.5), -1, - 10**9 / 2)) - WVPASS(s_ns_eq_ish(from_secs(-1 / 10e8), 0, -1)) + with no_lingering_errors(): + if os.getenv('FAKEROOTKEY'): + WVPASS(detect_fakeroot()) + else: + WVPASS(not detect_fakeroot()) + +@wvtest +def test_path_components(): + with no_lingering_errors(): + WVPASSEQ(path_components('/'), [('', '/')]) + WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')]) + WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')]) + WVPASSEQ(path_components('/foo/bar'), + [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')]) + WVEXCEPT(Exception, path_components, 'foo') + + +@wvtest +def test_stripped_path_components(): + with no_lingering_errors(): + WVPASSEQ(stripped_path_components('/', []), [('', '/')]) + WVPASSEQ(stripped_path_components('/', ['']), [('', '/')]) + WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')]) + WVPASSEQ(stripped_path_components('/foo', ['/']), + [('', '/'), ('foo', '/foo')]) + WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')]) + WVPASSEQ(stripped_path_components('/foo', ['/bar']), + [('', '/'), ('foo', '/foo')]) + WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')]) + WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']), + [('', '/foo'), ('bar', '/foo/bar')]) + WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']), + [('', '/foo'), ('bar', '/foo/bar')]) + WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']), + [('', '/foo/bar/baz')]) + WVEXCEPT(Exception, stripped_path_components, 'foo', []) + + +@wvtest +def test_grafted_path_components(): + with no_lingering_errors(): + WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'), + [('', '/'), ('foo', '/foo')]) + WVPASSEQ(grafted_path_components([('/foo/bar', '/')], + '/foo/bar/baz/bax'), + [('', '/foo/bar'), + ('baz', '/foo/bar/baz'), + ('bax', '/foo/bar/baz/bax')]) + WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')], + '/foo/bar/baz/1/2'), + [('', None), + ('bax', '/foo/bar/baz'), + ('1', '/foo/bar/baz/1'), + ('2', '/foo/bar/baz/1/2')]) + WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')], + '/foo/bar'), + [('', None), + ('bar', None), + ('baz', None), + ('bax', '/foo'), + ('bar', '/foo/bar')]) + WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')], + '/foo/bar/baz'), + [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')]) + WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'), + [('', None), ('a', None), ('b', None), ('c', '/'), + ('foo', '/foo'), ('bar', '/foo/bar')]) + WVEXCEPT(Exception, grafted_path_components, 'foo', []) + @wvtest -def test_fstime(): - _test_fstime(); - if _helpers.lstat: # Also test native python timestamp rep since we can. - orig_lstat = _helpers.lstat +def test_readpipe(): + with no_lingering_errors(): + x = readpipe(['echo', '42']) + WVPASSEQ(x, '42\n') try: - _helpers.lstat = None - _test_fstime(); + readpipe(['bash', '-c', 'exit 42']) + except Exception as ex: + WVPASSEQ(str(ex), + "subprocess 'bash -c exit 42' failed with status 42") + + +@wvtest +def test_batchpipe(): + with no_lingering_errors(): + for chunk in batchpipe(['echo'], []): + WVPASS(False) + out = '' + for chunk in batchpipe(['echo'], ['42']): + out += chunk + WVPASSEQ(out, '42\n') + try: + batchpipe(['bash', '-c'], ['exit 42']) + except Exception as ex: + WVPASSEQ(str(ex), + "subprocess 'bash -c exit 42' failed with status 42") + args = [str(x) for x in range(6)] + # Force batchpipe to break the args into batches of 3. This + # approach assumes all args are the same length. + arg_max = \ + helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3]) + batches = batchpipe(['echo'], args, arg_max=arg_max) + WVPASSEQ(next(batches), '0 1 2\n') + WVPASSEQ(next(batches), '3 4 5\n') + WVPASSEQ(next(batches, None), None) + batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max) + WVPASSEQ(next(batches), '0 1 2\n') + WVPASSEQ(next(batches), '3 4\n') + WVPASSEQ(next(batches, None), None) + + +@wvtest +def test_atomically_replaced_file(): + with no_lingering_errors(): + with test_tempdir('bup-thelper-') as tmpdir: + target_file = os.path.join(tmpdir, 'test-atomic-write') + + with atomically_replaced_file(target_file, mode='w') as f: + f.write('asdf') + WVPASSEQ(f.mode, 'w') + f = open(target_file, 'r') + WVPASSEQ(f.read(), 'asdf') + + try: + with atomically_replaced_file(target_file, mode='w') as f: + f.write('wxyz') + raise Exception() + except: + pass + with open(target_file) as f: + WVPASSEQ(f.read(), 'asdf') + + with atomically_replaced_file(target_file, mode='wb') as f: + f.write(os.urandom(20)) + WVPASSEQ(f.mode, 'wb') + + +@wvtest +def test_utc_offset_str(): + with no_lingering_errors(): + tz = os.environ.get('TZ') + try: + os.environ['TZ'] = 'FOO+0:00' + WVPASSEQ(utc_offset_str(0), '+0000') + os.environ['TZ'] = 'FOO+1:00' + WVPASSEQ(utc_offset_str(0), '-0100') + os.environ['TZ'] = 'FOO-1:00' + WVPASSEQ(utc_offset_str(0), '+0100') + os.environ['TZ'] = 'FOO+3:3' + WVPASSEQ(utc_offset_str(0), '-0303') + os.environ['TZ'] = 'FOO-3:3' + WVPASSEQ(utc_offset_str(0), '+0303') + # Offset is not an integer number of minutes + os.environ['TZ'] = 'FOO+3:3:3' + WVPASSEQ(utc_offset_str(1), '-0303') + os.environ['TZ'] = 'FOO-3:3:3' + WVPASSEQ(utc_offset_str(1), '+0303') + WVPASSEQ(utc_offset_str(314159), '+0303') finally: - _helpers.lstat = orig_lstat + if tz: + os.environ['TZ'] = tz + else: + try: + del os.environ['TZ'] + except KeyError: + pass + +@wvtest +def test_valid_save_name(): + with no_lingering_errors(): + valid = helpers.valid_save_name + WVPASS(valid('x')) + WVPASS(valid('x@')) + WVFAIL(valid('@')) + WVFAIL(valid('/')) + WVFAIL(valid('/foo')) + WVFAIL(valid('foo/')) + WVFAIL(valid('/foo/')) + WVFAIL(valid('foo//bar')) + WVFAIL(valid('.')) + WVFAIL(valid('bar.')) + WVFAIL(valid('foo@{')) + for x in ' ~^:?*[\\': + WVFAIL(valid('foo' + x)) + for i in range(20): + WVFAIL(valid('foo' + chr(i))) + WVFAIL(valid('foo' + chr(0x7f))) + WVFAIL(valid('foo..bar')) + WVFAIL(valid('bar.lock/baz')) + WVFAIL(valid('foo/bar.lock/baz')) + WVFAIL(valid('.bar/baz')) + WVFAIL(valid('foo/.bar/baz'))