2 from __future__ import absolute_import
4 import helpers, math, os, os.path, re, subprocess
8 from bup.compat import bytes_from_byte, bytes_from_uint, environ
9 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
10 grafted_path_components, mkdirp, parse_num,
11 path_components, readpipe, stripped_path_components,
14 from buptest import no_lingering_errors, test_tempdir
15 import bup._helpers as _helpers
18 bup_tmp = os.path.realpath(b'../../../t/tmp')
24 with no_lingering_errors():
29 WVPASSEQ(pn('1.5k'), 1536)
30 WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
31 WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
32 WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
35 def test_detect_fakeroot():
36 with no_lingering_errors():
37 if b'FAKEROOTKEY' in environ:
38 WVPASS(detect_fakeroot())
40 WVPASS(not detect_fakeroot())
43 def test_path_components():
44 with no_lingering_errors():
45 WVPASSEQ(path_components(b'/'), [(b'', b'/')])
46 WVPASSEQ(path_components(b'/foo'), [(b'', b'/'), (b'foo', b'/foo')])
47 WVPASSEQ(path_components(b'/foo/'), [(b'', b'/'), (b'foo', b'/foo')])
48 WVPASSEQ(path_components(b'/foo/bar'),
49 [(b'', b'/'), (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
50 WVEXCEPT(Exception, path_components, b'foo')
54 def test_stripped_path_components():
55 with no_lingering_errors():
56 WVPASSEQ(stripped_path_components(b'/', []), [(b'', b'/')])
57 WVPASSEQ(stripped_path_components(b'/', [b'']), [(b'', b'/')])
58 WVPASSEQ(stripped_path_components(b'/', [b'/']), [(b'', b'/')])
59 WVPASSEQ(stripped_path_components(b'/foo', [b'/']),
60 [(b'', b'/'), (b'foo', b'/foo')])
61 WVPASSEQ(stripped_path_components(b'/', [b'/foo']), [(b'', b'/')])
62 WVPASSEQ(stripped_path_components(b'/foo', [b'/bar']),
63 [(b'', b'/'), (b'foo', b'/foo')])
64 WVPASSEQ(stripped_path_components(b'/foo', [b'/foo']), [(b'', b'/foo')])
65 WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/foo']),
66 [(b'', b'/foo'), (b'bar', b'/foo/bar')])
67 WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/bar', b'/foo', b'/baz']),
68 [(b'', b'/foo'), (b'bar', b'/foo/bar')])
69 WVPASSEQ(stripped_path_components(b'/foo/bar/baz', [b'/foo/bar/baz']),
70 [(b'', b'/foo/bar/baz')])
71 WVEXCEPT(Exception, stripped_path_components, b'foo', [])
75 def test_grafted_path_components():
76 with no_lingering_errors():
77 WVPASSEQ(grafted_path_components([(b'/chroot', b'/')], b'/foo'),
78 [(b'', b'/'), (b'foo', b'/foo')])
79 WVPASSEQ(grafted_path_components([(b'/foo/bar', b'/')],
82 (b'baz', b'/foo/bar/baz'),
83 (b'bax', b'/foo/bar/baz/bax')])
84 WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/bax')],
87 (b'bax', b'/foo/bar/baz'),
88 (b'1', b'/foo/bar/baz/1'),
89 (b'2', b'/foo/bar/baz/1/2')])
90 WVPASSEQ(grafted_path_components([(b'/foo', b'/bar/baz/bax')],
96 (b'bar', b'/foo/bar')])
97 WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/a/b/c')],
99 [(b'', None), (b'a', None), (b'b', None), (b'c', b'/foo/bar/baz')])
100 WVPASSEQ(grafted_path_components([(b'/', b'/a/b/c/')], b'/foo/bar'),
101 [(b'', None), (b'a', None), (b'b', None), (b'c', b'/'),
102 (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
103 WVEXCEPT(Exception, grafted_path_components, b'foo', [])
108 with no_lingering_errors():
109 # Do nothing for strings and bytes
110 WVPASSEQ(shstr(b''), b'')
111 WVPASSEQ(shstr(b'1'), b'1')
112 WVPASSEQ(shstr(b'1 2'), b'1 2')
113 WVPASSEQ(shstr(b"1'2"), b"1'2")
114 WVPASSEQ(shstr(''), '')
115 WVPASSEQ(shstr('1'), '1')
116 WVPASSEQ(shstr('1 2'), '1 2')
117 WVPASSEQ(shstr("1'2"), "1'2")
119 # Escape parts of sequences
120 WVPASSEQ(shstr((b'1 2', b'3')), b"'1 2' 3")
121 WVPASSEQ(shstr((b"1'2", b'3')), b"'1'\"'\"'2' 3")
122 WVPASSEQ(shstr((b"'1", b'3')), b"''\"'\"'1' 3")
123 WVPASSEQ(shstr(('1 2', '3')), "'1 2' 3")
124 WVPASSEQ(shstr(("1'2", '3')), "'1'\"'\"'2' 3")
125 WVPASSEQ(shstr(("'1", '3')), "''\"'\"'1' 3")
130 with no_lingering_errors():
131 x = readpipe([b'echo', b'42'])
134 readpipe([b'bash', b'-c', b'exit 42'])
135 except Exception as ex:
136 if not re.match("^subprocess b?'bash -c exit 42' failed with status 42$",
139 "^subprocess b?'bash -c exit 42' failed with status 42$")
144 def test_batchpipe():
145 with no_lingering_errors():
146 for chunk in batchpipe([b'echo'], []):
149 for chunk in batchpipe([b'echo'], [b'42']):
151 WVPASSEQ(out, b'42\n')
153 batchpipe([b'bash', b'-c'], [b'exit 42'])
154 except Exception as ex:
156 "subprocess 'bash -c exit 42' failed with status 42")
157 args = [str(x) for x in range(6)]
158 # Force batchpipe to break the args into batches of 3. This
159 # approach assumes all args are the same length.
161 helpers._argmax_base([b'echo']) + helpers._argmax_args_size(args[:3])
162 batches = batchpipe(['echo'], args, arg_max=arg_max)
163 WVPASSEQ(next(batches), b'0 1 2\n')
164 WVPASSEQ(next(batches), b'3 4 5\n')
165 WVPASSEQ(next(batches, None), None)
166 batches = batchpipe([b'echo'], [str(x) for x in range(5)], arg_max=arg_max)
167 WVPASSEQ(next(batches), b'0 1 2\n')
168 WVPASSEQ(next(batches), b'3 4\n')
169 WVPASSEQ(next(batches, None), None)
173 def test_atomically_replaced_file():
174 with no_lingering_errors():
175 with test_tempdir(b'bup-thelper-') as tmpdir:
176 target_file = os.path.join(tmpdir, b'test-atomic-write')
178 with atomically_replaced_file(target_file, mode='w') as f:
180 WVPASSEQ(f.mode, 'w')
181 f = open(target_file, 'r')
182 WVPASSEQ(f.read(), 'asdf')
185 with atomically_replaced_file(target_file, mode='w') as f:
190 with open(target_file) as f:
191 WVPASSEQ(f.read(), 'asdf')
193 with atomically_replaced_file(target_file, mode='wb') as f:
194 f.write(os.urandom(20))
195 WVPASSEQ(f.mode, 'wb')
207 def test_utc_offset_str():
208 with no_lingering_errors():
209 tz = environ.get(b'TZ')
213 WVPASSEQ(utc_offset_str(0), b'+0000')
215 WVPASSEQ(utc_offset_str(0), b'-0100')
217 WVPASSEQ(utc_offset_str(0), b'+0100')
219 WVPASSEQ(utc_offset_str(0), b'-0303')
221 WVPASSEQ(utc_offset_str(0), b'+0303')
222 # Offset is not an integer number of minutes
224 WVPASSEQ(utc_offset_str(1), b'-0303')
226 WVPASSEQ(utc_offset_str(1), b'+0303')
227 WVPASSEQ(utc_offset_str(314159), b'+0303')
238 def test_valid_save_name():
239 with no_lingering_errors():
240 valid = helpers.valid_save_name
245 WVFAIL(valid(b'/foo'))
246 WVFAIL(valid(b'foo/'))
247 WVFAIL(valid(b'/foo/'))
248 WVFAIL(valid(b'foo//bar'))
250 WVFAIL(valid(b'bar.'))
251 WVFAIL(valid(b'foo@{'))
252 for x in b' ~^:?*[\\':
253 WVFAIL(valid(b'foo' + bytes_from_byte(x)))
255 WVFAIL(valid(b'foo' + bytes_from_uint(i)))
256 WVFAIL(valid(b'foo' + bytes_from_uint(0x7f)))
257 WVFAIL(valid(b'foo..bar'))
258 WVFAIL(valid(b'bar.lock/baz'))
259 WVFAIL(valid(b'foo/bar.lock/baz'))
260 WVFAIL(valid(b'.bar/baz'))
261 WVFAIL(valid(b'foo/.bar/baz'))