+import helpers
import math
import os
import bup._helpers as _helpers
from bup.helpers import *
from wvtest import *
+
+@wvtest
+def test_next():
+ # Test whatever you end up with for next() after import '*'.
+ WVPASSEQ(next(iter([]), None), None)
+ x = iter([1])
+ WVPASSEQ(next(x, None), 1)
+ WVPASSEQ(next(x, None), None)
+ x = iter([1])
+ WVPASSEQ(next(x, 'x'), 1)
+ WVPASSEQ(next(x, 'x'), 'x')
+ WVEXCEPT(StopIteration, next, iter([]))
+ x = iter([1])
+ WVPASSEQ(next(x), 1)
+ WVEXCEPT(StopIteration, next, x)
+
+
+@wvtest
+def test_fallback_next():
+ global next
+ orig = next
+ next = helpers._fallback_next
+ try:
+ test_next()
+ finally:
+ next = orig
+
+
@wvtest
def test_parse_num():
pn = parse_num
WVPASS(not detect_fakeroot())
@wvtest
-def test_strip_path():
- prefix = "/var/backup/daily.0/localhost"
- empty_prefix = ""
- non_matching_prefix = "/home"
- path = "/var/backup/daily.0/localhost/etc/"
+def test_path_components():
+ WVPASSEQ(path_components('/'), [('', '/')])
+ WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
+ WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
+ WVPASSEQ(path_components('/foo/bar'),
+ [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
+ WVEXCEPT(Exception, path_components, 'foo')
- WVPASSEQ(strip_path(prefix, path), '/etc')
- WVPASSEQ(strip_path(empty_prefix, path), path)
- WVPASSEQ(strip_path(non_matching_prefix, path), path)
- WVEXCEPT(Exception, strip_path, None, path)
@wvtest
-def test_strip_base_path():
- path = "/var/backup/daily.0/localhost/etc/"
- base_paths = ["/var", "/var/backup", "/var/backup/daily.0/localhost"]
- WVPASSEQ(strip_base_path(path, base_paths), '/etc')
+def test_stripped_path_components():
+ WVPASSEQ(stripped_path_components('/', []), [('', '/')])
+ WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
+ WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
+ WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
+ WVPASSEQ(stripped_path_components('/foo', ['/bar']),
+ [('', '/'), ('foo', '/foo')])
+ WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
+ WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
+ [('', '/foo'), ('bar', '/foo/bar')])
+ WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
+ [('', '/foo'), ('bar', '/foo/bar')])
+ WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
+ [('', '/foo/bar/baz')])
+ WVEXCEPT(Exception, stripped_path_components, 'foo', [])
-@wvtest
-def test_strip_symlinked_base_path():
- tmpdir = os.path.join(os.getcwd(),"test_strip_symlinked_base_path.tmp")
- symlink_src = os.path.join(tmpdir, "private", "var")
- symlink_dst = os.path.join(tmpdir, "var")
- path = os.path.join(symlink_dst, "a")
- os.mkdir(tmpdir)
- os.mkdir(os.path.join(tmpdir, "private"))
- os.mkdir(symlink_src)
- os.symlink(symlink_src, symlink_dst)
+@wvtest
+def test_grafted_path_components():
+ WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
+ [('', '/'), ('foo', '/foo')])
+ WVPASSEQ(grafted_path_components([('/foo/bar', '/')], '/foo/bar/baz/bax'),
+ [('', '/foo/bar'),
+ ('baz', '/foo/bar/baz'),
+ ('bax', '/foo/bar/baz/bax')])
+ WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
+ '/foo/bar/baz/1/2'),
+ [('', None),
+ ('bax', '/foo/bar/baz'),
+ ('1', '/foo/bar/baz/1'),
+ ('2', '/foo/bar/baz/1/2')])
+ WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
+ '/foo/bar'),
+ [('', None),
+ ('bar', None),
+ ('baz', None),
+ ('bax', '/foo'),
+ ('bar', '/foo/bar')])
+ WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
+ '/foo/bar/baz'),
+ [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
+ WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
+ [('', None), ('a', None), ('b', None), ('c', '/'),
+ ('foo', '/foo'), ('bar', '/foo/bar')])
+ WVEXCEPT(Exception, grafted_path_components, 'foo', [])
- result = strip_base_path(path, [symlink_dst])
- os.remove(symlink_dst)
- os.rmdir(symlink_src)
- os.rmdir(os.path.join(tmpdir, "private"))
- os.rmdir(tmpdir)
+@wvtest
+def test_readpipe():
+ x = readpipe(['echo', '42'])
+ WVPASSEQ(x, '42\n')
+ try:
+ readpipe(['bash', '-c', 'exit 42'])
+ except Exception, ex:
+ WVPASSEQ(str(ex), "subprocess 'bash -c exit 42' failed with status 42")
- WVPASSEQ(result, "/a")
@wvtest
-def test_graft_path():
- middle_matching_old_path = "/user"
- non_matching_old_path = "/usr"
- matching_old_path = "/home"
- matching_full_path = "/home/user"
- new_path = "/opt"
-
- all_graft_points = [(middle_matching_old_path, new_path),
- (non_matching_old_path, new_path),
- (matching_old_path, new_path)]
-
- path = "/home/user/"
-
- WVPASSEQ(graft_path([(middle_matching_old_path, new_path)], path),
- "/home/user")
- WVPASSEQ(graft_path([(non_matching_old_path, new_path)], path),
- "/home/user")
- WVPASSEQ(graft_path([(matching_old_path, new_path)], path), "/opt/user")
- WVPASSEQ(graft_path(all_graft_points, path), "/opt/user")
- WVPASSEQ(graft_path([(matching_full_path, new_path)], path),
- "/opt")
+def test_batchpipe():
+ for chunk in batchpipe(['echo'], []):
+ WVPASS(False)
+ out = ''
+ for chunk in batchpipe(['echo'], ['42']):
+ out += chunk
+ WVPASSEQ(out, '42\n')
+ try:
+ batchpipe(['bash', '-c'], ['exit 42'])
+ except Exception, ex:
+ WVPASSEQ(str(ex), "subprocess 'bash -c exit 42' failed with status 42")
+ args = [str(x) for x in range(6)]
+ # Force batchpipe to break the args into batches of 3. This
+ # approach assumes all args are the same length.
+ arg_max = \
+ helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
+ batches = batchpipe(['echo'], args, arg_max=arg_max)
+ WVPASSEQ(next(batches), '0 1 2\n')
+ WVPASSEQ(next(batches), '3 4 5\n')
+ WVPASSEQ(next(batches, None), None)
+ batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
+ WVPASSEQ(next(batches), '0 1 2\n')
+ WVPASSEQ(next(batches), '3 4\n')
+ WVPASSEQ(next(batches, None), None)