]> arthur.barton.de Git - bup.git/blob - lib/bup/t/thelpers.py
test_utc_offset_str: str -> bytes for py3
[bup.git] / lib / bup / t / thelpers.py
1
2 from __future__ import absolute_import
3 import helpers, math, os, os.path, stat, subprocess
4
5 from wvtest import *
6
7 from bup.compat import environ
8 from bup.helpers import (atomically_replaced_file, batchpipe, detect_fakeroot,
9                          grafted_path_components, mkdirp, parse_num,
10                          path_components, readpipe, stripped_path_components,
11                          utc_offset_str)
12 from buptest import no_lingering_errors, test_tempdir
13 import bup._helpers as _helpers
14
15
16 bup_tmp = os.path.realpath('../../../t/tmp')
17 mkdirp(bup_tmp)
18
19
20 @wvtest
21 def test_parse_num():
22     with no_lingering_errors():
23         pn = parse_num
24         WVPASSEQ(pn('1'), 1)
25         WVPASSEQ(pn('0'), 0)
26         WVPASSEQ(pn('1.5k'), 1536)
27         WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
28         WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
29         WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
30
31 @wvtest
32 def test_detect_fakeroot():
33     with no_lingering_errors():
34         if os.getenv('FAKEROOTKEY'):
35             WVPASS(detect_fakeroot())
36         else:
37             WVPASS(not detect_fakeroot())
38
39 @wvtest
40 def test_path_components():
41     with no_lingering_errors():
42         WVPASSEQ(path_components('/'), [('', '/')])
43         WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
44         WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
45         WVPASSEQ(path_components('/foo/bar'),
46                  [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
47         WVEXCEPT(Exception, path_components, 'foo')
48
49
50 @wvtest
51 def test_stripped_path_components():
52     with no_lingering_errors():
53         WVPASSEQ(stripped_path_components('/', []), [('', '/')])
54         WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
55         WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
56         WVPASSEQ(stripped_path_components('/foo', ['/']),
57                  [('', '/'), ('foo', '/foo')])
58         WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
59         WVPASSEQ(stripped_path_components('/foo', ['/bar']),
60                  [('', '/'), ('foo', '/foo')])
61         WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
62         WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
63                  [('', '/foo'), ('bar', '/foo/bar')])
64         WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
65                  [('', '/foo'), ('bar', '/foo/bar')])
66         WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
67                  [('', '/foo/bar/baz')])
68         WVEXCEPT(Exception, stripped_path_components, 'foo', [])
69
70
71 @wvtest
72 def test_grafted_path_components():
73     with no_lingering_errors():
74         WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
75                  [('', '/'), ('foo', '/foo')])
76         WVPASSEQ(grafted_path_components([('/foo/bar', '/')],
77                                          '/foo/bar/baz/bax'),
78                  [('', '/foo/bar'),
79                   ('baz', '/foo/bar/baz'),
80                   ('bax', '/foo/bar/baz/bax')])
81         WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
82                                          '/foo/bar/baz/1/2'),
83                  [('', None),
84                   ('bax', '/foo/bar/baz'),
85                   ('1', '/foo/bar/baz/1'),
86                   ('2', '/foo/bar/baz/1/2')])
87         WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
88                                          '/foo/bar'),
89                  [('', None),
90                   ('bar', None),
91                   ('baz', None),
92                   ('bax', '/foo'),
93                   ('bar', '/foo/bar')])
94         WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
95                                          '/foo/bar/baz'),
96                  [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
97         WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
98                  [('', None), ('a', None), ('b', None), ('c', '/'),
99                   ('foo', '/foo'), ('bar', '/foo/bar')])
100         WVEXCEPT(Exception, grafted_path_components, 'foo', [])
101
102
103 @wvtest
104 def test_readpipe():
105     with no_lingering_errors():
106         x = readpipe(['echo', '42'])
107         WVPASSEQ(x, b'42\n')
108         try:
109             readpipe(['bash', '-c', 'exit 42'])
110         except Exception as ex:
111             WVPASSEQ(str(ex),
112                      "subprocess 'bash -c exit 42' failed with status 42")
113
114
115 @wvtest
116 def test_batchpipe():
117     with no_lingering_errors():
118         for chunk in batchpipe(['echo'], []):
119             WVPASS(False)
120         out = b''
121         for chunk in batchpipe(['echo'], ['42']):
122             out += chunk
123         WVPASSEQ(out, b'42\n')
124         try:
125             batchpipe(['bash', '-c'], ['exit 42'])
126         except Exception as ex:
127             WVPASSEQ(str(ex),
128                      "subprocess 'bash -c exit 42' failed with status 42")
129         args = [str(x) for x in range(6)]
130         # Force batchpipe to break the args into batches of 3.  This
131         # approach assumes all args are the same length.
132         arg_max = \
133             helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
134         batches = batchpipe(['echo'], args, arg_max=arg_max)
135         WVPASSEQ(next(batches), b'0 1 2\n')
136         WVPASSEQ(next(batches), b'3 4 5\n')
137         WVPASSEQ(next(batches, None), None)
138         batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
139         WVPASSEQ(next(batches), b'0 1 2\n')
140         WVPASSEQ(next(batches), b'3 4\n')
141         WVPASSEQ(next(batches, None), None)
142
143
144 @wvtest
145 def test_atomically_replaced_file():
146     with no_lingering_errors():
147         with test_tempdir(b'bup-thelper-') as tmpdir:
148             target_file = os.path.join(tmpdir, b'test-atomic-write')
149
150             with atomically_replaced_file(target_file, mode='w') as f:
151                 f.write('asdf')
152                 WVPASSEQ(f.mode, 'w')
153             f = open(target_file, 'r')
154             WVPASSEQ(f.read(), 'asdf')
155
156             try:
157                 with atomically_replaced_file(target_file, mode='w') as f:
158                     f.write('wxyz')
159                     raise Exception()
160             except:
161                 pass
162             with open(target_file) as f:
163                 WVPASSEQ(f.read(), 'asdf')
164
165             with atomically_replaced_file(target_file, mode='wb') as f:
166                 f.write(os.urandom(20))
167                 WVPASSEQ(f.mode, 'wb')
168
169
170 @wvtest
171 def test_utc_offset_str():
172     with no_lingering_errors():
173         tz = environ.get(b'TZ')
174         try:
175             environ[b'TZ'] = b'FOO+0:00'
176             WVPASSEQ(utc_offset_str(0), b'+0000')
177             environ[b'TZ'] = b'FOO+1:00'
178             WVPASSEQ(utc_offset_str(0), b'-0100')
179             environ[b'TZ'] = b'FOO-1:00'
180             WVPASSEQ(utc_offset_str(0), b'+0100')
181             environ[b'TZ'] = b'FOO+3:3'
182             WVPASSEQ(utc_offset_str(0), b'-0303')
183             environ[b'TZ'] = b'FOO-3:3'
184             WVPASSEQ(utc_offset_str(0), b'+0303')
185             # Offset is not an integer number of minutes
186             environ[b'TZ'] = b'FOO+3:3:3'
187             WVPASSEQ(utc_offset_str(1), b'-0303')
188             environ[b'TZ'] = b'FOO-3:3:3'
189             WVPASSEQ(utc_offset_str(1), b'+0303')
190             WVPASSEQ(utc_offset_str(314159), b'+0303')
191         finally:
192             if tz:
193                 environ[b'TZ'] = tz
194             else:
195                 try:
196                     del environ[b'TZ']
197                 except KeyError:
198                     pass
199
200 @wvtest
201 def test_valid_save_name():
202     with no_lingering_errors():
203         valid = helpers.valid_save_name
204         WVPASS(valid('x'))
205         WVPASS(valid('x@'))
206         WVFAIL(valid('@'))
207         WVFAIL(valid('/'))
208         WVFAIL(valid('/foo'))
209         WVFAIL(valid('foo/'))
210         WVFAIL(valid('/foo/'))
211         WVFAIL(valid('foo//bar'))
212         WVFAIL(valid('.'))
213         WVFAIL(valid('bar.'))
214         WVFAIL(valid('foo@{'))
215         for x in ' ~^:?*[\\':
216             WVFAIL(valid('foo' + x))
217         for i in range(20):
218             WVFAIL(valid('foo' + chr(i)))
219         WVFAIL(valid('foo' + chr(0x7f)))
220         WVFAIL(valid('foo..bar'))
221         WVFAIL(valid('bar.lock/baz'))
222         WVFAIL(valid('foo/bar.lock/baz'))
223         WVFAIL(valid('.bar/baz'))
224         WVFAIL(valid('foo/.bar/baz'))