]> arthur.barton.de Git - bup.git/blob - test/int/test_helpers.py
0a03ff336f0c6c0845438ccc31a6a310f481ab3d
[bup.git] / test / int / test_helpers.py
1
2 from __future__ import absolute_import
3 from time import tzset
4 import math, os, os.path, re, subprocess
5 from bup import helpers
6
7 from wvtest import *
8
9 from bup.compat import bytes_from_byte, bytes_from_uint, environ
10 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
11                          grafted_path_components, mkdirp, parse_num,
12                          path_components, readpipe, stripped_path_components,
13                          shstr,
14                          utc_offset_str)
15 from buptest import no_lingering_errors, test_tempdir
16 import bup._helpers as _helpers
17
18
19 @wvtest
20 def test_parse_num():
21     with no_lingering_errors():
22         pn = parse_num
23         WVPASSEQ(pn(b'1'), 1)
24         WVPASSEQ(pn('1'), 1)
25         WVPASSEQ(pn('0'), 0)
26         WVPASSEQ(pn('1.5k'), 1536)
27         WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
28         WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
29         WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
30
31 @wvtest
32 def test_detect_fakeroot():
33     with no_lingering_errors():
34         if b'FAKEROOTKEY' in environ:
35             WVPASS(detect_fakeroot())
36         else:
37             WVPASS(not detect_fakeroot())
38
39 @wvtest
40 def test_path_components():
41     with no_lingering_errors():
42         WVPASSEQ(path_components(b'/'), [(b'', b'/')])
43         WVPASSEQ(path_components(b'/foo'), [(b'', b'/'), (b'foo', b'/foo')])
44         WVPASSEQ(path_components(b'/foo/'), [(b'', b'/'), (b'foo', b'/foo')])
45         WVPASSEQ(path_components(b'/foo/bar'),
46                  [(b'', b'/'), (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
47         WVEXCEPT(Exception, path_components, b'foo')
48
49
50 @wvtest
51 def test_stripped_path_components():
52     with no_lingering_errors():
53         WVPASSEQ(stripped_path_components(b'/', []), [(b'', b'/')])
54         WVPASSEQ(stripped_path_components(b'/', [b'']), [(b'', b'/')])
55         WVPASSEQ(stripped_path_components(b'/', [b'/']), [(b'', b'/')])
56         WVPASSEQ(stripped_path_components(b'/foo', [b'/']),
57                  [(b'', b'/'), (b'foo', b'/foo')])
58         WVPASSEQ(stripped_path_components(b'/', [b'/foo']), [(b'', b'/')])
59         WVPASSEQ(stripped_path_components(b'/foo', [b'/bar']),
60                  [(b'', b'/'), (b'foo', b'/foo')])
61         WVPASSEQ(stripped_path_components(b'/foo', [b'/foo']), [(b'', b'/foo')])
62         WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/foo']),
63                  [(b'', b'/foo'), (b'bar', b'/foo/bar')])
64         WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/bar', b'/foo', b'/baz']),
65                  [(b'', b'/foo'), (b'bar', b'/foo/bar')])
66         WVPASSEQ(stripped_path_components(b'/foo/bar/baz', [b'/foo/bar/baz']),
67                  [(b'', b'/foo/bar/baz')])
68         WVEXCEPT(Exception, stripped_path_components, b'foo', [])
69
70
71 @wvtest
72 def test_grafted_path_components():
73     with no_lingering_errors():
74         WVPASSEQ(grafted_path_components([(b'/chroot', b'/')], b'/foo'),
75                  [(b'', b'/'), (b'foo', b'/foo')])
76         WVPASSEQ(grafted_path_components([(b'/foo/bar', b'/')],
77                                          b'/foo/bar/baz/bax'),
78                  [(b'', b'/foo/bar'),
79                   (b'baz', b'/foo/bar/baz'),
80                   (b'bax', b'/foo/bar/baz/bax')])
81         WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/bax')],
82                                          b'/foo/bar/baz/1/2'),
83                  [(b'', None),
84                   (b'bax', b'/foo/bar/baz'),
85                   (b'1', b'/foo/bar/baz/1'),
86                   (b'2', b'/foo/bar/baz/1/2')])
87         WVPASSEQ(grafted_path_components([(b'/foo', b'/bar/baz/bax')],
88                                          b'/foo/bar'),
89                  [(b'', None),
90                   (b'bar', None),
91                   (b'baz', None),
92                   (b'bax', b'/foo'),
93                   (b'bar', b'/foo/bar')])
94         WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/a/b/c')],
95                                          b'/foo/bar/baz'),
96                  [(b'', None), (b'a', None), (b'b', None), (b'c', b'/foo/bar/baz')])
97         WVPASSEQ(grafted_path_components([(b'/', b'/a/b/c/')], b'/foo/bar'),
98                  [(b'', None), (b'a', None), (b'b', None), (b'c', b'/'),
99                   (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
100         WVEXCEPT(Exception, grafted_path_components, b'foo', [])
101
102
103 @wvtest
104 def test_shstr():
105     with no_lingering_errors():
106         # Do nothing for strings and bytes
107         WVPASSEQ(shstr(b''), b'')
108         WVPASSEQ(shstr(b'1'), b'1')
109         WVPASSEQ(shstr(b'1 2'), b'1 2')
110         WVPASSEQ(shstr(b"1'2"), b"1'2")
111         WVPASSEQ(shstr(''), '')
112         WVPASSEQ(shstr('1'), '1')
113         WVPASSEQ(shstr('1 2'), '1 2')
114         WVPASSEQ(shstr("1'2"), "1'2")
115
116         # Escape parts of sequences
117         WVPASSEQ(shstr((b'1 2', b'3')), b"'1 2' 3")
118         WVPASSEQ(shstr((b"1'2", b'3')), b"'1'\"'\"'2' 3")
119         WVPASSEQ(shstr((b"'1", b'3')), b"''\"'\"'1' 3")
120         WVPASSEQ(shstr(('1 2', '3')), "'1 2' 3")
121         WVPASSEQ(shstr(("1'2", '3')), "'1'\"'\"'2' 3")
122         WVPASSEQ(shstr(("'1", '3')), "''\"'\"'1' 3")
123
124
125 @wvtest
126 def test_readpipe():
127     with no_lingering_errors():
128         x = readpipe([b'echo', b'42'])
129         WVPASSEQ(x, b'42\n')
130         try:
131             readpipe([b'bash', b'-c', b'exit 42'])
132         except Exception as ex:
133             rx = '^subprocess b?"bash -c \'exit 42\'" failed with status 42$'
134             if not re.match(rx, str(ex)):
135                 WVPASSEQ(str(ex), rx)
136
137
138 @wvtest
139 def test_batchpipe():
140     with no_lingering_errors():
141         for chunk in batchpipe([b'echo'], []):
142             WVPASS(False)
143         out = b''
144         for chunk in batchpipe([b'echo'], [b'42']):
145             out += chunk
146         WVPASSEQ(out, b'42\n')
147         try:
148             batchpipe([b'bash', b'-c'], [b'exit 42'])
149         except Exception as ex:
150             WVPASSEQ(str(ex),
151                      "subprocess 'bash -c exit 42' failed with status 42")
152         args = [str(x) for x in range(6)]
153         # Force batchpipe to break the args into batches of 3.  This
154         # approach assumes all args are the same length.
155         arg_max = \
156             helpers._argmax_base([b'echo']) + helpers._argmax_args_size(args[:3])
157         batches = batchpipe(['echo'], args, arg_max=arg_max)
158         WVPASSEQ(next(batches), b'0 1 2\n')
159         WVPASSEQ(next(batches), b'3 4 5\n')
160         WVPASSEQ(next(batches, None), None)
161         batches = batchpipe([b'echo'], [str(x) for x in range(5)], arg_max=arg_max)
162         WVPASSEQ(next(batches), b'0 1 2\n')
163         WVPASSEQ(next(batches), b'3 4\n')
164         WVPASSEQ(next(batches, None), None)
165
166
167 @wvtest
168 def test_atomically_replaced_file():
169     with no_lingering_errors():
170         with test_tempdir(b'bup-thelper-') as tmpdir:
171             target_file = os.path.join(tmpdir, b'test-atomic-write')
172
173             with atomically_replaced_file(target_file, mode='w') as f:
174                 f.write('asdf')
175                 WVPASSEQ(f.mode, 'w')
176             f = open(target_file, 'r')
177             WVPASSEQ(f.read(), 'asdf')
178
179             try:
180                 with atomically_replaced_file(target_file, mode='w') as f:
181                     f.write('wxyz')
182                     raise Exception()
183             except:
184                 pass
185             with open(target_file) as f:
186                 WVPASSEQ(f.read(), 'asdf')
187
188             with atomically_replaced_file(target_file, mode='wb') as f:
189                 f.write(os.urandom(20))
190                 WVPASSEQ(f.mode, 'wb')
191
192
193 def set_tz(tz):
194     if not tz:
195         del environ[b'TZ']
196     else:
197         environ[b'TZ'] = tz
198     tzset()
199
200
201 @wvtest
202 def test_utc_offset_str():
203     with no_lingering_errors():
204         tz = environ.get(b'TZ')
205         tzset()
206         try:
207             set_tz(b'FOO+0:00')
208             WVPASSEQ(utc_offset_str(0), b'+0000')
209             set_tz(b'FOO+1:00')
210             WVPASSEQ(utc_offset_str(0), b'-0100')
211             set_tz(b'FOO-1:00')
212             WVPASSEQ(utc_offset_str(0), b'+0100')
213             set_tz(b'FOO+3:3')
214             WVPASSEQ(utc_offset_str(0), b'-0303')
215             set_tz(b'FOO-3:3')
216             WVPASSEQ(utc_offset_str(0), b'+0303')
217             # Offset is not an integer number of minutes
218             set_tz(b'FOO+3:3:3')
219             WVPASSEQ(utc_offset_str(1), b'-0303')
220             set_tz(b'FOO-3:3:3')
221             WVPASSEQ(utc_offset_str(1), b'+0303')
222             WVPASSEQ(utc_offset_str(314159), b'+0303')
223         finally:
224             if tz:
225                 set_tz(tz)
226             else:
227                 try:
228                     set_tz(None)
229                 except KeyError:
230                     pass
231
232 @wvtest
233 def test_valid_save_name():
234     with no_lingering_errors():
235         valid = helpers.valid_save_name
236         WVPASS(valid(b'x'))
237         WVPASS(valid(b'x@'))
238         WVFAIL(valid(b'@'))
239         WVFAIL(valid(b'/'))
240         WVFAIL(valid(b'/foo'))
241         WVFAIL(valid(b'foo/'))
242         WVFAIL(valid(b'/foo/'))
243         WVFAIL(valid(b'foo//bar'))
244         WVFAIL(valid(b'.'))
245         WVFAIL(valid(b'bar.'))
246         WVFAIL(valid(b'foo@{'))
247         for x in b' ~^:?*[\\':
248             WVFAIL(valid(b'foo' + bytes_from_byte(x)))
249         for i in range(20):
250             WVFAIL(valid(b'foo' + bytes_from_uint(i)))
251         WVFAIL(valid(b'foo' + bytes_from_uint(0x7f)))
252         WVFAIL(valid(b'foo..bar'))
253         WVFAIL(valid(b'bar.lock/baz'))
254         WVFAIL(valid(b'foo/bar.lock/baz'))
255         WVFAIL(valid(b'.bar/baz'))
256         WVFAIL(valid(b'foo/.bar/baz'))