]> arthur.barton.de Git - bup.git/blob - lib/bup/t/thelpers.py
thelpers: call tzset() after changing TZ
[bup.git] / lib / bup / t / thelpers.py
1
2 from __future__ import absolute_import
3 from time import tzset
4 import helpers, math, os, os.path, stat, subprocess
5
6 from wvtest import *
7
8 from bup.compat import environ
9 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
10                          grafted_path_components, mkdirp, parse_num,
11                          path_components, readpipe, stripped_path_components,
12                          utc_offset_str)
13 from buptest import no_lingering_errors, test_tempdir
14 import bup._helpers as _helpers
15
16
17 bup_tmp = os.path.realpath('../../../t/tmp')
18 mkdirp(bup_tmp)
19
20
21 @wvtest
22 def test_parse_num():
23     with no_lingering_errors():
24         pn = parse_num
25         WVPASSEQ(pn('1'), 1)
26         WVPASSEQ(pn('0'), 0)
27         WVPASSEQ(pn('1.5k'), 1536)
28         WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
29         WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
30         WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
31
32 @wvtest
33 def test_detect_fakeroot():
34     with no_lingering_errors():
35         if os.getenv('FAKEROOTKEY'):
36             WVPASS(detect_fakeroot())
37         else:
38             WVPASS(not detect_fakeroot())
39
40 @wvtest
41 def test_path_components():
42     with no_lingering_errors():
43         WVPASSEQ(path_components('/'), [('', '/')])
44         WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
45         WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
46         WVPASSEQ(path_components('/foo/bar'),
47                  [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
48         WVEXCEPT(Exception, path_components, 'foo')
49
50
51 @wvtest
52 def test_stripped_path_components():
53     with no_lingering_errors():
54         WVPASSEQ(stripped_path_components('/', []), [('', '/')])
55         WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
56         WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
57         WVPASSEQ(stripped_path_components('/foo', ['/']),
58                  [('', '/'), ('foo', '/foo')])
59         WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
60         WVPASSEQ(stripped_path_components('/foo', ['/bar']),
61                  [('', '/'), ('foo', '/foo')])
62         WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
63         WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
64                  [('', '/foo'), ('bar', '/foo/bar')])
65         WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
66                  [('', '/foo'), ('bar', '/foo/bar')])
67         WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
68                  [('', '/foo/bar/baz')])
69         WVEXCEPT(Exception, stripped_path_components, 'foo', [])
70
71
72 @wvtest
73 def test_grafted_path_components():
74     with no_lingering_errors():
75         WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
76                  [('', '/'), ('foo', '/foo')])
77         WVPASSEQ(grafted_path_components([('/foo/bar', '/')],
78                                          '/foo/bar/baz/bax'),
79                  [('', '/foo/bar'),
80                   ('baz', '/foo/bar/baz'),
81                   ('bax', '/foo/bar/baz/bax')])
82         WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
83                                          '/foo/bar/baz/1/2'),
84                  [('', None),
85                   ('bax', '/foo/bar/baz'),
86                   ('1', '/foo/bar/baz/1'),
87                   ('2', '/foo/bar/baz/1/2')])
88         WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
89                                          '/foo/bar'),
90                  [('', None),
91                   ('bar', None),
92                   ('baz', None),
93                   ('bax', '/foo'),
94                   ('bar', '/foo/bar')])
95         WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
96                                          '/foo/bar/baz'),
97                  [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
98         WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
99                  [('', None), ('a', None), ('b', None), ('c', '/'),
100                   ('foo', '/foo'), ('bar', '/foo/bar')])
101         WVEXCEPT(Exception, grafted_path_components, 'foo', [])
102
103
104 @wvtest
105 def test_readpipe():
106     with no_lingering_errors():
107         x = readpipe(['echo', '42'])
108         WVPASSEQ(x, b'42\n')
109         try:
110             readpipe(['bash', '-c', 'exit 42'])
111         except Exception as ex:
112             WVPASSEQ(str(ex),
113                      "subprocess 'bash -c exit 42' failed with status 42")
114
115
116 @wvtest
117 def test_batchpipe():
118     with no_lingering_errors():
119         for chunk in batchpipe(['echo'], []):
120             WVPASS(False)
121         out = b''
122         for chunk in batchpipe(['echo'], ['42']):
123             out += chunk
124         WVPASSEQ(out, b'42\n')
125         try:
126             batchpipe(['bash', '-c'], ['exit 42'])
127         except Exception as ex:
128             WVPASSEQ(str(ex),
129                      "subprocess 'bash -c exit 42' failed with status 42")
130         args = [str(x) for x in range(6)]
131         # Force batchpipe to break the args into batches of 3.  This
132         # approach assumes all args are the same length.
133         arg_max = \
134             helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
135         batches = batchpipe(['echo'], args, arg_max=arg_max)
136         WVPASSEQ(next(batches), b'0 1 2\n')
137         WVPASSEQ(next(batches), b'3 4 5\n')
138         WVPASSEQ(next(batches, None), None)
139         batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
140         WVPASSEQ(next(batches), b'0 1 2\n')
141         WVPASSEQ(next(batches), b'3 4\n')
142         WVPASSEQ(next(batches, None), None)
143
144
145 @wvtest
146 def test_atomically_replaced_file():
147     with no_lingering_errors():
148         with test_tempdir(b'bup-thelper-') as tmpdir:
149             target_file = os.path.join(tmpdir, b'test-atomic-write')
150
151             with atomically_replaced_file(target_file, mode='w') as f:
152                 f.write('asdf')
153                 WVPASSEQ(f.mode, 'w')
154             f = open(target_file, 'r')
155             WVPASSEQ(f.read(), 'asdf')
156
157             try:
158                 with atomically_replaced_file(target_file, mode='w') as f:
159                     f.write('wxyz')
160                     raise Exception()
161             except:
162                 pass
163             with open(target_file) as f:
164                 WVPASSEQ(f.read(), 'asdf')
165
166             with atomically_replaced_file(target_file, mode='wb') as f:
167                 f.write(os.urandom(20))
168                 WVPASSEQ(f.mode, 'wb')
169
170
171 def set_tz(tz):
172     if not tz:
173         del environ[b'TZ']
174     else:
175         environ[b'TZ'] = tz
176     tzset()
177
178
179 @wvtest
180 def test_utc_offset_str():
181     with no_lingering_errors():
182         tz = environ.get(b'TZ')
183         tzset()
184         try:
185             set_tz(b'FOO+0:00')
186             WVPASSEQ(utc_offset_str(0), b'+0000')
187             set_tz(b'FOO+1:00')
188             WVPASSEQ(utc_offset_str(0), b'-0100')
189             set_tz(b'FOO-1:00')
190             WVPASSEQ(utc_offset_str(0), b'+0100')
191             set_tz(b'FOO+3:3')
192             WVPASSEQ(utc_offset_str(0), b'-0303')
193             set_tz(b'FOO-3:3')
194             WVPASSEQ(utc_offset_str(0), b'+0303')
195             # Offset is not an integer number of minutes
196             set_tz(b'FOO+3:3:3')
197             WVPASSEQ(utc_offset_str(1), b'-0303')
198             set_tz(b'FOO-3:3:3')
199             WVPASSEQ(utc_offset_str(1), b'+0303')
200             WVPASSEQ(utc_offset_str(314159), b'+0303')
201         finally:
202             if tz:
203                 set_tz(tz)
204             else:
205                 try:
206                     set_tz(None)
207                 except KeyError:
208                     pass
209
210 @wvtest
211 def test_valid_save_name():
212     with no_lingering_errors():
213         valid = helpers.valid_save_name
214         WVPASS(valid('x'))
215         WVPASS(valid('x@'))
216         WVFAIL(valid('@'))
217         WVFAIL(valid('/'))
218         WVFAIL(valid('/foo'))
219         WVFAIL(valid('foo/'))
220         WVFAIL(valid('/foo/'))
221         WVFAIL(valid('foo//bar'))
222         WVFAIL(valid('.'))
223         WVFAIL(valid('bar.'))
224         WVFAIL(valid('foo@{'))
225         for x in ' ~^:?*[\\':
226             WVFAIL(valid('foo' + x))
227         for i in range(20):
228             WVFAIL(valid('foo' + chr(i)))
229         WVFAIL(valid('foo' + chr(0x7f)))
230         WVFAIL(valid('foo..bar'))
231         WVFAIL(valid('bar.lock/baz'))
232         WVFAIL(valid('foo/bar.lock/baz'))
233         WVFAIL(valid('.bar/baz'))
234         WVFAIL(valid('foo/.bar/baz'))