]> arthur.barton.de Git - bup.git/blobdiff - lib/bup/t/thelpers.py
get: adjust for python 3 and test there
[bup.git] / lib / bup / t / thelpers.py
index 9d23eae1facd8c0c7e3234674fa09d2763f289be..17ee6357debac8960c9130a2ce6a5f1856a4ee17 100644 (file)
@@ -1,52 +1,29 @@
 
-import helpers, math, os, os.path, stat, subprocess
+from __future__ import absolute_import
+from time import tzset
+import helpers, math, os, os.path, re, subprocess
 
 from wvtest import *
 
+from bup.compat import bytes_from_byte, bytes_from_uint, environ
 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
                          grafted_path_components, mkdirp, parse_num,
                          path_components, readpipe, stripped_path_components,
+                         shstr,
                          utc_offset_str)
 from buptest import no_lingering_errors, test_tempdir
 import bup._helpers as _helpers
 
 
-bup_tmp = os.path.realpath('../../../t/tmp')
+bup_tmp = os.path.realpath(b'../../../t/tmp')
 mkdirp(bup_tmp)
 
-@wvtest
-def test_next():
-    with no_lingering_errors():
-        # Test whatever you end up with for next() after import '*'.
-        WVPASSEQ(next(iter([]), None), None)
-        x = iter([1])
-        WVPASSEQ(next(x, None), 1)
-        WVPASSEQ(next(x, None), None)
-        x = iter([1])
-        WVPASSEQ(next(x, 'x'), 1)
-        WVPASSEQ(next(x, 'x'), 'x')
-        WVEXCEPT(StopIteration, next, iter([]))
-        x = iter([1])
-        WVPASSEQ(next(x), 1)
-        WVEXCEPT(StopIteration, next, x)
-
-
-@wvtest
-def test_fallback_next():
-    with no_lingering_errors():
-        global next
-        orig = next
-        next = helpers._fallback_next
-        try:
-            test_next()
-        finally:
-            next = orig
-
 
 @wvtest
 def test_parse_num():
     with no_lingering_errors():
         pn = parse_num
+        WVPASSEQ(pn(b'1'), 1)
         WVPASSEQ(pn('1'), 1)
         WVPASSEQ(pn('0'), 0)
         WVPASSEQ(pn('1.5k'), 1536)
@@ -57,7 +34,7 @@ def test_parse_num():
 @wvtest
 def test_detect_fakeroot():
     with no_lingering_errors():
-        if os.getenv('FAKEROOTKEY'):
+        if b'FAKEROOTKEY' in environ:
             WVPASS(detect_fakeroot())
         else:
             WVPASS(not detect_fakeroot())
@@ -65,90 +42,113 @@ def test_detect_fakeroot():
 @wvtest
 def test_path_components():
     with no_lingering_errors():
-        WVPASSEQ(path_components('/'), [('', '/')])
-        WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
-        WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
-        WVPASSEQ(path_components('/foo/bar'),
-                 [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
-        WVEXCEPT(Exception, path_components, 'foo')
+        WVPASSEQ(path_components(b'/'), [(b'', b'/')])
+        WVPASSEQ(path_components(b'/foo'), [(b'', b'/'), (b'foo', b'/foo')])
+        WVPASSEQ(path_components(b'/foo/'), [(b'', b'/'), (b'foo', b'/foo')])
+        WVPASSEQ(path_components(b'/foo/bar'),
+                 [(b'', b'/'), (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
+        WVEXCEPT(Exception, path_components, b'foo')
 
 
 @wvtest
 def test_stripped_path_components():
     with no_lingering_errors():
-        WVPASSEQ(stripped_path_components('/', []), [('', '/')])
-        WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
-        WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
-        WVPASSEQ(stripped_path_components('/foo', ['/']),
-                 [('', '/'), ('foo', '/foo')])
-        WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
-        WVPASSEQ(stripped_path_components('/foo', ['/bar']),
-                 [('', '/'), ('foo', '/foo')])
-        WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
-        WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
-                 [('', '/foo'), ('bar', '/foo/bar')])
-        WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
-                 [('', '/foo'), ('bar', '/foo/bar')])
-        WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
-                 [('', '/foo/bar/baz')])
-        WVEXCEPT(Exception, stripped_path_components, 'foo', [])
+        WVPASSEQ(stripped_path_components(b'/', []), [(b'', b'/')])
+        WVPASSEQ(stripped_path_components(b'/', [b'']), [(b'', b'/')])
+        WVPASSEQ(stripped_path_components(b'/', [b'/']), [(b'', b'/')])
+        WVPASSEQ(stripped_path_components(b'/foo', [b'/']),
+                 [(b'', b'/'), (b'foo', b'/foo')])
+        WVPASSEQ(stripped_path_components(b'/', [b'/foo']), [(b'', b'/')])
+        WVPASSEQ(stripped_path_components(b'/foo', [b'/bar']),
+                 [(b'', b'/'), (b'foo', b'/foo')])
+        WVPASSEQ(stripped_path_components(b'/foo', [b'/foo']), [(b'', b'/foo')])
+        WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/foo']),
+                 [(b'', b'/foo'), (b'bar', b'/foo/bar')])
+        WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/bar', b'/foo', b'/baz']),
+                 [(b'', b'/foo'), (b'bar', b'/foo/bar')])
+        WVPASSEQ(stripped_path_components(b'/foo/bar/baz', [b'/foo/bar/baz']),
+                 [(b'', b'/foo/bar/baz')])
+        WVEXCEPT(Exception, stripped_path_components, b'foo', [])
 
 
 @wvtest
 def test_grafted_path_components():
     with no_lingering_errors():
