2 from __future__ import absolute_import
3 import helpers, math, os, os.path, stat, subprocess
7 from bup.compat import environ
8 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
9 grafted_path_components, mkdirp, parse_num,
10 path_components, readpipe, stripped_path_components,
12 from buptest import no_lingering_errors, test_tempdir
13 import bup._helpers as _helpers
16 bup_tmp = os.path.realpath('../../../t/tmp')
22 with no_lingering_errors():
26 WVPASSEQ(pn('1.5k'), 1536)
27 WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
28 WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
29 WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
32 def test_detect_fakeroot():
33 with no_lingering_errors():
34 if os.getenv('FAKEROOTKEY'):
35 WVPASS(detect_fakeroot())
37 WVPASS(not detect_fakeroot())
40 def test_path_components():
41 with no_lingering_errors():
42 WVPASSEQ(path_components('/'), [('', '/')])
43 WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
44 WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
45 WVPASSEQ(path_components('/foo/bar'),
46 [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
47 WVEXCEPT(Exception, path_components, 'foo')
51 def test_stripped_path_components():
52 with no_lingering_errors():
53 WVPASSEQ(stripped_path_components('/', []), [('', '/')])
54 WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
55 WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
56 WVPASSEQ(stripped_path_components('/foo', ['/']),
57 [('', '/'), ('foo', '/foo')])
58 WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
59 WVPASSEQ(stripped_path_components('/foo', ['/bar']),
60 [('', '/'), ('foo', '/foo')])
61 WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
62 WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
63 [('', '/foo'), ('bar', '/foo/bar')])
64 WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
65 [('', '/foo'), ('bar', '/foo/bar')])
66 WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
67 [('', '/foo/bar/baz')])
68 WVEXCEPT(Exception, stripped_path_components, 'foo', [])
72 def test_grafted_path_components():
73 with no_lingering_errors():
74 WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
75 [('', '/'), ('foo', '/foo')])
76 WVPASSEQ(grafted_path_components([('/foo/bar', '/')],
79 ('baz', '/foo/bar/baz'),
80 ('bax', '/foo/bar/baz/bax')])
81 WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
84 ('bax', '/foo/bar/baz'),
85 ('1', '/foo/bar/baz/1'),
86 ('2', '/foo/bar/baz/1/2')])
87 WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
94 WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
96 [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
97 WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
98 [('', None), ('a', None), ('b', None), ('c', '/'),
99 ('foo', '/foo'), ('bar', '/foo/bar')])
100 WVEXCEPT(Exception, grafted_path_components, 'foo', [])
105 with no_lingering_errors():
106 x = readpipe(['echo', '42'])
109 readpipe(['bash', '-c', 'exit 42'])
110 except Exception as ex:
112 "subprocess 'bash -c exit 42' failed with status 42")
116 def test_batchpipe():
117 with no_lingering_errors():
118 for chunk in batchpipe(['echo'], []):
121 for chunk in batchpipe(['echo'], ['42']):
123 WVPASSEQ(out, b'42\n')
125 batchpipe(['bash', '-c'], ['exit 42'])
126 except Exception as ex:
128 "subprocess 'bash -c exit 42' failed with status 42")
129 args = [str(x) for x in range(6)]
130 # Force batchpipe to break the args into batches of 3. This
131 # approach assumes all args are the same length.
133 helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
134 batches = batchpipe(['echo'], args, arg_max=arg_max)
135 WVPASSEQ(next(batches), b'0 1 2\n')
136 WVPASSEQ(next(batches), b'3 4 5\n')
137 WVPASSEQ(next(batches, None), None)
138 batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
139 WVPASSEQ(next(batches), b'0 1 2\n')
140 WVPASSEQ(next(batches), b'3 4\n')
141 WVPASSEQ(next(batches, None), None)
145 def test_atomically_replaced_file():
146 with no_lingering_errors():
147 with test_tempdir(b'bup-thelper-') as tmpdir:
148 target_file = os.path.join(tmpdir, b'test-atomic-write')
150 with atomically_replaced_file(target_file, mode='w') as f:
152 WVPASSEQ(f.mode, 'w')
153 f = open(target_file, 'r')
154 WVPASSEQ(f.read(), 'asdf')
157 with atomically_replaced_file(target_file, mode='w') as f:
162 with open(target_file) as f:
163 WVPASSEQ(f.read(), 'asdf')
165 with atomically_replaced_file(target_file, mode='wb') as f:
166 f.write(os.urandom(20))
167 WVPASSEQ(f.mode, 'wb')
171 def test_utc_offset_str():
172 with no_lingering_errors():
173 tz = environ.get(b'TZ')
175 environ[b'TZ'] = b'FOO+0:00'
176 WVPASSEQ(utc_offset_str(0), b'+0000')
177 environ[b'TZ'] = b'FOO+1:00'
178 WVPASSEQ(utc_offset_str(0), b'-0100')
179 environ[b'TZ'] = b'FOO-1:00'
180 WVPASSEQ(utc_offset_str(0), b'+0100')
181 environ[b'TZ'] = b'FOO+3:3'
182 WVPASSEQ(utc_offset_str(0), b'-0303')
183 environ[b'TZ'] = b'FOO-3:3'
184 WVPASSEQ(utc_offset_str(0), b'+0303')
185 # Offset is not an integer number of minutes
186 environ[b'TZ'] = b'FOO+3:3:3'
187 WVPASSEQ(utc_offset_str(1), b'-0303')
188 environ[b'TZ'] = b'FOO-3:3:3'
189 WVPASSEQ(utc_offset_str(1), b'+0303')
190 WVPASSEQ(utc_offset_str(314159), b'+0303')
201 def test_valid_save_name():
202 with no_lingering_errors():
203 valid = helpers.valid_save_name
208 WVFAIL(valid('/foo'))
209 WVFAIL(valid('foo/'))
210 WVFAIL(valid('/foo/'))
211 WVFAIL(valid('foo//bar'))
213 WVFAIL(valid('bar.'))
214 WVFAIL(valid('foo@{'))
215 for x in ' ~^:?*[\\':
216 WVFAIL(valid('foo' + x))
218 WVFAIL(valid('foo' + chr(i)))
219 WVFAIL(valid('foo' + chr(0x7f)))
220 WVFAIL(valid('foo..bar'))
221 WVFAIL(valid('bar.lock/baz'))
222 WVFAIL(valid('foo/bar.lock/baz'))
223 WVFAIL(valid('.bar/baz'))
224 WVFAIL(valid('foo/.bar/baz'))