2 import helpers, math, os, os.path, stat, subprocess
6 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
7 grafted_path_components, mkdirp, parse_num,
8 path_components, readpipe, stripped_path_components,
10 from buptest import no_lingering_errors, test_tempdir
11 import bup._helpers as _helpers
14 bup_tmp = os.path.realpath('../../../t/tmp')
19 with no_lingering_errors():
20 # Test whatever you end up with for next() after import '*'.
21 WVPASSEQ(next(iter([]), None), None)
23 WVPASSEQ(next(x, None), 1)
24 WVPASSEQ(next(x, None), None)
26 WVPASSEQ(next(x, 'x'), 1)
27 WVPASSEQ(next(x, 'x'), 'x')
28 WVEXCEPT(StopIteration, next, iter([]))
31 WVEXCEPT(StopIteration, next, x)
35 def test_fallback_next():
36 with no_lingering_errors():
39 next = helpers._fallback_next
48 with no_lingering_errors():
52 WVPASSEQ(pn('1.5k'), 1536)
53 WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
54 WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
55 WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
58 def test_detect_fakeroot():
59 with no_lingering_errors():
60 if os.getenv('FAKEROOTKEY'):
61 WVPASS(detect_fakeroot())
63 WVPASS(not detect_fakeroot())
66 def test_path_components():
67 with no_lingering_errors():
68 WVPASSEQ(path_components('/'), [('', '/')])
69 WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
70 WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
71 WVPASSEQ(path_components('/foo/bar'),
72 [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
73 WVEXCEPT(Exception, path_components, 'foo')
77 def test_stripped_path_components():
78 with no_lingering_errors():
79 WVPASSEQ(stripped_path_components('/', []), [('', '/')])
80 WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
81 WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
82 WVPASSEQ(stripped_path_components('/foo', ['/']),
83 [('', '/'), ('foo', '/foo')])
84 WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
85 WVPASSEQ(stripped_path_components('/foo', ['/bar']),
86 [('', '/'), ('foo', '/foo')])
87 WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
88 WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
89 [('', '/foo'), ('bar', '/foo/bar')])
90 WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
91 [('', '/foo'), ('bar', '/foo/bar')])
92 WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
93 [('', '/foo/bar/baz')])
94 WVEXCEPT(Exception, stripped_path_components, 'foo', [])
98 def test_grafted_path_components():
99 with no_lingering_errors():
100 WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
101 [('', '/'), ('foo', '/foo')])
102 WVPASSEQ(grafted_path_components([('/foo/bar', '/')],
105 ('baz', '/foo/bar/baz'),
106 ('bax', '/foo/bar/baz/bax')])
107 WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
110 ('bax', '/foo/bar/baz'),
111 ('1', '/foo/bar/baz/1'),
112 ('2', '/foo/bar/baz/1/2')])
113 WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
119 ('bar', '/foo/bar')])
120 WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
122 [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
123 WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
124 [('', None), ('a', None), ('b', None), ('c', '/'),
125 ('foo', '/foo'), ('bar', '/foo/bar')])
126 WVEXCEPT(Exception, grafted_path_components, 'foo', [])
131 with no_lingering_errors():
132 x = readpipe(['echo', '42'])
135 readpipe(['bash', '-c', 'exit 42'])
136 except Exception as ex:
138 "subprocess 'bash -c exit 42' failed with status 42")
142 def test_batchpipe():
143 with no_lingering_errors():
144 for chunk in batchpipe(['echo'], []):
147 for chunk in batchpipe(['echo'], ['42']):
149 WVPASSEQ(out, '42\n')
151 batchpipe(['bash', '-c'], ['exit 42'])
152 except Exception as ex:
154 "subprocess 'bash -c exit 42' failed with status 42")
155 args = [str(x) for x in range(6)]
156 # Force batchpipe to break the args into batches of 3. This
157 # approach assumes all args are the same length.
159 helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
160 batches = batchpipe(['echo'], args, arg_max=arg_max)
161 WVPASSEQ(next(batches), '0 1 2\n')
162 WVPASSEQ(next(batches), '3 4 5\n')
163 WVPASSEQ(next(batches, None), None)
164 batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
165 WVPASSEQ(next(batches), '0 1 2\n')
166 WVPASSEQ(next(batches), '3 4\n')
167 WVPASSEQ(next(batches, None), None)
171 def test_atomically_replaced_file():
172 with no_lingering_errors():
173 with test_tempdir('bup-thelper-') as tmpdir:
174 target_file = os.path.join(tmpdir, 'test-atomic-write')
176 with atomically_replaced_file(target_file, mode='w') as f:
178 WVPASSEQ(f.mode, 'w')
179 f = open(target_file, 'r')
180 WVPASSEQ(f.read(), 'asdf')
183 with atomically_replaced_file(target_file, mode='w') as f:
188 with open(target_file) as f:
189 WVPASSEQ(f.read(), 'asdf')
191 with atomically_replaced_file(target_file, mode='wb') as f:
192 f.write(os.urandom(20))
193 WVPASSEQ(f.mode, 'wb')
197 def test_utc_offset_str():
198 with no_lingering_errors():
199 tz = os.environ.get('TZ')
201 os.environ['TZ'] = 'FOO+0:00'
202 WVPASSEQ(utc_offset_str(0), '+0000')
203 os.environ['TZ'] = 'FOO+1:00'
204 WVPASSEQ(utc_offset_str(0), '-0100')
205 os.environ['TZ'] = 'FOO-1:00'
206 WVPASSEQ(utc_offset_str(0), '+0100')
207 os.environ['TZ'] = 'FOO+3:3'
208 WVPASSEQ(utc_offset_str(0), '-0303')
209 os.environ['TZ'] = 'FOO-3:3'
210 WVPASSEQ(utc_offset_str(0), '+0303')
211 # Offset is not an integer number of minutes
212 os.environ['TZ'] = 'FOO+3:3:3'
213 WVPASSEQ(utc_offset_str(1), '-0303')
214 os.environ['TZ'] = 'FOO-3:3:3'
215 WVPASSEQ(utc_offset_str(1), '+0303')
216 WVPASSEQ(utc_offset_str(314159), '+0303')
219 os.environ['TZ'] = tz
227 def test_valid_save_name():
228 with no_lingering_errors():
229 valid = helpers.valid_save_name
234 WVFAIL(valid('/foo'))
235 WVFAIL(valid('foo/'))
236 WVFAIL(valid('/foo/'))
237 WVFAIL(valid('foo//bar'))
239 WVFAIL(valid('bar.'))
240 WVFAIL(valid('foo@{'))
241 for x in ' ~^:?*[\\':
242 WVFAIL(valid('foo' + x))
244 WVFAIL(valid('foo' + chr(i)))
245 WVFAIL(valid('foo' + chr(0x7f)))
246 WVFAIL(valid('foo..bar'))
247 WVFAIL(valid('bar.lock/baz'))
248 WVFAIL(valid('foo/bar.lock/baz'))
249 WVFAIL(valid('.bar/baz'))
250 WVFAIL(valid('foo/.bar/baz'))