]> arthur.barton.de Git - bup.git/commitdiff
bup: filter stdout/stderr via thread/pipe for internal subcommands
authorRob Browning <rlb@defaultvalue.org>
Sat, 12 Dec 2020 19:14:41 +0000 (13:14 -0600)
committerRob Browning <rlb@defaultvalue.org>
Sat, 6 Mar 2021 18:29:38 +0000 (12:29 -0600)
Signed-off-by: Rob Browning <rlb@defaultvalue.org>
Tested-by: Rob Browning <rlb@defaultvalue.org>
lib/cmd/bup

index 13490d8c896367fdd0f1687352a3d9af29f3be11..e7dd854a9aab7080566094327fd016f0bd82e6b9 100755 (executable)
@@ -32,6 +32,7 @@ sys.path[:0] = [os.path.dirname(os.path.realpath(__file__)) + '/..']
 
 from importlib import import_module
 from subprocess import PIPE
+from threading import Thread
 import errno, getopt, os, re, select, signal, subprocess, sys
 
 from bup import compat, path, helpers
@@ -54,6 +55,7 @@ from bup.helpers import (
     merge_dict,
     tty_width
 )
+from bup.git import close_catpipes
 from bup.io import byte_stream, path_msg
 from bup.options import _tty_width
 
@@ -283,16 +285,91 @@ def filter_output(srcs, dests):
         raise pending_ex
 
 
-def run_subcmd(module, args):
+def import_and_run_main(module, args):
+    if do_profile:
+        import cProfile
+        f = compile('module.main(args)', __file__, 'exec')
+        cProfile.runctx(f, globals(), locals())
+    else:
+        module.main(args)
 
-    if module:
-        if do_profile:
-            import cProfile
-            f = compile('module.main(args)', __file__, 'exec')
-            cProfile.runctx(f, globals(), locals())
-        else:
-            module.main(args)
+
+def run_module_cmd(module, args):
+    if not (fix_stdout or fix_stderr):
+        import_and_run_main(module, args)
         return
+    # Interpose filter_output between all attempts to write to the
+    # stdout/stderr and the real stdout/stderr (e.g. the fds that
+    # connect directly to the terminal) via a thread that runs
+    # filter_output in a pipeline.
+    srcs = []
+    dests = []
+    real_out_fd = real_err_fd = stdout_pipe = stderr_pipe = None
+    filter_thread = filter_thread_started = None
+    pending_ex = None
+    try:
+        if fix_stdout:
+            sys.stdout.flush()
+            stdout_pipe = os.pipe()  # monitored_by_filter, stdout_everyone_uses
+            real_out_fd = os.dup(sys.stdout.fileno())
+            os.dup2(stdout_pipe[1], sys.stdout.fileno())
+            srcs.append(stdout_pipe[0])
+            dests.append(real_out_fd)
+        if fix_stderr:
+            sys.stderr.flush()
+            stderr_pipe = os.pipe()  # monitored_by_filter, stderr_everyone_uses
+            real_err_fd = os.dup(sys.stderr.fileno())
+            os.dup2(stderr_pipe[1], sys.stderr.fileno())
+            srcs.append(stderr_pipe[0])
+            dests.append(real_err_fd)
+
+        filter_thread = Thread(name='output filter',
+                               target=lambda : filter_output(srcs, dests))
+        filter_thread.start()
+        filter_thread_started = True
+        import_and_run_main(module, args)
+    except Exception as ex:
+        add_ex_tb(ex)
+        pending_ex = ex
+        raise
+    finally:
+        # Try to make sure that whatever else happens, we restore
+        # stdout and stderr here, if that's possible, so that we don't
+        # risk just losing some output.
+        try:
+            real_out_fd is not None and os.dup2(real_out_fd, sys.stdout.fileno())
+        except Exception as ex:
+            add_ex_tb(ex)
+            add_ex_ctx(ex, pending_ex)
+        try:
+            real_err_fd is not None and os.dup2(real_err_fd, sys.stderr.fileno())
+        except Exception as ex:
+            add_ex_tb(ex)
+            add_ex_ctx(ex, pending_ex)
+        # Kick filter loose
+        try:
+            stdout_pipe is not None and os.close(stdout_pipe[1])
+        except Exception as ex:
+            add_ex_tb(ex)
+            add_ex_ctx(ex, pending_ex)
+        try:
+            stderr_pipe is not None and os.close(stderr_pipe[1])
+        except Exception as ex:
+            add_ex_tb(ex)
+            add_ex_ctx(ex, pending_ex)
+        try:
+            close_catpipes()
+        except Exception as ex:
+            add_ex_tb(ex)
+            add_ex_ctx(ex, pending_ex)
+    if pending_ex:
+        raise pending_ex
+    # There's no point in trying to join unless we finished the finally block.
+    if filter_thread_started:
+        filter_thread.join()
+
+
+def run_subproc_cmd(args):
 
     c = (do_profile and [sys.executable, b'-m', b'cProfile'] or []) + args
     if not (fix_stdout or fix_stderr):
@@ -333,4 +410,12 @@ def run_subcmd(module, args):
             raise add_ex_ctx(add_ex_tb(kill_ex), ex)
         raise ex
 
+
+def run_subcmd(module, args):
+    if module:
+        run_module_cmd(module, args)
+    else:
+        run_subproc_cmd(args)
+
+
 wrap_main(lambda : run_subcmd(cmd_module, subcmd))