Source code for xonsh.foreign_shells

"""Tools to help interface with foreign shells, such as Bash."""

import collections.abc as cabc
import functools
import os
import re
import shlex
import subprocess
import sys
import tempfile
import warnings

from xonsh.built_ins import XSH
from xonsh.lib.lazyasd import lazyobject
from xonsh.platform import ON_CYGWIN, ON_MSYS, ON_WINDOWS
from xonsh.tools import ensure_string, to_bool

COMMAND = """{seterrprevcmd}
{prevcmd}
echo __XONSH_ENV_BEG__
{envcmd}
echo __XONSH_ENV_END__
echo __XONSH_ALIAS_BEG__
{aliascmd}
echo __XONSH_ALIAS_END__
echo __XONSH_FUNCS_BEG__
{funcscmd}
echo __XONSH_FUNCS_END__
{postcmd}
{seterrpostcmd}"""

DEFAULT_BASH_FUNCSCMD = """# get function names from declare
declstr=$(echo $(declare -F))
read -r -a decls <<< $declstr
for((n=0;n<${#decls[@]};n++)); do
  if (( $(($n % 3 )) == 2 )); then
    echo -n "${decls[$n]} "
  fi
done
echo"""

DEFAULT_ZSH_FUNCSCMD = """# get function names
for name in ${(ok)functions}; do
  echo -n "$name "
done
echo"""


# mapping of shell name aliases to keys in other lookup dictionaries.
@lazyobject
def CANON_SHELL_NAMES():
    return {
        "bash": "bash",
        "/bin/bash": "bash",
        "zsh": "zsh",
        "/bin/zsh": "zsh",
        "/usr/bin/zsh": "zsh",
        "cmd": "cmd",
        "cmd.exe": "cmd",
    }


@lazyobject
def DEFAULT_ENVCMDS():
    return {"bash": "env", "zsh": "env", "cmd": "set"}


@lazyobject
def DEFAULT_ALIASCMDS():
    return {"bash": "alias", "zsh": "alias -L", "cmd": ""}


@lazyobject
def DEFAULT_FUNCSCMDS():
    return {"bash": DEFAULT_BASH_FUNCSCMD, "zsh": DEFAULT_ZSH_FUNCSCMD, "cmd": ""}


@lazyobject
def DEFAULT_SOURCERS():
    return {"bash": "source", "zsh": "source", "cmd": "call"}


@lazyobject
def DEFAULT_TMPFILE_EXT():
    return {"bash": ".sh", "zsh": ".zsh", "cmd": ".bat"}


@lazyobject
def DEFAULT_RUNCMD():
    return {"bash": "-c", "zsh": "-c", "cmd": "/C"}


@lazyobject
def DEFAULT_SETERRPREVCMD():
    return {"bash": "set -e", "zsh": "set -e", "cmd": "@echo off"}


@lazyobject
def DEFAULT_SETERRPOSTCMD():
    return {"bash": "", "zsh": "", "cmd": "if errorlevel 1 exit 1"}


