- with no_lingering_errors():
- for chunk in batchpipe([b'echo'], []):
- WVPASS(False)
- out = b''
- for chunk in batchpipe([b'echo'], [b'42']):
- out += chunk
- WVPASSEQ(out, b'42\n')
- try:
- batchpipe([b'bash', b'-c'], [b'exit 42'])
- except Exception as ex:
- WVPASSEQ(str(ex),
- "subprocess 'bash -c exit 42' failed with status 42")
- args = [str(x) for x in range(6)]
- # Force batchpipe to break the args into batches of 3. This
- # approach assumes all args are the same length.
- arg_max = \
- helpers._argmax_base([b'echo']) + helpers._argmax_args_size(args[:3])
- batches = batchpipe(['echo'], args, arg_max=arg_max)
- WVPASSEQ(next(batches), b'0 1 2\n')
- WVPASSEQ(next(batches), b'3 4 5\n')
- WVPASSEQ(next(batches, None), None)
- batches = batchpipe([b'echo'], [str(x) for x in range(5)], arg_max=arg_max)
- WVPASSEQ(next(batches), b'0 1 2\n')
- WVPASSEQ(next(batches), b'3 4\n')
- WVPASSEQ(next(batches, None), None)
-
-
-@wvtest
-def test_atomically_replaced_file():
- with no_lingering_errors():
- with test_tempdir(b'bup-thelper-') as tmpdir:
- target_file = os.path.join(tmpdir, b'test-atomic-write')
-
- with atomically_replaced_file(target_file, mode='w') as f:
- f.write('asdf')
- WVPASSEQ(f.mode, 'w')
- f = open(target_file, 'r')
- WVPASSEQ(f.read(), 'asdf')
-
- try:
- with atomically_replaced_file(target_file, mode='w') as f:
- f.write('wxyz')
- raise Exception()
- except:
- pass
- with open(target_file) as f:
- WVPASSEQ(f.read(), 'asdf')
-
- with atomically_replaced_file(target_file, mode='wb') as f:
- f.write(os.urandom(20))
- WVPASSEQ(f.mode, 'wb')
+ for chunk in batchpipe([b'echo'], []):
+ WVPASS(False)
+ out = b''
+ for chunk in batchpipe([b'echo'], [b'42']):
+ out += chunk
+ WVPASSEQ(out, b'42\n')
+ try:
+ batchpipe([b'bash', b'-c'], [b'exit 42'])
+ except Exception as ex:
+ WVPASSEQ(str(ex),
+ "subprocess 'bash -c exit 42' failed with status 42")
+ args = [str(x) for x in range(6)]
+ # Force batchpipe to break the args into batches of 3. This
+ # approach assumes all args are the same length.
+ arg_max = \
+ helpers._argmax_base([b'echo']) + helpers._argmax_args_size(args[:3])
+ batches = batchpipe(['echo'], args, arg_max=arg_max)
+ WVPASSEQ(next(batches), b'0 1 2\n')
+ WVPASSEQ(next(batches), b'3 4 5\n')
+ WVPASSEQ(next(batches, None), None)
+ batches = batchpipe([b'echo'], [str(x) for x in range(5)], arg_max=arg_max)
+ WVPASSEQ(next(batches), b'0 1 2\n')
+ WVPASSEQ(next(batches), b'3 4\n')
+ WVPASSEQ(next(batches, None), None)
+
+
+def test_atomically_replaced_file(tmpdir):
+ target_file = os.path.join(tmpdir, b'test-atomic-write')
+
+ with atomically_replaced_file(target_file, mode='w') as f:
+ f.write('asdf')
+ WVPASSEQ(f.mode, 'w')
+ f = open(target_file, 'r')
+ WVPASSEQ(f.read(), 'asdf')
+
+ try:
+ with atomically_replaced_file(target_file, mode='w') as f:
+ f.write('wxyz')
+ raise Exception()
+ except:
+ pass
+ with open(target_file) as f:
+ WVPASSEQ(f.read(), 'asdf')
+
+ with atomically_replaced_file(target_file, mode='wb') as f:
+ f.write(os.urandom(20))
+ WVPASSEQ(f.mode, 'wb')