]> arthur.barton.de Git - bup.git/blob - lib/bup/t/thelpers.py
helpers: add close_fds to exo
[bup.git] / lib / bup / t / thelpers.py
1
2 from __future__ import absolute_import
3 from time import tzset
4 import helpers, math, os, os.path, re, subprocess
5
6 from wvtest import *
7
8 from bup.compat import bytes_from_byte, bytes_from_uint, environ
9 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
10                          grafted_path_components, mkdirp, parse_num,
11                          path_components, readpipe, stripped_path_components,
12                          shstr,
13                          utc_offset_str)
14 from buptest import no_lingering_errors, test_tempdir
15 import bup._helpers as _helpers
16
17
18 bup_tmp = os.path.realpath(b'../../../t/tmp')
19 mkdirp(bup_tmp)
20
21
22 @wvtest
23 def test_parse_num():
24     with no_lingering_errors():
25         pn = parse_num
26         WVPASSEQ(pn(b'1'), 1)
27         WVPASSEQ(pn('1'), 1)
28         WVPASSEQ(pn('0'), 0)
29         WVPASSEQ(pn('1.5k'), 1536)
30         WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
31         WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
32         WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
33
34 @wvtest
35 def test_detect_fakeroot():
36     with no_lingering_errors():
37         if b'FAKEROOTKEY' in environ:
38             WVPASS(detect_fakeroot())
39         else:
40             WVPASS(not detect_fakeroot())
41
42 @wvtest
43 def test_path_components():
44     with no_lingering_errors():
45         WVPASSEQ(path_components(b'/'), [(b'', b'/')])
46         WVPASSEQ(path_components(b'/foo'), [(b'', b'/'), (b'foo', b'/foo')])
47         WVPASSEQ(path_components(b'/foo/'), [(b'', b'/'), (b'foo', b'/foo')])
48         WVPASSEQ(path_components(b'/foo/bar'),
49                  [(b'', b'/'), (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
50         WVEXCEPT(Exception, path_components, b'foo')
51
52
53 @wvtest
54 def test_stripped_path_components():
55     with no_lingering_errors():
56         WVPASSEQ(stripped_path_components(b'/', []), [(b'', b'/')])
57         WVPASSEQ(stripped_path_components(b'/', [b'']), [(b'', b'/')])
58         WVPASSEQ(stripped_path_components(b'/', [b'/']), [(b'', b'/')])
59         WVPASSEQ(stripped_path_components(b'/foo', [b'/']),
60                  [(b'', b'/'), (b'foo', b'/foo')])
61         WVPASSEQ(stripped_path_components(b'/', [b'/foo']), [(b'', b'/')])
62         WVPASSEQ(stripped_path_components(b'/foo', [b'/bar']),
63                  [(b'', b'/'), (b'foo', b'/foo')])
64         WVPASSEQ(stripped_path_components(b'/foo', [b'/foo']), [(b'', b'/foo')])
65         WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/foo']),
66                  [(b'', b'/foo'), (b'bar', b'/foo/bar')])
67         WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/bar', b'/foo', b'/baz']),
68                  [(b'', b'/foo'), (b'bar', b'/foo/bar')])
69         WVPASSEQ(stripped_path_components(b'/foo/bar/baz', [b'/foo/bar/baz']),
70                  [(b'', b'/foo/bar/baz')])
71         WVEXCEPT(Exception, stripped_path_components, b'foo', [])
72
73
74 @wvtest
75 def test_grafted_path_components():
76     with no_lingering_errors():
77         WVPASSEQ(grafted_path_components([(b'/chroot', b'/')], b'/foo'),
78                  [(b'', b'/'), (b'foo', b'/foo')])
79         WVPASSEQ(grafted_path_components([(b'/foo/bar', b'/')],
80                                          b'/foo/bar/baz/bax'),
81                  [(b'', b'/foo/bar'),
82                   (b'baz', b'/foo/bar/baz'),
83                   (b'bax', b'/foo/bar/baz/bax')])
84         WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/bax')],
85                                          b'/foo/bar/baz/1/2'),
86                  [(b'', None),
87                   (b'bax', b'/foo/bar/baz'),
88                   (b'1', b'/foo/bar/baz/1'),
89                   (b'2', b'/foo/bar/baz/1/2')])
90         WVPASSEQ(grafted_path_components([(b'/foo', b'/bar/baz/bax')],
91                                          b'/foo/bar'),
92                  [(b'', None),
93                   (b'bar', None),
94                   (b'baz', None),
95                   (b'bax', b'/foo'),
96                   (b'bar', b'/foo/bar')])
97         WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/a/b/c')],
98                                          b'/foo/bar/baz'),
99                  [(b'', None), (b'a', None), (b'b', None), (b'c', b'/foo/bar/baz')])
100         WVPASSEQ(grafted_path_components([(b'/', b'/a/b/c/')], b'/foo/bar'),
101                  [(b'', None), (b'a', None), (b'b', None), (b'c', b'/'),
102                   (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
103         WVEXCEPT(Exception, grafted_path_components, b'foo', [])
104
105
106 @wvtest
107 def test_shstr():
108     with no_lingering_errors():
109         # Do nothing for strings and bytes
110         WVPASSEQ(shstr(b''), b'')
111         WVPASSEQ(shstr(b'1'), b'1')
112         WVPASSEQ(shstr(b'1 2'), b'1 2')
113         WVPASSEQ(shstr(b"1'2"), b"1'2")
114         WVPASSEQ(shstr(''), '')
115         WVPASSEQ(shstr('1'), '1')
116         WVPASSEQ(shstr('1 2'), '1 2')
117         WVPASSEQ(shstr("1'2"), "1'2")
118
119         # Escape parts of sequences
120         WVPASSEQ(shstr((b'1 2', b'3')), b"'1 2' 3")
121         WVPASSEQ(shstr((b"1'2", b'3')), b"'1'\"'\"'2' 3")
122         WVPASSEQ(shstr((b"'1", b'3')), b"''\"'\"'1' 3")
123         WVPASSEQ(shstr(('1 2', '3')), "'1 2' 3")
124         WVPASSEQ(shstr(("1'2", '3')), "'1'\"'\"'2' 3")
125         WVPASSEQ(shstr(("'1", '3')), "''\"'\"'1' 3")
126
127
128 @wvtest
129 def test_readpipe():
130     with no_lingering_errors():
131         x = readpipe([b'echo', b'42'])
132         WVPASSEQ(x, b'42\n')
133         try:
134             readpipe([b'bash', b'-c', b'exit 42'])
135         except Exception as ex:
136             rx = '^subprocess b?"bash -c \'exit 42\'" failed with status 42$'
137             if not re.match(rx, str(ex)):
138                 WVPASSEQ(str(ex), rx)
139
140
141 @wvtest
142 def test_batchpipe():
143     with no_lingering_errors():
144         for chunk in batchpipe([b'echo'], []):
145             WVPASS(False)
146         out = b''
147         for chunk in batchpipe([b'echo'], [b'42']):
148             out += chunk
149         WVPASSEQ(out, b'42\n')
150         try:
151             batchpipe([b'bash', b'-c'], [b'exit 42'])
152         except Exception as ex:
153             WVPASSEQ(str(ex),
154                      "subprocess 'bash -c exit 42' failed with status 42")
155         args = [str(x) for x in range(6)]
156         # Force batchpipe to break the args into batches of 3.  This
157         # approach assumes all args are the same length.
158         arg_max = \
159             helpers._argmax_base([b'echo']) + helpers._argmax_args_size(args[:3])
160         batches = batchpipe(['echo'], args, arg_max=arg_max)
161         WVPASSEQ(next(batches), b'0 1 2\n')
162         WVPASSEQ(next(batches), b'3 4 5\n')
163         WVPASSEQ(next(batches, None), None)
164         batches = batchpipe([b'echo'], [str(x) for x in range(5)], arg_max=arg_max)
165         WVPASSEQ(next(batches), b'0 1 2\n')
166         WVPASSEQ(next(batches), b'3 4\n')
167         WVPASSEQ(next(batches, None), None)
168
169
170 @wvtest
171 def test_atomically_replaced_file():
172     with no_lingering_errors():
173         with test_tempdir(b'bup-thelper-') as tmpdir:
174             target_file = os.path.join(tmpdir, b'test-atomic-write')
175
176             with atomically_replaced_file(target_file, mode='w') as f:
177                 f.write('asdf')
178                 WVPASSEQ(f.mode, 'w')
179             f = open(target_file, 'r')
180             WVPASSEQ(f.read(), 'asdf')
181
182             try:
183                 with atomically_replaced_file(target_file, mode='w') as f:
184                     f.write('wxyz')
185                     raise Exception()
186             except:
187                 pass
188             with open(target_file) as f:
189                 WVPASSEQ(f.read(), 'asdf')
190
191             with atomically_replaced_file(target_file, mode='wb') as f:
192                 f.write(os.urandom(20))
193                 WVPASSEQ(f.mode, 'wb')
194
195
196 def set_tz(tz):
197     if not tz:
198         del environ[b'TZ']
199     else:
200         environ[b'TZ'] = tz
201     tzset()
202
203
204 @wvtest
205 def test_utc_offset_str():
206     with no_lingering_errors():
207         tz = environ.get(b'TZ')
208         tzset()
209         try:
210             set_tz(b'FOO+0:00')
211             WVPASSEQ(utc_offset_str(0), b'+0000')
212             set_tz(b'FOO+1:00')
213             WVPASSEQ(utc_offset_str(0), b'-0100')
214             set_tz(b'FOO-1:00')
215             WVPASSEQ(utc_offset_str(0), b'+0100')
216             set_tz(b'FOO+3:3')
217             WVPASSEQ(utc_offset_str(0), b'-0303')
218             set_tz(b'FOO-3:3')
219             WVPASSEQ(utc_offset_str(0), b'+0303')
220             # Offset is not an integer number of minutes
221             set_tz(b'FOO+3:3:3')
222             WVPASSEQ(utc_offset_str(1), b'-0303')
223             set_tz(b'FOO-3:3:3')
224             WVPASSEQ(utc_offset_str(1), b'+0303')
225             WVPASSEQ(utc_offset_str(314159), b'+0303')
226         finally:
227             if tz:
228                 set_tz(tz)
229             else:
230                 try:
231                     set_tz(None)
232                 except KeyError:
233                     pass
234
235 @wvtest
236 def test_valid_save_name():
237     with no_lingering_errors():
238         valid = helpers.valid_save_name
239         WVPASS(valid(b'x'))
240         WVPASS(valid(b'x@'))
241         WVFAIL(valid(b'@'))
242         WVFAIL(valid(b'/'))
243         WVFAIL(valid(b'/foo'))
244         WVFAIL(valid(b'foo/'))
245         WVFAIL(valid(b'/foo/'))
246         WVFAIL(valid(b'foo//bar'))
247         WVFAIL(valid(b'.'))
248         WVFAIL(valid(b'bar.'))
249         WVFAIL(valid(b'foo@{'))
250         for x in b' ~^:?*[\\':
251             WVFAIL(valid(b'foo' + bytes_from_byte(x)))
252         for i in range(20):
253             WVFAIL(valid(b'foo' + bytes_from_uint(i)))
254         WVFAIL(valid(b'foo' + bytes_from_uint(0x7f)))
255         WVFAIL(valid(b'foo..bar'))
256         WVFAIL(valid(b'bar.lock/baz'))
257         WVFAIL(valid(b'foo/bar.lock/baz'))
258         WVFAIL(valid(b'.bar/baz'))
259         WVFAIL(valid(b'foo/.bar/baz'))