]> arthur.barton.de Git - bup.git/blob - lib/bup/t/thelpers.py
428970352716e593dcaf7cb20d4627a3963b2540
[bup.git] / lib / bup / t / thelpers.py
1
2 from __future__ import absolute_import
3 from time import tzset
4 import helpers, math, os, os.path, re, subprocess
5
6 from wvtest import *
7
8 from bup.compat import bytes_from_byte, bytes_from_uint, environ
9 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
10                          grafted_path_components, mkdirp, parse_num,
11                          path_components, readpipe, stripped_path_components,
12                          utc_offset_str)
13 from buptest import no_lingering_errors, test_tempdir
14 import bup._helpers as _helpers
15
16
17 bup_tmp = os.path.realpath(b'../../../t/tmp')
18 mkdirp(bup_tmp)
19
20
21 @wvtest
22 def test_parse_num():
23     with no_lingering_errors():
24         pn = parse_num
25         WVPASSEQ(pn(b'1'), 1)
26         WVPASSEQ(pn('1'), 1)
27         WVPASSEQ(pn('0'), 0)
28         WVPASSEQ(pn('1.5k'), 1536)
29         WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
30         WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
31         WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
32
33 @wvtest
34 def test_detect_fakeroot():
35     with no_lingering_errors():
36         if b'FAKEROOTKEY' in environ:
37             WVPASS(detect_fakeroot())
38         else:
39             WVPASS(not detect_fakeroot())
40
41 @wvtest
42 def test_path_components():
43     with no_lingering_errors():
44         WVPASSEQ(path_components(b'/'), [(b'', b'/')])
45         WVPASSEQ(path_components(b'/foo'), [(b'', b'/'), (b'foo', b'/foo')])
46         WVPASSEQ(path_components(b'/foo/'), [(b'', b'/'), (b'foo', b'/foo')])
47         WVPASSEQ(path_components(b'/foo/bar'),
48                  [(b'', b'/'), (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
49         WVEXCEPT(Exception, path_components, b'foo')
50
51
52 @wvtest
53 def test_stripped_path_components():
54     with no_lingering_errors():
55         WVPASSEQ(stripped_path_components(b'/', []), [(b'', b'/')])
56         WVPASSEQ(stripped_path_components(b'/', [b'']), [(b'', b'/')])
57         WVPASSEQ(stripped_path_components(b'/', [b'/']), [(b'', b'/')])
58         WVPASSEQ(stripped_path_components(b'/foo', [b'/']),
59                  [(b'', b'/'), (b'foo', b'/foo')])
60         WVPASSEQ(stripped_path_components(b'/', [b'/foo']), [(b'', b'/')])
61         WVPASSEQ(stripped_path_components(b'/foo', [b'/bar']),
62                  [(b'', b'/'), (b'foo', b'/foo')])
63         WVPASSEQ(stripped_path_components(b'/foo', [b'/foo']), [(b'', b'/foo')])
64         WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/foo']),
65                  [(b'', b'/foo'), (b'bar', b'/foo/bar')])
66         WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/bar', b'/foo', b'/baz']),
67                  [(b'', b'/foo'), (b'bar', b'/foo/bar')])
68         WVPASSEQ(stripped_path_components(b'/foo/bar/baz', [b'/foo/bar/baz']),
69                  [(b'', b'/foo/bar/baz')])
70         WVEXCEPT(Exception, stripped_path_components, b'foo', [])
71
72
73 @wvtest
74 def test_grafted_path_components():
75     with no_lingering_errors():
76         WVPASSEQ(grafted_path_components([(b'/chroot', b'/')], b'/foo'),
77                  [(b'', b'/'), (b'foo', b'/foo')])
78         WVPASSEQ(grafted_path_components([(b'/foo/bar', b'/')],
79                                          b'/foo/bar/baz/bax'),
80                  [(b'', b'/foo/bar'),
81                   (b'baz', b'/foo/bar/baz'),
82                   (b'bax', b'/foo/bar/baz/bax')])
83         WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/bax')],
84                                          b'/foo/bar/baz/1/2'),
85                  [(b'', None),
86                   (b'bax', b'/foo/bar/baz'),
87                   (b'1', b'/foo/bar/baz/1'),
88                   (b'2', b'/foo/bar/baz/1/2')])
89         WVPASSEQ(grafted_path_components([(b'/foo', b'/bar/baz/bax')],
90                                          b'/foo/bar'),
91                  [(b'', None),
92                   (b'bar', None),
93                   (b'baz', None),
94                   (b'bax', b'/foo'),
95                   (b'bar', b'/foo/bar')])
96         WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/a/b/c')],
97                                          b'/foo/bar/baz'),
98                  [(b'', None), (b'a', None), (b'b', None), (b'c', b'/foo/bar/baz')])
99         WVPASSEQ(grafted_path_components([(b'/', b'/a/b/c/')], b'/foo/bar'),
100                  [(b'', None), (b'a', None), (b'b', None), (b'c', b'/'),
101                   (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
102         WVEXCEPT(Exception, grafted_path_components, b'foo', [])
103
104
105 @wvtest
106 def test_readpipe():
107     with no_lingering_errors():
108         x = readpipe([b'echo', b'42'])
109         WVPASSEQ(x, b'42\n')
110         try:
111             readpipe([b'bash', b'-c', b'exit 42'])
112         except Exception as ex:
113             if not re.match("^subprocess b?'bash -c exit 42' failed with status 42$",
114                             str(ex)):
115                 WVPASSEQ(str(ex),
116                          "^subprocess b?'bash -c exit 42' failed with status 42$")
117
118
119
120 @wvtest
121 def test_batchpipe():
122     with no_lingering_errors():
123         for chunk in batchpipe([b'echo'], []):
124             WVPASS(False)
125         out = b''
126         for chunk in batchpipe([b'echo'], [b'42']):
127             out += chunk
128         WVPASSEQ(out, b'42\n')
129         try:
130             batchpipe([b'bash', b'-c'], [b'exit 42'])
131         except Exception as ex:
132             WVPASSEQ(str(ex),
133                      "subprocess 'bash -c exit 42' failed with status 42")
134         args = [str(x) for x in range(6)]
135         # Force batchpipe to break the args into batches of 3.  This
136         # approach assumes all args are the same length.
137         arg_max = \
138             helpers._argmax_base([b'echo']) + helpers._argmax_args_size(args[:3])
139         batches = batchpipe(['echo'], args, arg_max=arg_max)
140         WVPASSEQ(next(batches), b'0 1 2\n')
141         WVPASSEQ(next(batches), b'3 4 5\n')
142         WVPASSEQ(next(batches, None), None)
143         batches = batchpipe([b'echo'], [str(x) for x in range(5)], arg_max=arg_max)
144         WVPASSEQ(next(batches), b'0 1 2\n')
145         WVPASSEQ(next(batches), b'3 4\n')
146         WVPASSEQ(next(batches, None), None)
147
148
149 @wvtest
150 def test_atomically_replaced_file():
151     with no_lingering_errors():
152         with test_tempdir(b'bup-thelper-') as tmpdir:
153             target_file = os.path.join(tmpdir, b'test-atomic-write')
154
155             with atomically_replaced_file(target_file, mode='w') as f:
156                 f.write('asdf')
157                 WVPASSEQ(f.mode, 'w')
158             f = open(target_file, 'r')
159             WVPASSEQ(f.read(), 'asdf')
160
161             try:
162                 with atomically_replaced_file(target_file, mode='w') as f:
163                     f.write('wxyz')
164                     raise Exception()
165             except:
166                 pass
167             with open(target_file) as f:
168                 WVPASSEQ(f.read(), 'asdf')
169
170             with atomically_replaced_file(target_file, mode='wb') as f:
171                 f.write(os.urandom(20))
172                 WVPASSEQ(f.mode, 'wb')
173
174
175 def set_tz(tz):
176     if not tz:
177         del environ[b'TZ']
178     else:
179         environ[b'TZ'] = tz
180     tzset()
181
182
183 @wvtest
184 def test_utc_offset_str():
185     with no_lingering_errors():
186         tz = environ.get(b'TZ')
187         tzset()
188         try:
189             set_tz(b'FOO+0:00')
190             WVPASSEQ(utc_offset_str(0), b'+0000')
191             set_tz(b'FOO+1:00')
192             WVPASSEQ(utc_offset_str(0), b'-0100')
193             set_tz(b'FOO-1:00')
194             WVPASSEQ(utc_offset_str(0), b'+0100')
195             set_tz(b'FOO+3:3')
196             WVPASSEQ(utc_offset_str(0), b'-0303')
197             set_tz(b'FOO-3:3')
198             WVPASSEQ(utc_offset_str(0), b'+0303')
199             # Offset is not an integer number of minutes
200             set_tz(b'FOO+3:3:3')
201             WVPASSEQ(utc_offset_str(1), b'-0303')
202             set_tz(b'FOO-3:3:3')
203             WVPASSEQ(utc_offset_str(1), b'+0303')
204             WVPASSEQ(utc_offset_str(314159), b'+0303')
205         finally:
206             if tz:
207                 set_tz(tz)
208             else:
209                 try:
210                     set_tz(None)
211                 except KeyError:
212                     pass
213
214 @wvtest
215 def test_valid_save_name():
216     with no_lingering_errors():
217         valid = helpers.valid_save_name
218         WVPASS(valid(b'x'))
219         WVPASS(valid(b'x@'))
220         WVFAIL(valid(b'@'))
221         WVFAIL(valid(b'/'))
222         WVFAIL(valid(b'/foo'))
223         WVFAIL(valid(b'foo/'))
224         WVFAIL(valid(b'/foo/'))
225         WVFAIL(valid(b'foo//bar'))
226         WVFAIL(valid(b'.'))
227         WVFAIL(valid(b'bar.'))
228         WVFAIL(valid(b'foo@{'))
229         for x in b' ~^:?*[\\':
230             WVFAIL(valid(b'foo' + bytes_from_byte(x)))
231         for i in range(20):
232             WVFAIL(valid(b'foo' + bytes_from_uint(i)))
233         WVFAIL(valid(b'foo' + bytes_from_uint(0x7f)))
234         WVFAIL(valid(b'foo..bar'))
235         WVFAIL(valid(b'bar.lock/baz'))
236         WVFAIL(valid(b'foo/bar.lock/baz'))
237         WVFAIL(valid(b'.bar/baz'))
238         WVFAIL(valid(b'foo/.bar/baz'))