[docs] @functools.lru_cache def foreign_shell_data( shell, interactive=True, login=False, envcmd=None, aliascmd=None, extra_args=(), currenv=None, safe=True, prevcmd="", postcmd="", funcscmd=None, sourcer=None, use_tmpfile=False, tmpfile_ext=None, runcmd=None, seterrprevcmd=None, seterrpostcmd=None, show=False, dryrun=False, files=(), ): """Extracts data from a foreign (non-xonsh) shells. Currently this gets the environment, aliases, and functions but may be extended in the future. Parameters ---------- shell : str The name of the shell, such as 'bash' or '/bin/sh'. interactive : bool, optional Whether the shell should be run in interactive mode. login : bool, optional Whether the shell should be a login shell. envcmd : str or None, optional The command to generate environment output with. aliascmd : str or None, optional The command to generate alias output with. extra_args : tuple of str, optional Additional command line options to pass into the shell. currenv : tuple of items or None, optional Manual override for the current environment. safe : bool, optional Flag for whether or not to safely handle exceptions and other errors. prevcmd : str, optional A command to run in the shell before anything else, useful for sourcing and other commands that may require environment recovery. postcmd : str, optional A command to run after everything else, useful for cleaning up any damage that the prevcmd may have caused. funcscmd : str or None, optional This is a command or script that can be used to determine the names and locations of any functions that are native to the foreign shell. This command should print *only* a JSON object that maps function names to the filenames where the functions are defined. If this is None, then a default script will attempted to be looked up based on the shell name. Callable wrappers for these functions will be returned in the aliases dictionary. sourcer : str or None, optional How to source a foreign shell file for purposes of calling functions in that shell. If this is None, a default value will attempt to be looked up based on the shell name. use_tmpfile : bool, optional This specifies if the commands are written to a tmp file or just parsed directly to the shell tmpfile_ext : str or None, optional If tmpfile is True this sets specifies the extension used. runcmd : str or None, optional Command line switches to use when running the script, such as -c for Bash and /C for cmd.exe. seterrprevcmd : str or None, optional Command that enables exit-on-error for the shell that is run at the start of the script. For example, this is "set -e" in Bash. To disable exit-on-error behavior, simply pass in an empty string. seterrpostcmd : str or None, optional Command that enables exit-on-error for the shell that is run at the end of the script. For example, this is "if errorlevel 1 exit 1" in cmd.exe. To disable exit-on-error behavior, simply pass in an empty string. show : bool, optional Whether or not to display the script that will be run. dryrun : bool, optional Whether or not to actually run and process the command. files : tuple of str, optional Paths to source. Returns ------- env : dict Dictionary of shell's environment. (None if the subproc command fails) aliases : dict Dictionary of shell's aliases, this includes foreign function wrappers.(None if the subproc command fails) """ cmd = [shell] cmd.extend(extra_args) # needs to come here for GNU long options if interactive: cmd.append("-i") if login: cmd.append("-l") shkey = CANON_SHELL_NAMES[shell] envcmd = DEFAULT_ENVCMDS.get(shkey, "env") if envcmd is None else envcmd aliascmd = DEFAULT_ALIASCMDS.get(shkey, "alias") if aliascmd is None else aliascmd funcscmd = DEFAULT_FUNCSCMDS.get(shkey, "echo {}") if funcscmd is None else funcscmd tmpfile_ext = ( DEFAULT_TMPFILE_EXT.get(shkey, "sh") if tmpfile_ext is None else tmpfile_ext ) runcmd = DEFAULT_RUNCMD.get(shkey, "-c") if runcmd is None else runcmd seterrprevcmd = ( DEFAULT_SETERRPREVCMD.get(shkey, "") if seterrprevcmd is None else seterrprevcmd ) seterrpostcmd = ( DEFAULT_SETERRPOSTCMD.get(shkey, "") if seterrpostcmd is None else seterrpostcmd ) command = COMMAND.format( envcmd=envcmd, aliascmd=aliascmd, prevcmd=prevcmd, postcmd=postcmd, funcscmd=funcscmd, seterrprevcmd=seterrprevcmd, seterrpostcmd=seterrpostcmd, ).strip() if show: print(command) if dryrun: return None, None cmd.append(runcmd) if not use_tmpfile: cmd.append(command) else: tmpfile = tempfile.NamedTemporaryFile(suffix=tmpfile_ext, delete=False) tmpfile.write(command.encode("utf8")) tmpfile.close() cmd.append(tmpfile.name) if currenv is None and XSH.env: currenv = XSH.env.detype() elif currenv is not None: currenv = dict(currenv) try: s = subprocess.check_output( cmd, stderr=subprocess.PIPE, env=currenv, # start new session to avoid hangs # (doesn't work on Cygwin though) start_new_session=((not ON_CYGWIN) and (not ON_MSYS)), text=True, ) except (subprocess.CalledProcessError, FileNotFoundError): if not safe: raise return None, None finally: if use_tmpfile: os.remove(tmpfile.name) env = parse_env(s) aliases = parse_aliases( s, shell=shell, sourcer=sourcer, files=files, extra_args=extra_args, ) funcs = parse_funcs( s, shell=shell, sourcer=sourcer, files=files, extra_args=extra_args, ) aliases.update(funcs) return env, aliases
@lazyobject def ENV_RE(): return re.compile("__XONSH_ENV_BEG__\n(.*)" "__XONSH_ENV_END__", flags=re.DOTALL) @lazyobject def ENV_SPLIT_RE(): return re.compile("^([^=]+)=([^=]*|[^\n]*)$", flags=re.DOTALL | re.MULTILINE)
[docs] def parse_env(s): """Parses the environment portion of string into a dict.""" m = ENV_RE.search(s) if m is None: return {} g1 = m.group(1) g1 = g1[:-1] if g1.endswith("\n") else g1 env = dict(ENV_SPLIT_RE.findall(g1)) return env
@lazyobject def ALIAS_RE(): return re.compile( "__XONSH_ALIAS_BEG__\n(.*)" "__XONSH_ALIAS_END__", flags=re.DOTALL ) @lazyobject def FS_EXEC_ALIAS_RE(): return re.compile(r";|`|\$\(")
[docs] def parse_aliases(s, shell, sourcer=None, files=(), extra_args=()): """Parses the aliases portion of string into a dict.""" m = ALIAS_RE.search(s) if m is None: return {} g1 = m.group(1) g1 = g1.replace("\\\n", " ") items = [ line.split("=", 1) for line in g1.splitlines() if line.startswith("alias ") and "=" in line ] aliases = {} for key, value in items: try: key = key[6:] # lstrip 'alias ' # undo bash's weird quoting of single quotes (sh_single_quote) value = value.replace("'\\''", "'") # strip one single quote at the start and end of value if value[0] == "'" and value[-1] == "'": value = value[1:-1] # now compute actual alias if FS_EXEC_ALIAS_RE.search(value) is None: # simple list of args alias value = shlex.split(value) else: # alias is more complex, use ExecAlias, but via shell value = ForeignShellExecAlias( src=value, shell=shell, sourcer=sourcer, files=files, extra_args=extra_args, ) except ValueError as exc: warnings.warn( f'could not parse alias "{key}": {exc!r}', RuntimeWarning, stacklevel=2, ) continue aliases[key] = value return aliases
@lazyobject def FUNCS_RE(): return re.compile( "__XONSH_FUNCS_BEG__\n(.+)\n" "__XONSH_FUNCS_END__", flags=re.DOTALL )
[docs] def parse_funcs(s, shell, sourcer=None, files=(), extra_args=()): """Parses the funcs portion of a string into a dict of callable foreign function wrappers. """ m = FUNCS_RE.search(s) if m is None: return {} g1 = m.group(1) if ON_WINDOWS: g1 = g1.replace(os.sep, os.altsep) funcnames = g1.split() funcs = {} for funcname in funcnames: if funcname.startswith("_"): continue # skip private functions and invalid files wrapper = ForeignShellFunctionAlias( funcname=funcname, shell=shell, sourcer=sourcer, files=files, extra_args=extra_args, ) funcs[funcname] = wrapper return funcs
[docs] class ForeignShellBaseAlias: """This class is responsible for calling foreign shell functions as if they were aliases. This does not currently support taking stdin. """ INPUT = "echo ForeignShellBaseAlias {shell} {args}\n" def __init__(self, shell, sourcer=None, files=(), extra_args=()): """ Parameters ---------- shell : str Name or path to shell sourcer : str or None, optional Command to source foreign files with. files : tuple of str, optional Paths to source. extra_args : tuple of str, optional Additional command line options to pass into the shell. """ sourcer = DEFAULT_SOURCERS.get(shell, "source") if sourcer is None else sourcer self.shell = shell self.sourcer = sourcer self.files = files self.extra_args = extra_args def _input_kwargs(self): return { "shell": self.shell, "extra_args": self.extra_args, } def __eq__(self, other): if not hasattr(other, "_input_kwargs") or not callable(other._input_kwargs): return NotImplemented return self._input_kwargs() == other._input_kwargs() def __call__( self, args, stdin=None, stdout=None, stderr=None, spec=None, stack=None ): args, streaming = self._is_streaming(args) input = self.INPUT.format(args=" ".join(args), **self._input_kwargs()) if len(self.files) > 0: input = "".join([f'{self.sourcer} "{f}"\n' for f in self.files]) + input cmd = [self.shell] + list(self.extra_args) + ["-ic", input] env = XSH.env denv = env.detype() if streaming: subprocess.check_call(cmd, env=denv) out = None else: out = subprocess.check_output(cmd, env=denv, stderr=subprocess.STDOUT) out = out.decode( encoding=env.get("XONSH_ENCODING"), errors=env.get("XONSH_ENCODING_ERRORS"), ) out = out.replace("\r\n", "\n") return out def __repr__(self): return ( self.__class__.__name__ + "(" + ", ".join([f"{k}={v!r}" for k, v in sorted(self._input_kwargs().items())]) + ")" ) @staticmethod def _is_streaming(args): """Test and modify args if --xonsh-stream is present.""" if "--xonsh-nostream" not in args: return args, True args = list(args) args.remove("--xonsh-nostream") return args, False
[docs] class ForeignShellFunctionAlias(ForeignShellBaseAlias): """This class is responsible for calling foreign shell functions as if they were aliases. This does not currently support taking stdin. """ INPUT = "{funcname} {args}\n" def __init__(self, funcname, shell, sourcer=None, files=(), extra_args=()): """ Parameters ---------- funcname : str function name shell : str Name or path to shell sourcer : str or None, optional Command to source foreign files with. files : tuple of str, optional Paths to source. extra_args : tuple of str, optional Additional command line options to pass into the shell. """ super().__init__( shell=shell, sourcer=sourcer, files=files, extra_args=extra_args ) self.funcname = funcname def _input_kwargs(self): inp = super()._input_kwargs() inp["funcname"] = self.funcname return inp
[docs] class ForeignShellExecAlias(ForeignShellBaseAlias): """Provides a callable alias for source code in a foreign shell.""" INPUT = "{src} {args}\n" def __init__( self, src, shell, sourcer=None, files=(), extra_args=(), ): """ Parameters ---------- src : str Source code in the shell language shell : str Name or path to shell sourcer : str or None, optional Command to source foreign files with. files : tuple of str, optional Paths to source. extra_args : tuple of str, optional Additional command line options to pass into the shell. """ super().__init__( shell=shell, sourcer=sourcer, files=files, extra_args=extra_args ) self.src = src.strip() def _input_kwargs(self): inp = super()._input_kwargs() inp["src"] = self.src return inp
@lazyobject def VALID_SHELL_PARAMS(): return frozenset( [ "shell", "interactive", "login", "envcmd", "aliascmd", "extra_args", "currenv", "safe", "prevcmd", "postcmd", "funcscmd", "sourcer", ] )
[docs] def ensure_shell(shell): """Ensures that a mapping follows the shell specification.""" if not isinstance(shell, cabc.MutableMapping): shell = dict(shell) shell_keys = set(shell.keys()) if not (shell_keys <= VALID_SHELL_PARAMS): raise KeyError(f"unknown shell keys: {shell_keys - VALID_SHELL_PARAMS}") shell["shell"] = ensure_string(shell["shell"]).lower() if "interactive" in shell_keys: shell["interactive"] = to_bool(shell["interactive"]) if "login" in shell_keys: shell["login"] = to_bool(shell["login"]) if "envcmd" in shell_keys: shell["envcmd"] = ( None if shell["envcmd"] is None else ensure_string(shell["envcmd"]) ) if "aliascmd" in shell_keys: shell["aliascmd"] = ( None if shell["aliascmd"] is None else ensure_string(shell["aliascmd"]) ) if "extra_args" in shell_keys and not isinstance(shell["extra_args"], tuple): shell["extra_args"] = tuple(map(ensure_string, shell["extra_args"])) if "currenv" in shell_keys and not isinstance(shell["currenv"], tuple): ce = shell["currenv"] if isinstance(ce, cabc.Mapping): ce = tuple((ensure_string(k), v) for k, v in ce.items()) elif isinstance(ce, cabc.Sequence): ce = tuple((ensure_string(k), v) for k, v in ce) else: raise RuntimeError("unrecognized type for currenv") shell["currenv"] = ce if "safe" in shell_keys: shell["safe"] = to_bool(shell["safe"]) if "prevcmd" in shell_keys: shell["prevcmd"] = ensure_string(shell["prevcmd"]) if "postcmd" in shell_keys: shell["postcmd"] = ensure_string(shell["postcmd"]) if "funcscmd" in shell_keys: shell["funcscmd"] = ( None if shell["funcscmd"] is None else ensure_string(shell["funcscmd"]) ) if "sourcer" in shell_keys: shell["sourcer"] = ( None if shell["sourcer"] is None else ensure_string(shell["sourcer"]) ) if "seterrprevcmd" in shell_keys: shell["seterrprevcmd"] = ( None if shell["seterrprevcmd"] is None else ensure_string(shell["seterrprevcmd"]) ) if "seterrpostcmd" in shell_keys: shell["seterrpostcmd"] = ( None if shell["seterrpostcmd"] is None else ensure_string(shell["seterrpostcmd"]) ) return shell
[docs] def load_foreign_envs(shells): """Loads environments from foreign shells. Parameters ---------- shells : sequence of dicts An iterable of dicts that can be passed into foreign_shell_data() as keyword arguments. Returns ------- env : dict A dictionary of the merged environments. """ env = {} for shell in shells: shell = ensure_shell(shell) shenv, _ = foreign_shell_data(**shell) if shenv: env.update(shenv) return env
[docs] def load_foreign_aliases(shells): """Loads aliases from foreign shells. Parameters ---------- shells : sequence of dicts An iterable of dicts that can be passed into foreign_shell_data() as keyword arguments. Returns ------- aliases : dict A dictionary of the merged aliases. """ aliases = {} xonsh_aliases = XSH.aliases for shell in shells: shell = ensure_shell(shell) _, shaliases = foreign_shell_data(**shell) if not XSH.env.get("FOREIGN_ALIASES_OVERRIDE"): shaliases = {} if shaliases is None else shaliases for alias in set(shaliases) & set(xonsh_aliases): del shaliases[alias] if XSH.env.get("XONSH_DEBUG") >= 1: print( f"aliases: ignoring alias {alias!r} of shell {shell['shell']!r} " "which tries to override xonsh alias.", file=sys.stderr, ) aliases.update(shaliases) return aliases