]> arthur.barton.de Git - bup.git/blob - lib/bup/t/thelpers.py
save: make --strip-path=/ a no-op
[bup.git] / lib / bup / t / thelpers.py
1 import helpers
2 import math
3 import os
4 import os.path
5 import tempfile
6 import stat
7 import bup._helpers as _helpers
8 from bup.helpers import *
9 from wvtest import *
10
11 bup_tmp = os.path.realpath('../../../t/tmp')
12 mkdirp(bup_tmp)
13
14 @wvtest
15 def test_next():
16     # Test whatever you end up with for next() after import '*'.
17     WVPASSEQ(next(iter([]), None), None)
18     x = iter([1])
19     WVPASSEQ(next(x, None), 1)
20     WVPASSEQ(next(x, None), None)
21     x = iter([1])
22     WVPASSEQ(next(x, 'x'), 1)
23     WVPASSEQ(next(x, 'x'), 'x')
24     WVEXCEPT(StopIteration, next, iter([]))
25     x = iter([1])
26     WVPASSEQ(next(x), 1)
27     WVEXCEPT(StopIteration, next, x)
28
29
30 @wvtest
31 def test_fallback_next():
32     global next
33     orig = next
34     next = helpers._fallback_next
35     try:
36         test_next()
37     finally:
38         next = orig
39
40
41 @wvtest
42 def test_parse_num():
43     pn = parse_num
44     WVPASSEQ(pn('1'), 1)
45     WVPASSEQ(pn('0'), 0)
46     WVPASSEQ(pn('1.5k'), 1536)
47     WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
48     WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
49     WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
50
51 @wvtest
52 def test_detect_fakeroot():
53     if os.getenv('FAKEROOTKEY'):
54         WVPASS(detect_fakeroot())
55     else:
56         WVPASS(not detect_fakeroot())
57
58 @wvtest
59 def test_path_components():
60     WVPASSEQ(path_components('/'), [('', '/')])
61     WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
62     WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
63     WVPASSEQ(path_components('/foo/bar'),
64              [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
65     WVEXCEPT(Exception, path_components, 'foo')
66
67
68 @wvtest
69 def test_stripped_path_components():
70     WVPASSEQ(stripped_path_components('/', []), [('', '/')])
71     WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
72     WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
73     WVPASSEQ(stripped_path_components('/foo', ['/']),
74              [('', '/'), ('foo', '/foo')])
75     WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
76     WVPASSEQ(stripped_path_components('/foo', ['/bar']),
77              [('', '/'), ('foo', '/foo')])
78     WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
79     WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
80              [('', '/foo'), ('bar', '/foo/bar')])
81     WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
82              [('', '/foo'), ('bar', '/foo/bar')])
83     WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
84              [('', '/foo/bar/baz')])
85     WVEXCEPT(Exception, stripped_path_components, 'foo', [])
86
87
88 @wvtest
89 def test_grafted_path_components():
90     WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
91              [('', '/'), ('foo', '/foo')])
92     WVPASSEQ(grafted_path_components([('/foo/bar', '/')], '/foo/bar/baz/bax'),
93              [('', '/foo/bar'),
94               ('baz', '/foo/bar/baz'),
95               ('bax', '/foo/bar/baz/bax')])
96     WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
97                                      '/foo/bar/baz/1/2'),
98              [('', None),
99               ('bax', '/foo/bar/baz'),
100               ('1', '/foo/bar/baz/1'),
101               ('2', '/foo/bar/baz/1/2')])
102     WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
103                                      '/foo/bar'),
104              [('', None),
105               ('bar', None),
106               ('baz', None),
107               ('bax', '/foo'),
108               ('bar', '/foo/bar')])
109     WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
110                                      '/foo/bar/baz'),
111              [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
112     WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
113              [('', None), ('a', None), ('b', None), ('c', '/'),
114               ('foo', '/foo'), ('bar', '/foo/bar')])
115     WVEXCEPT(Exception, grafted_path_components, 'foo', [])
116
117
118 @wvtest
119 def test_readpipe():
120     x = readpipe(['echo', '42'])
121     WVPASSEQ(x, '42\n')
122     try:
123         readpipe(['bash', '-c', 'exit 42'])
124     except Exception, ex:
125         WVPASSEQ(str(ex), "subprocess 'bash -c exit 42' failed with status 42")
126
127
128 @wvtest
129 def test_batchpipe():
130     for chunk in batchpipe(['echo'], []):
131         WVPASS(False)
132     out = ''
133     for chunk in batchpipe(['echo'], ['42']):
134         out += chunk
135     WVPASSEQ(out, '42\n')
136     try:
137         batchpipe(['bash', '-c'], ['exit 42'])
138     except Exception, ex:
139         WVPASSEQ(str(ex), "subprocess 'bash -c exit 42' failed with status 42")
140     args = [str(x) for x in range(6)]
141     # Force batchpipe to break the args into batches of 3.  This
142     # approach assumes all args are the same length.
143     arg_max = \
144         helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
145     batches = batchpipe(['echo'], args, arg_max=arg_max)
146     WVPASSEQ(next(batches), '0 1 2\n')
147     WVPASSEQ(next(batches), '3 4 5\n')
148     WVPASSEQ(next(batches, None), None)
149     batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
150     WVPASSEQ(next(batches), '0 1 2\n')
151     WVPASSEQ(next(batches), '3 4\n')
152     WVPASSEQ(next(batches, None), None)
153
154
155 @wvtest
156 def test_atomically_replaced_file():
157     tmpdir = tempfile.mkdtemp(dir=bup_tmp, prefix='bup-thelper-')
158     target_file = os.path.join(tmpdir, 'test-atomic-write')
159     initial_failures = wvfailure_count()
160
161     with atomically_replaced_file(target_file, mode='w') as f:
162         f.write('asdf')
163         WVPASSEQ(f.mode, 'w')
164     f = open(target_file, 'r')
165     WVPASSEQ(f.read(), 'asdf')
166
167     try:
168         with atomically_replaced_file(target_file, mode='w') as f:
169             f.write('wxyz')
170             raise Exception()
171     except:
172         pass
173     with open(target_file) as f:
174         WVPASSEQ(f.read(), 'asdf')
175
176     with atomically_replaced_file(target_file, mode='wb') as f:
177         f.write(os.urandom(20))
178         WVPASSEQ(f.mode, 'wb')
179
180     if wvfailure_count() == initial_failures:
181         subprocess.call(['rm', '-rf', tmpdir])