Source code for xonsh.xontribs

"""Tools for helping manage xontributions."""

import contextlib
import importlib
import importlib.util
import json
import os
import sys
import typing as tp
from enum import IntEnum
from pathlib import Path

from xonsh.built_ins import XSH
from xonsh.cli_utils import Annotated, Arg, ArgParserAlias
from xonsh.completers.tools import RichCompletion
from xonsh.tools import print_color, print_exception

if tp.TYPE_CHECKING:
    from importlib.metadata import Distribution, EntryPoint


[docs] class ExitCode(IntEnum): OK = 0 NOT_FOUND = 1 INIT_FAILED = 2
[docs] class XontribNotInstalled(Exception): """raised when the requested xontrib is not found"""
[docs] class Xontrib(tp.NamedTuple): """Meta class that is used to describe a xontrib""" module: str """path to the xontrib module""" distribution: "Distribution | None" = None """short description about the xontrib."""
[docs] def get_description(self): if self.distribution and ( summary := self.distribution.metadata.get("Summary", "") ): return summary return get_module_docstring(self.module)
@property def url(self): if self.distribution: return self.distribution.metadata.get("Home-page", "") return "" @property def license(self): if self.distribution: return self.distribution.metadata.get("License", "") return "" @property def is_loaded(self): return self.module and self.module in sys.modules @property def is_auto_loaded(self): loaded = getattr(XSH.builtins, "autoloaded_xontribs", None) or {} return self.module in set(loaded.values())
[docs] def get_module_docstring(module: str) -> str: """Find the module and return its docstring without actual import""" import ast spec = importlib.util.find_spec(module) if spec and spec.has_location and spec.origin: return ast.get_docstring(ast.parse(Path(spec.origin).read_text())) or "" # Fall back for ``xontrib.<name>`` modules that ship as ``.xsh`` files — # importlib's standard finders don't know about ``.xsh`` and return # ``None`` here. path = _find_xontrib_file(module) if path is not None and path.suffix == ".py": with contextlib.suppress(SyntaxError, OSError): return ast.get_docstring(ast.parse(path.read_text())) or "" return ""
def _find_xontrib_file(module: str) -> "Path | None": """Locate the source file for a ``xontrib.<name>`` module. Handles ``.py``, ``.xsh``, and package-style xontribs by scanning the physical locations of the ``xontrib`` namespace package directly. Used as a fallback for cases that ``importlib.util.find_spec`` cannot resolve — most notably ``.xsh`` xontribs (the standard import machinery doesn't recognize the extension). """ if not module.startswith("xontrib."): return None name = module.removeprefix("xontrib.") spec = importlib.util.find_spec("xontrib") if spec is None or spec.submodule_search_locations is None: return None for loc in spec.submodule_search_locations: base = Path(loc) for candidate in (base / f"{name}.py", base / f"{name}.xsh"): if candidate.exists(): return candidate pkg = base / name / "__init__.py" if pkg.exists(): return pkg return None
[docs] def get_xontribs() -> dict[str, Xontrib]: """Return xontrib definitions lazily.""" return dict(_get_installed_xontribs())
def _patch_in_userdir(): """ Patch in user site packages directory. If xonsh is installed in non-writeable location, then xontribs will end up there, so we make them accessible.""" if not os.access(os.path.dirname(sys.executable), os.W_OK): from site import getusersitepackages if (user_site_packages := getusersitepackages()) not in set(sys.path): sys.path.append(user_site_packages) def _get_installed_xontribs(pkg_name="xontrib"): """List all core packages + newly installed xontribs""" _patch_in_userdir() spec = importlib.util.find_spec(pkg_name) def iter_paths(): if spec is None or spec.submodule_search_locations is None: return for loc in spec.submodule_search_locations: path = Path(loc) if path.exists(): yield from path.iterdir() def iter_modules(): # pkgutil is not finding `*.xsh` files for path in iter_paths(): if path.suffix in {".py", ".xsh"}: yield path.stem elif path.is_dir(): if (path / "__init__.py").exists(): yield path.name for name in iter_modules(): module = f"xontrib.{name}" yield name, Xontrib(module) for entry in _get_xontrib_entrypoints(): yield entry.name, Xontrib(entry.value, distribution=entry.dist) def _find_xontrib_entrypoint(name): """Return the ``xonsh.xontribs`` entry point for ``name`` or ``None``. Reads the live setuptools entry-point registry rather than the ``XSH.builtins.autoloaded_xontribs`` cache, so xontribs whose only Python-visible name is an entry point (the wheel ships no ``xontrib/<name>.py``) are discoverable even when autoload did not run — e.g. ``xonsh --no-rc`` or ``$XONTRIBS_AUTOLOAD_DISABLED``. """ for entry in _get_xontrib_entrypoints(): if entry.name == name: return entry return None
[docs] def find_xontrib(name, full_module=False): """Finds a xontribution from its name.""" _patch_in_userdir() # Order matters. Try the cheap, exact paths first; fall through to # broader matches only when the previous lookup did not find anything. if name.startswith("."): return importlib.util.find_spec(name, package="xontrib") if full_module: return importlib.util.find_spec(name) # 1. Cache populated by ``auto_load_xontribs_from_entrypoints`` at # startup. This is the common interactive-shell path. autoloaded = getattr(XSH.builtins, "autoloaded_xontribs", None) or {} if name in autoloaded: return importlib.util.find_spec(autoloaded[name]) # 2. Live entry-point lookup. Required when autoload did not run # (``--no-rc``, ``$XONTRIBS_AUTOLOAD_DISABLED``, embedded use, # xontrib installed mid-session): without this step the only # Python-visible mapping for entry-point-only xontribs (the # ``coconut`` xontrib being the canonical example — its loader # lives in ``coconut.integrations`` and the wheel ships no # ``xontrib/coconut.py``) is unreachable from ``xontrib load``. entry = _find_xontrib_entrypoint(name) if entry is not None: return importlib.util.find_spec(entry.value) # 3. Legacy ``xontrib.<name>`` namespace-package layout. ``find_spec`` # only raises ``ValueError`` when ``xontrib`` is not a package at # all; otherwise it returns ``None`` for an absent submodule, in # which case we must continue to the top-level fallback rather # than returning that ``None``. (The pre-fix code returned ``None`` # here and never reached step 4.) spec = None with contextlib.suppress(ValueError): spec = importlib.util.find_spec("." + name, package="xontrib") if spec is not None: return spec # 4. Top-level module fallback (``import <name>``). return importlib.util.find_spec(name)
[docs] def xontrib_context(name, full_module=False): """Return a context dictionary for a xontrib of a given name.""" spec = find_xontrib(name, full_module) if spec is None: return None module = importlib.import_module(spec.name) ctx = {} def _get__all__(): pubnames = getattr(module, "__all__", None) if pubnames is None: for k in dir(module): if not k.startswith("_"): yield k, getattr(module, k) else: for attr in pubnames: yield attr, getattr(module, attr) entrypoint = getattr(module, "_load_xontrib_", None) if entrypoint is None: ctx.update(dict(_get__all__())) else: result = entrypoint(xsh=XSH) if result is not None: ctx.update(result) return ctx
[docs] def prompt_xontrib_install(names: list[str]): """Returns a formatted string with name of xontrib package to prompt user""" return ( "The following xontribs are enabled but not installed: \n" f" {names}\n" "Please make sure that they are installed correctly by checking https://xonsh.github.io/awesome-xontribs/\n" )
[docs] def update_context(name, ctx: dict, full_module=False): """Updates a context in place from a xontrib.""" modctx = xontrib_context(name, full_module) if modctx is None: raise XontribNotInstalled(f"Xontrib - {name} is not found.") else: ctx.update(modctx) return ctx
def _xontrib_name_completions(loaded=False): for name, xontrib in get_xontribs().items(): if xontrib.is_loaded is loaded: yield RichCompletion( name, append_space=True, description=xontrib.get_description() )
[docs] def xontrib_names_completer(**_): yield from _xontrib_name_completions(loaded=False)
[docs] def xontrib_unload_completer(**_): yield from _xontrib_name_completions(loaded=True)
[docs] def xontrib_any_completer(**_): for name, xontrib in get_xontribs().items(): yield RichCompletion( name, append_space=True, description=xontrib.get_description() )
[docs] def xontribs_load( names: Annotated[ tp.Sequence[str], Arg(nargs="+", completer=xontrib_names_completer), ] = (), verbose=False, full_module=False, suppress_warnings=False, ): """Load xontribs from a list of names Parameters ---------- names names of xontribs verbose : -v, --verbose verbose output full_module : -f, --full indicates that the names are fully qualified module paths and not inside ``xontrib`` package suppress_warnings : -s, --suppress-warnings no warnings about missing xontribs and return code 0 """ ctx = {} if XSH.ctx is None else XSH.ctx res = ExitCode.OK stdout = None stderr = None bad_imports = [] for name in names: if verbose: print(f"loading xontrib {name!r}") try: update_context(name, ctx=ctx, full_module=full_module) except XontribNotInstalled: if not suppress_warnings: bad_imports.append(name) except Exception: res = ExitCode.INIT_FAILED print_exception(f"Failed to load xontrib {name}.") if bad_imports: res = ExitCode.NOT_FOUND stderr = prompt_xontrib_install(bad_imports) return stdout, stderr, res
[docs] def xontribs_unload( names: Annotated[ tp.Sequence[str], Arg(nargs="+", completer=xontrib_unload_completer), ] = (), verbose=False, ): """Unload the given xontribs (requires ``_unload_xontrib_`` for full cleanup) Parameters ---------- names name of xontribs to unload verbose : -v, --verbose verbose output Notes ----- The xontrib must implement ``_unload_xontrib_()`` for proper cleanup. Without it, registered event handlers, env vars, aliases, and completers will remain active. The default is equivalent to ``del sys.modules[module]``. """ for name in names: if verbose: print(f"unloading xontrib {name!r}") spec = find_xontrib(name) try: if spec and spec.name in sys.modules: module = sys.modules[spec.name] unloader = getattr(module, "_unload_xontrib_", None) if unloader is not None: unloader(XSH) del sys.modules[spec.name] except Exception as ex: print_exception(f"Failed to unload xontrib {name} ({ex})")
[docs] def xontribs_reload( names: Annotated[ tp.Sequence[str], Arg(nargs="+", completer=xontrib_unload_completer), ] = (), verbose=False, ): """Reload the given xontribs (requires ``_unload_xontrib_`` for full cleanup) Parameters ---------- names name of xontribs to reload verbose : -v, --verbose verbose output """ for name in names: if verbose: print(f"reloading xontrib {name!r}") xontribs_unload([name]) xontribs_load([name])
[docs] def xontribs_info( name: Annotated[str, Arg(completer=xontrib_any_completer)], _stdout=None, ): """Show details about an installed xontrib. Parameters ---------- name name of the xontrib """ xontribs = get_xontribs() xontrib = xontribs.get(name) if xontrib is None: # Fall back to a direct module lookup so users can inspect xontribs # that aren't surfaced via the ``xontrib`` package or the # ``xonsh.xontribs`` entry-point group (e.g. ``xontrib info # some.module --full`` style lookups in the future). spec = find_xontrib(name) if spec is None: print_color( "{RED}Xontrib " + name + " is not installed.{RESET}", file=_stdout ) return ExitCode.NOT_FOUND xontrib = Xontrib(spec.name) spec = importlib.util.find_spec(xontrib.module) origin = spec.origin if spec is not None else None if not origin: # ``.xsh`` xontribs (and other non-importable modules) are not # surfaced by ``find_spec`` — fall back to scanning the namespace # package locations directly. path = _find_xontrib_file(xontrib.module) origin = str(path) if path is not None else "(builtin)" source = xontrib.module if xontrib.distribution is not None: dist_name = xontrib.distribution.metadata.get("Name", "") or "" dist_version = xontrib.distribution.metadata.get("Version", "") or "" if dist_name: source += f" ({dist_name} {dist_version})".rstrip() source += f" at {origin}" description = xontrib.get_description() or "" lines = [ "{PURPLE}Name{RESET}: " + name, "{PURPLE}Source{RESET}: " + source, "{PURPLE}Description{RESET}: " + description, ] if xontrib.url: lines.append("{PURPLE}URL{RESET}: " + xontrib.url) if xontrib.license: lines.append("{PURPLE}License{RESET}: " + xontrib.license) lines.append( "{PURPLE}Loaded{RESET}: " + ("{GREEN}yes{RESET}" if xontrib.is_loaded else "{RED}no{RESET}") + (" {GREEN}(auto){RESET}" if xontrib.is_auto_loaded else "") ) print_color("\n".join(lines), file=_stdout) return ExitCode.OK
[docs] def xontrib_data(): """Collects and returns the data about installed xontribs.""" data = {} for xo_name, xontrib in get_xontribs().items(): desc = xontrib.get_description() try: max_desc = os.get_terminal_size().columns - 40 except (OSError, ValueError): max_desc = 60 max_desc = max(max_desc, 20) short_desc = desc.split("\n")[0][:max_desc] if desc else "" data[xo_name] = { "name": xo_name, "loaded": xontrib.is_loaded, "auto": xontrib.is_auto_loaded, "module": xontrib.module, "description": short_desc, } return dict(sorted(data.items()))
[docs] def xontribs_loaded(): """Returns list of loaded xontribs.""" return [k for k, xontrib in get_xontribs().items() if xontrib.is_loaded]
[docs] def xontribs_list(to_json=False, _stdout=None): """List installed xontribs and show whether they are loaded or not Parameters ---------- to_json : -j, --json reports results as json """ data = xontrib_data() if to_json: s = json.dumps(data) return s else: nname = max([6] + [len(x) for x in data]) s = "" for name, d in data.items(): s += "{PURPLE}" + name + "{RESET} " + " " * (nname - len(name)) if d["loaded"]: s += "{GREEN}loaded{RESET}" + " " * 4 if d["auto"]: s += " {GREEN}auto{RESET}" elif d["loaded"]: s += " {CYAN}manual{RESET}" else: s += "{RED}not-loaded{RESET}" + " " * 8 if d.get("description"): s += " " + d["description"] s += "\n" print_color(s[:-1], file=_stdout)
def _get_xontrib_entrypoints() -> "tp.Iterable[EntryPoint]": from importlib import metadata name = "xonsh.xontribs" entries = metadata.entry_points() # for some reason, on CI (win py3.8) atleast, returns dict group = ( entries.select(group=name) if hasattr(entries, "select") else entries.get(name, []) # type: ignore ) yield from group
[docs] def auto_load_xontribs_from_entrypoints( blocked: "tp.Sequence[str]" = (), verbose=False ): """Load xontrib modules exposed via setuptools's entrypoints""" if not hasattr(XSH.builtins, "autoloaded_xontribs"): XSH.builtins.autoloaded_xontribs = {} def get_loadable(): for entry in _get_xontrib_entrypoints(): if entry.name not in blocked: XSH.builtins.autoloaded_xontribs[entry.name] = entry.value yield entry.value modules = list(get_loadable()) return xontribs_load(modules, verbose=verbose, full_module=True)
[docs] class XontribAlias(ArgParserAlias): """Manage xonsh extensions"""
[docs] def build(self): parser = self.create_parser(prog="xontrib") parser.add_command(xontribs_load, prog="load") parser.add_command(xontribs_unload, prog="unload") parser.add_command(xontribs_reload, prog="reload") parser.add_command(xontribs_list, prog="list", default=True) parser.add_command(xontribs_info, prog="info") return parser
xontribs_main = XontribAlias(threadable=False)