Source code for xonsh.procs.pipelines

"""Command pipeline tools."""

import errno
import io
import os
import re
import signal
import subprocess
import sys
import threading
import time

import xonsh.lib.lazyasd as xl
import xonsh.platform as xp
import xonsh.procs.jobs as xj
import xonsh.tools as xt
from xonsh.built_ins import XSH
from xonsh.procs.readers import ConsoleParallelReader, NonBlockingFDReader, safe_fdclose


@xl.lazyobject
def STDOUT_CAPTURE_KINDS():
    return frozenset(["stdout", "object"])


@xl.lazyobject
def RE_HIDDEN_BYTES():
    return re.compile("(\001.*?\002)")


@xl.lazyobject
def RE_VT100_ESCAPE():
    return re.compile("(\u009b|\u001b\\[)[0-?]*[ -\\/]*[@-~]")


@xl.lazyobject
def RE_HIDE_ESCAPE():
    return re.compile(
        "(" + RE_HIDDEN_BYTES.pattern + "|" + RE_VT100_ESCAPE.pattern + ")"
    )


@xl.lazyobject
def SIGNAL_MESSAGES():
    sm = {
        signal.SIGABRT: "Aborted",
        signal.SIGFPE: "Floating point exception",
        signal.SIGILL: "Illegal instructions",
        signal.SIGTERM: "Terminated",
        signal.SIGSEGV: "Segmentation fault",
    }
    if xp.ON_POSIX:
        sm.update(
            {
                signal.SIGQUIT: "Quit",
                signal.SIGHUP: "Hangup",
                signal.SIGKILL: "Killed",
                signal.SIGTSTP: "Stopped",
            }
        )
    return sm


