]> arthur.barton.de Git - bup.git/blob - lib/bup/t/thelpers.py
Don't import * from helpers
[bup.git] / lib / bup / t / thelpers.py
1 import helpers
2 import math
3 import os
4 import os.path
5 import tempfile
6 import stat
7 import subprocess
8
9 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
10                          grafted_path_components, mkdirp, parse_num,
11                          path_components, readpipe, stripped_path_components,
12                          utc_offset_str)
13 import bup._helpers as _helpers
14
15 from wvtest import *
16
17
18 bup_tmp = os.path.realpath('../../../t/tmp')
19 mkdirp(bup_tmp)
20
21 @wvtest
22 def test_next():
23     # Test whatever you end up with for next() after import '*'.
24     WVPASSEQ(next(iter([]), None), None)
25     x = iter([1])
26     WVPASSEQ(next(x, None), 1)
27     WVPASSEQ(next(x, None), None)
28     x = iter([1])
29     WVPASSEQ(next(x, 'x'), 1)
30     WVPASSEQ(next(x, 'x'), 'x')
31     WVEXCEPT(StopIteration, next, iter([]))
32     x = iter([1])
33     WVPASSEQ(next(x), 1)
34     WVEXCEPT(StopIteration, next, x)
35
36
37 @wvtest
38 def test_fallback_next():
39     global next
40     orig = next
41     next = helpers._fallback_next
42     try:
43         test_next()
44     finally:
45         next = orig
46
47
48 @wvtest
49 def test_parse_num():
50     pn = parse_num
51     WVPASSEQ(pn('1'), 1)
52     WVPASSEQ(pn('0'), 0)
53     WVPASSEQ(pn('1.5k'), 1536)
54     WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
55     WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
56     WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
57
58 @wvtest
59 def test_detect_fakeroot():
60     if os.getenv('FAKEROOTKEY'):
61         WVPASS(detect_fakeroot())
62     else:
63         WVPASS(not detect_fakeroot())
64
65 @wvtest
66 def test_path_components():
67     WVPASSEQ(path_components('/'), [('', '/')])
68     WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
69     WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
70     WVPASSEQ(path_components('/foo/bar'),
71              [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
72     WVEXCEPT(Exception, path_components, 'foo')
73
74
75 @wvtest
76 def test_stripped_path_components():
77     WVPASSEQ(stripped_path_components('/', []), [('', '/')])
78     WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
79     WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
80     WVPASSEQ(stripped_path_components('/foo', ['/']),
81              [('', '/'), ('foo', '/foo')])
82     WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
83     WVPASSEQ(stripped_path_components('/foo', ['/bar']),
84              [('', '/'), ('foo', '/foo')])
85     WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
86     WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
87              [('', '/foo'), ('bar', '/foo/bar')])
88     WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
89              [('', '/foo'), ('bar', '/foo/bar')])
90     WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
91              [('', '/foo/bar/baz')])
92     WVEXCEPT(Exception, stripped_path_components, 'foo', [])
93
94
95 @wvtest
96 def test_grafted_path_components():
97     WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
98              [('', '/'), ('foo', '/foo')])
99     WVPASSEQ(grafted_path_components([('/foo/bar', '/')], '/foo/bar/baz/bax'),
100              [('', '/foo/bar'),
101               ('baz', '/foo/bar/baz'),
102               ('bax', '/foo/bar/baz/bax')])
103     WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
104                                      '/foo/bar/baz/1/2'),
105              [('', None),
106               ('bax', '/foo/bar/baz'),
107               ('1', '/foo/bar/baz/1'),
108               ('2', '/foo/bar/baz/1/2')])
109     WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
110                                      '/foo/bar'),
111              [('', None),
112               ('bar', None),
113               ('baz', None),
114               ('bax', '/foo'),
115               ('bar', '/foo/bar')])
116     WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
117                                      '/foo/bar/baz'),
118              [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
119     WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
120              [('', None), ('a', None), ('b', None), ('c', '/'),
121               ('foo', '/foo'), ('bar', '/foo/bar')])
122     WVEXCEPT(Exception, grafted_path_components, 'foo', [])
123
124
125 @wvtest
126 def test_readpipe():
127     x = readpipe(['echo', '42'])
128     WVPASSEQ(x, '42\n')
129     try:
130         readpipe(['bash', '-c', 'exit 42'])
131     except Exception as ex:
132         WVPASSEQ(str(ex), "subprocess 'bash -c exit 42' failed with status 42")
133
134
135 @wvtest
136 def test_batchpipe():
137     for chunk in batchpipe(['echo'], []):
138         WVPASS(False)
139     out = ''
140     for chunk in batchpipe(['echo'], ['42']):
141         out += chunk
142     WVPASSEQ(out, '42\n')
143     try:
144         batchpipe(['bash', '-c'], ['exit 42'])
145     except Exception as ex:
146         WVPASSEQ(str(ex), "subprocess 'bash -c exit 42' failed with status 42")
147     args = [str(x) for x in range(6)]
148     # Force batchpipe to break the args into batches of 3.  This
149     # approach assumes all args are the same length.
150     arg_max = \
151         helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
152     batches = batchpipe(['echo'], args, arg_max=arg_max)
153     WVPASSEQ(next(batches), '0 1 2\n')
154     WVPASSEQ(next(batches), '3 4 5\n')
155     WVPASSEQ(next(batches, None), None)
156     batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
157     WVPASSEQ(next(batches), '0 1 2\n')
158     WVPASSEQ(next(batches), '3 4\n')
159     WVPASSEQ(next(batches, None), None)
160
161
162 @wvtest
163 def test_atomically_replaced_file():
164     tmpdir = tempfile.mkdtemp(dir=bup_tmp, prefix='bup-thelper-')
165     target_file = os.path.join(tmpdir, 'test-atomic-write')
166     initial_failures = wvfailure_count()
167
168     with atomically_replaced_file(target_file, mode='w') as f:
169         f.write('asdf')
170         WVPASSEQ(f.mode, 'w')
171     f = open(target_file, 'r')
172     WVPASSEQ(f.read(), 'asdf')
173
174     try:
175         with atomically_replaced_file(target_file, mode='w') as f:
176             f.write('wxyz')
177             raise Exception()
178     except:
179         pass
180     with open(target_file) as f:
181         WVPASSEQ(f.read(), 'asdf')
182
183     with atomically_replaced_file(target_file, mode='wb') as f:
184         f.write(os.urandom(20))
185         WVPASSEQ(f.mode, 'wb')
186
187     if wvfailure_count() == initial_failures:
188         subprocess.call(['rm', '-rf', tmpdir])
189
190
191 @wvtest
192 def test_utc_offset_str():
193     tz = os.environ.get('TZ')
194     try:
195         os.environ['TZ'] = 'FOO+0:00'
196         WVPASSEQ(utc_offset_str(0), '+0000')
197         os.environ['TZ'] = 'FOO+1:00'
198         WVPASSEQ(utc_offset_str(0), '-0100')
199         os.environ['TZ'] = 'FOO-1:00'
200         WVPASSEQ(utc_offset_str(0), '+0100')
201         os.environ['TZ'] = 'FOO+3:3'
202         WVPASSEQ(utc_offset_str(0), '-0303')
203         os.environ['TZ'] = 'FOO-3:3'
204         WVPASSEQ(utc_offset_str(0), '+0303')
205         # Offset is not an integer number of minutes
206         os.environ['TZ'] = 'FOO+3:3:3'
207         WVPASSEQ(utc_offset_str(1), '-0303')
208         os.environ['TZ'] = 'FOO-3:3:3'
209         WVPASSEQ(utc_offset_str(1), '+0303')
210         WVPASSEQ(utc_offset_str(314159), '+0303')
211     finally:
212         if tz:
213             os.environ['TZ'] = tz
214         else:
215             try:
216                 del os.environ['TZ']
217             except KeyError:
218                 pass