"""Subprocess specification and related utilities."""
import contextlib
import inspect
import io
import os
import pathlib
import re
import shlex
import signal
import stat
import subprocess
import sys
import xonsh.lib.lazyasd as xl
import xonsh.lib.lazyimps as xli
import xonsh.platform as xp
import xonsh.procs.jobs as xj
import xonsh.tools as xt
from xonsh.built_ins import XSH
from xonsh.procs.executables import locate_executable
from xonsh.procs.pipelines import (
STDOUT_CAPTURE_KINDS,
CommandPipeline,
HiddenCommandPipeline,
resume_process,
)
from xonsh.procs.posix import PopenThread
from xonsh.procs.proxies import ProcProxy, ProcProxyThread
from xonsh.procs.readers import ConsoleParallelReader
@xl.lazyobject
def RE_SHEBANG():
return re.compile(r"#![ \t]*(.+?)$")
[docs]
def is_app_execution_alias(fname):
"""App execution aliases behave strangly on Windows and Python.
Here we try to detect if a file is an app execution alias.
"""
fname = pathlib.Path(fname)
try:
return fname.stat().st_reparse_tag == stat.IO_REPARSE_TAG_APPEXECLINK
# os.stat().st_reparse_tag is python 3.8+, and os.stat(app_exec_alias) throws OSError for <= 3.7
# so use old method as fallback
except (AttributeError, OSError):
return not os.path.exists(fname) and fname.name in os.listdir(fname.parent)
def _is_binary(fname, limit=80):
try:
with open(fname, "rb") as f:
for _ in range(limit):
char = f.read(1)
if char == b"\0":
return True
if char == b"\n":
return False
if char == b"":
return
except OSError as e:
if xp.ON_WINDOWS and is_app_execution_alias(fname):
return True
raise e
return False
def _un_shebang(x):
if x == "/usr/bin/env":
return []
elif any(x.startswith(i) for i in ["/usr/bin", "/usr/local/bin", "/bin"]):
x = os.path.basename(x)
elif x.endswith("python") or x.endswith("python.exe"):
x = "python"
if x == "xonsh":
return ["python", "-m", "xonsh.main"]
return [x]
[docs]
def get_script_subproc_command(fname, args):
"""Given the name of a script outside the path, returns a list representing
an appropriate subprocess command to execute the script or None if
the argument is not readable or not a script. Raises PermissionError
if the script is not executable.
"""
# make sure file is executable
if not os.access(fname, os.X_OK):
if not xp.ON_CYGWIN:
raise PermissionError
# explicitly look at all PATH entries for cmd
w_path = os.getenv("PATH").split(":")
w_fpath = list(map(lambda p: p + os.sep + fname, w_path))
if not any(list(map(lambda c: os.access(c, os.X_OK), w_fpath))):
raise PermissionError
if xp.ON_POSIX and not os.access(fname, os.R_OK):
# on some systems, some important programs (e.g. sudo) will have
# execute permissions but not read/write permissions. This enables
# things with the SUID set to be run. Needs to come before _is_binary()
# is called, because that function tries to read the file.
return None
elif _is_binary(fname):
# if the file is a binary, we should call it directly
return None
if xp.ON_WINDOWS:
# Windows can execute various filetypes directly
# as given in PATHEXT
_, ext = os.path.splitext(fname)
if ext.upper() in XSH.env.get("PATHEXT"):
return [fname] + args
# find interpreter
with open(fname, "rb") as f:
first_line = f.readline().decode().strip()
m = RE_SHEBANG.match(first_line)
# xonsh is the default interpreter
if m is None:
interp = ["xonsh"]
else:
interp = m.group(1).strip()
if len(interp) > 0:
interp = shlex.split(interp)
else:
interp = ["xonsh"]
if xp.ON_WINDOWS:
o = []
for i in interp:
o.extend(_un_shebang(i))
interp = o
return interp + [fname] + args
@xl.lazyobject
def _REDIR_REGEX():
name = r"(o(?:ut)?|e(?:rr)?|a(?:ll)?|&?\d?)"
return re.compile(f"{name}(>?>|<){name}$")
@xl.lazyobject
def _MODES():
return {">>": "a", ">": "w", "<": "r"}
@xl.lazyobject
def _WRITE_MODES():
return frozenset({"w", "a"})
@xl.lazyobject
def _REDIR_ALL():
return frozenset({"&", "a", "all"})
@xl.lazyobject
def _REDIR_ERR():
return frozenset({"2", "e", "err"})
@xl.lazyobject
def _REDIR_OUT():
return frozenset({"", "1", "o", "out"})
@xl.lazyobject
def _E2O_MAP():
return frozenset({f"{e}>{o}" for e in _REDIR_ERR for o in _REDIR_OUT if o != ""})
@xl.lazyobject
def _O2E_MAP():
return frozenset({f"{o}>{e}" for e in _REDIR_ERR for o in _REDIR_OUT if o != ""})
[docs]
def safe_open(fname, mode, buffering=-1):
"""Safely attempts to open a file in for xonsh subprocs."""
# file descriptors
try:
return open(fname, mode, buffering=buffering)
except PermissionError as ex:
raise xt.XonshError(f"xonsh: {fname}: permission denied") from ex
except FileNotFoundError as ex:
raise xt.XonshError(f"xonsh: {fname}: no such file or directory") from ex
except Exception as ex:
raise xt.XonshError(f"xonsh: {fname}: unable to open file") from ex
[docs]
def safe_close(x):
"""Safely attempts to close an object."""
if not isinstance(x, io.IOBase):
return
if x.closed:
return
try:
x.close()
except Exception:
pass
def _parse_redirects(r, loc=None):
"""returns origin, mode, destination tuple"""
orig, mode, dest = _REDIR_REGEX.match(r).groups()
# redirect to fd
if dest.startswith("&"):
try:
dest = int(dest[1:])
if loc is None:
loc, dest = dest, "" # NOQA
else:
e = f"Unrecognized redirection command: {r}"
raise xt.XonshError(e)
except (ValueError, xt.XonshError):
raise
except Exception:
pass
mode = _MODES.get(mode, None)
if mode == "r" and (len(orig) > 0 or len(dest) > 0):
raise xt.XonshError(f"Unrecognized redirection command: {r}")
elif mode in _WRITE_MODES and len(dest) > 0:
raise xt.XonshError(f"Unrecognized redirection command: {r}")
return orig, mode, dest
def _redirect_streams(r, loc=None):
"""Returns stdin, stdout, stderr tuple of redirections."""
if isinstance(loc, list):
raise Exception(f"Unsupported redirect: {r!r} {loc!r}")
stdin = stdout = stderr = None
no_ampersand = r.replace("&", "")
# special case of redirecting stderr to stdout
if no_ampersand in _E2O_MAP:
stderr = subprocess.STDOUT
return stdin, stdout, stderr
elif no_ampersand in _O2E_MAP:
stdout = 2 # using 2 as a flag, rather than using a file object
return stdin, stdout, stderr
# get streams
orig, mode, dest = _parse_redirects(r)
if mode == "r":
stdin = safe_open(loc, mode)
elif mode in _WRITE_MODES:
if orig in _REDIR_ALL:
stdout = stderr = safe_open(loc, mode)
elif orig in _REDIR_OUT:
stdout = safe_open(loc, mode)
elif orig in _REDIR_ERR:
stderr = safe_open(loc, mode)
else:
raise xt.XonshError(f"Unrecognized redirection command: {r}")
else:
raise xt.XonshError(f"Unrecognized redirection command: {r}")
return stdin, stdout, stderr
def _flatten_cmd_redirects(cmd):
"""Transforms a command like ['ls', ('>', '/dev/null')] into ['ls', '>', '/dev/null']."""
new_cmd = []
for c in cmd:
if isinstance(c, tuple):
new_cmd.extend(c)
else:
new_cmd.append(c)
return new_cmd
[docs]
def default_signal_pauser(n, f):
"""Pauses a signal, as needed."""
signal.pause()
[docs]
def no_pg_xonsh_preexec_fn():
"""Default subprocess preexec function for when there is no existing
pipeline group.
"""
os.setpgrp()
signal.signal(signal.SIGTSTP, default_signal_pauser)
[docs]
class DecoratorAlias:
"""Decorator alias base class."""
descr = "DecoratorAlias base."
def __call__(
self,
args,
stdin=None,
stdout=None,
stderr=None,
spec=None,
stack=None,
**kwargs,
):
print(self.descr, file=stdout)
[docs]
def decorate_spec(self, spec):
"""Modify spec immediately after modifier added."""
pass
[docs]
def decorate_spec_pre_run(self, pipeline, spec, spec_num):
"""Modify spec before run."""
pass
[docs]
class SpecAttrDecoratorAlias(DecoratorAlias):
"""Decorator Alias for spec attributes."""
def __init__(self, set_attributes: dict, descr=""):
self.set_attributes = set_attributes
self.descr = descr
super().__init__()
[docs]
def decorate_spec(self, spec):
for a, v in self.set_attributes.items():
setattr(spec, a, v)
[docs]
class SubprocSpec:
"""A container for specifying how a subprocess command should be
executed.
"""
kwnames = ("stdin", "stdout", "stderr", "universal_newlines", "close_fds")
def __init__(
self,
cmd,
cls=subprocess.Popen,
stdin=None,
stdout=None,
stderr=None,
universal_newlines=False,
close_fds=False,
captured=False,
env=None,
):
"""
Parameters
----------
cmd : list of str
Command to be run.
cls : Popen-like
Class to run the subprocess with.
stdin : file-like
Popen file descriptor or flag for stdin.
stdout : file-like
Popen file descriptor or flag for stdout.
stderr : file-like
Popen file descriptor or flag for stderr.
universal_newlines : bool
Whether or not to use universal newlines.
close_fds : bool
Whether or not to close the file descriptors when the
process exits.
captured : bool or str, optional
The flag for if the subprocess is captured, may be one of:
False for $[], 'stdout' for $(), 'hiddenobject' for ![], or
'object' for !().
env : dict
Replacement environment to run the subporcess in.
Attributes
----------
args : list of str
Arguments as originally supplied.
alias : list of str, callable, or None
The alias that was resolved for this command, if any.
binary_loc : str or None
Path to binary to execute.
is_proxy : bool
Whether or not the subprocess is or should be run as a proxy.
background : bool
Whether or not the subprocess should be started in the background.
threadable : bool
Whether or not the subprocess is able to be run in a background
thread, rather than the main thread.
pipeline_index : int or None
The index number of this sepc into the pipeline that is being setup.
last_in_pipeline : bool
Whether the subprocess is the last in the execution pipeline.
captured_stdout : file-like
Handle to captured stdin
captured_stderr : file-like
Handle to captured stderr
stack : list of FrameInfo namedtuples or None
The stack of the call-site of alias, if the alias requires it.
None otherwise.
"""
self._stdin = self._stdout = self._stderr = None
# args
self.cmd = list(cmd)
self.cls = cls
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.universal_newlines = universal_newlines
self.close_fds = close_fds
self.captured = captured
if env is not None:
self.env = {
k: v if not (isinstance(v, list)) or len(v) > 1 else v[0]
for (k, v) in env.items()
}
else:
self.env = None
# pure attrs
self.args = _flatten_cmd_redirects(cmd)
self.alias = None
self.alias_name = None
self.alias_stack = XSH.env.get("__ALIAS_STACK", "").split(":")
self.binary_loc = None
self.is_proxy = False
self.background = False
self.threadable = True
self.force_threadable = None # Set this value to ignore threadable prediction.
self.pipeline_index = None
self.last_in_pipeline = False
self.captured_stdout = None
self.captured_stderr = None
self.stack = None
self.decorators = [] # List of DecoratorAlias objects that applied to spec.
self.output_format = XSH.env.get("XONSH_SUBPROC_OUTPUT_FORMAT", "stream_lines")
self.raise_subproc_error = None # Spec-based $RAISE_SUBPROC_ERROR.
def __str__(self):
s = self.__class__.__name__ + "(" + str(self.cmd) + ", "
s += self.cls.__name__ + ", "
kws = [n + "=" + str(getattr(self, n)) for n in self.kwnames]
s += ", ".join(kws) + ")"
return s
def __repr__(self):
s = self.__class__.__name__ + "(" + repr(self.cmd) + ", "
s += self.cls.__name__ + ", "
kws = [n + "=" + repr(getattr(self, n)) for n in self.kwnames]
s += ", ".join(kws) + ")"
return s
#
# Properties
#
@property
def stdin(self):
return self._stdin
@stdin.setter
def stdin(self, value):
if self._stdin is None:
self._stdin = value
elif value is None:
pass
else:
safe_close(value)
msg = "Multiple inputs for stdin for {0!r}"
msg = msg.format(self.get_command_str())
raise xt.XonshError(msg)
@property
def stdout(self):
return self._stdout
@stdout.setter
def stdout(self, value):
if self._stdout is None:
self._stdout = value
elif value is None:
pass
else:
safe_close(value)
msg = "Multiple redirections for stdout for {0!r}"
msg = msg.format(self.get_command_str())
raise xt.XonshError(msg)
@property
def stderr(self):
return self._stderr
@stderr.setter
def stderr(self, value):
if self._stderr is None:
self._stderr = value
elif value is None:
pass
else:
safe_close(value)
msg = "Multiple redirections for stderr for {0!r}"
msg = msg.format(self.get_command_str())
raise xt.XonshError(msg)
[docs]
def get_command_str(self):
return " ".join(arg for arg in self.args)
#
# Execution methods
#
[docs]
def run(self, *, pipeline_group=None):
"""Launches the subprocess and returns the object."""
event_name = self._cmd_event_name()
self._pre_run_event_fire(event_name)
kwargs = {n: getattr(self, n) for n in self.kwnames}
if callable(self.alias):
kwargs["env"] = self.env or {}
kwargs["env"]["__ALIAS_NAME"] = self.alias_name or ""
p = self.cls(self.alias, self.cmd, **kwargs)
else:
self.prep_env_subproc(kwargs)
self.prep_preexec_fn(kwargs, pipeline_group=pipeline_group)
self._fix_null_cmd_bytes()
p = self._run_binary(kwargs)
p.spec = self
p.last_in_pipeline = self.last_in_pipeline
p.captured_stdout = self.captured_stdout
p.captured_stderr = self.captured_stderr
self._post_run_event_fire(event_name, p)
return p
def _run_binary(self, kwargs):
if not self.cmd[0]:
raise xt.XonshError("xonsh: subprocess mode: command is empty")
bufsize = 1
try:
if xp.ON_WINDOWS and self.binary_loc is not None:
# launch process using full paths (https://bugs.python.org/issue8557)
cmd = [self.binary_loc] + self.cmd[1:]
else:
cmd = self.cmd
p = self.cls(cmd, bufsize=bufsize, **kwargs)
except PermissionError as ex:
e = "xonsh: subprocess mode: permission denied: {0}"
raise xt.XonshError(e.format(self.cmd[0])) from ex
except FileNotFoundError as ex:
cmd0 = self.cmd[0]
if len(self.cmd) == 1 and cmd0.endswith("?"):
cmdq = cmd0.rstrip("?")
if cmdq in XSH.aliases:
alias = XSH.aliases[cmdq]
descr = (
repr(alias) + (":\n" + doc)
if (doc := getattr(alias, "__doc__", ""))
else ""
)
return self.cls(["echo", descr], bufsize=bufsize, **kwargs)
else:
with contextlib.suppress(OSError):
return self.cls(["man", cmdq], bufsize=bufsize, **kwargs)
e = f"xonsh: subprocess mode: command not found: {repr(cmd0)}"
env = XSH.env
sug = xt.suggest_commands(cmd0, env)
if len(sug.strip()) > 0:
e += "\n" + xt.suggest_commands(cmd0, env)
if XSH.env.get("XONSH_INTERACTIVE"):
events = XSH.builtins.events
events.on_command_not_found.fire(cmd=self.cmd)
raise xt.XonshError(e) from ex
return p
[docs]
def prep_env_subproc(self, kwargs):
"""Prepares the environment to use in the subprocess."""
with XSH.env.swap(self.env) as env:
denv = env.detype()
if xp.ON_WINDOWS:
# Over write prompt variable as xonsh's $PROMPT does
# not make much sense for other subprocs
denv["PROMPT"] = "$P$G"
kwargs["env"] = denv
[docs]
def prep_preexec_fn(self, kwargs, pipeline_group=None):
"""Prepares the 'preexec_fn' keyword argument"""
if not xp.ON_POSIX:
return
if not XSH.env.get("XONSH_INTERACTIVE"):
return
if pipeline_group is None or xp.ON_WSL1:
# If there is no pipeline group
# or the platform is windows subsystem for linux (WSL)
xonsh_preexec_fn = no_pg_xonsh_preexec_fn
else:
def xonsh_preexec_fn():
"""Preexec function bound to a pipeline group."""
os.setpgid(0, pipeline_group)
signal.signal(
signal.SIGTERM if xp.ON_WINDOWS else signal.SIGTSTP,
default_signal_pauser,
)
kwargs["preexec_fn"] = xonsh_preexec_fn
def _fix_null_cmd_bytes(self):
# Popen does not accept null bytes in its input commands.
# That doesn't stop some subprocesses from using them. Here we
# escape them just in case.
cmd = self.cmd
for i in range(len(cmd)):
if callable(cmd[i]):
raise Exception(f"The command contains callable argument: {cmd[i]}")
cmd[i] = cmd[i].replace("\0", "\\0")
def _cmd_event_name(self):
if callable(self.alias):
return getattr(self.alias, "__name__", repr(self.alias))
elif self.binary_loc is None:
return "<not-found>"
else:
return os.path.basename(self.binary_loc)
def _pre_run_event_fire(self, name):
events = XSH.builtins.events
event_name = "on_pre_spec_run_" + name
if events.exists(event_name):
event = getattr(events, event_name)
event.fire(spec=self)
if events.exists("on_pre_spec_run"):
event = events.on_pre_spec_run
event.fire(spec=self)
def _post_run_event_fire(self, name, proc):
events = XSH.builtins.events
event_name = "on_post_spec_run_" + name
if events.exists(event_name):
event = getattr(events, event_name)
event.fire(spec=self, proc=proc)
if events.exists("on_post_spec_run"):
event = events.on_post_spec_run
event.fire(spec=self)
#
# Building methods
#
[docs]
@classmethod
def build(kls, cmd, *, cls=subprocess.Popen, **kwargs):
"""Creates an instance of the subprocess command, with any
modifications and adjustments based on the actual cmd that
was received.
"""
if not cmd:
raise xt.XonshError("xonsh: subprocess mode: command is empty")
# modifications that do not alter cmds may come before creating instance
spec = kls(cmd, cls=cls, **kwargs)
# modifications that alter cmds must come after creating instance
spec.resolve_decorators() # keep this first
spec.resolve_args_list()
spec.resolve_redirects()
spec.resolve_alias()
spec.resolve_binary_loc()
spec.resolve_auto_cd()
spec.resolve_executable_commands()
spec.resolve_alias_cls()
spec.resolve_stack()
return spec
[docs]
def add_decorator(self, mod: DecoratorAlias):
"""Add spec modifier to the specification."""
mod.decorate_spec(self)
self.decorators.append(mod)
[docs]
def resolve_decorators(self):
"""Apply decorators."""
if (ln := len(self.cmd)) == 1:
return
for i in range(ln):
c = self.cmd[i]
if c in XSH.aliases and isinstance(mod := XSH.aliases[c], DecoratorAlias):
self.add_decorator(mod)
else:
break
self.cmd = self.cmd[i:]
[docs]
def resolve_args_list(self):
"""Weave a list of arguments into a command."""
resolved_cmd = []
for c in self.cmd:
if (
isinstance(c, tuple)
and len(c) == 2
and isinstance(c[1], list)
and len(c[1]) == 1
):
# Redirect case e.g. `> file`
resolved_cmd.append(
(
c[0],
c[1][0],
)
)
else:
resolved_cmd += c if isinstance(c, list) else [c]
self.cmd = resolved_cmd
[docs]
def resolve_redirects(self):
"""Manages redirects."""
new_cmd = []
for c in self.cmd:
if isinstance(c, tuple):
streams = _redirect_streams(*c)
self.stdin, self.stdout, self.stderr = streams
else:
new_cmd.append(c)
self.cmd = new_cmd
[docs]
def resolve_alias(self):
"""Resolving alias and setting up command."""
cmd0 = self.cmd[0]
if cmd0 in self.alias_stack:
# Disabling the alias resolving to prevent infinite loop in call stack
# and further using binary_loc to resolve the alias name.
self.alias = None
return
if callable(cmd0):
self.alias = cmd0
else:
decorators = []
if isinstance(XSH.aliases, dict):
# Windows tests
alias = XSH.aliases.get(cmd0, None)
if alias is not None:
alias = alias + self.cmd[1:]
else:
alias = XSH.aliases.get(
self.cmd,
None,
decorators=decorators,
)
if alias is not None:
self.alias_name = cmd0
if callable(alias[0]):
# E.g. `alias == [FuncAlias({'name': 'cd'}), '/tmp']`
self.alias = alias[0]
self.cmd = [cmd0] + alias[1:]
else:
# E.g. `alias == ['ls', '-la']`
self.alias = alias
for mod in decorators:
self.add_decorator(mod)
[docs]
def resolve_binary_loc(self):
"""Sets the binary location"""
alias = self.alias
if alias is None:
cmd0 = self.cmd[0]
binary_loc = locate_executable(cmd0)
if binary_loc is None and cmd0 and cmd0 in self.alias_stack:
raise Exception(f'Recursive calls to "{cmd0}" alias.')
elif callable(alias):
binary_loc = None
else:
binary_loc = locate_executable(alias[0])
self.binary_loc = binary_loc
[docs]
def resolve_auto_cd(self):
"""Implements AUTO_CD functionality."""
if not (
self.alias is None
and self.binary_loc is None
and len(self.cmd) == 1
and XSH.env.get("AUTO_CD")
and os.path.isdir(self.cmd[0])
):
return
self.cmd.insert(0, "cd")
self.alias = XSH.aliases.get("cd", None)[0]
[docs]
def resolve_executable_commands(self):
"""Resolve command executables, if applicable."""
alias = self.alias
if alias is None:
pass
elif callable(alias):
self.cmd.pop(0)
return
else:
self.cmd = alias
self.resolve_redirects()
if self.binary_loc is None:
return
try:
scriptcmd = get_script_subproc_command(self.binary_loc, self.cmd[1:])
if scriptcmd is not None:
self.cmd = scriptcmd
except PermissionError as ex:
e = "xonsh: subprocess mode: permission denied: {0}"
raise xt.XonshError(e.format(self.cmd[0])) from ex
[docs]
def resolve_alias_cls(self):
"""Determine which proxy class to run an alias with."""
if callable(self.alias):
self.is_proxy = True
_update_proc_alias_threadable(self)
_update_proc_alias_captured(self)
[docs]
def resolve_stack(self):
"""Computes the stack for a callable alias's call-site, if needed."""
if not callable(self.alias):
return
# check that we actual need the stack
sig = inspect.signature(getattr(self.alias, "func", self.alias))
if len(sig.parameters) <= 5 and "stack" not in sig.parameters:
return
# compute the stack, and filter out these build methods
# run_subproc() is the 4th command in the stack
# we want to filter out one up, e.g. subproc_captured_hiddenobject()
# after that the stack from the call site starts.
stack = inspect.stack(context=0)
if not stack[3][3].startswith("test_"):
assert stack[3][3] == "run_subproc", "xonsh stack has changed!"
del stack[:5]
self.stack = stack
def _safe_pipe_properties(fd, use_tty=False):
"""Makes sure that a pipe file descriptor properties are reasonable."""
if not use_tty:
return
# due to some weird, long standing issue in Python, PTYs come out
# replacing newline \n with \r\n. This causes issues for raw unix
# protocols, like git and ssh, which expect unix line endings.
# see https://mail.python.org/pipermail/python-list/2013-June/650460.html
# for more details and the following solution.
props = xli.termios.tcgetattr(fd)
props[1] = props[1] & (~xli.termios.ONLCR) | xli.termios.ONLRET
xli.termios.tcsetattr(fd, xli.termios.TCSANOW, props)
# newly created PTYs have a stardard size (24x80), set size to the same size
# than the current terminal
winsize = None
if sys.stdin.isatty():
winsize = xli.fcntl.ioctl(sys.stdin.fileno(), xli.termios.TIOCGWINSZ, b"0000")
elif sys.stdout.isatty():
winsize = xli.fcntl.ioctl(sys.stdout.fileno(), xli.termios.TIOCGWINSZ, b"0000")
elif sys.stderr.isatty():
winsize = xli.fcntl.ioctl(sys.stderr.fileno(), xli.termios.TIOCGWINSZ, b"0000")
if winsize is not None:
xli.fcntl.ioctl(fd, xli.termios.TIOCSWINSZ, winsize)
def _update_last_spec(last):
last.last_in_pipeline = True
if not last.captured:
return
_last_spec_update_threading(last)
_last_spec_update_captured(last)
def _last_spec_update_threading(last: SubprocSpec):
if callable(last.alias):
return
captured, env, cmds_cache = last.captured, XSH.env, XSH.commands_cache
threadable = (
captured
and env.get("THREAD_SUBPROCS")
and (captured != "hiddenobject" or env.get("XONSH_CAPTURE_ALWAYS"))
and cmds_cache.predict_threadable(last.args)
and cmds_cache.predict_threadable(last.cmd)
)
if last.force_threadable is not None:
threadable = last.force_threadable
if threadable:
if last.captured:
last.cls = PopenThread
else:
last.threadable = False
def _last_spec_update_captured(last: SubprocSpec):
captured = (
(captured := last.captured)
and not (captured in ["object", "hiddenobject"] and not last.threadable)
# a ProcProxy run using ![] should not be captured
and not (
callable(last.alias)
and last.cls is ProcProxy
and captured == "hiddenobject"
)
)
if captured:
_make_last_spec_captured(last)
def _make_last_spec_captured(last: SubprocSpec):
captured = last.captured
callable_alias = callable(last.alias)
# cannot used PTY pipes for aliases, for some dark reason,
# and must use normal pipes instead.
use_tty = xp.ON_POSIX and not callable_alias
# Do not set standard in! Popen is not a fan of redirections here
# set standard out
if last.stdout is not None:
last.universal_newlines = True
elif captured in STDOUT_CAPTURE_KINDS:
last.universal_newlines = False
r, w = os.pipe()
last.stdout = safe_open(w, "wb")
last.captured_stdout = safe_open(r, "rb")
elif XSH.stdout_uncaptured is not None:
last.universal_newlines = True
last.stdout = XSH.stdout_uncaptured
last.captured_stdout = last.stdout
elif xp.ON_WINDOWS and not callable_alias:
last.universal_newlines = True
last.stdout = None # must truly stream on windows
last.captured_stdout = ConsoleParallelReader(1)
else:
last.universal_newlines = True
r, w = xli.pty.openpty() if use_tty else os.pipe()
_safe_pipe_properties(w, use_tty=use_tty)
last.stdout = safe_open(w, "w")
_safe_pipe_properties(r, use_tty=use_tty)
last.captured_stdout = safe_open(r, "r")
# set standard error
if last.stderr is not None:
pass
elif captured == "stdout":
pass
elif captured == "object":
r, w = os.pipe()
last.stderr = safe_open(w, "w")
last.captured_stderr = safe_open(r, "r")
elif XSH.stderr_uncaptured is not None:
last.stderr = XSH.stderr_uncaptured
last.captured_stderr = last.stderr
elif xp.ON_WINDOWS and not callable_alias:
last.universal_newlines = True
last.stderr = None # must truly stream on windows
else:
r, w = xli.pty.openpty() if use_tty else os.pipe()
_safe_pipe_properties(w, use_tty=use_tty)
last.stderr = safe_open(w, "w")
_safe_pipe_properties(r, use_tty=use_tty)
last.captured_stderr = safe_open(r, "r")
# redirect stdout to stderr, if we should
if isinstance(last.stdout, int) and last.stdout == 2:
# need to use private interface to avoid duplication.
last._stdout = last.stderr
# redirect stderr to stdout, if we should
if callable_alias and last.stderr == subprocess.STDOUT:
last._stderr = last.stdout
last.captured_stderr = last.captured_stdout
def _update_proc_alias_threadable(proc):
threadable = XSH.env.get("THREAD_SUBPROCS") and getattr(
proc.alias, "__xonsh_threadable__", True
)
if proc.force_threadable is not None:
threadable = proc.force_threadable
proc.threadable = threadable
proc.cls = ProcProxyThread if proc.threadable else ProcProxy
def _update_proc_alias_captured(proc):
proc.captured = getattr(proc.alias, "__xonsh_capturable__", proc.captured)
def _trace_specs(trace_mode, specs, cmds, captured):
"""Show information about specs."""
tracer = XSH.env.get("XONSH_TRACE_SUBPROC_FUNC", None)
if callable(tracer):
tracer(cmds, captured=captured)
else:
r = {"cmds": cmds, "captured": captured}
print(f"Trace run_subproc({repr(r)})", file=sys.stderr)
if trace_mode >= 2:
for i, s in enumerate(specs):
pcls = s.cls.__module__ + "." + s.cls.__name__
pcmd = (
[s.args[0].__name__] + s.args[1:] if callable(s.args[0]) else s.args
)
p = {
"cmd": pcmd,
"cls": pcls,
}
p |= {
a: getattr(s, a, None)
for a in [
"alias_name",
"alias",
"binary_loc",
"threadable",
"background",
]
}
if trace_mode == 3:
p |= {
a: getattr(s, a, None)
for a in [
"stdin",
"stdout",
"stderr",
"captured",
"captured_stdout",
"captured_stderr",
]
}
p = {k: v for k, v in p.items() if v is not None}
print(f"{i}: {repr(p)}", file=sys.stderr)
[docs]
def cmds_to_specs(cmds, captured=False, envs=None):
"""Converts a list of cmds to a list of SubprocSpec objects that are
ready to be executed.
"""
# first build the subprocs independently and separate from the redirects
i = 0
specs = []
redirects = []
for i, cmd in enumerate(cmds):
if isinstance(cmd, str):
redirects.append(cmd)
else:
env = envs[i] if envs is not None else None
spec = SubprocSpec.build(cmd, captured=captured, env=env)
spec.pipeline_index = i
specs.append(spec)
i += 1
# now modify the subprocs based on the redirects.
for i, redirect in enumerate(redirects):
if redirect == "|":
# these should remain integer file descriptors, and not Python
# file objects since they connect processes.
r, w = os.pipe()
specs[i].stdout = w
specs[i + 1].stdin = r
elif redirect == "&" and i == len(redirects) - 1:
specs[i].background = True
else:
raise xt.XonshError(f"unrecognized redirect {redirect!r}")
# Apply boundary conditions
if not XSH.env.get("XONSH_CAPTURE_ALWAYS"):
# Make sure sub-specs are always captured in case:
# `![some_alias | grep x]`, `$(some_alias)`, `some_alias > file`.
last = spec
is_redirected_stdout = bool(last.stdout)
specs_to_capture = (
specs
if captured in STDOUT_CAPTURE_KINDS or is_redirected_stdout
else specs[:-1]
)
_set_specs_capture_always(specs_to_capture)
_update_last_spec(specs[-1])
return specs
def _set_specs_capture_always(specs_to_capture):
"""Set XONSH_CAPTURE_ALWAYS for all specs."""
for spec in specs_to_capture:
if spec.env is None:
spec.env = {"XONSH_CAPTURE_ALWAYS": True}
else:
spec.env.setdefault("XONSH_CAPTURE_ALWAYS", True)
def _shell_set_title(cmds):
if XSH.env.get("XONSH_INTERACTIVE") and XSH.shell is not None:
# context manager updates the command information that gets
# accessed by CurrentJobField when setting the terminal's title
with XSH.env["PROMPT_FIELDS"]["current_job"].update_current_cmds(cmds):
# remove current_job from prompt level cache
XSH.env["PROMPT_FIELDS"].reset_key("current_job")
# The terminal's title needs to be set before starting the
# subprocess to avoid accidentally answering interactive questions
# from commands such as `rm -i` (see #1436)
XSH.shell.settitle()
[docs]
def run_subproc(cmds, captured=False, envs=None):
"""Runs a subprocess, in its many forms. This takes a list of 'commands,'
which may be a list of command line arguments or a string, representing
a special connecting character. For example::
$ ls | grep wakka
is represented by the following cmds::
[['ls'], '|', ['grep', 'wakka']]
Lastly, the captured argument affects only the last real command.
"""
specs = cmds_to_specs(cmds, captured=captured, envs=envs)
if trace_mode := XSH.env.get("XONSH_TRACE_SUBPROC", False):
_trace_specs(trace_mode, specs, cmds, captured)
cmds = [
_flatten_cmd_redirects(cmd) if isinstance(cmd, list) else cmd for cmd in cmds
]
_shell_set_title(cmds)
return _run_specs(specs, cmds)
def _run_command_pipeline(specs, cmds):
captured = specs[-1].captured
if captured == "hiddenobject":
cp = HiddenCommandPipeline(specs)
else:
cp = CommandPipeline(specs)
proc = cp.proc
background = cp.spec.background
if not all(x.is_proxy for x in specs):
xj.add_job(
{
"cmds": cmds,
"pids": [i.pid for i in cp.procs],
"status": "suspended" if cp.suspended else "running",
"obj": proc,
"bg": background,
"pipeline": cp,
"pgrp": cp.term_pgid,
}
)
return cp
def _run_specs(specs, cmds):
cp = _run_command_pipeline(specs, cmds)
XSH.last, proc, captured, background = (
cp,
cp.proc,
specs[-1].captured,
cp.spec.background,
)
"""
For some reason, some programs are in a stopped state when the flow
reaches this point, hence a SIGCONT should be sent to `proc` to make
sure that the shell doesn't hang. See issue #2999 and the fix in PR #3000
"""
resume_process(proc)
if captured == "object":
return cp
elif captured == "hiddenobject":
if not background:
cp.end()
return cp
elif background:
return
elif captured == "stdout":
cp.end()
return cp.output
else:
cp.end()
return