]> arthur.barton.de Git - bup.git/blobdiff - lib/bup/t/thelpers.py
save: make --strip-path=/ a no-op
[bup.git] / lib / bup / t / thelpers.py
index fd041ffb792e3586259581dd70719ccf7b4c867a..79eb4af444dd82380aa931bcb042984462035aaf 100644 (file)
@@ -1,9 +1,43 @@
+import helpers
 import math
 import os
+import os.path
+import tempfile
+import stat
 import bup._helpers as _helpers
 from bup.helpers import *
 from wvtest import *
 
+bup_tmp = os.path.realpath('../../../t/tmp')
+mkdirp(bup_tmp)
+
+@wvtest
+def test_next():
+    # Test whatever you end up with for next() after import '*'.
+    WVPASSEQ(next(iter([]), None), None)
+    x = iter([1])
+    WVPASSEQ(next(x, None), 1)
+    WVPASSEQ(next(x, None), None)
+    x = iter([1])
+    WVPASSEQ(next(x, 'x'), 1)
+    WVPASSEQ(next(x, 'x'), 'x')
+    WVEXCEPT(StopIteration, next, iter([]))
+    x = iter([1])
+    WVPASSEQ(next(x), 1)
+    WVEXCEPT(StopIteration, next, x)
+
+
+@wvtest
+def test_fallback_next():
+    global next
+    orig = next
+    next = helpers._fallback_next
+    try:
+        test_next()
+    finally:
+        next = orig
+
+
 @wvtest
 def test_parse_num():
     pn = parse_num
@@ -36,6 +70,8 @@ def test_stripped_path_components():
     WVPASSEQ(stripped_path_components('/', []), [('', '/')])
     WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
     WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
+    WVPASSEQ(stripped_path_components('/foo', ['/']),
+             [('', '/'), ('foo', '/foo')])
     WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
     WVPASSEQ(stripped_path_components('/foo', ['/bar']),
              [('', '/'), ('foo', '/foo')])
@@ -87,3 +123,59 @@ def test_readpipe():
         readpipe(['bash', '-c', 'exit 42'])
     except Exception, ex:
         WVPASSEQ(str(ex), "subprocess 'bash -c exit 42' failed with status 42")
+
+
+@wvtest
+def test_batchpipe():
+    for chunk in batchpipe(['echo'], []):
+        WVPASS(False)
+    out = ''
+    for chunk in batchpipe(['echo'], ['42']):
+        out += chunk
+    WVPASSEQ(out, '42\n')
+    try:
+        batchpipe(['bash', '-c'], ['exit 42'])
+    except Exception, ex:
+        WVPASSEQ(str(ex), "subprocess 'bash -c exit 42' failed with status 42")
+    args = [str(x) for x in range(6)]
+    # Force batchpipe to break the args into batches of 3.  This
+    # approach assumes all args are the same length.
+    arg_max = \
+        helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
+    batches = batchpipe(['echo'], args, arg_max=arg_max)
+    WVPASSEQ(next(batches), '0 1 2\n')
+    WVPASSEQ(next(batches), '3 4 5\n')
+    WVPASSEQ(next(batches, None), None)
+    batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
+    WVPASSEQ(next(batches), '0 1 2\n')
+    WVPASSEQ(next(batches), '3 4\n')
+    WVPASSEQ(next(batches, None), None)
+
+
+@wvtest
+def test_atomically_replaced_file():
+    tmpdir = tempfile.mkdtemp(dir=bup_tmp, prefix='bup-thelper-')
+    target_file = os.path.join(tmpdir, 'test-atomic-write')
+    initial_failures = wvfailure_count()
+
+    with atomically_replaced_file(target_file, mode='w') as f:
+        f.write('asdf')
+        WVPASSEQ(f.mode, 'w')
+    f = open(target_file, 'r')
+    WVPASSEQ(f.read(), 'asdf')
+
+    try:
+        with atomically_replaced_file(target_file, mode='w') as f:
+            f.write('wxyz')
+            raise Exception()
+    except:
+        pass
+    with open(target_file) as f:
+        WVPASSEQ(f.read(), 'asdf')
+
+    with atomically_replaced_file(target_file, mode='wb') as f:
+        f.write(os.urandom(20))
+        WVPASSEQ(f.mode, 'wb')
+
+    if wvfailure_count() == initial_failures:
+        subprocess.call(['rm', '-rf', tmpdir])