-        WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
-                 [('', '/'), ('foo', '/foo')])
-        WVPASSEQ(grafted_path_components([('/foo/bar', '/')],
-                                         '/foo/bar/baz/bax'),
-                 [('', '/foo/bar'),
-                  ('baz', '/foo/bar/baz'),
-                  ('bax', '/foo/bar/baz/bax')])
-        WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
-                                         '/foo/bar/baz/1/2'),
-                 [('', None),
-                  ('bax', '/foo/bar/baz'),
-                  ('1', '/foo/bar/baz/1'),
-                  ('2', '/foo/bar/baz/1/2')])
-        WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
-                                         '/foo/bar'),
-                 [('', None),
-                  ('bar', None),
-                  ('baz', None),
-                  ('bax', '/foo'),
-                  ('bar', '/foo/bar')])
-        WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
-                                         '/foo/bar/baz'),
-                 [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
-        WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
-                 [('', None), ('a', None), ('b', None), ('c', '/'),
-                  ('foo', '/foo'), ('bar', '/foo/bar')])
-        WVEXCEPT(Exception, grafted_path_components, 'foo', [])
+        WVPASSEQ(grafted_path_components([(b'/chroot', b'/')], b'/foo'),
+                 [(b'', b'/'), (b'foo', b'/foo')])
+        WVPASSEQ(grafted_path_components([(b'/foo/bar', b'/')],
+                                         b'/foo/bar/baz/bax'),
+                 [(b'', b'/foo/bar'),
+                  (b'baz', b'/foo/bar/baz'),
+                  (b'bax', b'/foo/bar/baz/bax')])
+        WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/bax')],
+                                         b'/foo/bar/baz/1/2'),
+                 [(b'', None),
+                  (b'bax', b'/foo/bar/baz'),
+                  (b'1', b'/foo/bar/baz/1'),
+                  (b'2', b'/foo/bar/baz/1/2')])
+        WVPASSEQ(grafted_path_components([(b'/foo', b'/bar/baz/bax')],
+                                         b'/foo/bar'),
+                 [(b'', None),
+                  (b'bar', None),
+                  (b'baz', None),
+                  (b'bax', b'/foo'),
+                  (b'bar', b'/foo/bar')])
+        WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/a/b/c')],
+                                         b'/foo/bar/baz'),
+                 [(b'', None), (b'a', None), (b'b', None), (b'c', b'/foo/bar/baz')])
+        WVPASSEQ(grafted_path_components([(b'/', b'/a/b/c/')], b'/foo/bar'),
+                 [(b'', None), (b'a', None), (b'b', None), (b'c', b'/'),
+                  (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
+        WVEXCEPT(Exception, grafted_path_components, b'foo', [])
+
+
+@wvtest
+def test_shstr():
+    with no_lingering_errors():
+        # Do nothing for strings and bytes
+        WVPASSEQ(shstr(b''), b'')
+        WVPASSEQ(shstr(b'1'), b'1')
+        WVPASSEQ(shstr(b'1 2'), b'1 2')
+        WVPASSEQ(shstr(b"1'2"), b"1'2")
+        WVPASSEQ(shstr(''), '')
+        WVPASSEQ(shstr('1'), '1')
+        WVPASSEQ(shstr('1 2'), '1 2')
+        WVPASSEQ(shstr("1'2"), "1'2")
+
+        # Escape parts of sequences
+        WVPASSEQ(shstr((b'1 2', b'3')), b"'1 2' 3")
+        WVPASSEQ(shstr((b"1'2", b'3')), b"'1'\"'\"'2' 3")
+        WVPASSEQ(shstr((b"'1", b'3')), b"''\"'\"'1' 3")
+        WVPASSEQ(shstr(('1 2', '3')), "'1 2' 3")
+        WVPASSEQ(shstr(("1'2", '3')), "'1'\"'\"'2' 3")
+        WVPASSEQ(shstr(("'1", '3')), "''\"'\"'1' 3")
 
 
 @wvtest
 def test_readpipe():
     with no_lingering_errors():
-        x = readpipe(['echo', '42'])
-        WVPASSEQ(x, '42\n')
+        x = readpipe([b'echo', b'42'])
+        WVPASSEQ(x, b'42\n')
         try:
-            readpipe(['bash', '-c', 'exit 42'])
+            readpipe([b'bash', b'-c', b'exit 42'])
         except Exception as ex:
-            WVPASSEQ(str(ex),
-                     "subprocess 'bash -c exit 42' failed with status 42")
+            rx = '^subprocess b?"bash -c \'exit 42\'" failed with status 42$'
+            if not re.match(rx, str(ex)):
+                WVPASSEQ(str(ex), rx)
 
 
 @wvtest
 def test_batchpipe():
     with no_lingering_errors():
-        for chunk in batchpipe(['echo'], []):
+        for chunk in batchpipe([b'echo'], []):
             WVPASS(False)
-        out = ''
-        for chunk in batchpipe(['echo'], ['42']):
+        out = b''
+        for chunk in batchpipe([b'echo'], [b'42']):
             out += chunk
-        WVPASSEQ(out, '42\n')
+        WVPASSEQ(out, b'42\n')
         try:
-            batchpipe(['bash', '-c'], ['exit 42'])
+            batchpipe([b'bash', b'-c'], [b'exit 42'])
         except Exception as ex:
             WVPASSEQ(str(ex),
                      "subprocess 'bash -c exit 42' failed with status 42")
@@ -156,69 +156,79 @@ def test_batchpipe():
         # Force batchpipe to break the args into batches of 3.  This
         # approach assumes all args are the same length.
         arg_max = \
-            helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
+            helpers._argmax_base([b'echo']) + helpers._argmax_args_size(args[:3])
         batches = batchpipe(['echo'], args, arg_max=arg_max)
-        WVPASSEQ(next(batches), '0 1 2\n')
-        WVPASSEQ(next(batches), '3 4 5\n')
+        WVPASSEQ(next(batches), b'0 1 2\n')
+        WVPASSEQ(next(batches), b'3 4 5\n')
         WVPASSEQ(next(batches, None), None)
-        batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
-        WVPASSEQ(next(batches), '0 1 2\n')
-        WVPASSEQ(next(batches), '3 4\n')
+        batches = batchpipe([b'echo'], [str(x) for x in range(5)], arg_max=arg_max)
+        WVPASSEQ(next(batches), b'0 1 2\n')
+        WVPASSEQ(next(batches), b'3 4\n')
         WVPASSEQ(next(batches, None), None)
 
 
 @wvtest
 def test_atomically_replaced_file():
-    with no_lingering_errors(), test_tempdir('bup-thelper-') as tmpdir:
-        target_file = os.path.join(tmpdir, 'test-atomic-write')
-
-        with atomically_replaced_file(target_file, mode='w') as f:
-            f.write('asdf')
-            WVPASSEQ(f.mode, 'w')
-        f = open(target_file, 'r')
-        WVPASSEQ(f.read(), 'asdf')
+    with no_lingering_errors():
+        with test_tempdir(b'bup-thelper-') as tmpdir:
+            target_file = os.path.join(tmpdir, b'test-atomic-write')
 
-        try:
             with atomically_replaced_file(target_file, mode='w') as f:
-                f.write('wxyz')
-                raise Exception()
-        except:
-            pass
-        with open(target_file) as f:
+                f.write('asdf')
+                WVPASSEQ(f.mode, 'w')
+            f = open(target_file, 'r')
             WVPASSEQ(f.read(), 'asdf')
 
-        with atomically_replaced_file(target_file, mode='wb') as f:
-            f.write(os.urandom(20))
-            WVPASSEQ(f.mode, 'wb')
+            try:
+                with atomically_replaced_file(target_file, mode='w') as f:
+                    f.write('wxyz')
+                    raise Exception()
+            except:
+                pass
+            with open(target_file) as f:
+                WVPASSEQ(f.read(), 'asdf')
+
+            with atomically_replaced_file(target_file, mode='wb') as f:
+                f.write(os.urandom(20))
+                WVPASSEQ(f.mode, 'wb')
+
+
+def set_tz(tz):
+    if not tz:
+        del environ[b'TZ']
+    else:
+        environ[b'TZ'] = tz
+    tzset()
 
 
 @wvtest
 def test_utc_offset_str():
     with no_lingering_errors():
-        tz = os.environ.get('TZ')
+        tz = environ.get(b'TZ')
+        tzset()
         try:
-            os.environ['TZ'] = 'FOO+0:00'
-            WVPASSEQ(utc_offset_str(0), '+0000')
-            os.environ['TZ'] = 'FOO+1:00'
-            WVPASSEQ(utc_offset_str(0), '-0100')
-            os.environ['TZ'] = 'FOO-1:00'
-            WVPASSEQ(utc_offset_str(0), '+0100')
-            os.environ['TZ'] = 'FOO+3:3'
-            WVPASSEQ(utc_offset_str(0), '-0303')
-            os.environ['TZ'] = 'FOO-3:3'
-            WVPASSEQ(utc_offset_str(0), '+0303')
+            set_tz(b'FOO+0:00')
+            WVPASSEQ(utc_offset_str(0), b'+0000')
+            set_tz(b'FOO+1:00')
+            WVPASSEQ(utc_offset_str(0), b'-0100')
+            set_tz(b'FOO-1:00')
+            WVPASSEQ(utc_offset_str(0), b'+0100')
+            set_tz(b'FOO+3:3')
+            WVPASSEQ(utc_offset_str(0), b'-0303')
+            set_tz(b'FOO-3:3')
+            WVPASSEQ(utc_offset_str(0), b'+0303')
             # Offset is not an integer number of minutes
-            os.environ['TZ'] = 'FOO+3:3:3'
-            WVPASSEQ(utc_offset_str(1), '-0303')
-            os.environ['TZ'] = 'FOO-3:3:3'
-            WVPASSEQ(utc_offset_str(1), '+0303')
-            WVPASSEQ(utc_offset_str(314159), '+0303')
+            set_tz(b'FOO+3:3:3')
+            WVPASSEQ(utc_offset_str(1), b'-0303')
+            set_tz(b'FOO-3:3:3')
+            WVPASSEQ(utc_offset_str(1), b'+0303')
+            WVPASSEQ(utc_offset_str(314159), b'+0303')
         finally:
             if tz:
-                os.environ['TZ'] = tz
+                set_tz(tz)
             else:
                 try:
-                    del os.environ['TZ']
+                    set_tz(None)
                 except KeyError:
                     pass
 
@@ -226,24 +236,24 @@ def test_utc_offset_str():
 def test_valid_save_name():
     with no_lingering_errors():
         valid = helpers.valid_save_name
-        WVPASS(valid('x'))
-        WVPASS(valid('x@'))
-        WVFAIL(valid('@'))
-        WVFAIL(valid('/'))
-        WVFAIL(valid('/foo'))
-        WVFAIL(valid('foo/'))
-        WVFAIL(valid('/foo/'))
-        WVFAIL(valid('foo//bar'))
-        WVFAIL(valid('.'))
-        WVFAIL(valid('bar.'))
-        WVFAIL(valid('foo@{'))
-        for x in ' ~^:?*[\\':
-            WVFAIL(valid('foo' + x))
+        WVPASS(valid(b'x'))
+        WVPASS(valid(b'x@'))
+        WVFAIL(valid(b'@'))
+        WVFAIL(valid(b'/'))
+        WVFAIL(valid(b'/foo'))
+        WVFAIL(valid(b'foo/'))
+        WVFAIL(valid(b'/foo/'))
+        WVFAIL(valid(b'foo//bar'))
+        WVFAIL(valid(b'.'))
+        WVFAIL(valid(b'bar.'))
+        WVFAIL(valid(b'foo@{'))
+        for x in b' ~^:?*[\\':
+            WVFAIL(valid(b'foo' + bytes_from_byte(x)))
         for i in range(20):
-            WVFAIL(valid('foo' + chr(i)))
-        WVFAIL(valid('foo' + chr(0x7f)))
-        WVFAIL(valid('foo..bar'))
-        WVFAIL(valid('bar.lock/baz'))
-        WVFAIL(valid('foo/bar.lock/baz'))
-        WVFAIL(valid('.bar/baz'))
-        WVFAIL(valid('foo/.bar/baz'))
+            WVFAIL(valid(b'foo' + bytes_from_uint(i)))
+        WVFAIL(valid(b'foo' + bytes_from_uint(0x7f)))
+        WVFAIL(valid(b'foo..bar'))
+        WVFAIL(valid(b'bar.lock/baz'))
+        WVFAIL(valid(b'foo/bar.lock/baz'))
+        WVFAIL(valid(b'.bar/baz'))
+        WVFAIL(valid(b'foo/.bar/baz'))