]> arthur.barton.de Git - bup.git/blobdiff - lib/bup/t/thelpers.py
Read ARG_MAX directly via os.sysconf('SC_ARG_MAX').
[bup.git] / lib / bup / t / thelpers.py
index 8fec0f55df91c468629785228fd4564918f214c5..b5427076dddc9e93ffd882af8a4ef9aa744408e0 100644 (file)
@@ -1,9 +1,38 @@
-import os, math
+import helpers
+import math
+import os
 import bup._helpers as _helpers
-
 from bup.helpers import *
 from wvtest import *
 
+
+@wvtest
+def test_next():
+    # Test whatever you end up with for next() after import '*'.
+    WVPASSEQ(next(iter([]), None), None)
+    x = iter([1])
+    WVPASSEQ(next(x, None), 1)
+    WVPASSEQ(next(x, None), None)
+    x = iter([1])
+    WVPASSEQ(next(x, 'x'), 1)
+    WVPASSEQ(next(x, 'x'), 'x')
+    WVEXCEPT(StopIteration, next, iter([]))
+    x = iter([1])
+    WVPASSEQ(next(x), 1)
+    WVEXCEPT(StopIteration, next, x)
+
+
+@wvtest
+def test_fallback_next():
+    global next
+    orig = next
+    next = helpers._fallback_next
+    try:
+        test_next()
+    finally:
+        next = orig
+
+
 @wvtest
 def test_parse_num():
     pn = parse_num
@@ -14,7 +43,6 @@ def test_parse_num():
     WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
     WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
 
-
 @wvtest
 def test_detect_fakeroot():
     if os.getenv('FAKEROOTKEY'):
@@ -22,55 +50,96 @@ def test_detect_fakeroot():
     else:
         WVPASS(not detect_fakeroot())
 
+@wvtest
+def test_path_components():
+    WVPASSEQ(path_components('/'), [('', '/')])
+    WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
+    WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
+    WVPASSEQ(path_components('/foo/bar'),
+             [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
+    WVEXCEPT(Exception, path_components, 'foo')
+
+
+@wvtest
+def test_stripped_path_components():
+    WVPASSEQ(stripped_path_components('/', []), [('', '/')])
+    WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
+    WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
+    WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
+    WVPASSEQ(stripped_path_components('/foo', ['/bar']),
+             [('', '/'), ('foo', '/foo')])
+    WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
+    WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
+             [('', '/foo'), ('bar', '/foo/bar')])
+    WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
+             [('', '/foo'), ('bar', '/foo/bar')])
+    WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
+             [('', '/foo/bar/baz')])
+    WVEXCEPT(Exception, stripped_path_components, 'foo', [])
+
+
+@wvtest
+def test_grafted_path_components():
+    WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
+             [('', '/'), ('foo', '/foo')])
+    WVPASSEQ(grafted_path_components([('/foo/bar', '/')], '/foo/bar/baz/bax'),
+             [('', '/foo/bar'),
+              ('baz', '/foo/bar/baz'),
+              ('bax', '/foo/bar/baz/bax')])
+    WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
+                                     '/foo/bar/baz/1/2'),
+             [('', None),
+              ('bax', '/foo/bar/baz'),
+              ('1', '/foo/bar/baz/1'),
+              ('2', '/foo/bar/baz/1/2')])
+    WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
+                                     '/foo/bar'),
+             [('', None),
+              ('bar', None),
+              ('baz', None),
+              ('bax', '/foo'),
+              ('bar', '/foo/bar')])
+    WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
+                                     '/foo/bar/baz'),
+             [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
+    WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
+             [('', None), ('a', None), ('b', None), ('c', '/'),
+              ('foo', '/foo'), ('bar', '/foo/bar')])
+    WVEXCEPT(Exception, grafted_path_components, 'foo', [])
+
+
+@wvtest
+def test_readpipe():
+    x = readpipe(['echo', '42'])
+    WVPASSEQ(x, '42\n')
+    try:
+        readpipe(['bash', '-c', 'exit 42'])
+    except Exception, ex:
+        WVPASSEQ(str(ex), "subprocess 'bash -c exit 42' failed with status 42")
 
