sys.path[:0] = [os.path.dirname(os.path.realpath(__file__)) + '/..']
from importlib import import_module
+from pkgutil import iter_modules
from subprocess import PIPE
+from threading import Thread
import errno, getopt, os, re, select, signal, subprocess, sys
from bup import compat, path, helpers
argv_bytes,
environ,
fsdecode,
+ int_types,
wrap_main
)
from bup.compat import add_ex_tb, add_ex_ctx, argv_bytes, wrap_main
merge_dict,
tty_width
)
+from bup.git import close_catpipes
from bup.io import byte_stream, path_msg
from bup.options import _tty_width
+import bup.cmd
def maybe_import_early(argv):
"""Scan argv and import any modules specified by --import-py-module."""
log('\n')
log('Other available commands:\n')
- cmds = []
+ cmds = set()
for c in sorted(os.listdir(cmdpath)):
if c.startswith(b'bup-') and c.find(b'.') < 0:
cname = fsdecode(c[4:])
if cname not in common:
- cmds.append(c[4:].decode(errors='backslashreplace'))
- log(columnate(cmds, ' '))
+ cmds.add(c[4:].decode(errors='backslashreplace'))
+ # built-in commands take precedence
+ for _, name, _ in iter_modules(path=bup.cmd.__path__):
+ name = name.replace('_','-')
+ if name not in common:
+ cmds.add(name)
+
+ log(columnate(sorted(cmds), ' '))
log('\n')
log("See 'bup help COMMAND' for more information on " +
usage()
try:
- if subcmd_name not in []:
+ if subcmd_name not in (b'cat-file',
+ b'ls'):
raise ModuleNotFoundError()
cmd_module = import_module('bup.cmd.'
+ subcmd_name.decode('ascii').replace('-', '_'))
if sep:
os.write(dest, sep)
-def filter_output(src_out, src_err, dest_out, dest_err):
- """Transfer data from src_out to dest_out and src_err to dest_err via
- print_clean_line until src_out and src_err close."""
+def filter_output(srcs, dests):
+ """Transfer data from file descriptors in srcs to the corresponding
+ file descriptors in dests print_clean_line until all of the srcs
+ have closed.
+
+ """
global sep_rx
- assert not isinstance(src_out, bool)
- assert not isinstance(src_err, bool)
- assert not isinstance(dest_out, bool)
- assert not isinstance(dest_err, bool)
- assert src_out is not None or src_err is not None
- assert (src_out is None) == (dest_out is None)
- assert (src_err is None) == (dest_err is None)
+ assert all(type(x) in int_types for x in srcs)
+ assert all(type(x) in int_types for x in srcs)
+ assert len(srcs) == len(dests)
+ srcs = tuple(srcs)
+ dest_for = dict(zip(srcs, dests))
pending = {}
pending_ex = None
try:
- fds = tuple([x for x in (src_out, src_err) if x is not None])
- while fds:
- ready_fds, _, _ = select.select(fds, [], [])
+ while srcs:
+ ready_fds, _, _ = select.select(srcs, [], [])
width = tty_width()
for fd in ready_fds:
buf = os.read(fd, 4096)
- dest = dest_out if fd == src_out else dest_err
+ dest = dest_for[fd]
if not buf:
- fds = tuple([x for x in fds if x is not fd])
+ srcs = tuple([x for x in srcs if x is not fd])
print_clean_line(dest, pending.pop(fd, []), width)
else:
split = sep_rx.split(buf)
pending.pop(fd, []) + [content],
width,
sep)
- assert(len(split) == 1)
+ assert len(split) == 1
if split[0]:
pending.setdefault(fd, []).extend(split)
except BaseException as ex:
try:
# Try to finish each of the streams
for fd, pending_items in compat.items(pending):
- dest = dest_out if fd == src_out else dest_err
+ dest = dest_for[fd]
+ width = tty_width()
try:
print_clean_line(dest, pending_items, width)
except (EnvironmentError, EOFError) as ex:
raise pending_ex
-def run_subcmd(module, args):
+def import_and_run_main(module, args):
+ if do_profile:
+ import cProfile
+ f = compile('module.main(args)', __file__, 'exec')
+ cProfile.runctx(f, globals(), locals())
+ else:
+ module.main(args)
- if module:
- if do_profile:
- import cProfile
- f = compile('module.main(args)', __file__, 'exec')
- cProfile.runctx(f, globals(), locals())
- else:
- module.main(args)
+
+def run_module_cmd(module, args):
+ if not (fix_stdout or fix_stderr):
+ import_and_run_main(module, args)
return
+ # Interpose filter_output between all attempts to write to the
+ # stdout/stderr and the real stdout/stderr (e.g. the fds that
+ # connect directly to the terminal) via a thread that runs
+ # filter_output in a pipeline.
+ srcs = []
+ dests = []
+ real_out_fd = real_err_fd = stdout_pipe = stderr_pipe = None
+ filter_thread = filter_thread_started = None
+ pending_ex = None
+ try:
+ if fix_stdout:
+ sys.stdout.flush()
+ stdout_pipe = os.pipe() # monitored_by_filter, stdout_everyone_uses
+ real_out_fd = os.dup(sys.stdout.fileno())
+ os.dup2(stdout_pipe[1], sys.stdout.fileno())
+ srcs.append(stdout_pipe[0])
+ dests.append(real_out_fd)
+ if fix_stderr:
+ sys.stderr.flush()
+ stderr_pipe = os.pipe() # monitored_by_filter, stderr_everyone_uses
+ real_err_fd = os.dup(sys.stderr.fileno())
+ os.dup2(stderr_pipe[1], sys.stderr.fileno())
+ srcs.append(stderr_pipe[0])
+ dests.append(real_err_fd)
+
+ filter_thread = Thread(name='output filter',
+ target=lambda : filter_output(srcs, dests))
+ filter_thread.start()
+ filter_thread_started = True
+ import_and_run_main(module, args)
+ except Exception as ex:
+ add_ex_tb(ex)
+ pending_ex = ex
+ raise
+ finally:
+ # Try to make sure that whatever else happens, we restore
+ # stdout and stderr here, if that's possible, so that we don't
+ # risk just losing some output.
+ try:
+ real_out_fd is not None and os.dup2(real_out_fd, sys.stdout.fileno())
+ except Exception as ex:
+ add_ex_tb(ex)
+ add_ex_ctx(ex, pending_ex)
+ try:
+ real_err_fd is not None and os.dup2(real_err_fd, sys.stderr.fileno())
+ except Exception as ex:
+ add_ex_tb(ex)
+ add_ex_ctx(ex, pending_ex)
+ # Kick filter loose
+ try:
+ stdout_pipe is not None and os.close(stdout_pipe[1])
+ except Exception as ex:
+ add_ex_tb(ex)
+ add_ex_ctx(ex, pending_ex)
+ try:
+ stderr_pipe is not None and os.close(stderr_pipe[1])
+ except Exception as ex:
+ add_ex_tb(ex)
+ add_ex_ctx(ex, pending_ex)
+ try:
+ close_catpipes()
+ except Exception as ex:
+ add_ex_tb(ex)
+ add_ex_ctx(ex, pending_ex)
+ if pending_ex:
+ raise pending_ex
+ # There's no point in trying to join unless we finished the finally block.
+ if filter_thread_started:
+ filter_thread.join()
+
+
+def run_subproc_cmd(args):
c = (do_profile and [sys.executable, b'-m', b'cProfile'] or []) + args
if not (fix_stdout or fix_stderr):
for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGQUIT):
signal.signal(sig, signal.SIG_IGN)
- filter_output(fix_stdout and p.stdout.fileno() or None,
- fix_stderr and p.stderr.fileno() or None,
- fix_stdout and out.fileno() or None,
- fix_stderr and err.fileno() or None)
+ srcs = []
+ dests = []
+ if fix_stdout:
+ srcs.append(p.stdout.fileno())
+ dests.append(out.fileno())
+ if fix_stderr:
+ srcs.append(p.stderr.fileno())
+ dests.append(err.fileno())
+ filter_output(srcs, dests)
return p.wait()
except BaseException as ex:
add_ex_tb(ex)
raise add_ex_ctx(add_ex_tb(kill_ex), ex)
raise ex
+
+def run_subcmd(module, args):
+ if module:
+ run_module_cmd(module, args)
+ else:
+ run_subproc_cmd(args)
+
+
wrap_main(lambda : run_subcmd(cmd_module, subcmd))