]> arthur.barton.de Git - bup.git/blob - lib/bup/t/thelpers.py
Add atomically_replaced_file for safer output
[bup.git] / lib / bup / t / thelpers.py
1 import helpers
2 import math
3 import os
4 import os.path
5 import tempfile
6 import stat
7 import bup._helpers as _helpers
8 from bup.helpers import *
9 from wvtest import *
10
11
12 @wvtest
13 def test_next():
14     # Test whatever you end up with for next() after import '*'.
15     WVPASSEQ(next(iter([]), None), None)
16     x = iter([1])
17     WVPASSEQ(next(x, None), 1)
18     WVPASSEQ(next(x, None), None)
19     x = iter([1])
20     WVPASSEQ(next(x, 'x'), 1)
21     WVPASSEQ(next(x, 'x'), 'x')
22     WVEXCEPT(StopIteration, next, iter([]))
23     x = iter([1])
24     WVPASSEQ(next(x), 1)
25     WVEXCEPT(StopIteration, next, x)
26
27
28 @wvtest
29 def test_fallback_next():
30     global next
31     orig = next
32     next = helpers._fallback_next
33     try:
34         test_next()
35     finally:
36         next = orig
37
38
39 @wvtest
40 def test_parse_num():
41     pn = parse_num
42     WVPASSEQ(pn('1'), 1)
43     WVPASSEQ(pn('0'), 0)
44     WVPASSEQ(pn('1.5k'), 1536)
45     WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
46     WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
47     WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
48
49 @wvtest
50 def test_detect_fakeroot():
51     if os.getenv('FAKEROOTKEY'):
52         WVPASS(detect_fakeroot())
53     else:
54         WVPASS(not detect_fakeroot())
55
56 @wvtest
57 def test_path_components():
58     WVPASSEQ(path_components('/'), [('', '/')])
59     WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
60     WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
61     WVPASSEQ(path_components('/foo/bar'),
62              [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
63     WVEXCEPT(Exception, path_components, 'foo')
64
65
66 @wvtest
67 def test_stripped_path_components():
68     WVPASSEQ(stripped_path_components('/', []), [('', '/')])
69     WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
70     WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
71     WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
72     WVPASSEQ(stripped_path_components('/foo', ['/bar']),
73              [('', '/'), ('foo', '/foo')])
74     WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
75     WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
76              [('', '/foo'), ('bar', '/foo/bar')])
77     WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
78              [('', '/foo'), ('bar', '/foo/bar')])
79     WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
80              [('', '/foo/bar/baz')])
81     WVEXCEPT(Exception, stripped_path_components, 'foo', [])
82
83
84 @wvtest
85 def test_grafted_path_components():
86     WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
87              [('', '/'), ('foo', '/foo')])
88     WVPASSEQ(grafted_path_components([('/foo/bar', '/')], '/foo/bar/baz/bax'),
89              [('', '/foo/bar'),
90               ('baz', '/foo/bar/baz'),
91               ('bax', '/foo/bar/baz/bax')])
92     WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
93                                      '/foo/bar/baz/1/2'),
94              [('', None),
95               ('bax', '/foo/bar/baz'),
96               ('1', '/foo/bar/baz/1'),
97               ('2', '/foo/bar/baz/1/2')])
98     WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
99                                      '/foo/bar'),
100              [('', None),
101               ('bar', None),
102               ('baz', None),
103               ('bax', '/foo'),
104               ('bar', '/foo/bar')])
105     WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
106                                      '/foo/bar/baz'),
107              [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
108     WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
109              [('', None), ('a', None), ('b', None), ('c', '/'),
110               ('foo', '/foo'), ('bar', '/foo/bar')])
111     WVEXCEPT(Exception, grafted_path_components, 'foo', [])
112
113
114 @wvtest
115 def test_readpipe():
116     x = readpipe(['echo', '42'])
117     WVPASSEQ(x, '42\n')
118     try:
119         readpipe(['bash', '-c', 'exit 42'])
120     except Exception, ex:
121         WVPASSEQ(str(ex), "subprocess 'bash -c exit 42' failed with status 42")
122
123
124 @wvtest
125 def test_batchpipe():
126     for chunk in batchpipe(['echo'], []):
127         WVPASS(False)
128     out = ''
129     for chunk in batchpipe(['echo'], ['42']):
130         out += chunk
131     WVPASSEQ(out, '42\n')
132     try:
133         batchpipe(['bash', '-c'], ['exit 42'])
134     except Exception, ex:
135         WVPASSEQ(str(ex), "subprocess 'bash -c exit 42' failed with status 42")
136     args = [str(x) for x in range(6)]
137     # Force batchpipe to break the args into batches of 3.  This
138     # approach assumes all args are the same length.
139     arg_max = \
140         helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
141     batches = batchpipe(['echo'], args, arg_max=arg_max)
142     WVPASSEQ(next(batches), '0 1 2\n')
143     WVPASSEQ(next(batches), '3 4 5\n')
144     WVPASSEQ(next(batches, None), None)
145     batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
146     WVPASSEQ(next(batches), '0 1 2\n')
147     WVPASSEQ(next(batches), '3 4\n')
148     WVPASSEQ(next(batches, None), None)
149
150
151 @wvtest
152 def test_atomically_replaced_file():
153     target_file = os.path.join(tempfile.gettempdir(),
154                                'test_atomic_write')
155     try:
156         with atomically_replaced_file(target_file, mode='w') as f:
157             f.write('asdf')
158             WVPASSEQ(f.mode, 'w')
159         f = open(target_file, 'r')
160         WVPASSEQ(f.read(), 'asdf')
161
162         with atomically_replaced_file(target_file, mode='wb') as f:
163             f.write(os.urandom(20))
164             WVPASSEQ(f.mode, 'wb')
165     finally:
166         unlink(target_file)