2 from __future__ import absolute_import
4 import helpers, math, os, os.path, re, subprocess
8 from bup.compat import bytes_from_byte, bytes_from_uint, environ
9 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
10 grafted_path_components, mkdirp, parse_num,
11 path_components, readpipe, stripped_path_components,
14 from buptest import no_lingering_errors, test_tempdir
15 import bup._helpers as _helpers
18 bup_tmp = os.path.realpath(b'../../../t/tmp')
24 with no_lingering_errors():
29 WVPASSEQ(pn('1.5k'), 1536)
30 WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
31 WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
32 WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
35 def test_detect_fakeroot():
36 with no_lingering_errors():
37 if b'FAKEROOTKEY' in environ:
38 WVPASS(detect_fakeroot())
40 WVPASS(not detect_fakeroot())
43 def test_path_components():
44 with no_lingering_errors():
45 WVPASSEQ(path_components(b'/'), [(b'', b'/')])
46 WVPASSEQ(path_components(b'/foo'), [(b'', b'/'), (b'foo', b'/foo')])
47 WVPASSEQ(path_components(b'/foo/'), [(b'', b'/'), (b'foo', b'/foo')])
48 WVPASSEQ(path_components(b'/foo/bar'),
49 [(b'', b'/'), (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
50 WVEXCEPT(Exception, path_components, b'foo')
54 def test_stripped_path_components():
55 with no_lingering_errors():
56 WVPASSEQ(stripped_path_components(b'/', []), [(b'', b'/')])
57 WVPASSEQ(stripped_path_components(b'/', [b'']), [(b'', b'/')])
58 WVPASSEQ(stripped_path_components(b'/', [b'/']), [(b'', b'/')])
59 WVPASSEQ(stripped_path_components(b'/foo', [b'/']),
60 [(b'', b'/'), (b'foo', b'/foo')])
61 WVPASSEQ(stripped_path_components(b'/', [b'/foo']), [(b'', b'/')])
62 WVPASSEQ(stripped_path_components(b'/foo', [b'/bar']),
63 [(b'', b'/'), (b'foo', b'/foo')])
64 WVPASSEQ(stripped_path_components(b'/foo', [b'/foo']), [(b'', b'/foo')])
65 WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/foo']),
66 [(b'', b'/foo'), (b'bar', b'/foo/bar')])
67 WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/bar', b'/foo', b'/baz']),
68 [(b'', b'/foo'), (b'bar', b'/foo/bar')])
69 WVPASSEQ(stripped_path_components(b'/foo/bar/baz', [b'/foo/bar/baz']),
70 [(b'', b'/foo/bar/baz')])
71 WVEXCEPT(Exception, stripped_path_components, b'foo', [])
75 def test_grafted_path_components():
76 with no_lingering_errors():
77 WVPASSEQ(grafted_path_components([(b'/chroot', b'/')], b'/foo'),
78 [(b'', b'/'), (b'foo', b'/foo')])
79 WVPASSEQ(grafted_path_components([(b'/foo/bar', b'/')],
82 (b'baz', b'/foo/bar/baz'),
83 (b'bax', b'/foo/bar/baz/bax')])
84 WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/bax')],
87 (b'bax', b'/foo/bar/baz'),
88 (b'1', b'/foo/bar/baz/1'),
89 (b'2', b'/foo/bar/baz/1/2')])
90 WVPASSEQ(grafted_path_components([(b'/foo', b'/bar/baz/bax')],
96 (b'bar', b'/foo/bar')])
97 WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/a/b/c')],
99 [(b'', None), (b'a', None), (b'b', None), (b'c', b'/foo/bar/baz')])
100 WVPASSEQ(grafted_path_components([(b'/', b'/a/b/c/')], b'/foo/bar'),
101 [(b'', None), (b'a', None), (b'b', None), (b'c', b'/'),
102 (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
103 WVEXCEPT(Exception, grafted_path_components, b'foo', [])
108 with no_lingering_errors():
109 # Do nothing for strings and bytes
110 WVPASSEQ(shstr(b''), b'')
111 WVPASSEQ(shstr(b'1'), b'1')
112 WVPASSEQ(shstr(b'1 2'), b'1 2')
113 WVPASSEQ(shstr(b"1'2"), b"1'2")
114 WVPASSEQ(shstr(''), '')
115 WVPASSEQ(shstr('1'), '1')
116 WVPASSEQ(shstr('1 2'), '1 2')
117 WVPASSEQ(shstr("1'2"), "1'2")
119 # Escape parts of sequences
120 WVPASSEQ(shstr((b'1 2', b'3')), b"'1 2' 3")
121 WVPASSEQ(shstr((b"1'2", b'3')), b"'1'\"'\"'2' 3")
122 WVPASSEQ(shstr((b"'1", b'3')), b"''\"'\"'1' 3")
123 WVPASSEQ(shstr(('1 2', '3')), "'1 2' 3")
124 WVPASSEQ(shstr(("1'2", '3')), "'1'\"'\"'2' 3")
125 WVPASSEQ(shstr(("'1", '3')), "''\"'\"'1' 3")
130 with no_lingering_errors():
131 x = readpipe([b'echo', b'42'])
134 readpipe([b'bash', b'-c', b'exit 42'])
135 except Exception as ex:
136 rx = '^subprocess b?"bash -c \'exit 42\'" failed with status 42$'
137 if not re.match(rx, str(ex)):
138 WVPASSEQ(str(ex), rx)
142 def test_batchpipe():
143 with no_lingering_errors():
144 for chunk in batchpipe([b'echo'], []):
147 for chunk in batchpipe([b'echo'], [b'42']):
149 WVPASSEQ(out, b'42\n')
151 batchpipe([b'bash', b'-c'], [b'exit 42'])
152 except Exception as ex:
154 "subprocess 'bash -c exit 42' failed with status 42")
155 args = [str(x) for x in range(6)]
156 # Force batchpipe to break the args into batches of 3. This
157 # approach assumes all args are the same length.
159 helpers._argmax_base([b'echo']) + helpers._argmax_args_size(args[:3])
160 batches = batchpipe(['echo'], args, arg_max=arg_max)
161 WVPASSEQ(next(batches), b'0 1 2\n')
162 WVPASSEQ(next(batches), b'3 4 5\n')
163 WVPASSEQ(next(batches, None), None)
164 batches = batchpipe([b'echo'], [str(x) for x in range(5)], arg_max=arg_max)
165 WVPASSEQ(next(batches), b'0 1 2\n')
166 WVPASSEQ(next(batches), b'3 4\n')
167 WVPASSEQ(next(batches, None), None)
171 def test_atomically_replaced_file():
172 with no_lingering_errors():
173 with test_tempdir(b'bup-thelper-') as tmpdir:
174 target_file = os.path.join(tmpdir, b'test-atomic-write')
176 with atomically_replaced_file(target_file, mode='w') as f:
178 WVPASSEQ(f.mode, 'w')
179 f = open(target_file, 'r')
180 WVPASSEQ(f.read(), 'asdf')
183 with atomically_replaced_file(target_file, mode='w') as f:
188 with open(target_file) as f:
189 WVPASSEQ(f.read(), 'asdf')
191 with atomically_replaced_file(target_file, mode='wb') as f:
192 f.write(os.urandom(20))
193 WVPASSEQ(f.mode, 'wb')
205 def test_utc_offset_str():
206 with no_lingering_errors():
207 tz = environ.get(b'TZ')
211 WVPASSEQ(utc_offset_str(0), b'+0000')
213 WVPASSEQ(utc_offset_str(0), b'-0100')
215 WVPASSEQ(utc_offset_str(0), b'+0100')
217 WVPASSEQ(utc_offset_str(0), b'-0303')
219 WVPASSEQ(utc_offset_str(0), b'+0303')
220 # Offset is not an integer number of minutes
222 WVPASSEQ(utc_offset_str(1), b'-0303')
224 WVPASSEQ(utc_offset_str(1), b'+0303')
225 WVPASSEQ(utc_offset_str(314159), b'+0303')
236 def test_valid_save_name():
237 with no_lingering_errors():
238 valid = helpers.valid_save_name
243 WVFAIL(valid(b'/foo'))
244 WVFAIL(valid(b'foo/'))
245 WVFAIL(valid(b'/foo/'))
246 WVFAIL(valid(b'foo//bar'))
248 WVFAIL(valid(b'bar.'))
249 WVFAIL(valid(b'foo@{'))
250 for x in b' ~^:?*[\\':
251 WVFAIL(valid(b'foo' + bytes_from_byte(x)))
253 WVFAIL(valid(b'foo' + bytes_from_uint(i)))
254 WVFAIL(valid(b'foo' + bytes_from_uint(0x7f)))
255 WVFAIL(valid(b'foo..bar'))
256 WVFAIL(valid(b'bar.lock/baz'))
257 WVFAIL(valid(b'foo/bar.lock/baz'))
258 WVFAIL(valid(b'.bar/baz'))
259 WVFAIL(valid(b'foo/.bar/baz'))