2 from __future__ import absolute_import
3 import helpers, math, os, os.path, stat, subprocess
7 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
8 grafted_path_components, mkdirp, parse_num,
9 path_components, readpipe, stripped_path_components,
11 from buptest import no_lingering_errors, test_tempdir
12 import bup._helpers as _helpers
15 bup_tmp = os.path.realpath('../../../t/tmp')
21 with no_lingering_errors():
25 WVPASSEQ(pn('1.5k'), 1536)
26 WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
27 WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
28 WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
31 def test_detect_fakeroot():
32 with no_lingering_errors():
33 if os.getenv('FAKEROOTKEY'):
34 WVPASS(detect_fakeroot())
36 WVPASS(not detect_fakeroot())
39 def test_path_components():
40 with no_lingering_errors():
41 WVPASSEQ(path_components('/'), [('', '/')])
42 WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
43 WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
44 WVPASSEQ(path_components('/foo/bar'),
45 [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
46 WVEXCEPT(Exception, path_components, 'foo')
50 def test_stripped_path_components():
51 with no_lingering_errors():
52 WVPASSEQ(stripped_path_components('/', []), [('', '/')])
53 WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
54 WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
55 WVPASSEQ(stripped_path_components('/foo', ['/']),
56 [('', '/'), ('foo', '/foo')])
57 WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
58 WVPASSEQ(stripped_path_components('/foo', ['/bar']),
59 [('', '/'), ('foo', '/foo')])
60 WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
61 WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
62 [('', '/foo'), ('bar', '/foo/bar')])
63 WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
64 [('', '/foo'), ('bar', '/foo/bar')])
65 WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
66 [('', '/foo/bar/baz')])
67 WVEXCEPT(Exception, stripped_path_components, 'foo', [])
71 def test_grafted_path_components():
72 with no_lingering_errors():
73 WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
74 [('', '/'), ('foo', '/foo')])
75 WVPASSEQ(grafted_path_components([('/foo/bar', '/')],
78 ('baz', '/foo/bar/baz'),
79 ('bax', '/foo/bar/baz/bax')])
80 WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
83 ('bax', '/foo/bar/baz'),
84 ('1', '/foo/bar/baz/1'),
85 ('2', '/foo/bar/baz/1/2')])
86 WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
93 WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
95 [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
96 WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
97 [('', None), ('a', None), ('b', None), ('c', '/'),
98 ('foo', '/foo'), ('bar', '/foo/bar')])
99 WVEXCEPT(Exception, grafted_path_components, 'foo', [])
104 with no_lingering_errors():
105 x = readpipe(['echo', '42'])
108 readpipe(['bash', '-c', 'exit 42'])
109 except Exception as ex:
111 "subprocess 'bash -c exit 42' failed with status 42")
115 def test_batchpipe():
116 with no_lingering_errors():
117 for chunk in batchpipe(['echo'], []):
120 for chunk in batchpipe(['echo'], ['42']):
122 WVPASSEQ(out, '42\n')
124 batchpipe(['bash', '-c'], ['exit 42'])
125 except Exception as ex:
127 "subprocess 'bash -c exit 42' failed with status 42")
128 args = [str(x) for x in range(6)]
129 # Force batchpipe to break the args into batches of 3. This
130 # approach assumes all args are the same length.
132 helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
133 batches = batchpipe(['echo'], args, arg_max=arg_max)
134 WVPASSEQ(next(batches), '0 1 2\n')
135 WVPASSEQ(next(batches), '3 4 5\n')
136 WVPASSEQ(next(batches, None), None)
137 batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
138 WVPASSEQ(next(batches), '0 1 2\n')
139 WVPASSEQ(next(batches), '3 4\n')
140 WVPASSEQ(next(batches, None), None)
144 def test_atomically_replaced_file():
145 with no_lingering_errors():
146 with test_tempdir('bup-thelper-') as tmpdir:
147 target_file = os.path.join(tmpdir, 'test-atomic-write')
149 with atomically_replaced_file(target_file, mode='w') as f:
151 WVPASSEQ(f.mode, 'w')
152 f = open(target_file, 'r')
153 WVPASSEQ(f.read(), 'asdf')
156 with atomically_replaced_file(target_file, mode='w') as f:
161 with open(target_file) as f:
162 WVPASSEQ(f.read(), 'asdf')
164 with atomically_replaced_file(target_file, mode='wb') as f:
165 f.write(os.urandom(20))
166 WVPASSEQ(f.mode, 'wb')
170 def test_utc_offset_str():
171 with no_lingering_errors():
172 tz = os.environ.get('TZ')
174 os.environ['TZ'] = 'FOO+0:00'
175 WVPASSEQ(utc_offset_str(0), '+0000')
176 os.environ['TZ'] = 'FOO+1:00'
177 WVPASSEQ(utc_offset_str(0), '-0100')
178 os.environ['TZ'] = 'FOO-1:00'
179 WVPASSEQ(utc_offset_str(0), '+0100')
180 os.environ['TZ'] = 'FOO+3:3'
181 WVPASSEQ(utc_offset_str(0), '-0303')
182 os.environ['TZ'] = 'FOO-3:3'
183 WVPASSEQ(utc_offset_str(0), '+0303')
184 # Offset is not an integer number of minutes
185 os.environ['TZ'] = 'FOO+3:3:3'
186 WVPASSEQ(utc_offset_str(1), '-0303')
187 os.environ['TZ'] = 'FOO-3:3:3'
188 WVPASSEQ(utc_offset_str(1), '+0303')
189 WVPASSEQ(utc_offset_str(314159), '+0303')
192 os.environ['TZ'] = tz
200 def test_valid_save_name():
201 with no_lingering_errors():
202 valid = helpers.valid_save_name
207 WVFAIL(valid('/foo'))
208 WVFAIL(valid('foo/'))
209 WVFAIL(valid('/foo/'))
210 WVFAIL(valid('foo//bar'))
212 WVFAIL(valid('bar.'))
213 WVFAIL(valid('foo@{'))
214 for x in ' ~^:?*[\\':
215 WVFAIL(valid('foo' + x))
217 WVFAIL(valid('foo' + chr(i)))
218 WVFAIL(valid('foo' + chr(0x7f)))
219 WVFAIL(valid('foo..bar'))
220 WVFAIL(valid('bar.lock/baz'))
221 WVFAIL(valid('foo/bar.lock/baz'))
222 WVFAIL(valid('.bar/baz'))
223 WVFAIL(valid('foo/.bar/baz'))