]> arthur.barton.de Git - bup.git/blob - lib/bup/t/thelpers.py
Use absolute_import from the __future__ everywhere
[bup.git] / lib / bup / t / thelpers.py
1
2 from __future__ import absolute_import
3 import helpers, math, os, os.path, stat, subprocess
4
5 from wvtest import *
6
7 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
8                          grafted_path_components, mkdirp, parse_num,
9                          path_components, readpipe, stripped_path_components,
10                          utc_offset_str)
11 from buptest import no_lingering_errors, test_tempdir
12 import bup._helpers as _helpers
13
14
15 bup_tmp = os.path.realpath('../../../t/tmp')
16 mkdirp(bup_tmp)
17
18
19 @wvtest
20 def test_parse_num():
21     with no_lingering_errors():
22         pn = parse_num
23         WVPASSEQ(pn('1'), 1)
24         WVPASSEQ(pn('0'), 0)
25         WVPASSEQ(pn('1.5k'), 1536)
26         WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
27         WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
28         WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
29
30 @wvtest
31 def test_detect_fakeroot():
32     with no_lingering_errors():
33         if os.getenv('FAKEROOTKEY'):
34             WVPASS(detect_fakeroot())
35         else:
36             WVPASS(not detect_fakeroot())
37
38 @wvtest
39 def test_path_components():
40     with no_lingering_errors():
41         WVPASSEQ(path_components('/'), [('', '/')])
42         WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
43         WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
44         WVPASSEQ(path_components('/foo/bar'),
45                  [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
46         WVEXCEPT(Exception, path_components, 'foo')
47
48
49 @wvtest
50 def test_stripped_path_components():
51     with no_lingering_errors():
52         WVPASSEQ(stripped_path_components('/', []), [('', '/')])
53         WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
54         WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
55         WVPASSEQ(stripped_path_components('/foo', ['/']),
56                  [('', '/'), ('foo', '/foo')])
57         WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
58         WVPASSEQ(stripped_path_components('/foo', ['/bar']),
59                  [('', '/'), ('foo', '/foo')])
60         WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
61         WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
62                  [('', '/foo'), ('bar', '/foo/bar')])
63         WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
64                  [('', '/foo'), ('bar', '/foo/bar')])
65         WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
66                  [('', '/foo/bar/baz')])
67         WVEXCEPT(Exception, stripped_path_components, 'foo', [])
68
69
70 @wvtest
71 def test_grafted_path_components():
72     with no_lingering_errors():
73         WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
74                  [('', '/'), ('foo', '/foo')])
75         WVPASSEQ(grafted_path_components([('/foo/bar', '/')],
76                                          '/foo/bar/baz/bax'),
77                  [('', '/foo/bar'),
78                   ('baz', '/foo/bar/baz'),
79                   ('bax', '/foo/bar/baz/bax')])
80         WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
81                                          '/foo/bar/baz/1/2'),
82                  [('', None),
83                   ('bax', '/foo/bar/baz'),
84                   ('1', '/foo/bar/baz/1'),
85                   ('2', '/foo/bar/baz/1/2')])
86         WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
87                                          '/foo/bar'),
88                  [('', None),
89                   ('bar', None),
90                   ('baz', None),
91                   ('bax', '/foo'),
92                   ('bar', '/foo/bar')])
93         WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
94                                          '/foo/bar/baz'),
95                  [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
96         WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
97                  [('', None), ('a', None), ('b', None), ('c', '/'),
98                   ('foo', '/foo'), ('bar', '/foo/bar')])
99         WVEXCEPT(Exception, grafted_path_components, 'foo', [])
100
101
102 @wvtest
103 def test_readpipe():
104     with no_lingering_errors():
105         x = readpipe(['echo', '42'])
106         WVPASSEQ(x, '42\n')
107         try:
108             readpipe(['bash', '-c', 'exit 42'])
109         except Exception as ex:
110             WVPASSEQ(str(ex),
111                      "subprocess 'bash -c exit 42' failed with status 42")
112
113
114 @wvtest
115 def test_batchpipe():
116     with no_lingering_errors():
117         for chunk in batchpipe(['echo'], []):
118             WVPASS(False)
119         out = ''
120         for chunk in batchpipe(['echo'], ['42']):
121             out += chunk
122         WVPASSEQ(out, '42\n')
123         try:
124             batchpipe(['bash', '-c'], ['exit 42'])
125         except Exception as ex:
126             WVPASSEQ(str(ex),
127                      "subprocess 'bash -c exit 42' failed with status 42")
128         args = [str(x) for x in range(6)]
129         # Force batchpipe to break the args into batches of 3.  This
130         # approach assumes all args are the same length.
131         arg_max = \
132             helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
133         batches = batchpipe(['echo'], args, arg_max=arg_max)
134         WVPASSEQ(next(batches), '0 1 2\n')
135         WVPASSEQ(next(batches), '3 4 5\n')
136         WVPASSEQ(next(batches, None), None)
137         batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
138         WVPASSEQ(next(batches), '0 1 2\n')
139         WVPASSEQ(next(batches), '3 4\n')
140         WVPASSEQ(next(batches, None), None)
141
142
143 @wvtest
144 def test_atomically_replaced_file():
145     with no_lingering_errors():
146         with test_tempdir('bup-thelper-') as tmpdir:
147             target_file = os.path.join(tmpdir, 'test-atomic-write')
148
149             with atomically_replaced_file(target_file, mode='w') as f:
150                 f.write('asdf')
151                 WVPASSEQ(f.mode, 'w')
152             f = open(target_file, 'r')
153             WVPASSEQ(f.read(), 'asdf')
154
155             try:
156                 with atomically_replaced_file(target_file, mode='w') as f:
157                     f.write('wxyz')
158                     raise Exception()
159             except:
160                 pass
161             with open(target_file) as f:
162                 WVPASSEQ(f.read(), 'asdf')
163
164             with atomically_replaced_file(target_file, mode='wb') as f:
165                 f.write(os.urandom(20))
166                 WVPASSEQ(f.mode, 'wb')
167
168
169 @wvtest
170 def test_utc_offset_str():
171     with no_lingering_errors():
172         tz = os.environ.get('TZ')
173         try:
174             os.environ['TZ'] = 'FOO+0:00'
175             WVPASSEQ(utc_offset_str(0), '+0000')
176             os.environ['TZ'] = 'FOO+1:00'
177             WVPASSEQ(utc_offset_str(0), '-0100')
178             os.environ['TZ'] = 'FOO-1:00'
179             WVPASSEQ(utc_offset_str(0), '+0100')
180             os.environ['TZ'] = 'FOO+3:3'
181             WVPASSEQ(utc_offset_str(0), '-0303')
182             os.environ['TZ'] = 'FOO-3:3'
183             WVPASSEQ(utc_offset_str(0), '+0303')
184             # Offset is not an integer number of minutes
185             os.environ['TZ'] = 'FOO+3:3:3'
186             WVPASSEQ(utc_offset_str(1), '-0303')
187             os.environ['TZ'] = 'FOO-3:3:3'
188             WVPASSEQ(utc_offset_str(1), '+0303')
189             WVPASSEQ(utc_offset_str(314159), '+0303')
190         finally:
191             if tz:
192                 os.environ['TZ'] = tz
193             else:
194                 try:
195                     del os.environ['TZ']
196                 except KeyError:
197                     pass
198
199 @wvtest
200 def test_valid_save_name():
201     with no_lingering_errors():
202         valid = helpers.valid_save_name
203         WVPASS(valid('x'))
204         WVPASS(valid('x@'))
205         WVFAIL(valid('@'))
206         WVFAIL(valid('/'))
207         WVFAIL(valid('/foo'))
208         WVFAIL(valid('foo/'))
209         WVFAIL(valid('/foo/'))
210         WVFAIL(valid('foo//bar'))
211         WVFAIL(valid('.'))
212         WVFAIL(valid('bar.'))
213         WVFAIL(valid('foo@{'))
214         for x in ' ~^:?*[\\':
215             WVFAIL(valid('foo' + x))
216         for i in range(20):
217             WVFAIL(valid('foo' + chr(i)))
218         WVFAIL(valid('foo' + chr(0x7f)))
219         WVFAIL(valid('foo..bar'))
220         WVFAIL(valid('bar.lock/baz'))
221         WVFAIL(valid('foo/bar.lock/baz'))
222         WVFAIL(valid('.bar/baz'))
223         WVFAIL(valid('foo/.bar/baz'))