-def _test_fstime():
-    def approx_eq(x, y):
-        return math.fabs(x - y) < 1 / 10e8
-    def ts_eq_ish(x, y):
-        return approx_eq(x[0], y[0]) and approx_eq(x[1], y[1])
-    def fst_eq_ish(x, y):
-        return approx_eq(x.approx_secs(), y.approx_secs())
-    def s_ns_eq_ish(fst, s, ns):
-        (fst_s, fst_ns) = fst.secs_nsecs()
-        return approx_eq(fst_s, s) and approx_eq(fst_ns, ns)
-    from_secs = FSTime.from_secs
-    from_ts = FSTime.from_timespec
-    WVPASS(from_secs(0) == from_secs(0))
-    WVPASS(from_secs(0) < from_secs(1))
-    WVPASS(from_secs(-1) < from_secs(1))
-    WVPASS(from_secs(1) > from_secs(0))
-    WVPASS(from_secs(1) > from_secs(-1))
-
-    WVPASS(fst_eq_ish(from_secs(0), from_ts((0, 0))))
-    WVPASS(fst_eq_ish(from_secs(1), from_ts((1, 0))))
-    WVPASS(fst_eq_ish(from_secs(-1), from_ts((-1, 0))))
-    WVPASS(fst_eq_ish(from_secs(-0.5), from_ts((-1, 10**9 / 2))))
-    WVPASS(fst_eq_ish(from_secs(-1.5), from_ts((-2, 10**9 / 2))))
-    WVPASS(fst_eq_ish(from_secs(1 / 10e8), from_ts((0, 1))))
-    WVPASS(fst_eq_ish(from_secs(-1 / 10e8), from_ts((-1, 10**9 - 1))))
-
-    WVPASS(ts_eq_ish(from_secs(0).to_timespec(), (0, 0)))
-    WVPASS(ts_eq_ish(from_secs(1).to_timespec(), (1, 0)))
-    WVPASS(ts_eq_ish(from_secs(-1).to_timespec(), (-1, 0)))
-    WVPASS(ts_eq_ish(from_secs(-0.5).to_timespec(), (-1, 10**9 / 2)))
-    WVPASS(ts_eq_ish(from_secs(-1.5).to_timespec(), (-2, 10**9 / 2)))
-    WVPASS(ts_eq_ish(from_secs(1 / 10e8).to_timespec(), (0, 1)))
-    WVPASS(ts_eq_ish(from_secs(-1 / 10e8).to_timespec(), (-1, 10**9 - 1)))
-
-    WVPASS(s_ns_eq_ish(from_secs(0), 0, 0))
-    WVPASS(s_ns_eq_ish(from_secs(1), 1, 0))
-    WVPASS(s_ns_eq_ish(from_secs(-1), -1, 0))
-    WVPASS(s_ns_eq_ish(from_secs(-0.5), 0, - 10**9 / 2))
-    WVPASS(s_ns_eq_ish(from_secs(-1.5), -1, - 10**9 / 2))
-    WVPASS(s_ns_eq_ish(from_secs(-1 / 10e8), 0, -1))
 
 @wvtest
-def test_fstime():
-    _test_fstime();
-    if _helpers.lstat: # Also test native python timestamp rep since we can.
-        orig_lstat = _helpers.lstat
-        try:
-            _helpers.lstat = None
-            _test_fstime();
-        finally:
-            _helpers.lstat = orig_lstat
+def test_batchpipe():
+    for chunk in batchpipe(['echo'], []):
+        WVPASS(False)
+    out = ''
+    for chunk in batchpipe(['echo'], ['42']):
+        out += chunk
+    WVPASSEQ(out, '42\n')
+    try:
+        batchpipe(['bash', '-c'], ['exit 42'])
+    except Exception, ex:
+        WVPASSEQ(str(ex), "subprocess 'bash -c exit 42' failed with status 42")
+    args = [str(x) for x in range(6)]
+    # Force batchpipe to break the args into batches of 3.  This
+    # approach assumes all args are the same length.
+    arg_max = \
+        helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
+    batches = batchpipe(['echo'], args, arg_max=arg_max)
+    WVPASSEQ(next(batches), '0 1 2\n')
+    WVPASSEQ(next(batches), '3 4 5\n')
+    WVPASSEQ(next(batches, None), None)
+    batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
+    WVPASSEQ(next(batches), '0 1 2\n')
+    WVPASSEQ(next(batches), '3 4\n')
+    WVPASSEQ(next(batches, None), None)