Source code for xonsh.xontribs

"""Tools for helping manage xontributions."""

import contextlib
import importlib
import importlib.util
import json
import os
import sys
import typing as tp
from enum import IntEnum
from pathlib import Path

from xonsh.built_ins import XSH
from xonsh.cli_utils import Annotated, Arg, ArgParserAlias
from xonsh.completers.tools import RichCompletion
from xonsh.tools import print_color, print_exception

if tp.TYPE_CHECKING:
    from importlib.metadata import Distribution, EntryPoint


[docs] class ExitCode(IntEnum): OK = 0 NOT_FOUND = 1 INIT_FAILED = 2
[docs] class XontribNotInstalled(Exception): """raised when the requested xontrib is not found"""
[docs] class Xontrib(tp.NamedTuple): """Meta class that is used to describe a xontrib""" module: str """path to the xontrib module""" distribution: "tp.Optional[Distribution]" = None """short description about the xontrib."""
[docs] def get_description(self): if self.distribution: print(self, file=sys.stderr) if self.distribution and ( summary := self.distribution.metadata.get("Summary", "") ): return summary return get_module_docstring(self.module)
@property def url(self): if self.distribution: return self.distribution.metadata.get("Home-page", "") return "" @property def license(self): if self.distribution: return self.distribution.metadata.get("License", "") return "" @property def is_loaded(self): return self.module and self.module in sys.modules @property def is_auto_loaded(self): loaded = getattr(XSH.builtins, "autoloaded_xontribs", None) or {} return self.module in set(loaded.values())
[docs] def get_module_docstring(module: str) -> str: """Find the module and return its docstring without actual import""" import ast spec = importlib.util.find_spec(module) if spec and spec.has_location and spec.origin: return ast.get_docstring(ast.parse(Path(spec.origin).read_text())) or "" return ""
[docs] def get_xontribs() -> dict[str, Xontrib]: """Return xontrib definitions lazily.""" return dict(_get_installed_xontribs())
def _patch_in_userdir(): """ Patch in user site packages directory. If xonsh is installed in non-writeable location, then xontribs will end up there, so we make them accessible.""" if not os.access(os.path.dirname(sys.executable), os.W_OK): from site import getusersitepackages if (user_site_packages := getusersitepackages()) not in set(sys.path): sys.path.append(user_site_packages) def _get_installed_xontribs(pkg_name="xontrib"): """List all core packages + newly installed xontribs""" _patch_in_userdir() spec = importlib.util.find_spec(pkg_name) def iter_paths(): for loc in spec.submodule_search_locations: path = Path(loc) if path.exists(): yield from path.iterdir() def iter_modules(): # pkgutil is not finding `*.xsh` files for path in iter_paths(): if path.suffix in {".py", ".xsh"}: yield path.stem elif path.is_dir(): if (path / "__init__.py").exists(): yield path.name for name in iter_modules(): module = f"xontrib.{name}" yield name, Xontrib(module) for entry in _get_xontrib_entrypoints(): yield entry.name, Xontrib(entry.value, distribution=entry.dist)
[docs] def find_xontrib(name, full_module=False): """Finds a xontribution from its name.""" _patch_in_userdir() # here the order is important. We try to run the correct cases first and then later trial cases # that will likely fail if name.startswith("."): return importlib.util.find_spec(name, package="xontrib") if full_module: return importlib.util.find_spec(name) autoloaded = getattr(XSH.builtins, "autoloaded_xontribs", None) or {} if name in autoloaded: return importlib.util.find_spec(autoloaded[name]) with contextlib.suppress(ValueError): return importlib.util.find_spec("." + name, package="xontrib") return importlib.util.find_spec(name)
[docs] def xontrib_context(name, full_module=False): """Return a context dictionary for a xontrib of a given name.""" spec = find_xontrib(name, full_module) if spec is None: return None module = importlib.import_module(spec.name) ctx = {} def _get__all__(): pubnames = getattr(module, "__all__", None) if pubnames is None: for k in dir(module): if not k.startswith("_"): yield k, getattr(module, k) else: for attr in pubnames: yield attr, getattr(module, attr) entrypoint = getattr(module, "_load_xontrib_", None) if entrypoint is None: ctx.update(dict(_get__all__())) else: result = entrypoint(xsh=XSH) if result is not None: ctx.update(result) return ctx
[docs] def prompt_xontrib_install(names: list[str]): """Returns a formatted string with name of xontrib package to prompt user""" return ( "The following xontribs are enabled but not installed: \n" f" {names}\n" "Please make sure that they are installed correctly by checking https://xonsh.github.io/awesome-xontribs/\n" )
[docs] def update_context(name, ctx: dict, full_module=False): """Updates a context in place from a xontrib.""" modctx = xontrib_context(name, full_module) if modctx is None: raise XontribNotInstalled(f"Xontrib - {name} is not found.") else: ctx.update(modctx) return ctx
def _xontrib_name_completions(loaded=False): for name, xontrib in get_xontribs().items(): if xontrib.is_loaded is loaded: yield RichCompletion( name, append_space=True, description=xontrib.get_description() )
[docs] def xontrib_names_completer(**_): yield from _xontrib_name_completions(loaded=False)
[docs] def xontrib_unload_completer(**_): yield from _xontrib_name_completions(loaded=True)
[docs] def xontribs_load( names: Annotated[ tp.Sequence[str], Arg(nargs="+", completer=xontrib_names_completer), ] = (), verbose=False, full_module=False, suppress_warnings=False, ): """Load xontribs from a list of names Parameters ---------- names names of xontribs verbose : -v, --verbose verbose output full_module : -f, --full indicates that the names are fully qualified module paths and not inside ``xontrib`` package suppress_warnings : -s, --suppress-warnings no warnings about missing xontribs and return code 0 """ ctx = {} if XSH.ctx is None else XSH.ctx res = ExitCode.OK stdout = None stderr = None bad_imports = [] for name in names: if verbose: print(f"loading xontrib {name!r}") try: update_context(name, ctx=ctx, full_module=full_module) except XontribNotInstalled: if not suppress_warnings: bad_imports.append(name) except Exception: res = ExitCode.INIT_FAILED print_exception(f"Failed to load xontrib {name}.") if bad_imports: res = ExitCode.NOT_FOUND stderr = prompt_xontrib_install(bad_imports) return stdout, stderr, res
[docs] def xontribs_unload( names: Annotated[ tp.Sequence[str], Arg(nargs="+", completer=xontrib_unload_completer), ] = (), verbose=False, ): """Unload the given xontribs Parameters ---------- names name of xontribs to unload Notes ----- Proper cleanup can be implemented by the xontrib. The default is equivalent to ``del sys.modules[module]``. """ for name in names: if verbose: print(f"unloading xontrib {name!r}") spec = find_xontrib(name) try: if spec and spec.name in sys.modules: module = sys.modules[spec.name] unloader = getattr(module, "_unload_xontrib_", None) if unloader is not None: unloader(XSH) del sys.modules[spec.name] except Exception as ex: print_exception(f"Failed to unload xontrib {name} ({ex})")
[docs] def xontribs_reload( names: Annotated[ tp.Sequence[str], Arg(nargs="+", completer=xontrib_unload_completer), ] = (), verbose=False, ): """Reload the given xontribs Parameters ---------- names name of xontribs to reload """ for name in names: if verbose: print(f"reloading xontrib {name!r}") xontribs_unload([name]) xontribs_load([name])
[docs] def xontrib_data(): """Collects and returns the data about installed xontribs.""" data = {} for xo_name, xontrib in get_xontribs().items(): data[xo_name] = { "name": xo_name, "loaded": xontrib.is_loaded, "auto": xontrib.is_auto_loaded, "module": xontrib.module, } return dict(sorted(data.items()))
[docs] def xontribs_loaded(): """Returns list of loaded xontribs.""" return [k for k, xontrib in get_xontribs().items() if xontrib.is_loaded]
[docs] def xontribs_list(to_json=False): """List installed xontribs and show whether they are loaded or not Parameters ---------- to_json : -j, --json reports results as json """ data = xontrib_data() if to_json: s = json.dumps(data) print(s) else: nname = max([6] + [len(x) for x in data]) s = "" for name, d in data.items(): s += "{PURPLE}" + name + "{RESET} " + " " * (nname - len(name)) if d["loaded"]: s += "{GREEN}loaded{RESET}" + " " * 4 if d["auto"]: s += " {GREEN}auto{RESET}" elif d["loaded"]: s += " {CYAN}manual{RESET}" else: s += "{RED}not-loaded{RESET}" s += "\n" print_color(s[:-1])
def _get_xontrib_entrypoints() -> "tp.Iterable[EntryPoint]": from importlib import metadata name = "xonsh.xontribs" entries = metadata.entry_points() # for some reason, on CI (win py3.8) atleast, returns dict group = entries.select(group=name) if hasattr(entries, "select") else entries.get(name, []) # type: ignore yield from group
[docs] def auto_load_xontribs_from_entrypoints( blocked: "tp.Sequence[str]" = (), verbose=False ): """Load xontrib modules exposed via setuptools's entrypoints""" if not hasattr(XSH.builtins, "autoloaded_xontribs"): XSH.builtins.autoloaded_xontribs = {} def get_loadable(): for entry in _get_xontrib_entrypoints(): if entry.name not in blocked: XSH.builtins.autoloaded_xontribs[entry.name] = entry.value yield entry.value modules = list(get_loadable()) return xontribs_load(modules, verbose=verbose, full_module=True)
[docs] class XontribAlias(ArgParserAlias): """Manage xonsh extensions"""
[docs] def build(self): parser = self.create_parser(prog="xontrib") parser.add_command(xontribs_load, prog="load") parser.add_command(xontribs_unload, prog="unload") parser.add_command(xontribs_reload, prog="reload") parser.add_command(xontribs_list, prog="list") return parser
xontribs_main = XontribAlias(threadable=False)