[docs] def safe_readlines(handle, hint=-1): """Attempts to read lines without throwing an error.""" if handle is None: return [] try: lines = handle.readlines(hint) except OSError: lines = [] return lines
[docs] def safe_readable(handle): """Attempts to find if the handle is readable without throwing an error.""" try: status = handle.readable() except (OSError, ValueError): status = False return status
[docs] def update_process_group(pipeline_group, background): if not xp.ON_POSIX: return False env = XSH.env if not env.get("XONSH_INTERACTIVE"): return False if background: return True return xj.give_terminal_to(pipeline_group)
def _read_all(stdout): """Read all remaining bytes from *stdout*.""" if hasattr(stdout, "iterqueue"): return b"".join(stdout.iterqueue()) return stdout.read() def _drain_stdout(stdout): """Read all remaining bytes and yield them as lines.""" return _read_all(stdout).splitlines(keepends=True)
[docs] class blocking_property(property): """Property that may block waiting for process completion."""
[docs] class CommandPipeline: """Represents a subprocess-mode command pipeline.""" attrnames = ( "returncode", "suspended", "pid", "args", "alias", "executed_cmd", "timestamps", "input", "output", "errors", ) attrnames_ext = ( "stdin", "stdout", "stderr", "stdin_redirect", "stdout_redirect", "stderr_redirect", ) nonblocking = (io.BytesIO, NonBlockingFDReader, ConsoleParallelReader) def __init__(self, specs): """ Parameters ---------- specs : list of SubprocSpec Process specifications Attributes ---------- spec : SubprocSpec The last specification in specs proc : Popen-like The process in procs ended : bool Boolean for if the command has stopped executing. input : str A string of the standard input. output : str A string of the standard output. errors : str A string of the standard error. lines : list of str The output lines starttime : floats or None Pipeline start timestamp. pipestatus : list of int or None Current return codes of all commands in the pipeline. pipecode : int Current pipeline status: 1 if any command returned non-zero or is still running, 0 if all succeeded. """ self.starttime = None self.ended = False self.procs = [] self.specs = specs self.spec = specs[-1] self.captured = specs[-1].captured self.input = self._output = self.errors = self.endtime = None self._closed_handle_cache = {} self.lines = [] self._raw_output = self._raw_error = b"" self._stderr_prefix = self._stderr_postfix = None self.term_pgid = None self._term_state = None # saved terminal attrs for restoration self.suspended = None self.output_format = self.spec.output_format background = self.spec.background pipeline_group = None if xp.ON_POSIX and not xt.on_main_thread(): # If we are inside a ProcProxyThread, then run commands in the same # process group as xonsh. This fixes case 2 of issue #4277, where # the terminal is given to a command inside the ProcProxyThread, # taking the terminal away from the `less` command, causing `less` # to stop. pipeline_group = os.getpgid(0) for i, spec in enumerate(specs): for mod in spec.decorators: mod.decorate_spec_pre_run(self, spec, i) if self.starttime is None: self.starttime = time.time() try: proc = spec.run(pipeline_group=pipeline_group) except Exception: xt.print_exception() self._return_terminal() # Release any pipe wrappers held by specs that won't be # routed through _close_proc(): the failing spec, plus any # later specs that never got to run(). for s in specs[i:]: s.close() self.proc = None return if proc.pid and pipeline_group is None and not spec.is_proxy: # All non-proxy pipeline members must share a single # process group so that one os.killpg() can reach them # all on Ctrl+C. The first subprocess becomes the group # leader (via os.setpgrp() in its preexec_fn); subsequent # ones join it (via os.setpgid(0, pipeline_group)). # Proxy specs (callable aliases) are Python threads inside # xonsh, not child processes, so they cannot join the # group — they are skipped here. pipeline_group = proc.pid # Terminal ownership is a separate concern: for # captured="object" the pipeline is returned as a live # Python object, so the terminal must stay with xonsh. if self.captured != "object" and update_process_group( pipeline_group, background ): self.term_pgid = pipeline_group self._save_term_state() self.procs.append(proc) self.proc = self.procs[-1] self._pgid = pipeline_group # process group for interrupt handling def __repr__(self): debug = XSH.env.get("XONSH_DEBUG", False) attrs = self.attrnames + (self.attrnames_ext if debug else ()) s = self.__class__.__name__ + "(\n " s += ",\n ".join( a + "=" + repr(getattr(self, a)) for a in attrs if debug or getattr(self, a) is not None ) s += "\n)" return s def __bool__(self): return self.returncode == 0 def __int__(self): return self.returncode def __hash__(self): return hash(self.returncode) def __str__(self): self.end() return self.output def __len__(self): return len(self.procs) def __eq__(self, other): if isinstance(other, int): return self.returncode == other elif isinstance(other, str): return str(self) == other raise Exception( f"CommandPipeline doesn't support comparing with {type(other)}." ) def __iter__(self): """Iterates through stdout and returns the lines, converting to strings and universal newlines if needed. """ if self.ended: yield from iter(self.lines) else: yield from self.tee_stdout()
[docs] def iterraw(self): """Iterates through the last stdout, and returns the lines exactly as found. """ # get appropriate handles spec = self.spec proc = self.proc if proc is None: return timeout = XSH.env.get("XONSH_PROC_FREQUENCY") # get the correct stdout stdout = proc.stdout if ( stdout is None or spec.stdout is None or not safe_readable(stdout) ) and spec.captured_stdout is not None: stdout = spec.captured_stdout if hasattr(stdout, "buffer"): stdout = stdout.buffer if stdout is not None and not isinstance(stdout, self.nonblocking): stdout = NonBlockingFDReader(stdout.fileno(), timeout=timeout) if ( not stdout or self.captured == "stdout" or not safe_readable(stdout) or not spec.threadable ): # we get here if the process is not threadable or the # class is the real Popen PrevProcCloser(pipeline=self) task = None if not isinstance(sys.exc_info()[1], SystemExit): task = xj.wait_for_active_job() if task is None or task["status"] != "stopped": proc.wait() self._endtime() # Close captured pipe write ends to signal EOF to readers. # For non-threadable procs (plain Popen), the write end stays # open in the parent after the child exits. PopenThread handles # this itself, but plain Popen does not. if not spec.threadable: for ch in spec.pipe_channels: ch.close_writer() if self.captured in ("object", "hiddenobject") and stdout: yield from _drain_stdout(stdout) self.end(tee_output=False) elif self.captured == "object": self.end(tee_output=False) elif self.captured == "stdout" and stdout is not None: b = _read_all(stdout) s = self._decode_uninew(b, universal_newlines=True) self.lines = s.splitlines(keepends=True) return # get the correct stderr stderr = proc.stderr if ( stderr is None or spec.stderr is None or not safe_readable(stderr) ) and spec.captured_stderr is not None: stderr = spec.captured_stderr if hasattr(stderr, "buffer"): stderr = stderr.buffer if stderr is not None and not isinstance(stderr, self.nonblocking): stderr = NonBlockingFDReader(stderr.fileno(), timeout=timeout) # read from process while it is running check_prev_done = len(self.procs) == 1 prev_end_time = None i = j = cnt = 1 # In the case of pipelines with more than one command # we should give the commands a little time # to start up fully. This is particularly true for # GNU Parallel, which has a long startup time. first_read = True prev_procs_closed = False while proc.poll() is None or first_read or self._any_proc_running(): first_read = False if getattr(proc, "suspended", False) or self._procs_suspended() is not None: self.suspended = True xj.update_job_attr(proc.pid, "status", "suspended") return elif getattr(proc, "in_alt_mode", False): time.sleep(0.1) # probably not leaving any time soon continue # Drain stdout/stderr BEFORE closing previous procs. # _close_prev_procs() may block waiting for upstream processes # (e.g. sleep) and get interrupted by Ctrl+C. Reading first # ensures that output already produced by the last process # (e.g. echo) is captured in self.lines regardless. stdout_lines = safe_readlines(stdout, 1024) i = len(stdout_lines) if i != 0: yield from stdout_lines stderr_lines = safe_readlines(stderr, 1024) j = len(stderr_lines) if j != 0: self.stream_stderr(stderr_lines) # When the last process (e.g. head) has exited but upstream # processes are still alive, close the inter-process pipe read # ends so that upstream writers get SIGPIPE instead of blocking # on a full pipe buffer. if not prev_procs_closed and proc.poll() is not None: self._close_prev_procs() prev_procs_closed = True if not check_prev_done: # if we are piping... if stdout_lines or stderr_lines: # see if we have some output. check_prev_done = True elif prev_end_time is None: # or see if we already know that the next-to-last # proc in the pipeline has ended. if self._prev_procs_done(): # if it has, record the time prev_end_time = time.time() elif time.time() - prev_end_time >= 0.1: # if we still don't have any output, even though the # next-to-last proc has finished, wait a bit to make # sure we have fully started up, etc. check_prev_done = True # this is for CPU usage if i + j == 0: cnt = min(cnt + 1, 1000) else: cnt = 1 time.sleep(timeout * cnt) # Check if SIGINT was caught but not raised as KeyboardInterrupt. # ProcProxyThread's _signal_int sets _interrupted without raising, # so the while loop keeps spinning. Detect it and kill everything. if getattr(proc, "_interrupted", False): self._signal_pipeline() return if not prev_procs_closed: self._close_prev_procs() proc.prevs_are_closed = True # read from process now that it is over yield from safe_readlines(stdout) self.stream_stderr(safe_readlines(stderr)) proc.wait() self._endtime() yield from safe_readlines(stdout) self.stream_stderr(safe_readlines(stderr)) if self.captured == "object": self.end(tee_output=False)
[docs] def itercheck(self): """Iterates through the command lines and throws an error if the returncode is non-zero. """ yield from self if self.returncode: # I included self, as providing access to stderr and other details # useful when instance isn't assigned to a variable in the shell. raise xt.XonshCalledProcessError( self.returncode, self.executed_cmd, self.stdout, self.stderr, self )
[docs] def tee_stdout(self): """Writes the process stdout to the output variable, line-by-line, and yields each line. This may optionally accept lines (in bytes) to iterate over, in which case it does not call iterraw(). """ env = XSH.env enc = env.get("XONSH_ENCODING") err = env.get("XONSH_ENCODING_ERRORS") lines = self.lines raw_out_lines = [] stream = self.captured not in STDOUT_CAPTURE_KINDS if stream and not self.spec.stdout: stream = False # Use STDOUT_DISPATCHER.handle directly to get the per-thread stdout. # sys.stdout is set globally by redirect_stdout(STDOUT_DISPATCHER) in # ProcProxyThread.run(), but another thread may restore sys.stdout # before this thread finishes, causing output to leak to the terminal. from xonsh.procs.proxies import STDOUT_DISPATCHER if STDOUT_DISPATCHER.available: out_target = STDOUT_DISPATCHER.handle else: out_target = sys.stdout stdout_has_buffer = hasattr(out_target, "buffer") nl = b"\n" cr = b"\r" crnl = b"\r\n" for line in self.iterraw(): # write to stdout line ASAP, if needed if stream: try: if stdout_has_buffer: out_target.buffer.write(line) else: out_target.write(line.decode(encoding=enc, errors=err)) out_target.flush() except OSError as e: if e.errno in (errno.EPIPE, errno.EINVAL): # Downstream process closed the pipe. Stop streaming # but keep collecting raw output for captured result. # Linux: errno.EPIPE (32, BrokenPipeError) # Windows: errno.EINVAL (22, "Invalid argument") stream = False else: raise # save the raw bytes raw_out_lines.append(line) # do some munging of the line before we return it if line.endswith(crnl): line = line[:-2] + nl elif line.endswith(cr): line = line[:-1] + nl line = line.decode(encoding=enc, errors=err) line = RE_HIDE_ESCAPE.sub("", line) # tee it up! lines.append(line) yield line # using join is more efficient than concatenating in a loop self._raw_output = b"".join(raw_out_lines)
[docs] def stream_stderr(self, lines): """Streams lines to sys.stderr and the errors attribute.""" if not lines: return env = XSH.env enc = env.get("XONSH_ENCODING") err = env.get("XONSH_ENCODING_ERRORS") b = b"".join(lines) if self.stderr_prefix: b = self.stderr_prefix + b if self.stderr_postfix: b += self.stderr_postfix # Use STDERR_DISPATCHER.handle for per-thread stderr (same race fix # as tee_stdout — see comment there). from xonsh.procs.proxies import STDERR_DISPATCHER if STDERR_DISPATCHER.available: err_target = STDERR_DISPATCHER.handle else: err_target = sys.stderr stderr_has_buffer = hasattr(err_target, "buffer") show_stderr = self.captured != "object" or env.get( "XONSH_SUBPROC_CAPTURED_PRINT_STDERR", True ) if show_stderr: # write bytes to std stream try: if stderr_has_buffer: err_target.buffer.write(b) else: err_target.write(b.decode(encoding=enc, errors=err)) err_target.flush() except OSError as e: if e.errno not in (errno.EPIPE, errno.EINVAL): raise # Downstream process closed the pipe. # Linux: errno.EPIPE (32, BrokenPipeError) # Windows: errno.EINVAL (22, "Invalid argument") # accumulate the raw bytes self._raw_error += b # do some munging of the line before we save it to the attr b = b.replace(b"\r\n", b"\n").replace(b"\r", b"\n") env = XSH.env s = b.decode( encoding=env.get("XONSH_ENCODING"), errors=env.get("XONSH_ENCODING_ERRORS") ) s = RE_HIDE_ESCAPE.sub("", s) # set the errors if self.errors is None: self.errors = s else: self.errors += s
def _decode_uninew(self, b, universal_newlines=None): """Decode bytes into a str and apply universal newlines as needed.""" if not b: return "" if isinstance(b, bytes | bytearray): env = XSH.env s = b.decode( encoding=env.get("XONSH_ENCODING"), errors=env.get("XONSH_ENCODING_ERRORS"), ) else: s = b if universal_newlines or self.spec.universal_newlines: s = s.replace("\r\n", "\n").replace("\r", "\n") return s # # Ending methods #
[docs] def end(self, tee_output=True): """ End the pipeline, return the controlling terminal if needed. Main things done in self._end(). """ if self.ended: return self._end(tee_output=tee_output) self._return_terminal()
def _end(self, tee_output): """Waits for the command to complete and then runs any closing and cleanup procedures that need to be run. """ try: if tee_output: for _ in self.tee_stdout(): pass self._endtime() # since we are driven by getting output, input may not be available # until the command has completed. self._set_input() finally: if ( not hasattr(self.proc, "prevs_are_closed") or not self.proc.prevs_are_closed ): self._close_prev_procs() self._close_proc() # Mark as ended even if an exception occurred (e.g. KeyboardInterrupt). # Without this, subsequent access to the pipeline would try to # re-read from already-closed pipes → ValueError. self.ended = True self._check_signal() self._apply_to_history() self._apply_to_thread_local() self._raise_subproc_error() def _save_term_state(self): """Save terminal attributes so we can restore them exactly later.""" try: import termios self._term_state = termios.tcgetattr(sys.stdin.fileno()) except (termios.error, OSError, ValueError): self._term_state = None def _return_terminal(self): if xp.ON_WINDOWS or not xp.ON_POSIX: return pgid = os.getpgid(0) if self.term_pgid is None or pgid == self.term_pgid: return if xj.give_terminal_to(pgid): # if gave term succeed self.term_pgid = pgid if self._term_state is not None: # Restore exact terminal state saved before the subprocess ran. # The old approach (stty sane) reset to canonical mode which # broke prompt-toolkit's raw mode after keybinding handlers. try: import termios termios.tcsetattr( sys.stdin.fileno(), termios.TCSANOW, self._term_state ) except (termios.error, OSError, ValueError): pass elif XSH.shell is not None: # Fallback when no saved state is available. XSH.shell.shell.restore_tty_sanity()
[docs] def resume(self, job, tee_output=True): self.ended = False if xj.give_terminal_to(job["pgrp"]): self.term_pgid = job["pgrp"] xj._continue(job) self.end(tee_output=tee_output)
def _endtime(self): """Sets the closing timestamp if it hasn't been already.""" if self.endtime is None: self.endtime = time.time() def _safe_close(self, handle): # Skip integer fds — they are owned by PipeChannel and closed there. if isinstance(handle, int): return safe_fdclose(handle, cache=self._closed_handle_cache) def _procs_suspended(self): """Check procs and return suspended proc.""" for proc in self.procs: info = xj.proc_untraced_waitpid(proc, hang=False) if getattr(proc, "suspended", False): proc = getattr(proc, "proc", proc) procname = f"{getattr(proc, 'args', '')} with pid {proc.pid}".strip() print( f"Process {procname} was suspended with signal {info['signal_name']} and placed in `jobs`.\n" f"This happens when a process starts waiting for input but there is no terminal attached in captured mode.", file=sys.stderr, ) return proc def _any_proc_running(self): """Boolean for if all previous processes have completed. If there is only a single process in the pipeline, this returns False. """ for p in self.procs: if p.poll() is None: return True return False def _signal_pipeline(self): """Send SIGINT to all alive pipeline processes. Two process groups are involved when a pipeline contains a callable alias (ProcProxyThread): * The *pipeline group* (self._pgid) — holds the non-proxy subprocesses of the outer pipeline (e.g. ``sleep 100`` and ``echo 1`` in ``sleep 100 | echo 1 | alias``). Reached by os.killpg() below. * *xonsh's own group* — holds subprocesses spawned *inside* the callable alias (they run on a non-main thread, so CommandPipeline.__init__ sets their pipeline_group to os.getpgid(0)). These are killed directly by the kernel when Ctrl+C sends SIGINT to the foreground (xonsh) group; no action is needed here. """ if not xp.ON_POSIX: return # Kill the pipeline process group (covers grouped subprocesses) if self._pgid is not None: try: os.killpg(self._pgid, signal.SIGINT) except (ProcessLookupError, OSError): pass # Also signal individual procs that may be in other groups my_pid = os.getpid() for p in self.procs: if p is not None and p.poll() is None: pid = getattr(p, "pid", None) # Skip ProcProxyThread whose pid == xonsh's own pid if pid and pid != my_pid: try: os.kill(pid, signal.SIGINT) except (ProcessLookupError, OSError): pass def _prev_procs_done(self): """Boolean for if all previous processes have completed. If there is only a single process in the pipeline, this returns False. """ any_running = False for s, p in zip(self.specs[:-1], self.procs[:-1], strict=False): if p is None or p.poll() is None: any_running = True continue # Ensure thread is fully done - poll() returns non-None # before run() finishes closing pipe channels. if hasattr(p, "join"): p.join(timeout=0.5) self._safe_close(s.stdin) self._safe_close(s.stdout) self._safe_close(s.stderr) # Close ONLY the writer end of any connecting pipe. The reader # end is in active use by the next proc in the pipeline (which # may still be draining buffered data); closing it here would # invalidate the fd mid-read and surface as # `OSError: [Errno 9] Bad file descriptor` in the consumer # (e.g. a callable alias iterating over `stdin`). # The reader is closed later in `_close_prev_procs` / # `_close_proc`, after the next proc has finished. for ch in s.pipe_channels: ch.close_writer() self._safe_close(p.stdin) self._safe_close(p.stdout) self._safe_close(p.stderr) # Close ONLY the writer. Described above. for ch in getattr(p, "pipe_channels", ()): ch.close_writer() return False if any_running else (len(self) > 1) def _close_prev_procs(self): """Closes all but the last proc's stdout.""" for s, p in zip(self.specs[:-1], self.procs[:-1], strict=False): self._safe_close(s.stdin) self._safe_close(s.stderr) # Close read ends of connection pipes to unblock any blocked writes, # then wait for the proc thread to finish to prevent fd-reuse races. for ch in s.pipe_channels: ch.close_reader() if p is not None: try: # Use join for threads (ProcProxyThread.wait ignores timeout) if hasattr(p, "join"): p.join(timeout=3) else: p.wait(timeout=3) except BaseException: # BaseException (not Exception) — KeyboardInterrupt during # this wait must not prevent closing FDs below. The # _interrupted flag on the proc is already set by # _signal_int and will be handled by the caller. pass self._safe_close(s.stdout) for ch in s.pipe_channels: ch.close() if p is None: continue self._safe_close(p.stdin) self._safe_close(p.stdout) self._safe_close(p.stderr) for ch in getattr(p, "pipe_channels", ()): ch.close() def _close_proc(self): """Closes last proc's stdout.""" s = self.spec p = self.proc # Wait for the last proc thread to finish before closing handles # it may still be flushing. Without this, tee.close() in the # caller can destroy the mem buffer while the thread still uses it. # Only join threads — ProcProxy.wait() is not idempotent (it re-runs # parse_proxy_return, duplicating output). if p is not None and hasattr(p, "join"): try: p.join(timeout=3) except Exception: pass self._safe_close(s.stdin) self._safe_close(s.stdout) self._safe_close(s.stderr) self._safe_close(s.captured_stdout) self._safe_close(s.captured_stderr) for ch in s.pipe_channels: ch.close() if p is None: return self._safe_close(p.stdin) self._safe_close(p.stdout) self._safe_close(p.stderr) for ch in getattr(p, "pipe_channels", ()): ch.close() def _set_input(self): """Sets the input variable.""" if self.proc is None: return stdin = self.proc.stdin if ( stdin is None or isinstance(stdin, int) or stdin.closed or not stdin.seekable() or not safe_readable(stdin) ): input = b"" else: stdin.seek(0) input = stdin.read() self.input = self._decode_uninew(input) def _check_signal(self): """Checks if a signal was received and issues a message.""" proc_signal = getattr(self.proc, "signal", None) if proc_signal is None: return sig, core = proc_signal sig_str = SIGNAL_MESSAGES.get(sig) if sig_str: if core: sig_str += " (core dumped)" print(sig_str, file=sys.stderr) if self.errors is not None: self.errors += sig_str + "\n" def _apply_to_history(self): """Applies the results to the current history object.""" hist = XSH.history if hist is not None: hist.last_cmd_rtn = 1 if self.proc is None else self.proc.returncode def _apply_to_thread_local(self): """Store the return code in the thread-local dict if present.""" tl = XSH.env.get("__THREAD_LOCAL__") if tl is not None: tl["returncode"] = 1 if self.proc is None else self.proc.returncode def _raise_subproc_error(self): """Raises a subprocess error, if we are supposed to.""" spec = self.spec rtn = self.returncode if rtn is None or rtn == 0: return raise_subproc_error = spec.raise_subproc_error if callable(raise_subproc_error): raise_subproc_error = raise_subproc_error(spec, self) # @error_ignore — never raise. if raise_subproc_error is False: return # @error_raise — always raise, even mid-chain. if raise_subproc_error is True: try: raise subprocess.CalledProcessError(rtn, spec.args, output=self.output) finally: # needed to get a working terminal in interactive mode self._return_terminal() return # Default: defer chain operands to the BoolOp wrapper. if getattr(spec, "in_boolop", False): return # Standalone — only raise here if the user explicitly opted in # to per-command raising. Otherwise let the AST wrapper around # the statement do it via $XONSH_SUBPROC_RAISE_ERROR. if XSH.env.get("XONSH_SUBPROC_CMD_RAISE_ERROR"): try: raise subprocess.CalledProcessError(rtn, spec.args, output=self.output) finally: # needed to get a working terminal in interactive mode self._return_terminal() # # Properties # @property def stdin(self): """Process stdin.""" return self.proc.stdin @property def stdout(self): """Process stdout.""" return self.proc.stdout @property def stderr(self): """Process stderr.""" return self.proc.stderr @property def inp(self): """Creates normalized input string from args.""" return " ".join(self.args)
[docs] def get_formatted_lines(self, lines): """Format output lines.""" fmt = self.output_format if fmt == "stream_lines": if len(lines) == 1: return lines[0].rstrip("\n") else: return "".join(lines) elif fmt == "list_lines": if not lines: return lines elif len(lines) == 1: return [lines[0].rstrip("\n")] else: return [line.rstrip("\n") for line in lines] elif callable(fmt): return fmt(lines)
@property def output(self): """Non-blocking, lazy access to output""" if self.ended: if self._output is None: self._output = self.get_formatted_lines(self.lines) return self._output else: return self.get_formatted_lines(self.lines)
[docs] @blocking_property def out(self): """Output value as a str.""" self.end() return self.output
[docs] @blocking_property def err(self): """Error messages as a string.""" self.end() return self.errors
[docs] @blocking_property def raw_out(self): """Output as raw bytes.""" self.end() return self._raw_output
[docs] @blocking_property def raw_err(self): """Errors as raw bytes.""" self.end() return self._raw_error
@property def pid(self): """Process identifier.""" return self.proc.pid if self.proc else None @property def pipestatus(self): """Current status. Return codes of all commands in the pipeline.""" return [None if p is None else p.returncode for p in self.procs] @property def pipecode(self): """Current status. 1 if any command in the pipeline returned non-zero or is still running, 0 if all succeeded.""" return 0 if all(r == 0 for r in self.pipestatus) else 1
[docs] @blocking_property def returncode(self): """Process return code, waits until command is completed.""" self.end() if self.proc is None: return 1 return self.proc.returncode
@property def args(self): """Arguments to the process.""" return self.spec.args
[docs] @blocking_property def rtn(self): """Alias to return code.""" return self.returncode
@property def alias(self): """Alias the process used.""" return self.spec.alias @property def stdin_redirect(self): """Redirection used for stdin.""" stdin = self.spec.stdin name = getattr(stdin, "name", "<stdin>") mode = getattr(stdin, "mode", "r") return [name, mode] @property def stdout_redirect(self): """Redirection used for stdout.""" stdout = self.spec.stdout name = getattr(stdout, "name", "<stdout>") mode = getattr(stdout, "mode", "a") return [name, mode] @property def stderr_redirect(self): """Redirection used for stderr.""" stderr = self.spec.stderr name = getattr(stderr, "name", "<stderr>") mode = getattr(stderr, "mode", "a") return [name, mode] @property def timestamps(self): """The start and end time stamps.""" return [self.starttime, self.endtime] @property def executed_cmd(self): """The resolve and executed command.""" return self.spec.cmd @property def stderr_prefix(self): """Prefix to print in front of stderr, as bytes.""" p = self._stderr_prefix if p is None: env = XSH.env t = env.get("XONSH_STDERR_PREFIX") s = xt.format_std_prepost(t, env=env) p = s.encode( encoding=env.get("XONSH_ENCODING"), errors=env.get("XONSH_ENCODING_ERRORS"), ) self._stderr_prefix = p return p @property def stderr_postfix(self): """Postfix to print after stderr, as bytes.""" p = self._stderr_postfix if p is None: env = XSH.env t = env.get("XONSH_STDERR_POSTFIX") s = xt.format_std_prepost(t, env=env) p = s.encode( encoding=env.get("XONSH_ENCODING"), errors=env.get("XONSH_ENCODING_ERRORS"), ) self._stderr_postfix = p return p
[docs] class HiddenCommandPipeline(CommandPipeline): def __repr__(self): return ""
[docs] def resume_process(p): """Sends SIGCONT to a process if possible.""" can_send_signal = ( hasattr(p, "send_signal") and xp.ON_POSIX and not xp.ON_MSYS and not xp.ON_CYGWIN ) if can_send_signal: try: p.send_signal(signal.SIGCONT) except PermissionError: pass
[docs] class PrevProcCloser(threading.Thread): """Previous process closer thread for pipelines whose last command is itself unthreadable. This makes sure that the pipeline is driven forward and does not deadlock. """ def __init__(self, pipeline): """ Parameters ---------- pipeline : CommandPipeline The pipeline whose prev procs we should close. """ self.pipeline = pipeline super().__init__() self.daemon = True self.start()
[docs] def run(self): """Runs the closing algorithm.""" pipeline = self.pipeline check_prev_done = len(pipeline.procs) == 1 if check_prev_done: return proc = pipeline.proc prev_end_time = None timeout = XSH.env.get("XONSH_PROC_FREQUENCY") sleeptime = min(timeout * 1000, 0.1) while proc.poll() is None: if not check_prev_done: # In the case of pipelines with more than one command # we should give the commands a little time # to start up fully. This is particularly true for # GNU Parallel, which has a long startup time. pass elif pipeline._prev_procs_done(): pipeline._close_prev_procs() proc.prevs_are_closed = True break if not check_prev_done: # if we are piping... if prev_end_time is None: # or see if we already know that the next-to-last # proc in the pipeline has ended. if pipeline._prev_procs_done(): # if it has, record the time prev_end_time = time.time() elif time.time() - prev_end_time >= 0.1: # if we still don't have any output, even though the # next-to-last proc has finished, wait a bit to make # sure we have fully started up, etc. check_prev_done = True # this is for CPU usage time.sleep(sleeptime)