]> arthur.barton.de Git - bup.git/blob - test/int/test_helpers.py
tests: partially convert to pytest
[bup.git] / test / int / test_helpers.py
1
2 from __future__ import absolute_import
3 from time import tzset
4 import math, os, os.path, re, subprocess
5 from bup import helpers
6
7 from wvpytest import *
8
9 from bup.compat import bytes_from_byte, bytes_from_uint, environ
10 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
11                          grafted_path_components, mkdirp, parse_num,
12                          path_components, readpipe, stripped_path_components,
13                          shstr,
14                          utc_offset_str)
15 import bup._helpers as _helpers
16
17
18 def test_parse_num():
19     pn = parse_num
20     WVPASSEQ(pn(b'1'), 1)
21     WVPASSEQ(pn('1'), 1)
22     WVPASSEQ(pn('0'), 0)
23     WVPASSEQ(pn('1.5k'), 1536)
24     WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
25     WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
26     WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
27
28 def test_detect_fakeroot():
29     if b'FAKEROOTKEY' in environ:
30         WVPASS(detect_fakeroot())
31     else:
32         WVPASS(not detect_fakeroot())
33
34 def test_path_components():
35     WVPASSEQ(path_components(b'/'), [(b'', b'/')])
36     WVPASSEQ(path_components(b'/foo'), [(b'', b'/'), (b'foo', b'/foo')])
37     WVPASSEQ(path_components(b'/foo/'), [(b'', b'/'), (b'foo', b'/foo')])
38     WVPASSEQ(path_components(b'/foo/bar'),
39              [(b'', b'/'), (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
40     WVEXCEPT(Exception, path_components, b'foo')
41
42
43 def test_stripped_path_components():
44     WVPASSEQ(stripped_path_components(b'/', []), [(b'', b'/')])
45     WVPASSEQ(stripped_path_components(b'/', [b'']), [(b'', b'/')])
46     WVPASSEQ(stripped_path_components(b'/', [b'/']), [(b'', b'/')])
47     WVPASSEQ(stripped_path_components(b'/foo', [b'/']),
48              [(b'', b'/'), (b'foo', b'/foo')])
49     WVPASSEQ(stripped_path_components(b'/', [b'/foo']), [(b'', b'/')])
50     WVPASSEQ(stripped_path_components(b'/foo', [b'/bar']),
51              [(b'', b'/'), (b'foo', b'/foo')])
52     WVPASSEQ(stripped_path_components(b'/foo', [b'/foo']), [(b'', b'/foo')])
53     WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/foo']),
54              [(b'', b'/foo'), (b'bar', b'/foo/bar')])
55     WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/bar', b'/foo', b'/baz']),
56              [(b'', b'/foo'), (b'bar', b'/foo/bar')])
57     WVPASSEQ(stripped_path_components(b'/foo/bar/baz', [b'/foo/bar/baz']),
58              [(b'', b'/foo/bar/baz')])
59     WVEXCEPT(Exception, stripped_path_components, b'foo', [])
60
61
62 def test_grafted_path_components():
63     WVPASSEQ(grafted_path_components([(b'/chroot', b'/')], b'/foo'),
64              [(b'', b'/'), (b'foo', b'/foo')])
65     WVPASSEQ(grafted_path_components([(b'/foo/bar', b'/')],
66                                      b'/foo/bar/baz/bax'),
67              [(b'', b'/foo/bar'),
68               (b'baz', b'/foo/bar/baz'),
69               (b'bax', b'/foo/bar/baz/bax')])
70     WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/bax')],
71                                      b'/foo/bar/baz/1/2'),
72              [(b'', None),
73               (b'bax', b'/foo/bar/baz'),
74               (b'1', b'/foo/bar/baz/1'),
75               (b'2', b'/foo/bar/baz/1/2')])
76     WVPASSEQ(grafted_path_components([(b'/foo', b'/bar/baz/bax')],
77                                      b'/foo/bar'),
78              [(b'', None),
79               (b'bar', None),
80               (b'baz', None),
81               (b'bax', b'/foo'),
82               (b'bar', b'/foo/bar')])
83     WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/a/b/c')],
84                                      b'/foo/bar/baz'),
85              [(b'', None), (b'a', None), (b'b', None), (b'c', b'/foo/bar/baz')])
86     WVPASSEQ(grafted_path_components([(b'/', b'/a/b/c/')], b'/foo/bar'),
87              [(b'', None), (b'a', None), (b'b', None), (b'c', b'/'),
88               (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
89     WVEXCEPT(Exception, grafted_path_components, b'foo', [])
90
91
92 def test_shstr():
93     # Do nothing for strings and bytes
94     WVPASSEQ(shstr(b''), b'')
95     WVPASSEQ(shstr(b'1'), b'1')
96     WVPASSEQ(shstr(b'1 2'), b'1 2')
97     WVPASSEQ(shstr(b"1'2"), b"1'2")
98     WVPASSEQ(shstr(''), '')
99     WVPASSEQ(shstr('1'), '1')
100     WVPASSEQ(shstr('1 2'), '1 2')
101     WVPASSEQ(shstr("1'2"), "1'2")
102
103     # Escape parts of sequences
104     WVPASSEQ(shstr((b'1 2', b'3')), b"'1 2' 3")
105     WVPASSEQ(shstr((b"1'2", b'3')), b"'1'\"'\"'2' 3")
106     WVPASSEQ(shstr((b"'1", b'3')), b"''\"'\"'1' 3")
107     WVPASSEQ(shstr(('1 2', '3')), "'1 2' 3")
108     WVPASSEQ(shstr(("1'2", '3')), "'1'\"'\"'2' 3")
109     WVPASSEQ(shstr(("'1", '3')), "''\"'\"'1' 3")
110
111
112 def test_readpipe():
113     x = readpipe([b'echo', b'42'])
114     WVPASSEQ(x, b'42\n')
115     try:
116         readpipe([b'bash', b'-c', b'exit 42'])
117     except Exception as ex:
118         rx = '^subprocess b?"bash -c \'exit 42\'" failed with status 42$'
119         if not re.match(rx, str(ex)):
120             WVPASSEQ(str(ex), rx)
121
122
123 def test_batchpipe():
124     for chunk in batchpipe([b'echo'], []):
125         WVPASS(False)
126     out = b''
127     for chunk in batchpipe([b'echo'], [b'42']):
128         out += chunk
129     WVPASSEQ(out, b'42\n')
130     try:
131         batchpipe([b'bash', b'-c'], [b'exit 42'])
132     except Exception as ex:
133         WVPASSEQ(str(ex),
134                  "subprocess 'bash -c exit 42' failed with status 42")
135     args = [str(x) for x in range(6)]
136     # Force batchpipe to break the args into batches of 3.  This
137     # approach assumes all args are the same length.
138     arg_max = \
139         helpers._argmax_base([b'echo']) + helpers._argmax_args_size(args[:3])
140     batches = batchpipe(['echo'], args, arg_max=arg_max)
141     WVPASSEQ(next(batches), b'0 1 2\n')
142     WVPASSEQ(next(batches), b'3 4 5\n')
143     WVPASSEQ(next(batches, None), None)
144     batches = batchpipe([b'echo'], [str(x) for x in range(5)], arg_max=arg_max)
145     WVPASSEQ(next(batches), b'0 1 2\n')
146     WVPASSEQ(next(batches), b'3 4\n')
147     WVPASSEQ(next(batches, None), None)
148
149
150 def test_atomically_replaced_file(tmpdir):
151     target_file = os.path.join(tmpdir, b'test-atomic-write')
152
153     with atomically_replaced_file(target_file, mode='w') as f:
154         f.write('asdf')
155         WVPASSEQ(f.mode, 'w')
156     f = open(target_file, 'r')
157     WVPASSEQ(f.read(), 'asdf')
158
159     try:
160         with atomically_replaced_file(target_file, mode='w') as f:
161             f.write('wxyz')
162             raise Exception()
163     except:
164         pass
165     with open(target_file) as f:
166         WVPASSEQ(f.read(), 'asdf')
167
168     with atomically_replaced_file(target_file, mode='wb') as f:
169         f.write(os.urandom(20))
170         WVPASSEQ(f.mode, 'wb')
171
172
173 def set_tz(tz):
174     if not tz:
175         del environ[b'TZ']
176     else:
177         environ[b'TZ'] = tz
178     tzset()
179
180
181 def test_utc_offset_str():
182     tz = environ.get(b'TZ')
183     tzset()
184     try:
185         set_tz(b'FOO+0:00')
186         WVPASSEQ(utc_offset_str(0), b'+0000')
187         set_tz(b'FOO+1:00')
188         WVPASSEQ(utc_offset_str(0), b'-0100')
189         set_tz(b'FOO-1:00')
190         WVPASSEQ(utc_offset_str(0), b'+0100')
191         set_tz(b'FOO+3:3')
192         WVPASSEQ(utc_offset_str(0), b'-0303')
193         set_tz(b'FOO-3:3')
194         WVPASSEQ(utc_offset_str(0), b'+0303')
195         # Offset is not an integer number of minutes
196         set_tz(b'FOO+3:3:3')
197         WVPASSEQ(utc_offset_str(1), b'-0303')
198         set_tz(b'FOO-3:3:3')
199         WVPASSEQ(utc_offset_str(1), b'+0303')
200         WVPASSEQ(utc_offset_str(314159), b'+0303')
201     finally:
202         if tz:
203             set_tz(tz)
204         else:
205             try:
206                 set_tz(None)
207             except KeyError:
208                 pass
209
210 def test_valid_save_name():
211     valid = helpers.valid_save_name
212     WVPASS(valid(b'x'))
213     WVPASS(valid(b'x@'))
214     WVFAIL(valid(b'@'))
215     WVFAIL(valid(b'/'))
216     WVFAIL(valid(b'/foo'))
217     WVFAIL(valid(b'foo/'))
218     WVFAIL(valid(b'/foo/'))
219     WVFAIL(valid(b'foo//bar'))
220     WVFAIL(valid(b'.'))
221     WVFAIL(valid(b'bar.'))
222     WVFAIL(valid(b'foo@{'))
223     for x in b' ~^:?*[\\':
224         WVFAIL(valid(b'foo' + bytes_from_byte(x)))
225     for i in range(20):
226         WVFAIL(valid(b'foo' + bytes_from_uint(i)))
227     WVFAIL(valid(b'foo' + bytes_from_uint(0x7f)))
228     WVFAIL(valid(b'foo..bar'))
229     WVFAIL(valid(b'bar.lock/baz'))
230     WVFAIL(valid(b'foo/bar.lock/baz'))
231     WVFAIL(valid(b'.bar/baz'))
232     WVFAIL(valid(b'foo/.bar/baz'))