columnate,
handle_ctrl_c,
log,
- merge_dict,
tty_width
)
from bup.git import close_catpipes
fix_stderr = not already_fixed and os.isatty(2)
if fix_stdout or fix_stderr:
- tty_env = merge_dict(environ,
- {b'BUP_FORCE_TTY': (b'%d'
- % ((fix_stdout and 1 or 0)
- + (fix_stderr and 2 or 0))),
- b'BUP_TTY_WIDTH': b'%d' % _tty_width(), })
-else:
- tty_env = environ
+ _ttymask = (fix_stdout and 1 or 0) + (fix_stderr and 2 or 0)
+ environ[b'BUP_FORCE_TTY'] = b'%d' % _ttymask
+ environ[b'BUP_TTY_WIDTH'] = b'%d' % _tty_width()
sep_rx = re.compile(br'([\r\n])')
assert not sep_rx.match(x)
content = b''.join(content)
if sep == b'\r' and len(content) > width:
- content = content[width:]
+ content = content[:width]
os.write(dest, content)
if len(content) < width:
os.write(dest, b' ' * (width - len(content)))
"""
global sep_rx
- assert all(type(x) in int_types for x in srcs)
- assert all(type(x) in int_types for x in srcs)
+ assert all(isinstance(x, int_types) for x in srcs)
assert len(srcs) == len(dests)
srcs = tuple(srcs)
dest_for = dict(zip(srcs, dests))
p = subprocess.Popen(c,
stdout=PIPE if fix_stdout else out,
stderr=PIPE if fix_stderr else err,
- env=tty_env, bufsize=4096, close_fds=True)
+ bufsize=4096, close_fds=True)
# Assume p will receive these signals and quit, which will
# then cause us to quit.
for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGQUIT):