]> arthur.barton.de Git - bup.git/blob - test/int/test_helpers.py
test: add pylint and test imports
[bup.git] / test / int / test_helpers.py
1
2 from __future__ import absolute_import
3 from time import tzset
4 import os, os.path, re
5 from bup import helpers
6
7 from wvpytest import *
8
9 from bup.compat import bytes_from_byte, bytes_from_uint, environ
10 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
11                          grafted_path_components, parse_num,
12                          path_components, readpipe, stripped_path_components,
13                          shstr,
14                          utc_offset_str)
15
16
17 def test_parse_num():
18     pn = parse_num
19     WVPASSEQ(pn(b'1'), 1)
20     WVPASSEQ(pn('1'), 1)
21     WVPASSEQ(pn('0'), 0)
22     WVPASSEQ(pn('1.5k'), 1536)
23     WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
24     WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
25     WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
26
27 def test_detect_fakeroot():
28     if b'FAKEROOTKEY' in environ:
29         WVPASS(detect_fakeroot())
30     else:
31         WVPASS(not detect_fakeroot())
32
33 def test_path_components():
34     WVPASSEQ(path_components(b'/'), [(b'', b'/')])
35     WVPASSEQ(path_components(b'/foo'), [(b'', b'/'), (b'foo', b'/foo')])
36     WVPASSEQ(path_components(b'/foo/'), [(b'', b'/'), (b'foo', b'/foo')])
37     WVPASSEQ(path_components(b'/foo/bar'),
38              [(b'', b'/'), (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
39     WVEXCEPT(Exception, path_components, b'foo')
40
41
42 def test_stripped_path_components():
43     WVPASSEQ(stripped_path_components(b'/', []), [(b'', b'/')])
44     WVPASSEQ(stripped_path_components(b'/', [b'']), [(b'', b'/')])
45     WVPASSEQ(stripped_path_components(b'/', [b'/']), [(b'', b'/')])
46     WVPASSEQ(stripped_path_components(b'/foo', [b'/']),
47              [(b'', b'/'), (b'foo', b'/foo')])
48     WVPASSEQ(stripped_path_components(b'/', [b'/foo']), [(b'', b'/')])
49     WVPASSEQ(stripped_path_components(b'/foo', [b'/bar']),
50              [(b'', b'/'), (b'foo', b'/foo')])
51     WVPASSEQ(stripped_path_components(b'/foo', [b'/foo']), [(b'', b'/foo')])
52     WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/foo']),
53              [(b'', b'/foo'), (b'bar', b'/foo/bar')])
54     WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/bar', b'/foo', b'/baz']),
55              [(b'', b'/foo'), (b'bar', b'/foo/bar')])
56     WVPASSEQ(stripped_path_components(b'/foo/bar/baz', [b'/foo/bar/baz']),
57              [(b'', b'/foo/bar/baz')])
58     WVEXCEPT(Exception, stripped_path_components, b'foo', [])
59
60
61 def test_grafted_path_components():
62     WVPASSEQ(grafted_path_components([(b'/chroot', b'/')], b'/foo'),
63              [(b'', b'/'), (b'foo', b'/foo')])
64     WVPASSEQ(grafted_path_components([(b'/foo/bar', b'/')],
65                                      b'/foo/bar/baz/bax'),
66              [(b'', b'/foo/bar'),
67               (b'baz', b'/foo/bar/baz'),
68               (b'bax', b'/foo/bar/baz/bax')])
69     WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/bax')],
70                                      b'/foo/bar/baz/1/2'),
71              [(b'', None),
72               (b'bax', b'/foo/bar/baz'),
73               (b'1', b'/foo/bar/baz/1'),
74               (b'2', b'/foo/bar/baz/1/2')])
75     WVPASSEQ(grafted_path_components([(b'/foo', b'/bar/baz/bax')],
76                                      b'/foo/bar'),
77              [(b'', None),
78               (b'bar', None),
79               (b'baz', None),
80               (b'bax', b'/foo'),
81               (b'bar', b'/foo/bar')])
82     WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/a/b/c')],
83                                      b'/foo/bar/baz'),
84              [(b'', None), (b'a', None), (b'b', None), (b'c', b'/foo/bar/baz')])
85     WVPASSEQ(grafted_path_components([(b'/', b'/a/b/c/')], b'/foo/bar'),
86              [(b'', None), (b'a', None), (b'b', None), (b'c', b'/'),
87               (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
88     WVEXCEPT(Exception, grafted_path_components, b'foo', [])
89
90
91 def test_shstr():
92     # Do nothing for strings and bytes
93     WVPASSEQ(shstr(b''), b'')
94     WVPASSEQ(shstr(b'1'), b'1')
95     WVPASSEQ(shstr(b'1 2'), b'1 2')
96     WVPASSEQ(shstr(b"1'2"), b"1'2")
97     WVPASSEQ(shstr(''), '')
98     WVPASSEQ(shstr('1'), '1')
99     WVPASSEQ(shstr('1 2'), '1 2')
100     WVPASSEQ(shstr("1'2"), "1'2")
101
102     # Escape parts of sequences
103     WVPASSEQ(shstr((b'1 2', b'3')), b"'1 2' 3")
104     WVPASSEQ(shstr((b"1'2", b'3')), b"'1'\"'\"'2' 3")
105     WVPASSEQ(shstr((b"'1", b'3')), b"''\"'\"'1' 3")
106     WVPASSEQ(shstr(('1 2', '3')), "'1 2' 3")
107     WVPASSEQ(shstr(("1'2", '3')), "'1'\"'\"'2' 3")
108     WVPASSEQ(shstr(("'1", '3')), "''\"'\"'1' 3")
109
110
111 def test_readpipe():
112     x = readpipe([b'echo', b'42'])
113     WVPASSEQ(x, b'42\n')
114     try:
115         readpipe([b'bash', b'-c', b'exit 42'])
116     except Exception as ex:
117         rx = '^subprocess b?"bash -c \'exit 42\'" failed with status 42$'
118         if not re.match(rx, str(ex)):
119             WVPASSEQ(str(ex), rx)
120
121
122 def test_batchpipe():
123     for chunk in batchpipe([b'echo'], []):
124         WVPASS(False)
125     out = b''
126     for chunk in batchpipe([b'echo'], [b'42']):
127         out += chunk
128     WVPASSEQ(out, b'42\n')
129     try:
130         batchpipe([b'bash', b'-c'], [b'exit 42'])
131     except Exception as ex:
132         WVPASSEQ(str(ex),
133                  "subprocess 'bash -c exit 42' failed with status 42")
134     args = [str(x) for x in range(6)]
135     # Force batchpipe to break the args into batches of 3.  This
136     # approach assumes all args are the same length.
137     arg_max = \
138         helpers._argmax_base([b'echo']) + helpers._argmax_args_size(args[:3])
139     batches = batchpipe(['echo'], args, arg_max=arg_max)
140     WVPASSEQ(next(batches), b'0 1 2\n')
141     WVPASSEQ(next(batches), b'3 4 5\n')
142     WVPASSEQ(next(batches, None), None)
143     batches = batchpipe([b'echo'], [str(x) for x in range(5)], arg_max=arg_max)
144     WVPASSEQ(next(batches), b'0 1 2\n')
145     WVPASSEQ(next(batches), b'3 4\n')
146     WVPASSEQ(next(batches, None), None)
147
148
149 def test_atomically_replaced_file(tmpdir):
150     target_file = os.path.join(tmpdir, b'test-atomic-write')
151
152     with atomically_replaced_file(target_file, mode='w') as f:
153         f.write('asdf')
154         WVPASSEQ(f.mode, 'w')
155     f = open(target_file, 'r')
156     WVPASSEQ(f.read(), 'asdf')
157
158     try:
159         with atomically_replaced_file(target_file, mode='w') as f:
160             f.write('wxyz')
161             raise Exception()
162     except:
163         pass
164     with open(target_file) as f:
165         WVPASSEQ(f.read(), 'asdf')
166
167     with atomically_replaced_file(target_file, mode='wb') as f:
168         f.write(os.urandom(20))
169         WVPASSEQ(f.mode, 'wb')
170
171
172 def set_tz(tz):
173     if not tz:
174         del environ[b'TZ']
175     else:
176         environ[b'TZ'] = tz
177     tzset()
178
179
180 def test_utc_offset_str():
181     tz = environ.get(b'TZ')
182     tzset()
183     try:
184         set_tz(b'FOO+0:00')
185         WVPASSEQ(utc_offset_str(0), b'+0000')
186         set_tz(b'FOO+1:00')
187         WVPASSEQ(utc_offset_str(0), b'-0100')
188         set_tz(b'FOO-1:00')
189         WVPASSEQ(utc_offset_str(0), b'+0100')
190         set_tz(b'FOO+3:3')
191         WVPASSEQ(utc_offset_str(0), b'-0303')
192         set_tz(b'FOO-3:3')
193         WVPASSEQ(utc_offset_str(0), b'+0303')
194         # Offset is not an integer number of minutes
195         set_tz(b'FOO+3:3:3')
196         WVPASSEQ(utc_offset_str(1), b'-0303')
197         set_tz(b'FOO-3:3:3')
198         WVPASSEQ(utc_offset_str(1), b'+0303')
199         WVPASSEQ(utc_offset_str(314159), b'+0303')
200     finally:
201         if tz:
202             set_tz(tz)
203         else:
204             try:
205                 set_tz(None)
206             except KeyError:
207                 pass
208
209 def test_valid_save_name():
210     valid = helpers.valid_save_name
211     WVPASS(valid(b'x'))
212     WVPASS(valid(b'x@'))
213     WVFAIL(valid(b'@'))
214     WVFAIL(valid(b'/'))
215     WVFAIL(valid(b'/foo'))
216     WVFAIL(valid(b'foo/'))
217     WVFAIL(valid(b'/foo/'))
218     WVFAIL(valid(b'foo//bar'))
219     WVFAIL(valid(b'.'))
220     WVFAIL(valid(b'bar.'))
221     WVFAIL(valid(b'foo@{'))
222     for x in b' ~^:?*[\\':
223         WVFAIL(valid(b'foo' + bytes_from_byte(x)))
224     for i in range(20):
225         WVFAIL(valid(b'foo' + bytes_from_uint(i)))
226     WVFAIL(valid(b'foo' + bytes_from_uint(0x7f)))
227     WVFAIL(valid(b'foo..bar'))
228     WVFAIL(valid(b'bar.lock/baz'))
229     WVFAIL(valid(b'foo/bar.lock/baz'))
230     WVFAIL(valid(b'.bar/baz'))
231     WVFAIL(valid(b'foo/.bar/baz'))