-import os, math
-import bup._helpers as _helpers
-from bup.helpers import *
+from __future__ import absolute_import
+import helpers, math, os, os.path, stat, subprocess
+
from wvtest import *
+from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
+ grafted_path_components, mkdirp, parse_num,
+ path_components, readpipe, stripped_path_components,
+ utc_offset_str)
+from buptest import no_lingering_errors, test_tempdir
+import bup._helpers as _helpers
+
+
+bup_tmp = os.path.realpath('../../../t/tmp')
+mkdirp(bup_tmp)
+
+
@wvtest
def test_parse_num():
- pn = parse_num
- WVPASSEQ(pn('1'), 1)
- WVPASSEQ(pn('0'), 0)
- WVPASSEQ(pn('1.5k'), 1536)
- WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
- WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
- WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
-
+ with no_lingering_errors():
+ pn = parse_num
+ WVPASSEQ(pn('1'), 1)
+ WVPASSEQ(pn('0'), 0)
+ WVPASSEQ(pn('1.5k'), 1536)
+ WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
+ WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
+ WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
@wvtest
def test_detect_fakeroot():
- if os.getenv('FAKEROOTKEY'):
- WVPASS(detect_fakeroot())
- else:
- WVPASS(not detect_fakeroot())
-
-
-def _test_fstime():
- def approx_eq(x, y):
- return math.fabs(x - y) < 1 / 10e8
- def ts_eq_ish(x, y):
- return approx_eq(x[0], y[0]) and approx_eq(x[1], y[1])
- def fst_eq_ish(x, y):
- return approx_eq(x.approx_secs(), y.approx_secs())
- def s_ns_eq_ish(fst, s, ns):
- (fst_s, fst_ns) = fst.secs_nsecs()
- return approx_eq(fst_s, s) and approx_eq(fst_ns, ns)
- from_secs = FSTime.from_secs
- from_ts = FSTime.from_timespec
- WVPASS(from_secs(0) == from_secs(0))
- WVPASS(from_secs(0) < from_secs(1))
- WVPASS(from_secs(-1) < from_secs(1))
- WVPASS(from_secs(1) > from_secs(0))
- WVPASS(from_secs(1) > from_secs(-1))
-
- WVPASS(fst_eq_ish(from_secs(0), from_ts((0, 0))))
- WVPASS(fst_eq_ish(from_secs(1), from_ts((1, 0))))
- WVPASS(fst_eq_ish(from_secs(-1), from_ts((-1, 0))))
- WVPASS(fst_eq_ish(from_secs(-0.5), from_ts((-1, 10**9 / 2))))
- WVPASS(fst_eq_ish(from_secs(-1.5), from_ts((-2, 10**9 / 2))))
- WVPASS(fst_eq_ish(from_secs(1 / 10e8), from_ts((0, 1))))
- WVPASS(fst_eq_ish(from_secs(-1 / 10e8), from_ts((-1, 10**9 - 1))))
-
- WVPASS(ts_eq_ish(from_secs(0).to_timespec(), (0, 0)))
- WVPASS(ts_eq_ish(from_secs(1).to_timespec(), (1, 0)))
- WVPASS(ts_eq_ish(from_secs(-1).to_timespec(), (-1, 0)))
- WVPASS(ts_eq_ish(from_secs(-0.5).to_timespec(), (-1, 10**9 / 2)))
- WVPASS(ts_eq_ish(from_secs(-1.5).to_timespec(), (-2, 10**9 / 2)))
- WVPASS(ts_eq_ish(from_secs(1 / 10e8).to_timespec(), (0, 1)))
- WVPASS(ts_eq_ish(from_secs(-1 / 10e8).to_timespec(), (-1, 10**9 - 1)))
-
- WVPASS(s_ns_eq_ish(from_secs(0), 0, 0))
- WVPASS(s_ns_eq_ish(from_secs(1), 1, 0))
- WVPASS(s_ns_eq_ish(from_secs(-1), -1, 0))
- WVPASS(s_ns_eq_ish(from_secs(-0.5), 0, - 10**9 / 2))
- WVPASS(s_ns_eq_ish(from_secs(-1.5), -1, - 10**9 / 2))
- WVPASS(s_ns_eq_ish(from_secs(-1 / 10e8), 0, -1))
+ with no_lingering_errors():
+ if os.getenv('FAKEROOTKEY'):
+ WVPASS(detect_fakeroot())
+ else:
+ WVPASS(not detect_fakeroot())
+
+@wvtest
+def test_path_components():
+ with no_lingering_errors():
+ WVPASSEQ(path_components('/'), [('', '/')])
+ WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
+ WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
+ WVPASSEQ(path_components('/foo/bar'),
+ [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
+ WVEXCEPT(Exception, path_components, 'foo')
+
+
+@wvtest
+def test_stripped_path_components():
+ with no_lingering_errors():
+ WVPASSEQ(stripped_path_components('/', []), [('', '/')])
+ WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
+ WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
+ WVPASSEQ(stripped_path_components('/foo', ['/']),
+ [('', '/'), ('foo', '/foo')])
+ WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
+ WVPASSEQ(stripped_path_components('/foo', ['/bar']),
+ [('', '/'), ('foo', '/foo')])
+ WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
+ WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
+ [('', '/foo'), ('bar', '/foo/bar')])
+ WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
+ [('', '/foo'), ('bar', '/foo/bar')])
+ WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
+ [('', '/foo/bar/baz')])
+ WVEXCEPT(Exception, stripped_path_components, 'foo', [])
+
+
+@wvtest
+def test_grafted_path_components():
+ with no_lingering_errors():
+ WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
+ [('', '/'), ('foo', '/foo')])
+ WVPASSEQ(grafted_path_components([('/foo/bar', '/')],
+ '/foo/bar/baz/bax'),
+ [('', '/foo/bar'),
+ ('baz', '/foo/bar/baz'),
+ ('bax', '/foo/bar/baz/bax')])
+ WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
+ '/foo/bar/baz/1/2'),
+ [('', None),
+ ('bax', '/foo/bar/baz'),
+ ('1', '/foo/bar/baz/1'),
+ ('2', '/foo/bar/baz/1/2')])
+ WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
+ '/foo/bar'),
+ [('', None),
+ ('bar', None),
+ ('baz', None),
+ ('bax', '/foo'),
+ ('bar', '/foo/bar')])
+ WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
+ '/foo/bar/baz'),
+ [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
+ WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
+ [('', None), ('a', None), ('b', None), ('c', '/'),
+ ('foo', '/foo'), ('bar', '/foo/bar')])
+ WVEXCEPT(Exception, grafted_path_components, 'foo', [])
+
@wvtest
-def test_fstime():
- _test_fstime();
- if _helpers.lstat: # Also test native python timestamp rep since we can.
- orig_lstat = _helpers.lstat
+def test_readpipe():
+ with no_lingering_errors():
+ x = readpipe(['echo', '42'])
+ WVPASSEQ(x, '42\n')
try:
- _helpers.lstat = None
- _test_fstime();
+ readpipe(['bash', '-c', 'exit 42'])
+ except Exception as ex:
+ WVPASSEQ(str(ex),
+ "subprocess 'bash -c exit 42' failed with status 42")
+
+
+@wvtest
+def test_batchpipe():
+ with no_lingering_errors():
+ for chunk in batchpipe(['echo'], []):
+ WVPASS(False)
+ out = ''
+ for chunk in batchpipe(['echo'], ['42']):
+ out += chunk
+ WVPASSEQ(out, '42\n')
+ try:
+ batchpipe(['bash', '-c'], ['exit 42'])
+ except Exception as ex:
+ WVPASSEQ(str(ex),
+ "subprocess 'bash -c exit 42' failed with status 42")
+ args = [str(x) for x in range(6)]
+ # Force batchpipe to break the args into batches of 3. This
+ # approach assumes all args are the same length.
+ arg_max = \
+ helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
+ batches = batchpipe(['echo'], args, arg_max=arg_max)
+ WVPASSEQ(next(batches), '0 1 2\n')
+ WVPASSEQ(next(batches), '3 4 5\n')
+ WVPASSEQ(next(batches, None), None)
+ batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
+ WVPASSEQ(next(batches), '0 1 2\n')
+ WVPASSEQ(next(batches), '3 4\n')
+ WVPASSEQ(next(batches, None), None)
+
+
+@wvtest
+def test_atomically_replaced_file():
+ with no_lingering_errors():
+ with test_tempdir('bup-thelper-') as tmpdir:
+ target_file = os.path.join(tmpdir, 'test-atomic-write')
+
+ with atomically_replaced_file(target_file, mode='w') as f:
+ f.write('asdf')
+ WVPASSEQ(f.mode, 'w')
+ f = open(target_file, 'r')
+ WVPASSEQ(f.read(), 'asdf')
+
+ try:
+ with atomically_replaced_file(target_file, mode='w') as f:
+ f.write('wxyz')
+ raise Exception()
+ except:
+ pass
+ with open(target_file) as f:
+ WVPASSEQ(f.read(), 'asdf')
+
+ with atomically_replaced_file(target_file, mode='wb') as f:
+ f.write(os.urandom(20))
+ WVPASSEQ(f.mode, 'wb')
+
+
+@wvtest
+def test_utc_offset_str():
+ with no_lingering_errors():
+ tz = os.environ.get('TZ')
+ try:
+ os.environ['TZ'] = 'FOO+0:00'
+ WVPASSEQ(utc_offset_str(0), '+0000')
+ os.environ['TZ'] = 'FOO+1:00'
+ WVPASSEQ(utc_offset_str(0), '-0100')
+ os.environ['TZ'] = 'FOO-1:00'
+ WVPASSEQ(utc_offset_str(0), '+0100')
+ os.environ['TZ'] = 'FOO+3:3'
+ WVPASSEQ(utc_offset_str(0), '-0303')
+ os.environ['TZ'] = 'FOO-3:3'
+ WVPASSEQ(utc_offset_str(0), '+0303')
+ # Offset is not an integer number of minutes
+ os.environ['TZ'] = 'FOO+3:3:3'
+ WVPASSEQ(utc_offset_str(1), '-0303')
+ os.environ['TZ'] = 'FOO-3:3:3'
+ WVPASSEQ(utc_offset_str(1), '+0303')
+ WVPASSEQ(utc_offset_str(314159), '+0303')
finally:
- _helpers.lstat = orig_lstat
+ if tz:
+ os.environ['TZ'] = tz
+ else:
+ try:
+ del os.environ['TZ']
+ except KeyError:
+ pass
+
+@wvtest
+def test_valid_save_name():
+ with no_lingering_errors():
+ valid = helpers.valid_save_name
+ WVPASS(valid('x'))
+ WVPASS(valid('x@'))
+ WVFAIL(valid('@'))
+ WVFAIL(valid('/'))
+ WVFAIL(valid('/foo'))
+ WVFAIL(valid('foo/'))
+ WVFAIL(valid('/foo/'))
+ WVFAIL(valid('foo//bar'))
+ WVFAIL(valid('.'))
+ WVFAIL(valid('bar.'))
+ WVFAIL(valid('foo@{'))
+ for x in ' ~^:?*[\\':
+ WVFAIL(valid('foo' + x))
+ for i in range(20):
+ WVFAIL(valid('foo' + chr(i)))
+ WVFAIL(valid('foo' + chr(0x7f)))
+ WVFAIL(valid('foo..bar'))
+ WVFAIL(valid('bar.lock/baz'))
+ WVFAIL(valid('foo/bar.lock/baz'))
+ WVFAIL(valid('.bar/baz'))
+ WVFAIL(valid('foo/.bar/baz'))