Source code for xonsh.main

"""The main xonsh script."""

import argparse
import builtins
import contextlib
import enum
import os
import signal
import sys
import traceback

import xonsh.procs.pipelines as xpp
from xonsh import __version__
from xonsh.built_ins import XSH
from xonsh.codecache import run_code_with_cache, run_script_with_cache
from xonsh.environ import make_args_env, xonshrc_context
from xonsh.events import events
from xonsh.execer import Execer
from xonsh.imphooks import install_import_hooks
from xonsh.jobs import ignore_sigtstp
from xonsh.lazyasd import lazyobject
from xonsh.lazyimps import pyghooks, pygments
from xonsh.platform import HAS_PYGMENTS, ON_WINDOWS
from xonsh.pretty import pretty
from xonsh.shell import Shell
from xonsh.timings import setup_timings
from xonsh.tools import (
    display_error_message,
    print_color,
    print_exception,
    to_bool_or_int,
)
from xonsh.xonfig import print_welcome_screen
from xonsh.xontribs import auto_load_xontribs_from_entrypoints, xontribs_load

events.transmogrify("on_post_init", "LoadEvent")
events.doc(
    "on_post_init",
    """
on_post_init() -> None

Fired after all initialization is finished and we're ready to do work.

NOTE: This is fired before the wizard is automatically started.
""",
)

events.transmogrify("on_exit", "LoadEvent")
events.doc(
    "on_exit",
    """
on_exit() -> None

Fired after all commands have been executed, before tear-down occurs.

NOTE: All the caveats of the ``atexit`` module also apply to this event.
""",
)


events.transmogrify("on_pre_cmdloop", "LoadEvent")
events.doc(
    "on_pre_cmdloop",
    """
on_pre_cmdloop() -> None

Fired just before the command loop is started, if it is.
""",
)

events.transmogrify("on_post_cmdloop", "LoadEvent")
events.doc(
    "on_post_cmdloop",
    """
on_post_cmdloop() -> None

Fired just after the command loop finishes, if it is.

NOTE: All the caveats of the ``atexit`` module also apply to this event.
""",
)

events.transmogrify("on_xontribs_loaded", "LoadEvent")
events.doc(
    "on_xontribs_loaded",
    """
on_xontribs_loaded() -> None

Fired after external xontribs with ``entrypoints defined`` are loaded.
""",
)

events.transmogrify("on_pre_rc", "LoadEvent")
events.doc(
    "on_pre_rc",
    """
on_pre_rc() -> None

Fired just before rc files are loaded, if they are.
""",
)

events.transmogrify("on_post_rc", "LoadEvent")
events.doc(
    "on_post_rc",
    """
on_post_rc() -> None

Fired just after rc files are loaded, if they are.
""",
)


