9 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
10 grafted_path_components, mkdirp, parse_num,
11 path_components, readpipe, stripped_path_components,
13 import bup._helpers as _helpers
18 bup_tmp = os.path.realpath('../../../t/tmp')
23 # Test whatever you end up with for next() after import '*'.
24 WVPASSEQ(next(iter([]), None), None)
26 WVPASSEQ(next(x, None), 1)
27 WVPASSEQ(next(x, None), None)
29 WVPASSEQ(next(x, 'x'), 1)
30 WVPASSEQ(next(x, 'x'), 'x')
31 WVEXCEPT(StopIteration, next, iter([]))
34 WVEXCEPT(StopIteration, next, x)
38 def test_fallback_next():
41 next = helpers._fallback_next
53 WVPASSEQ(pn('1.5k'), 1536)
54 WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
55 WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
56 WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
59 def test_detect_fakeroot():
60 if os.getenv('FAKEROOTKEY'):
61 WVPASS(detect_fakeroot())
63 WVPASS(not detect_fakeroot())
66 def test_path_components():
67 WVPASSEQ(path_components('/'), [('', '/')])
68 WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
69 WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
70 WVPASSEQ(path_components('/foo/bar'),
71 [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
72 WVEXCEPT(Exception, path_components, 'foo')
76 def test_stripped_path_components():
77 WVPASSEQ(stripped_path_components('/', []), [('', '/')])
78 WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
79 WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
80 WVPASSEQ(stripped_path_components('/foo', ['/']),
81 [('', '/'), ('foo', '/foo')])
82 WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
83 WVPASSEQ(stripped_path_components('/foo', ['/bar']),
84 [('', '/'), ('foo', '/foo')])
85 WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
86 WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
87 [('', '/foo'), ('bar', '/foo/bar')])
88 WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
89 [('', '/foo'), ('bar', '/foo/bar')])
90 WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
91 [('', '/foo/bar/baz')])
92 WVEXCEPT(Exception, stripped_path_components, 'foo', [])
96 def test_grafted_path_components():
97 WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
98 [('', '/'), ('foo', '/foo')])
99 WVPASSEQ(grafted_path_components([('/foo/bar', '/')], '/foo/bar/baz/bax'),
101 ('baz', '/foo/bar/baz'),
102 ('bax', '/foo/bar/baz/bax')])
103 WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
106 ('bax', '/foo/bar/baz'),
107 ('1', '/foo/bar/baz/1'),
108 ('2', '/foo/bar/baz/1/2')])
109 WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
115 ('bar', '/foo/bar')])
116 WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
118 [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
119 WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
120 [('', None), ('a', None), ('b', None), ('c', '/'),
121 ('foo', '/foo'), ('bar', '/foo/bar')])
122 WVEXCEPT(Exception, grafted_path_components, 'foo', [])
127 x = readpipe(['echo', '42'])
130 readpipe(['bash', '-c', 'exit 42'])
131 except Exception as ex:
132 WVPASSEQ(str(ex), "subprocess 'bash -c exit 42' failed with status 42")
136 def test_batchpipe():
137 for chunk in batchpipe(['echo'], []):
140 for chunk in batchpipe(['echo'], ['42']):
142 WVPASSEQ(out, '42\n')
144 batchpipe(['bash', '-c'], ['exit 42'])
145 except Exception as ex:
146 WVPASSEQ(str(ex), "subprocess 'bash -c exit 42' failed with status 42")
147 args = [str(x) for x in range(6)]
148 # Force batchpipe to break the args into batches of 3. This
149 # approach assumes all args are the same length.
151 helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
152 batches = batchpipe(['echo'], args, arg_max=arg_max)
153 WVPASSEQ(next(batches), '0 1 2\n')
154 WVPASSEQ(next(batches), '3 4 5\n')
155 WVPASSEQ(next(batches, None), None)
156 batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
157 WVPASSEQ(next(batches), '0 1 2\n')
158 WVPASSEQ(next(batches), '3 4\n')
159 WVPASSEQ(next(batches, None), None)
163 def test_atomically_replaced_file():
164 tmpdir = tempfile.mkdtemp(dir=bup_tmp, prefix='bup-thelper-')
165 target_file = os.path.join(tmpdir, 'test-atomic-write')
166 initial_failures = wvfailure_count()
168 with atomically_replaced_file(target_file, mode='w') as f:
170 WVPASSEQ(f.mode, 'w')
171 f = open(target_file, 'r')
172 WVPASSEQ(f.read(), 'asdf')
175 with atomically_replaced_file(target_file, mode='w') as f:
180 with open(target_file) as f:
181 WVPASSEQ(f.read(), 'asdf')
183 with atomically_replaced_file(target_file, mode='wb') as f:
184 f.write(os.urandom(20))
185 WVPASSEQ(f.mode, 'wb')
187 if wvfailure_count() == initial_failures:
188 subprocess.call(['rm', '-rf', tmpdir])
192 def test_utc_offset_str():
193 tz = os.environ.get('TZ')
195 os.environ['TZ'] = 'FOO+0:00'
196 WVPASSEQ(utc_offset_str(0), '+0000')
197 os.environ['TZ'] = 'FOO+1:00'
198 WVPASSEQ(utc_offset_str(0), '-0100')
199 os.environ['TZ'] = 'FOO-1:00'
200 WVPASSEQ(utc_offset_str(0), '+0100')
201 os.environ['TZ'] = 'FOO+3:3'
202 WVPASSEQ(utc_offset_str(0), '-0303')
203 os.environ['TZ'] = 'FOO-3:3'
204 WVPASSEQ(utc_offset_str(0), '+0303')
205 # Offset is not an integer number of minutes
206 os.environ['TZ'] = 'FOO+3:3:3'
207 WVPASSEQ(utc_offset_str(1), '-0303')
208 os.environ['TZ'] = 'FOO-3:3:3'
209 WVPASSEQ(utc_offset_str(1), '+0303')
210 WVPASSEQ(utc_offset_str(314159), '+0303')
213 os.environ['TZ'] = tz