]> arthur.barton.de Git - bup.git/blob - lib/bup/t/thelpers.py
c71cbb71a448b71257aec81649c583bda34d5ab6
[bup.git] / lib / bup / t / thelpers.py
1
2 from __future__ import absolute_import
3 from time import tzset
4 import helpers, math, os, os.path, re, subprocess
5
6 from wvtest import *
7
8 from bup.compat import bytes_from_byte, bytes_from_uint, environ
9 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
10                          grafted_path_components, mkdirp, parse_num,
11                          path_components, readpipe, stripped_path_components,
12                          shstr,
13                          utc_offset_str)
14 from buptest import no_lingering_errors, test_tempdir
15 import bup._helpers as _helpers
16
17
18 bup_tmp = os.path.realpath(b'../../../t/tmp')
19 mkdirp(bup_tmp)
20
21
22 @wvtest
23 def test_parse_num():
24     with no_lingering_errors():
25         pn = parse_num
26         WVPASSEQ(pn(b'1'), 1)
27         WVPASSEQ(pn('1'), 1)
28         WVPASSEQ(pn('0'), 0)
29         WVPASSEQ(pn('1.5k'), 1536)
30         WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
31         WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
32         WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
33
34 @wvtest
35 def test_detect_fakeroot():
36     with no_lingering_errors():
37         if b'FAKEROOTKEY' in environ:
38             WVPASS(detect_fakeroot())
39         else:
40             WVPASS(not detect_fakeroot())
41
42 @wvtest
43 def test_path_components():
44     with no_lingering_errors():
45         WVPASSEQ(path_components(b'/'), [(b'', b'/')])
46         WVPASSEQ(path_components(b'/foo'), [(b'', b'/'), (b'foo', b'/foo')])
47         WVPASSEQ(path_components(b'/foo/'), [(b'', b'/'), (b'foo', b'/foo')])
48         WVPASSEQ(path_components(b'/foo/bar'),
49                  [(b'', b'/'), (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
50         WVEXCEPT(Exception, path_components, b'foo')
51
52
53 @wvtest
54 def test_stripped_path_components():
55     with no_lingering_errors():
56         WVPASSEQ(stripped_path_components(b'/', []), [(b'', b'/')])
57         WVPASSEQ(stripped_path_components(b'/', [b'']), [(b'', b'/')])
58         WVPASSEQ(stripped_path_components(b'/', [b'/']), [(b'', b'/')])
59         WVPASSEQ(stripped_path_components(b'/foo', [b'/']),
60                  [(b'', b'/'), (b'foo', b'/foo')])
61         WVPASSEQ(stripped_path_components(b'/', [b'/foo']), [(b'', b'/')])
62         WVPASSEQ(stripped_path_components(b'/foo', [b'/bar']),
63                  [(b'', b'/'), (b'foo', b'/foo')])
64         WVPASSEQ(stripped_path_components(b'/foo', [b'/foo']), [(b'', b'/foo')])
65         WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/foo']),
66                  [(b'', b'/foo'), (b'bar', b'/foo/bar')])
67         WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/bar', b'/foo', b'/baz']),
68                  [(b'', b'/foo'), (b'bar', b'/foo/bar')])
69         WVPASSEQ(stripped_path_components(b'/foo/bar/baz', [b'/foo/bar/baz']),
70                  [(b'', b'/foo/bar/baz')])
71         WVEXCEPT(Exception, stripped_path_components, b'foo', [])
72
73
74 @wvtest
75 def test_grafted_path_components():
76     with no_lingering_errors():
77         WVPASSEQ(grafted_path_components([(b'/chroot', b'/')], b'/foo'),
78                  [(b'', b'/'), (b'foo', b'/foo')])
79         WVPASSEQ(grafted_path_components([(b'/foo/bar', b'/')],
80                                          b'/foo/bar/baz/bax'),
81                  [(b'', b'/foo/bar'),
82                   (b'baz', b'/foo/bar/baz'),
83                   (b'bax', b'/foo/bar/baz/bax')])
84         WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/bax')],
85                                          b'/foo/bar/baz/1/2'),
86                  [(b'', None),
87                   (b'bax', b'/foo/bar/baz'),
88                   (b'1', b'/foo/bar/baz/1'),
89                   (b'2', b'/foo/bar/baz/1/2')])
90         WVPASSEQ(grafted_path_components([(b'/foo', b'/bar/baz/bax')],
91                                          b'/foo/bar'),
92                  [(b'', None),
93                   (b'bar', None),
94                   (b'baz', None),
95                   (b'bax', b'/foo'),
96                   (b'bar', b'/foo/bar')])
97         WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/a/b/c')],
98                                          b'/foo/bar/baz'),
99                  [(b'', None), (b'a', None), (b'b', None), (b'c', b'/foo/bar/baz')])
100         WVPASSEQ(grafted_path_components([(b'/', b'/a/b/c/')], b'/foo/bar'),
101                  [(b'', None), (b'a', None), (b'b', None), (b'c', b'/'),
102                   (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
103         WVEXCEPT(Exception, grafted_path_components, b'foo', [])
104
105
106 @wvtest
107 def test_shstr():
108     with no_lingering_errors():
109         # Do nothing for strings and bytes
110         WVPASSEQ(shstr(b''), b'')
111         WVPASSEQ(shstr(b'1'), b'1')
112         WVPASSEQ(shstr(b'1 2'), b'1 2')
113         WVPASSEQ(shstr(b"1'2"), b"1'2")
114         WVPASSEQ(shstr(''), '')
115         WVPASSEQ(shstr('1'), '1')
116         WVPASSEQ(shstr('1 2'), '1 2')
117         WVPASSEQ(shstr("1'2"), "1'2")
118
119         # Escape parts of sequences
120         WVPASSEQ(shstr((b'1 2', b'3')), b"'1 2' 3")
121         WVPASSEQ(shstr((b"1'2", b'3')), b"'1'\"'\"'2' 3")
122         WVPASSEQ(shstr((b"'1", b'3')), b"''\"'\"'1' 3")
123         WVPASSEQ(shstr(('1 2', '3')), "'1 2' 3")
124         WVPASSEQ(shstr(("1'2", '3')), "'1'\"'\"'2' 3")
125         WVPASSEQ(shstr(("'1", '3')), "''\"'\"'1' 3")
126
127
128 @wvtest
129 def test_readpipe():
130     with no_lingering_errors():
131         x = readpipe([b'echo', b'42'])
132         WVPASSEQ(x, b'42\n')
133         try:
134             readpipe([b'bash', b'-c', b'exit 42'])
135         except Exception as ex:
136             if not re.match("^subprocess b?'bash -c exit 42' failed with status 42$",
137                             str(ex)):
138                 WVPASSEQ(str(ex),
139                          "^subprocess b?'bash -c exit 42' failed with status 42$")
140
141
142
143 @wvtest
144 def test_batchpipe():
145     with no_lingering_errors():
146         for chunk in batchpipe([b'echo'], []):
147             WVPASS(False)
148         out = b''
149         for chunk in batchpipe([b'echo'], [b'42']):
150             out += chunk
151         WVPASSEQ(out, b'42\n')
152         try:
153             batchpipe([b'bash', b'-c'], [b'exit 42'])
154         except Exception as ex:
155             WVPASSEQ(str(ex),
156                      "subprocess 'bash -c exit 42' failed with status 42")
157         args = [str(x) for x in range(6)]
158         # Force batchpipe to break the args into batches of 3.  This
159         # approach assumes all args are the same length.
160         arg_max = \
161             helpers._argmax_base([b'echo']) + helpers._argmax_args_size(args[:3])
162         batches = batchpipe(['echo'], args, arg_max=arg_max)
163         WVPASSEQ(next(batches), b'0 1 2\n')
164         WVPASSEQ(next(batches), b'3 4 5\n')
165         WVPASSEQ(next(batches, None), None)
166         batches = batchpipe([b'echo'], [str(x) for x in range(5)], arg_max=arg_max)
167         WVPASSEQ(next(batches), b'0 1 2\n')
168         WVPASSEQ(next(batches), b'3 4\n')
169         WVPASSEQ(next(batches, None), None)
170
171
172 @wvtest
173 def test_atomically_replaced_file():
174     with no_lingering_errors():
175         with test_tempdir(b'bup-thelper-') as tmpdir:
176             target_file = os.path.join(tmpdir, b'test-atomic-write')
177
178             with atomically_replaced_file(target_file, mode='w') as f:
179                 f.write('asdf')
180                 WVPASSEQ(f.mode, 'w')
181             f = open(target_file, 'r')
182             WVPASSEQ(f.read(), 'asdf')
183
184             try:
185                 with atomically_replaced_file(target_file, mode='w') as f:
186                     f.write('wxyz')
187                     raise Exception()
188             except:
189                 pass
190             with open(target_file) as f:
191                 WVPASSEQ(f.read(), 'asdf')
192
193             with atomically_replaced_file(target_file, mode='wb') as f:
194                 f.write(os.urandom(20))
195                 WVPASSEQ(f.mode, 'wb')
196
197
198 def set_tz(tz):
199     if not tz:
200         del environ[b'TZ']
201     else:
202         environ[b'TZ'] = tz
203     tzset()
204
205
206 @wvtest
207 def test_utc_offset_str():
208     with no_lingering_errors():
209         tz = environ.get(b'TZ')
210         tzset()
211         try:
212             set_tz(b'FOO+0:00')
213             WVPASSEQ(utc_offset_str(0), b'+0000')
214             set_tz(b'FOO+1:00')
215             WVPASSEQ(utc_offset_str(0), b'-0100')
216             set_tz(b'FOO-1:00')
217             WVPASSEQ(utc_offset_str(0), b'+0100')
218             set_tz(b'FOO+3:3')
219             WVPASSEQ(utc_offset_str(0), b'-0303')
220             set_tz(b'FOO-3:3')
221             WVPASSEQ(utc_offset_str(0), b'+0303')
222             # Offset is not an integer number of minutes
223             set_tz(b'FOO+3:3:3')
224             WVPASSEQ(utc_offset_str(1), b'-0303')
225             set_tz(b'FOO-3:3:3')
226             WVPASSEQ(utc_offset_str(1), b'+0303')
227             WVPASSEQ(utc_offset_str(314159), b'+0303')
228         finally:
229             if tz:
230                 set_tz(tz)
231             else:
232                 try:
233                     set_tz(None)
234                 except KeyError:
235                     pass
236
237 @wvtest
238 def test_valid_save_name():
239     with no_lingering_errors():
240         valid = helpers.valid_save_name
241         WVPASS(valid(b'x'))
242         WVPASS(valid(b'x@'))
243         WVFAIL(valid(b'@'))
244         WVFAIL(valid(b'/'))
245         WVFAIL(valid(b'/foo'))
246         WVFAIL(valid(b'foo/'))
247         WVFAIL(valid(b'/foo/'))
248         WVFAIL(valid(b'foo//bar'))
249         WVFAIL(valid(b'.'))
250         WVFAIL(valid(b'bar.'))
251         WVFAIL(valid(b'foo@{'))
252         for x in b' ~^:?*[\\':
253             WVFAIL(valid(b'foo' + bytes_from_byte(x)))
254         for i in range(20):
255             WVFAIL(valid(b'foo' + bytes_from_uint(i)))
256         WVFAIL(valid(b'foo' + bytes_from_uint(0x7f)))
257         WVFAIL(valid(b'foo..bar'))
258         WVFAIL(valid(b'bar.lock/baz'))
259         WVFAIL(valid(b'foo/bar.lock/baz'))
260         WVFAIL(valid(b'.bar/baz'))
261         WVFAIL(valid(b'foo/.bar/baz'))