[docs] def get_setproctitle(): """Proxy function for loading process title""" try: from setproctitle import setproctitle as spt except ImportError: return return spt
[docs] def path_argument(s): """Return a path only if the path is actually legal (file or directory) This is very similar to argparse.FileType, except that it doesn't return an open file handle, but rather simply validates the path.""" s = os.path.abspath(os.path.expanduser(s)) if not os.path.exists(s): msg = f"{s!r} must be a valid path to a file or directory" raise argparse.ArgumentTypeError(msg) return s
@lazyobject def parser(): p = argparse.ArgumentParser(description="xonsh", add_help=False) p.add_argument( "-h", "--help", dest="help", action="store_true", default=False, help="Show help and exit.", ) p.add_argument( "-V", "--version", action="version", help="Show version information and exit.", version=f"xonsh/{__version__}", ) p.add_argument( "-c", help="Run a single command and exit.", dest="command", required=False, default=None, ) p.add_argument( "-i", "--interactive", help="Force running in interactive mode.", dest="force_interactive", action="store_true", default=False, ) p.add_argument( "-l", "--login", help="Run as a login shell.", dest="login", action="store_true", default=False, ) p.add_argument( "--rc", help="The xonshrc files to load, these may be either xonsh " "files or directories containing xonsh files", dest="rc", nargs="+", type=path_argument, default=None, ) p.add_argument( "--no-rc", help="Do not load any xonsh RC files. Argument --rc will " "be ignored if --no-rc is set.", dest="norc", action="store_true", default=False, ) p.add_argument( "--no-script-cache", help="Do not cache scripts as they are run.", dest="scriptcache", action="store_false", default=True, ) p.add_argument( "--cache-everything", help="Use a cache, even for interactive commands.", dest="cacheall", action="store_true", default=False, ) p.add_argument( "-D", dest="defines", help="Define an environment variable, in the form of " "-DNAME=VAL. May be used many times.", metavar="ITEM", action="append", default=None, ) p.add_argument( "--shell-type", help="What kind of shell should be used. " "Possible options: " + ", ".join(Shell.shell_type_aliases.keys()) + ". Warning! If set this overrides $SHELL_TYPE variable.", metavar="SHELL_TYPE", dest="shell_type", choices=tuple(Shell.shell_type_aliases.keys()), default=None, ) p.add_argument( "--timings", help="Prints timing information before the prompt is shown. " "This is useful while tracking down performance issues " "and investigating startup times.", dest="timings", action="store_true", default=None, ) p.add_argument( "file", metavar="script-file", help="If present, execute the script in script-file and exit.", nargs="?", default=None, ) p.add_argument( "args", metavar="args", help="Additional arguments to the script specified by script-file.", nargs=argparse.REMAINDER, default=[], ) return p def _pprint_displayhook(value): if value is None: return builtins._ = None # Set '_' to None to avoid recursion if isinstance(value, xpp.HiddenCommandPipeline): builtins._ = value return env = XSH.env printed_val = None if env.get("PRETTY_PRINT_RESULTS"): printed_val = pretty(value) if not isinstance(printed_val, str): # pretty may fail (i.e for unittest.mock.Mock) printed_val = repr(value) if HAS_PYGMENTS and env.get("COLOR_RESULTS"): tokens = list(pygments.lex(printed_val, lexer=pyghooks.XonshLexer())) end = "" if env.get("SHELL_TYPE") == "prompt_toolkit" else "\n" print_color(tokens, end=end) else: print(printed_val) # black & white case builtins._ = value
[docs] class XonshMode(enum.Enum): single_command = 0 script_from_file = 1 script_from_stdin = 2 interactive = 3
def _get_rc_files(shell_kwargs: dict, args, env): if shell_kwargs.get("norc"): # if --no-rc was passed, then disable loading RC files and dirs return (), () # determine which RC files to load, including whether any RC directories # should be scanned for such files rc_cli = shell_kwargs.get("rc") if rc_cli: # if an explicit --rc was passed, then we should load only that RC # file, and nothing else (ignore both XONSHRC and XONSHRC_DIR) rc = tuple(r for r in rc_cli if os.path.isfile(r)) rcd = tuple(r for r in rc_cli if os.path.isdir(r)) return rc, rcd # otherwise, get the RC files from XONSHRC, and RC dirs from XONSHRC_DIR rc = env.get("XONSHRC") rcd = env.get("XONSHRC_DIR") return rc, rcd def _load_rc_files(shell_kwargs: dict, args, env, execer, ctx): events.on_pre_rc.fire() # load rc files login = shell_kwargs.get("login", True) rc, rcd = _get_rc_files(shell_kwargs, args, env) XSH.rc_files = xonshrc_context( rcfiles=rc, rcdirs=rcd, execer=execer, ctx=ctx, env=env, login=login ) events.on_post_rc.fire() def _autoload_xontribs(env): events.on_timingprobe.fire(name="pre_xontribs_autoload") disabled = env.get("XONTRIBS_AUTOLOAD_DISABLED", False) if disabled is True: return blocked_xontribs = disabled or () auto_load_xontribs_from_entrypoints( blocked_xontribs, verbose=bool(env.get("XONSH_DEBUG", False)) ) events.on_xontribs_loaded.fire() events.on_timingprobe.fire(name="post_xontribs_autoload")
[docs] def start_services(shell_kwargs, args, pre_env=None): """Starts up the essential services in the proper order. This returns the environment instance as a convenience. """ if pre_env is None: pre_env = {} # create execer, which loads builtins ctx = shell_kwargs.get("ctx", {}) debug = to_bool_or_int(os.getenv("XONSH_DEBUG", "0")) events.on_timingprobe.fire(name="pre_execer_init") execer = Execer( filename="<stdin>", debug_level=debug, scriptcache=shell_kwargs.get("scriptcache", True), cacheall=shell_kwargs.get("cacheall", False), ) XSH.load(ctx=ctx, execer=execer) events.on_timingprobe.fire(name="post_execer_init") install_import_hooks(execer) env = XSH.env for k, v in pre_env.items(): env[k] = v _autoload_xontribs(env) _load_rc_files(shell_kwargs, args, env, execer, ctx) # create shell XSH.shell = Shell(execer=execer, **shell_kwargs) ctx["__name__"] = "__main__" return env
[docs] def premain(argv=None): """Setup for main xonsh entry point. Returns parsed arguments.""" if argv is None: argv = sys.argv[1:] setup_timings(argv) setproctitle = get_setproctitle() if setproctitle is not None: setproctitle(" ".join(["xonsh"] + argv)) args = parser.parse_args(argv) if args.help: parser.print_help() parser.exit() shell_kwargs = { "shell_type": args.shell_type, "completer": False, "login": False, "scriptcache": args.scriptcache, "cacheall": args.cacheall, "ctx": XSH.ctx, } if args.login or sys.argv[0].startswith("-"): args.login = True shell_kwargs["login"] = True if args.norc: shell_kwargs["norc"] = True elif args.rc: shell_kwargs["rc"] = args.rc sys.displayhook = _pprint_displayhook if args.command is not None: args.mode = XonshMode.single_command shell_kwargs["shell_type"] = "none" elif args.file is not None: args.mode = XonshMode.script_from_file shell_kwargs["shell_type"] = "none" elif not sys.stdin.isatty() and not args.force_interactive: args.mode = XonshMode.script_from_stdin shell_kwargs["shell_type"] = "none" else: args.mode = XonshMode.interactive shell_kwargs["completer"] = True shell_kwargs["login"] = True pre_env = { "XONSH_LOGIN": shell_kwargs["login"], "XONSH_INTERACTIVE": args.force_interactive or (args.mode == XonshMode.interactive), } env = start_services(shell_kwargs, args, pre_env=pre_env) if args.defines is not None: env.update([x.split("=", 1) for x in args.defines]) return args
def _failback_to_other_shells(args, err): # only failback for interactive shell; if we cannot tell, treat it # as an interactive one for safe. if hasattr(args, "mode") and args.mode != XonshMode.interactive: raise err foreign_shell = None # look first in users login shell $SHELL. # use real os.environ, in case Xonsh hasn't initialized yet # but don't fail back to same shell that just failed. try: env_shell = os.getenv("SHELL") if env_shell and os.path.exists(env_shell) and env_shell != sys.argv[0]: foreign_shell = env_shell except Exception: pass # otherwise, find acceptable shell from (unix) list of installed shells. if not foreign_shell: excluded_list = ["xonsh", "screen"] shells_file = "/etc/shells" if not os.path.exists(shells_file): # right now, it will always break here on Windows raise err with open(shells_file) as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "/" not in line: continue _, shell = line.rsplit("/", 1) if shell in excluded_list: continue if not os.path.exists(line): continue foreign_shell = line break if foreign_shell: traceback.print_exc() print("Xonsh encountered an issue during launch", file=sys.stderr) print(f"Failback to {foreign_shell}", file=sys.stderr) os.execlp(foreign_shell, foreign_shell) else: raise err
[docs] def main(argv=None): args = None try: args = premain(argv) sys.exit(main_xonsh(args)) except Exception as err: _failback_to_other_shells(args, err)
[docs] def main_xonsh(args): """Main entry point for xonsh cli.""" if not ON_WINDOWS: def func_sig_ttin_ttou(n, f): pass signal.signal(signal.SIGTTIN, func_sig_ttin_ttou) signal.signal(signal.SIGTTOU, func_sig_ttin_ttou) events.on_post_init.fire() env = XSH.env shell = XSH.shell history = XSH.history exit_code = 0 if shell and not env["XONSH_INTERACTIVE"]: shell.ctx.update({"exit": sys.exit}) # store a sys.exc_info() tuple to record any exception that might occur in the user code that we are about to execute # if this does not change, no exceptions were thrown. Otherwise, print a traceback that does not expose xonsh internals exc_info = None, None, None try: if args.mode == XonshMode.interactive: # enter the shell # Setted again here because it is possible to call main_xonsh() without calling premain(), namely in the tests. env["XONSH_INTERACTIVE"] = True ignore_sigtstp() if ( env["XONSH_INTERACTIVE"] and not any(os.path.isfile(i) for i in env["XONSHRC"]) and not any(os.path.isdir(i) for i in env["XONSHRC_DIR"]) ): print_welcome_screen() events.on_pre_cmdloop.fire() try: shell.shell.cmdloop() finally: events.on_post_cmdloop.fire() elif args.mode == XonshMode.single_command: # run a single command and exit exc_info = run_code_with_cache( args.command.lstrip(), "<string>", shell.execer, glb=shell.ctx, mode="single", ) if history is not None and history.last_cmd_rtn is not None: exit_code = history.last_cmd_rtn elif args.mode == XonshMode.script_from_file: # run a script contained in a file path = os.path.abspath(os.path.expanduser(args.file)) if os.path.isfile(path): sys.argv = [args.file] + args.args env.update(make_args_env()) # $ARGS is not sys.argv env["XONSH_SOURCE"] = path shell.ctx.update({"__file__": args.file, "__name__": "__main__"}) exc_info = run_script_with_cache( args.file, shell.execer, glb=shell.ctx, loc=None, mode="exec" ) else: print(f"xonsh: {args.file}: No such file or directory.") exit_code = 1 elif args.mode == XonshMode.script_from_stdin: # run a script given on stdin code = sys.stdin.read() exc_info = run_code_with_cache( code, "<stdin>", shell.execer, glb=shell.ctx, loc=None, mode="exec" ) except SyntaxError: exit_code = 1 debug_level = env.get("XONSH_DEBUG", 0) if debug_level == 0: # print error without tracktrace display_error_message(sys.exc_info()) else: # pass error to finally clause exc_info = sys.exc_info() finally: if exc_info != (None, None, None): err_type, err, _ = exc_info if err_type is SystemExit: raise err print_exception(None, exc_info) exit_code = 1 events.on_exit.fire() postmain(args) return exit_code
[docs] def postmain(args=None): """Teardown for main xonsh entry point, accepts parsed arguments.""" XSH.unload() XSH.shell = None
[docs] @contextlib.contextmanager def main_context(argv=None): """Generator that runs pre- and post-main() functions. This has two iterations. The first yields the shell. The second returns None but cleans up the shell. """ args = premain(argv) yield XSH.shell postmain(args)
[docs] def setup( ctx=None, shell_type="none", env=(("RAISE_SUBPROC_ERROR", True),), aliases=(), xontribs=(), threadable_predictors=(), ): """Starts up a new xonsh shell. Calling this in function in another packages ``__init__.py`` will allow xonsh to be fully used in the package in headless or headed mode. This function is primarily indended to make starting up xonsh for 3rd party packages easier. Here is example of using this at the top of an ``__init__.py``:: from xonsh.main import setup setup() del setup Parameters ---------- ctx : dict-like or None, optional The xonsh context to start with. If None, an empty dictionary is provided. shell_type : str, optional The type of shell to start. By default this is 'none', indicating we should start in headless mode. env : dict-like, optional Environment to update the current environment with after the shell has been initialized. aliases : dict-like, optional Aliases to add after the shell has been initialized. xontribs : iterable of str, optional Xontrib names to load. threadable_predictors : dict-like, optional Threadable predictors to start up with. These overide the defaults. """ ctx = {} if ctx is None else ctx # setup xonsh ctx and execer if not hasattr(builtins, "__xonsh__"): execer = Execer(filename="<stdin>") XSH.load(ctx=ctx, execer=execer) XSH.shell = Shell(execer, ctx=ctx, shell_type=shell_type) XSH.env.update(env) install_import_hooks(XSH.execer) XSH.aliases.update(aliases) if xontribs: xontribs_load(xontribs) if threadable_predictors: XSH.commands_cache.threadable_predictors.update(threadable_predictors)