]> arthur.barton.de Git - bup.git/blob - lib/bup/t/thelpers.py
d7d302d074f3fb2ac5dc146c22fc6f58765de4c4
[bup.git] / lib / bup / t / thelpers.py
1
2 import helpers, math, os, os.path, stat, subprocess
3
4 from wvtest import *
5
6 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
7                          grafted_path_components, mkdirp, parse_num,
8                          path_components, readpipe, stripped_path_components,
9                          utc_offset_str)
10 from buptest import no_lingering_errors, test_tempdir
11 import bup._helpers as _helpers
12
13
14 bup_tmp = os.path.realpath('../../../t/tmp')
15 mkdirp(bup_tmp)
16
17 @wvtest
18 def test_next():
19     with no_lingering_errors():
20         # Test whatever you end up with for next() after import '*'.
21         WVPASSEQ(next(iter([]), None), None)
22         x = iter([1])
23         WVPASSEQ(next(x, None), 1)
24         WVPASSEQ(next(x, None), None)
25         x = iter([1])
26         WVPASSEQ(next(x, 'x'), 1)
27         WVPASSEQ(next(x, 'x'), 'x')
28         WVEXCEPT(StopIteration, next, iter([]))
29         x = iter([1])
30         WVPASSEQ(next(x), 1)
31         WVEXCEPT(StopIteration, next, x)
32
33
34 @wvtest
35 def test_fallback_next():
36     with no_lingering_errors():
37         global next
38         orig = next
39         next = helpers._fallback_next
40         try:
41             test_next()
42         finally:
43             next = orig
44
45
46 @wvtest
47 def test_parse_num():
48     with no_lingering_errors():
49         pn = parse_num
50         WVPASSEQ(pn('1'), 1)
51         WVPASSEQ(pn('0'), 0)
52         WVPASSEQ(pn('1.5k'), 1536)
53         WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
54         WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
55         WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
56
57 @wvtest
58 def test_detect_fakeroot():
59     with no_lingering_errors():
60         if os.getenv('FAKEROOTKEY'):
61             WVPASS(detect_fakeroot())
62         else:
63             WVPASS(not detect_fakeroot())
64
65 @wvtest
66 def test_path_components():
67     with no_lingering_errors():
68         WVPASSEQ(path_components('/'), [('', '/')])
69         WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
70         WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
71         WVPASSEQ(path_components('/foo/bar'),
72                  [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
73         WVEXCEPT(Exception, path_components, 'foo')
74
75
76 @wvtest
77 def test_stripped_path_components():
78     with no_lingering_errors():
79         WVPASSEQ(stripped_path_components('/', []), [('', '/')])
80         WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
81         WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
82         WVPASSEQ(stripped_path_components('/foo', ['/']),
83                  [('', '/'), ('foo', '/foo')])
84         WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
85         WVPASSEQ(stripped_path_components('/foo', ['/bar']),
86                  [('', '/'), ('foo', '/foo')])
87         WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
88         WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
89                  [('', '/foo'), ('bar', '/foo/bar')])
90         WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
91                  [('', '/foo'), ('bar', '/foo/bar')])
92         WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
93                  [('', '/foo/bar/baz')])
94         WVEXCEPT(Exception, stripped_path_components, 'foo', [])
95
96
97 @wvtest
98 def test_grafted_path_components():
99     with no_lingering_errors():
100         WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
101                  [('', '/'), ('foo', '/foo')])
102         WVPASSEQ(grafted_path_components([('/foo/bar', '/')],
103                                          '/foo/bar/baz/bax'),
104                  [('', '/foo/bar'),
105                   ('baz', '/foo/bar/baz'),
106                   ('bax', '/foo/bar/baz/bax')])
107         WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
108                                          '/foo/bar/baz/1/2'),
109                  [('', None),
110                   ('bax', '/foo/bar/baz'),
111                   ('1', '/foo/bar/baz/1'),
112                   ('2', '/foo/bar/baz/1/2')])
113         WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
114                                          '/foo/bar'),
115                  [('', None),
116                   ('bar', None),
117                   ('baz', None),
118                   ('bax', '/foo'),
119                   ('bar', '/foo/bar')])
120         WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
121                                          '/foo/bar/baz'),
122                  [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
123         WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
124                  [('', None), ('a', None), ('b', None), ('c', '/'),
125                   ('foo', '/foo'), ('bar', '/foo/bar')])
126         WVEXCEPT(Exception, grafted_path_components, 'foo', [])
127
128
129 @wvtest
130 def test_readpipe():
131     with no_lingering_errors():
132         x = readpipe(['echo', '42'])
133         WVPASSEQ(x, '42\n')
134         try:
135             readpipe(['bash', '-c', 'exit 42'])
136         except Exception as ex:
137             WVPASSEQ(str(ex),
138                      "subprocess 'bash -c exit 42' failed with status 42")
139
140
141 @wvtest
142 def test_batchpipe():
143     with no_lingering_errors():
144         for chunk in batchpipe(['echo'], []):
145             WVPASS(False)
146         out = ''
147         for chunk in batchpipe(['echo'], ['42']):
148             out += chunk
149         WVPASSEQ(out, '42\n')
150         try:
151             batchpipe(['bash', '-c'], ['exit 42'])
152         except Exception as ex:
153             WVPASSEQ(str(ex),
154                      "subprocess 'bash -c exit 42' failed with status 42")
155         args = [str(x) for x in range(6)]
156         # Force batchpipe to break the args into batches of 3.  This
157         # approach assumes all args are the same length.
158         arg_max = \
159             helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
160         batches = batchpipe(['echo'], args, arg_max=arg_max)
161         WVPASSEQ(next(batches), '0 1 2\n')
162         WVPASSEQ(next(batches), '3 4 5\n')
163         WVPASSEQ(next(batches, None), None)
164         batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
165         WVPASSEQ(next(batches), '0 1 2\n')
166         WVPASSEQ(next(batches), '3 4\n')
167         WVPASSEQ(next(batches, None), None)
168
169
170 @wvtest
171 def test_atomically_replaced_file():
172     with no_lingering_errors():
173         with test_tempdir('bup-thelper-') as tmpdir:
174             target_file = os.path.join(tmpdir, 'test-atomic-write')
175
176             with atomically_replaced_file(target_file, mode='w') as f:
177                 f.write('asdf')
178                 WVPASSEQ(f.mode, 'w')
179             f = open(target_file, 'r')
180             WVPASSEQ(f.read(), 'asdf')
181
182             try:
183                 with atomically_replaced_file(target_file, mode='w') as f:
184                     f.write('wxyz')
185                     raise Exception()
186             except:
187                 pass
188             with open(target_file) as f:
189                 WVPASSEQ(f.read(), 'asdf')
190
191             with atomically_replaced_file(target_file, mode='wb') as f:
192                 f.write(os.urandom(20))
193                 WVPASSEQ(f.mode, 'wb')
194
195
196 @wvtest
197 def test_utc_offset_str():
198     with no_lingering_errors():
199         tz = os.environ.get('TZ')
200         try:
201             os.environ['TZ'] = 'FOO+0:00'
202             WVPASSEQ(utc_offset_str(0), '+0000')
203             os.environ['TZ'] = 'FOO+1:00'
204             WVPASSEQ(utc_offset_str(0), '-0100')
205             os.environ['TZ'] = 'FOO-1:00'
206             WVPASSEQ(utc_offset_str(0), '+0100')
207             os.environ['TZ'] = 'FOO+3:3'
208             WVPASSEQ(utc_offset_str(0), '-0303')
209             os.environ['TZ'] = 'FOO-3:3'
210             WVPASSEQ(utc_offset_str(0), '+0303')
211             # Offset is not an integer number of minutes
212             os.environ['TZ'] = 'FOO+3:3:3'
213             WVPASSEQ(utc_offset_str(1), '-0303')
214             os.environ['TZ'] = 'FOO-3:3:3'
215             WVPASSEQ(utc_offset_str(1), '+0303')
216             WVPASSEQ(utc_offset_str(314159), '+0303')
217         finally:
218             if tz:
219                 os.environ['TZ'] = tz
220             else:
221                 try:
222                     del os.environ['TZ']
223                 except KeyError:
224                     pass
225
226 @wvtest
227 def test_valid_save_name():
228     with no_lingering_errors():
229         valid = helpers.valid_save_name
230         WVPASS(valid('x'))
231         WVPASS(valid('x@'))
232         WVFAIL(valid('@'))
233         WVFAIL(valid('/'))
234         WVFAIL(valid('/foo'))
235         WVFAIL(valid('foo/'))
236         WVFAIL(valid('/foo/'))
237         WVFAIL(valid('foo//bar'))
238         WVFAIL(valid('.'))
239         WVFAIL(valid('bar.'))
240         WVFAIL(valid('foo@{'))
241         for x in ' ~^:?*[\\':
242             WVFAIL(valid('foo' + x))
243         for i in range(20):
244             WVFAIL(valid('foo' + chr(i)))
245         WVFAIL(valid('foo' + chr(0x7f)))
246         WVFAIL(valid('foo..bar'))
247         WVFAIL(valid('bar.lock/baz'))
248         WVFAIL(valid('foo/bar.lock/baz'))
249         WVFAIL(valid('.bar/baz'))
250         WVFAIL(valid('foo/.bar/baz'))