7 import bup._helpers as _helpers
8 from bup.helpers import *
11 bup_tmp = os.path.realpath('../../../t/tmp')
16 # Test whatever you end up with for next() after import '*'.
17 WVPASSEQ(next(iter([]), None), None)
19 WVPASSEQ(next(x, None), 1)
20 WVPASSEQ(next(x, None), None)
22 WVPASSEQ(next(x, 'x'), 1)
23 WVPASSEQ(next(x, 'x'), 'x')
24 WVEXCEPT(StopIteration, next, iter([]))
27 WVEXCEPT(StopIteration, next, x)
31 def test_fallback_next():
34 next = helpers._fallback_next
46 WVPASSEQ(pn('1.5k'), 1536)
47 WVPASSEQ(pn('2 gb'), 2*1024*1024*1024)
48 WVPASSEQ(pn('1e+9 k'), 1000000000 * 1024)
49 WVPASSEQ(pn('-3e-3mb'), int(-0.003 * 1024 * 1024))
52 def test_detect_fakeroot():
53 if os.getenv('FAKEROOTKEY'):
54 WVPASS(detect_fakeroot())
56 WVPASS(not detect_fakeroot())
59 def test_path_components():
60 WVPASSEQ(path_components('/'), [('', '/')])
61 WVPASSEQ(path_components('/foo'), [('', '/'), ('foo', '/foo')])
62 WVPASSEQ(path_components('/foo/'), [('', '/'), ('foo', '/foo')])
63 WVPASSEQ(path_components('/foo/bar'),
64 [('', '/'), ('foo', '/foo'), ('bar', '/foo/bar')])
65 WVEXCEPT(Exception, path_components, 'foo')
69 def test_stripped_path_components():
70 WVPASSEQ(stripped_path_components('/', []), [('', '/')])
71 WVPASSEQ(stripped_path_components('/', ['']), [('', '/')])
72 WVPASSEQ(stripped_path_components('/', ['/']), [('', '/')])
73 WVPASSEQ(stripped_path_components('/foo', ['/']),
74 [('', '/'), ('foo', '/foo')])
75 WVPASSEQ(stripped_path_components('/', ['/foo']), [('', '/')])
76 WVPASSEQ(stripped_path_components('/foo', ['/bar']),
77 [('', '/'), ('foo', '/foo')])
78 WVPASSEQ(stripped_path_components('/foo', ['/foo']), [('', '/foo')])
79 WVPASSEQ(stripped_path_components('/foo/bar', ['/foo']),
80 [('', '/foo'), ('bar', '/foo/bar')])
81 WVPASSEQ(stripped_path_components('/foo/bar', ['/bar', '/foo', '/baz']),
82 [('', '/foo'), ('bar', '/foo/bar')])
83 WVPASSEQ(stripped_path_components('/foo/bar/baz', ['/foo/bar/baz']),
84 [('', '/foo/bar/baz')])
85 WVEXCEPT(Exception, stripped_path_components, 'foo', [])
89 def test_grafted_path_components():
90 WVPASSEQ(grafted_path_components([('/chroot', '/')], '/foo'),
91 [('', '/'), ('foo', '/foo')])
92 WVPASSEQ(grafted_path_components([('/foo/bar', '/')], '/foo/bar/baz/bax'),
94 ('baz', '/foo/bar/baz'),
95 ('bax', '/foo/bar/baz/bax')])
96 WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/bax')],
99 ('bax', '/foo/bar/baz'),
100 ('1', '/foo/bar/baz/1'),
101 ('2', '/foo/bar/baz/1/2')])
102 WVPASSEQ(grafted_path_components([('/foo', '/bar/baz/bax')],
108 ('bar', '/foo/bar')])
109 WVPASSEQ(grafted_path_components([('/foo/bar/baz', '/a/b/c')],
111 [('', None), ('a', None), ('b', None), ('c', '/foo/bar/baz')])
112 WVPASSEQ(grafted_path_components([('/', '/a/b/c/')], '/foo/bar'),
113 [('', None), ('a', None), ('b', None), ('c', '/'),
114 ('foo', '/foo'), ('bar', '/foo/bar')])
115 WVEXCEPT(Exception, grafted_path_components, 'foo', [])
120 x = readpipe(['echo', '42'])
123 readpipe(['bash', '-c', 'exit 42'])
124 except Exception, ex:
125 WVPASSEQ(str(ex), "subprocess 'bash -c exit 42' failed with status 42")
129 def test_batchpipe():
130 for chunk in batchpipe(['echo'], []):
133 for chunk in batchpipe(['echo'], ['42']):
135 WVPASSEQ(out, '42\n')
137 batchpipe(['bash', '-c'], ['exit 42'])
138 except Exception, ex:
139 WVPASSEQ(str(ex), "subprocess 'bash -c exit 42' failed with status 42")
140 args = [str(x) for x in range(6)]
141 # Force batchpipe to break the args into batches of 3. This
142 # approach assumes all args are the same length.
144 helpers._argmax_base(['echo']) + helpers._argmax_args_size(args[:3])
145 batches = batchpipe(['echo'], args, arg_max=arg_max)
146 WVPASSEQ(next(batches), '0 1 2\n')
147 WVPASSEQ(next(batches), '3 4 5\n')
148 WVPASSEQ(next(batches, None), None)
149 batches = batchpipe(['echo'], [str(x) for x in range(5)], arg_max=arg_max)
150 WVPASSEQ(next(batches), '0 1 2\n')
151 WVPASSEQ(next(batches), '3 4\n')
152 WVPASSEQ(next(batches, None), None)
156 def test_atomically_replaced_file():
157 tmpdir = tempfile.mkdtemp(dir=bup_tmp, prefix='bup-thelper-')
158 target_file = os.path.join(tmpdir, 'test-atomic-write')
159 initial_failures = wvfailure_count()
161 with atomically_replaced_file(target_file, mode='w') as f:
163 WVPASSEQ(f.mode, 'w')
164 f = open(target_file, 'r')
165 WVPASSEQ(f.read(), 'asdf')
168 with atomically_replaced_file(target_file, mode='w') as f:
173 with open(target_file) as f:
174 WVPASSEQ(f.read(), 'asdf')
176 with atomically_replaced_file(target_file, mode='wb') as f:
177 f.write(os.urandom(20))
178 WVPASSEQ(f.mode, 'wb')
180 if wvfailure_count() == initial_failures:
181 subprocess.call(['rm', '-rf', tmpdir])