2 from __future__ import absolute_import
5 from bup import helpers
9 from bup.compat import bytes_from_byte, bytes_from_uint, environ
10 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
11 grafted_path_components, parse_num,
12 path_components, readpipe, stripped_path_components,
22 WVPASSEQ(pn('1.5k'), 1536)
23 WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
24 WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
25 WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
27 def test_detect_fakeroot():
28 if b'FAKEROOTKEY' in environ:
29 WVPASS(detect_fakeroot())
31 WVPASS(not detect_fakeroot())
33 def test_path_components():
34 WVPASSEQ(path_components(b'/'), [(b'', b'/')])
35 WVPASSEQ(path_components(b'/foo'), [(b'', b'/'), (b'foo', b'/foo')])
36 WVPASSEQ(path_components(b'/foo/'), [(b'', b'/'), (b'foo', b'/foo')])
37 WVPASSEQ(path_components(b'/foo/bar'),
38 [(b'', b'/'), (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
39 WVEXCEPT(Exception, path_components, b'foo')
42 def test_stripped_path_components():
43 WVPASSEQ(stripped_path_components(b'/', []), [(b'', b'/')])
44 WVPASSEQ(stripped_path_components(b'/', [b'']), [(b'', b'/')])
45 WVPASSEQ(stripped_path_components(b'/', [b'/']), [(b'', b'/')])
46 WVPASSEQ(stripped_path_components(b'/foo', [b'/']),
47 [(b'', b'/'), (b'foo', b'/foo')])
48 WVPASSEQ(stripped_path_components(b'/', [b'/foo']), [(b'', b'/')])
49 WVPASSEQ(stripped_path_components(b'/foo', [b'/bar']),
50 [(b'', b'/'), (b'foo', b'/foo')])
51 WVPASSEQ(stripped_path_components(b'/foo', [b'/foo']), [(b'', b'/foo')])
52 WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/foo']),
53 [(b'', b'/foo'), (b'bar', b'/foo/bar')])
54 WVPASSEQ(stripped_path_components(b'/foo/bar', [b'/bar', b'/foo', b'/baz']),
55 [(b'', b'/foo'), (b'bar', b'/foo/bar')])
56 WVPASSEQ(stripped_path_components(b'/foo/bar/baz', [b'/foo/bar/baz']),
57 [(b'', b'/foo/bar/baz')])
58 WVEXCEPT(Exception, stripped_path_components, b'foo', [])
61 def test_grafted_path_components():
62 WVPASSEQ(grafted_path_components([(b'/chroot', b'/')], b'/foo'),
63 [(b'', b'/'), (b'foo', b'/foo')])
64 WVPASSEQ(grafted_path_components([(b'/foo/bar', b'/')],
67 (b'baz', b'/foo/bar/baz'),
68 (b'bax', b'/foo/bar/baz/bax')])
69 WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/bax')],
72 (b'bax', b'/foo/bar/baz'),
73 (b'1', b'/foo/bar/baz/1'),
74 (b'2', b'/foo/bar/baz/1/2')])
75 WVPASSEQ(grafted_path_components([(b'/foo', b'/bar/baz/bax')],
81 (b'bar', b'/foo/bar')])
82 WVPASSEQ(grafted_path_components([(b'/foo/bar/baz', b'/a/b/c')],
84 [(b'', None), (b'a', None), (b'b', None), (b'c', b'/foo/bar/baz')])
85 WVPASSEQ(grafted_path_components([(b'/', b'/a/b/c/')], b'/foo/bar'),
86 [(b'', None), (b'a', None), (b'b', None), (b'c', b'/'),
87 (b'foo', b'/foo'), (b'bar', b'/foo/bar')])
88 WVEXCEPT(Exception, grafted_path_components, b'foo', [])
92 # Do nothing for strings and bytes
93 WVPASSEQ(shstr(b''), b'')
94 WVPASSEQ(shstr(b'1'), b'1')
95 WVPASSEQ(shstr(b'1 2'), b'1 2')
96 WVPASSEQ(shstr(b"1'2"), b"1'2")
97 WVPASSEQ(shstr(''), '')
98 WVPASSEQ(shstr('1'), '1')
99 WVPASSEQ(shstr('1 2'), '1 2')
100 WVPASSEQ(shstr("1'2"), "1'2")
102 # Escape parts of sequences
103 WVPASSEQ(shstr((b'1 2', b'3')), b"'1 2' 3")
104 WVPASSEQ(shstr((b"1'2", b'3')), b"'1'\"'\"'2' 3")
105 WVPASSEQ(shstr((b"'1", b'3')), b"''\"'\"'1' 3")
106 WVPASSEQ(shstr(('1 2', '3')), "'1 2' 3")
107 WVPASSEQ(shstr(("1'2", '3')), "'1'\"'\"'2' 3")
108 WVPASSEQ(shstr(("'1", '3')), "''\"'\"'1' 3")
112 x = readpipe([b'echo', b'42'])
115 readpipe([b'bash', b'-c', b'exit 42'])
116 except Exception as ex:
117 rx = '^subprocess b?"bash -c \'exit 42\'" failed with status 42$'
118 if not re.match(rx, str(ex)):
119 WVPASSEQ(str(ex), rx)
122 def test_batchpipe():
123 for chunk in batchpipe([b'echo'], []):
126 for chunk in batchpipe([b'echo'], [b'42']):
128 WVPASSEQ(out, b'42\n')
130 batchpipe([b'bash', b'-c'], [b'exit 42'])
131 except Exception as ex:
133 "subprocess 'bash -c exit 42' failed with status 42")
134 args = [str(x) for x in range(6)]
135 # Force batchpipe to break the args into batches of 3. This
136 # approach assumes all args are the same length.
138 helpers._argmax_base([b'echo']) + helpers._argmax_args_size(args[:3])
139 batches = batchpipe(['echo'], args, arg_max=arg_max)
140 WVPASSEQ(next(batches), b'0 1 2\n')
141 WVPASSEQ(next(batches), b'3 4 5\n')
142 WVPASSEQ(next(batches, None), None)
143 batches = batchpipe([b'echo'], [str(x) for x in range(5)], arg_max=arg_max)
144 WVPASSEQ(next(batches), b'0 1 2\n')
145 WVPASSEQ(next(batches), b'3 4\n')
146 WVPASSEQ(next(batches, None), None)
149 def test_atomically_replaced_file(tmpdir):
150 target_file = os.path.join(tmpdir, b'test-atomic-write')
152 with atomically_replaced_file(target_file, mode='w') as f:
154 WVPASSEQ(f.mode, 'w')
155 f = open(target_file, 'r')
156 WVPASSEQ(f.read(), 'asdf')
159 with atomically_replaced_file(target_file, mode='w') as f:
164 with open(target_file) as f:
165 WVPASSEQ(f.read(), 'asdf')
167 with atomically_replaced_file(target_file, mode='wb') as f:
168 f.write(os.urandom(20))
169 WVPASSEQ(f.mode, 'wb')
180 def test_utc_offset_str():
181 tz = environ.get(b'TZ')
185 WVPASSEQ(utc_offset_str(0), b'+0000')
187 WVPASSEQ(utc_offset_str(0), b'-0100')
189 WVPASSEQ(utc_offset_str(0), b'+0100')
191 WVPASSEQ(utc_offset_str(0), b'-0303')
193 WVPASSEQ(utc_offset_str(0), b'+0303')
194 # Offset is not an integer number of minutes
196 WVPASSEQ(utc_offset_str(1), b'-0303')
198 WVPASSEQ(utc_offset_str(1), b'+0303')
199 WVPASSEQ(utc_offset_str(314159), b'+0303')
209 def test_valid_save_name():
210 valid = helpers.valid_save_name
215 WVFAIL(valid(b'/foo'))
216 WVFAIL(valid(b'foo/'))
217 WVFAIL(valid(b'/foo/'))
218 WVFAIL(valid(b'foo//bar'))
220 WVFAIL(valid(b'bar.'))
221 WVFAIL(valid(b'foo@{'))
222 for x in b' ~^:?*[\\':
223 WVFAIL(valid(b'foo' + bytes_from_byte(x)))
225 WVFAIL(valid(b'foo' + bytes_from_uint(i)))
226 WVFAIL(valid(b'foo' + bytes_from_uint(0x7f)))
227 WVFAIL(valid(b'foo..bar'))
228 WVFAIL(valid(b'bar.lock/baz'))
229 WVFAIL(valid(b'foo/bar.lock/baz'))
230 WVFAIL(valid(b'.bar/baz'))
231 WVFAIL(valid(b'foo/.bar/baz'))