]> arthur.barton.de Git - bup.git/blob - lib/bup/t/thelpers.py
c707839fdb956ce7a6dfdb50ff27510479fbd53e
[bup.git] / lib / bup / t / thelpers.py
1
2 import helpers, math, os, os.path, stat, subprocess
3
4 from wvtest import *
5
6 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
7                          grafted_path_components, mkdirp, parse_num,
8                          path_components, readpipe, stripped_path_components,
9                          utc_offset_str)
10 from buptest import no_lingering_errors, test_tempdir
11 import bup._helpers as _helpers
12
13
14 bup_tmp = os.path.realpath('../../../t/tmp')
15 mkdirp(bup_tmp)
16
17
18 @wvtest
19 def test_parse_num():
20     with no_lingering_errors():
21         pn = parse_num
22         WVPASSEQ(pn('1'), 1)
23         WVPASSEQ(pn('0'), 0)
24         WVPASSEQ(pn('1.5k'), 1536)
25         WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
26         WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
27         WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
28
29 @wvtest
30 def test_detect_fakeroot():
31     with no_lingering_errors():
32         if os.getenv('FAKEROOTKEY'):
33             WVPASS(detect_fakeroot())
34         else:
35             WVPASS(not detect_fakeroot())
36
37 @wvtest
38 def test_path_components():
39     with no_lingering_errors():
40         WVPASSEQ(path_components('/'), [('', '/')])
41         WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
42         WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
43         WVPASSEQ(path_components('/foo/bar'),
44                  [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
45         WVEXCEPT(Exception, path_components, 'foo')
46
47
48 @wvtest
49 def test_stripped_path_components():
50     with no_lingering_errors():
51         WVPASSEQ(stripped_path_components('/', []), [('', '/')])
52         WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
53         WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
54         WVPASSEQ(stripped_path_components('/foo', ['/']),
55                  [('', '/'), ('foo', '/foo')])
56         WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
57         WVPASSEQ(stripped_path_components('/foo', ['/bar']),
58                  [('', '/'), ('foo', '/foo')])
59         WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
60         WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
61                  [('', '/foo'), ('bar', '/foo/bar')])
62         WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
63                  [('', '/foo'), ('bar', '/foo/bar')])
64         WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
65                  [('', '/foo/bar/baz')])
66         WVEXCEPT(Exception, stripped_path_components, 'foo', [])
67
68
69 @wvtest
70 def test_grafted_path_components():
71     with no_lingering_errors():
72         WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
73                  [('', '/'), ('foo', '/foo')])
74         WVPASSEQ(grafted_path_components([('/foo/bar', '/')],
75                                          '/foo/bar/baz/bax'),
76                  [('', '/foo/bar'),
77                   ('baz', '/foo/bar/baz'),
78                   ('bax', '/foo/bar/baz/bax')])
79         WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
80                                          '/foo/bar/baz/1/2'),
81                  [('', None),
82                   ('bax', '/foo/bar/baz'),
83                   ('1', '/foo/bar/baz/1'),
84                   ('2', '/foo/bar/baz/1/2')])
85         WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
86                                          '/foo/bar'),
87                  [('', None),
88                   ('bar', None),
89                   ('baz', None),
90                   ('bax', '/foo'),
91                   ('bar', '/foo/bar')])
92         WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
93                                          '/foo/bar/baz'),
94                  [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
95         WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
96                  [('', None), ('a', None), ('b', None), ('c', '/'),
97                   ('foo', '/foo'), ('bar', '/foo/bar')])
98         WVEXCEPT(Exception, grafted_path_components, 'foo', [])
99
100
101 @wvtest
102 def test_readpipe():
103     with no_lingering_errors():
104         x = readpipe(['echo', '42'])
105         WVPASSEQ(x, '42\n')
106         try:
107             readpipe(['bash', '-c', 'exit 42'])
108         except Exception as ex:
109             WVPASSEQ(str(ex),
110                      "subprocess 'bash -c exit 42' failed with status 42")
111
112
113 @wvtest
114 def test_batchpipe():
115     with no_lingering_errors():
116         for chunk in batchpipe(['echo'], []):
117             WVPASS(False)
118         out = ''
119         for chunk in batchpipe(['echo'], ['42']):
120             out += chunk
121         WVPASSEQ(out, '42\n')
122         try:
123             batchpipe(['bash', '-c'], ['exit 42'])
124         except Exception as ex:
125             WVPASSEQ(str(ex),
126                      "subprocess 'bash -c exit 42' failed with status 42")
127         args = [str(x) for x in range(6)]
128         # Force batchpipe to break the args into batches of 3.  This
129         # approach assumes all args are the same length.
130         arg_max = \
131             helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
132         batches = batchpipe(['echo'], args, arg_max=arg_max)
133         WVPASSEQ(next(batches), '0 1 2\n')
134         WVPASSEQ(next(batches), '3 4 5\n')
135         WVPASSEQ(next(batches, None), None)
136         batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
137         WVPASSEQ(next(batches), '0 1 2\n')
138         WVPASSEQ(next(batches), '3 4\n')
139         WVPASSEQ(next(batches, None), None)
140
141
142 @wvtest
143 def test_atomically_replaced_file():
144     with no_lingering_errors():
145         with test_tempdir('bup-thelper-') as tmpdir:
146             target_file = os.path.join(tmpdir, 'test-atomic-write')
147
148             with atomically_replaced_file(target_file, mode='w') as f:
149                 f.write('asdf')
150                 WVPASSEQ(f.mode, 'w')
151             f = open(target_file, 'r')
152             WVPASSEQ(f.read(), 'asdf')
153
154             try:
155                 with atomically_replaced_file(target_file, mode='w') as f:
156                     f.write('wxyz')
157                     raise Exception()
158             except:
159                 pass
160             with open(target_file) as f:
161                 WVPASSEQ(f.read(), 'asdf')
162
163             with atomically_replaced_file(target_file, mode='wb') as f:
164                 f.write(os.urandom(20))
165                 WVPASSEQ(f.mode, 'wb')
166
167
168 @wvtest
169 def test_utc_offset_str():
170     with no_lingering_errors():
171         tz = os.environ.get('TZ')
172         try:
173             os.environ['TZ'] = 'FOO+0:00'
174             WVPASSEQ(utc_offset_str(0), '+0000')
175             os.environ['TZ'] = 'FOO+1:00'
176             WVPASSEQ(utc_offset_str(0), '-0100')
177             os.environ['TZ'] = 'FOO-1:00'
178             WVPASSEQ(utc_offset_str(0), '+0100')
179             os.environ['TZ'] = 'FOO+3:3'
180             WVPASSEQ(utc_offset_str(0), '-0303')
181             os.environ['TZ'] = 'FOO-3:3'
182             WVPASSEQ(utc_offset_str(0), '+0303')
183             # Offset is not an integer number of minutes
184             os.environ['TZ'] = 'FOO+3:3:3'
185             WVPASSEQ(utc_offset_str(1), '-0303')
186             os.environ['TZ'] = 'FOO-3:3:3'
187             WVPASSEQ(utc_offset_str(1), '+0303')
188             WVPASSEQ(utc_offset_str(314159), '+0303')
189         finally:
190             if tz:
191                 os.environ['TZ'] = tz
192             else:
193                 try:
194                     del os.environ['TZ']
195                 except KeyError:
196                     pass
197
198 @wvtest
199 def test_valid_save_name():
200     with no_lingering_errors():
201         valid = helpers.valid_save_name
202         WVPASS(valid('x'))
203         WVPASS(valid('x@'))
204         WVFAIL(valid('@'))
205         WVFAIL(valid('/'))
206         WVFAIL(valid('/foo'))
207         WVFAIL(valid('foo/'))
208         WVFAIL(valid('/foo/'))
209         WVFAIL(valid('foo//bar'))
210         WVFAIL(valid('.'))
211         WVFAIL(valid('bar.'))
212         WVFAIL(valid('foo@{'))
213         for x in ' ~^:?*[\\':
214             WVFAIL(valid('foo' + x))
215         for i in range(20):
216             WVFAIL(valid('foo' + chr(i)))
217         WVFAIL(valid('foo' + chr(0x7f)))
218         WVFAIL(valid('foo..bar'))
219         WVFAIL(valid('bar.lock/baz'))
220         WVFAIL(valid('foo/bar.lock/baz'))
221         WVFAIL(valid('.bar/baz'))
222         WVFAIL(valid('foo/.bar/baz'))