"""Tools for helping manage xontributions."""
import contextlib
import importlib
import importlib.util
import json
import os
import sys
import typing as tp
from enum import IntEnum
from pathlib import Path
from xonsh.built_ins import XSH
from xonsh.cli_utils import Annotated, Arg, ArgParserAlias
from xonsh.completers.tools import RichCompletion
from xonsh.tools import print_color, print_exception
if tp.TYPE_CHECKING:
from importlib.metadata import Distribution, EntryPoint
[docs]
class ExitCode(IntEnum):
OK = 0
NOT_FOUND = 1
INIT_FAILED = 2
[docs]
class XontribNotInstalled(Exception):
"""raised when the requested xontrib is not found"""
[docs]
class Xontrib(tp.NamedTuple):
"""Meta class that is used to describe a xontrib"""
module: str
"""path to the xontrib module"""
distribution: "Distribution | None" = None
"""short description about the xontrib."""
[docs]
def get_description(self):
if self.distribution and (
summary := self.distribution.metadata.get("Summary", "")
):
return summary
return get_module_docstring(self.module)
@property
def url(self):
if self.distribution:
return self.distribution.metadata.get("Home-page", "")
return ""
@property
def license(self):
if self.distribution:
return self.distribution.metadata.get("License", "")
return ""
@property
def is_loaded(self):
return self.module and self.module in sys.modules
@property
def is_auto_loaded(self):
loaded = getattr(XSH.builtins, "autoloaded_xontribs", None) or {}
return self.module in set(loaded.values())
[docs]
def get_module_docstring(module: str) -> str:
"""Find the module and return its docstring without actual import"""
import ast
spec = importlib.util.find_spec(module)
if spec and spec.has_location and spec.origin:
return ast.get_docstring(ast.parse(Path(spec.origin).read_text())) or ""
# Fall back for ``xontrib.<name>`` modules that ship as ``.xsh`` files —
# importlib's standard finders don't know about ``.xsh`` and return
# ``None`` here.
path = _find_xontrib_file(module)
if path is not None and path.suffix == ".py":
with contextlib.suppress(SyntaxError, OSError):
return ast.get_docstring(ast.parse(path.read_text())) or ""
return ""
def _find_xontrib_file(module: str) -> "Path | None":
"""Locate the source file for a ``xontrib.<name>`` module.
Handles ``.py``, ``.xsh``, and package-style xontribs by scanning the
physical locations of the ``xontrib`` namespace package directly. Used
as a fallback for cases that ``importlib.util.find_spec`` cannot
resolve — most notably ``.xsh`` xontribs (the standard import
machinery doesn't recognize the extension).
"""
if not module.startswith("xontrib."):
return None
name = module.removeprefix("xontrib.")
spec = importlib.util.find_spec("xontrib")
if spec is None or spec.submodule_search_locations is None:
return None
for loc in spec.submodule_search_locations:
base = Path(loc)
for candidate in (base / f"{name}.py", base / f"{name}.xsh"):
if candidate.exists():
return candidate
pkg = base / name / "__init__.py"
if pkg.exists():
return pkg
return None
[docs]
def get_xontribs() -> dict[str, Xontrib]:
"""Return xontrib definitions lazily."""
return dict(_get_installed_xontribs())
def _patch_in_userdir():
"""
Patch in user site packages directory.
If xonsh is installed in non-writeable location, then xontribs will end up
there, so we make them accessible."""
if not os.access(os.path.dirname(sys.executable), os.W_OK):
from site import getusersitepackages
if (user_site_packages := getusersitepackages()) not in set(sys.path):
sys.path.append(user_site_packages)
def _get_installed_xontribs(pkg_name="xontrib"):
"""List all core packages + newly installed xontribs"""
_patch_in_userdir()
spec = importlib.util.find_spec(pkg_name)
def iter_paths():
if spec is None or spec.submodule_search_locations is None:
return
for loc in spec.submodule_search_locations:
path = Path(loc)
if path.exists():
yield from path.iterdir()
def iter_modules():
# pkgutil is not finding `*.xsh` files
for path in iter_paths():
if path.suffix in {".py", ".xsh"}:
yield path.stem
elif path.is_dir():
if (path / "__init__.py").exists():
yield path.name
for name in iter_modules():
module = f"xontrib.{name}"
yield name, Xontrib(module)
for entry in _get_xontrib_entrypoints():
yield entry.name, Xontrib(entry.value, distribution=entry.dist)
def _find_xontrib_entrypoint(name):
"""Return the ``xonsh.xontribs`` entry point for ``name`` or ``None``.
Reads the live setuptools entry-point registry rather than the
``XSH.builtins.autoloaded_xontribs`` cache, so xontribs whose only
Python-visible name is an entry point (the wheel ships no
``xontrib/<name>.py``) are discoverable even when autoload did not
run — e.g. ``xonsh --no-rc`` or ``$XONTRIBS_AUTOLOAD_DISABLED``.
"""
for entry in _get_xontrib_entrypoints():
if entry.name == name:
return entry
return None
[docs]
def find_xontrib(name, full_module=False):
"""Finds a xontribution from its name."""
_patch_in_userdir()
# Order matters. Try the cheap, exact paths first; fall through to
# broader matches only when the previous lookup did not find anything.
if name.startswith("."):
return importlib.util.find_spec(name, package="xontrib")
if full_module:
return importlib.util.find_spec(name)
# 1. Cache populated by ``auto_load_xontribs_from_entrypoints`` at
# startup. This is the common interactive-shell path.
autoloaded = getattr(XSH.builtins, "autoloaded_xontribs", None) or {}
if name in autoloaded:
return importlib.util.find_spec(autoloaded[name])
# 2. Live entry-point lookup. Required when autoload did not run
# (``--no-rc``, ``$XONTRIBS_AUTOLOAD_DISABLED``, embedded use,
# xontrib installed mid-session): without this step the only
# Python-visible mapping for entry-point-only xontribs (the
# ``coconut`` xontrib being the canonical example — its loader
# lives in ``coconut.integrations`` and the wheel ships no
# ``xontrib/coconut.py``) is unreachable from ``xontrib load``.
entry = _find_xontrib_entrypoint(name)
if entry is not None:
return importlib.util.find_spec(entry.value)
# 3. Legacy ``xontrib.<name>`` namespace-package layout. ``find_spec``
# only raises ``ValueError`` when ``xontrib`` is not a package at
# all; otherwise it returns ``None`` for an absent submodule, in
# which case we must continue to the top-level fallback rather
# than returning that ``None``. (The pre-fix code returned ``None``
# here and never reached step 4.)
spec = None
with contextlib.suppress(ValueError):
spec = importlib.util.find_spec("." + name, package="xontrib")
if spec is not None:
return spec
# 4. Top-level module fallback (``import <name>``).
return importlib.util.find_spec(name)
[docs]
def xontrib_context(name, full_module=False):
"""Return a context dictionary for a xontrib of a given name."""
spec = find_xontrib(name, full_module)
if spec is None:
return None
module = importlib.import_module(spec.name)
ctx = {}
def _get__all__():
pubnames = getattr(module, "__all__", None)
if pubnames is None:
for k in dir(module):
if not k.startswith("_"):
yield k, getattr(module, k)
else:
for attr in pubnames:
yield attr, getattr(module, attr)
entrypoint = getattr(module, "_load_xontrib_", None)
if entrypoint is None:
ctx.update(dict(_get__all__()))
else:
result = entrypoint(xsh=XSH)
if result is not None:
ctx.update(result)
return ctx
[docs]
def prompt_xontrib_install(names: list[str]):
"""Returns a formatted string with name of xontrib package to prompt user"""
return (
"The following xontribs are enabled but not installed: \n"
f" {names}\n"
"Please make sure that they are installed correctly by checking https://xonsh.github.io/awesome-xontribs/\n"
)
[docs]
def update_context(name, ctx: dict, full_module=False):
"""Updates a context in place from a xontrib."""
modctx = xontrib_context(name, full_module)
if modctx is None:
raise XontribNotInstalled(f"Xontrib - {name} is not found.")
else:
ctx.update(modctx)
return ctx
def _xontrib_name_completions(loaded=False):
for name, xontrib in get_xontribs().items():
if xontrib.is_loaded is loaded:
yield RichCompletion(
name, append_space=True, description=xontrib.get_description()
)
[docs]
def xontrib_names_completer(**_):
yield from _xontrib_name_completions(loaded=False)
[docs]
def xontrib_unload_completer(**_):
yield from _xontrib_name_completions(loaded=True)
[docs]
def xontrib_any_completer(**_):
for name, xontrib in get_xontribs().items():
yield RichCompletion(
name, append_space=True, description=xontrib.get_description()
)
[docs]
def xontribs_load(
names: Annotated[
tp.Sequence[str],
Arg(nargs="+", completer=xontrib_names_completer),
] = (),
verbose=False,
full_module=False,
suppress_warnings=False,
):
"""Load xontribs from a list of names
Parameters
----------
names
names of xontribs
verbose : -v, --verbose
verbose output
full_module : -f, --full
indicates that the names are fully qualified module paths and not inside ``xontrib`` package
suppress_warnings : -s, --suppress-warnings
no warnings about missing xontribs and return code 0
"""
ctx = {} if XSH.ctx is None else XSH.ctx
res = ExitCode.OK
stdout = None
stderr = None
bad_imports = []
for name in names:
if verbose:
print(f"loading xontrib {name!r}")
try:
update_context(name, ctx=ctx, full_module=full_module)
except XontribNotInstalled:
if not suppress_warnings:
bad_imports.append(name)
except Exception:
res = ExitCode.INIT_FAILED
print_exception(f"Failed to load xontrib {name}.")
if bad_imports:
res = ExitCode.NOT_FOUND
stderr = prompt_xontrib_install(bad_imports)
return stdout, stderr, res
[docs]
def xontribs_unload(
names: Annotated[
tp.Sequence[str],
Arg(nargs="+", completer=xontrib_unload_completer),
] = (),
verbose=False,
):
"""Unload the given xontribs (requires ``_unload_xontrib_`` for full cleanup)
Parameters
----------
names
name of xontribs to unload
verbose : -v, --verbose
verbose output
Notes
-----
The xontrib must implement ``_unload_xontrib_()`` for proper cleanup.
Without it, registered event handlers, env vars, aliases, and completers
will remain active. The default is equivalent to ``del sys.modules[module]``.
"""
for name in names:
if verbose:
print(f"unloading xontrib {name!r}")
spec = find_xontrib(name)
try:
if spec and spec.name in sys.modules:
module = sys.modules[spec.name]
unloader = getattr(module, "_unload_xontrib_", None)
if unloader is not None:
unloader(XSH)
del sys.modules[spec.name]
except Exception as ex:
print_exception(f"Failed to unload xontrib {name} ({ex})")
[docs]
def xontribs_reload(
names: Annotated[
tp.Sequence[str],
Arg(nargs="+", completer=xontrib_unload_completer),
] = (),
verbose=False,
):
"""Reload the given xontribs (requires ``_unload_xontrib_`` for full cleanup)
Parameters
----------
names
name of xontribs to reload
verbose : -v, --verbose
verbose output
"""
for name in names:
if verbose:
print(f"reloading xontrib {name!r}")
xontribs_unload([name])
xontribs_load([name])
[docs]
def xontribs_info(
name: Annotated[str, Arg(completer=xontrib_any_completer)],
_stdout=None,
):
"""Show details about an installed xontrib.
Parameters
----------
name
name of the xontrib
"""
xontribs = get_xontribs()
xontrib = xontribs.get(name)
if xontrib is None:
# Fall back to a direct module lookup so users can inspect xontribs
# that aren't surfaced via the ``xontrib`` package or the
# ``xonsh.xontribs`` entry-point group (e.g. ``xontrib info
# some.module --full`` style lookups in the future).
spec = find_xontrib(name)
if spec is None:
print_color(
"{RED}Xontrib " + name + " is not installed.{RESET}", file=_stdout
)
return ExitCode.NOT_FOUND
xontrib = Xontrib(spec.name)
spec = importlib.util.find_spec(xontrib.module)
origin = spec.origin if spec is not None else None
if not origin:
# ``.xsh`` xontribs (and other non-importable modules) are not
# surfaced by ``find_spec`` — fall back to scanning the namespace
# package locations directly.
path = _find_xontrib_file(xontrib.module)
origin = str(path) if path is not None else "(builtin)"
source = xontrib.module
if xontrib.distribution is not None:
dist_name = xontrib.distribution.metadata.get("Name", "") or ""
dist_version = xontrib.distribution.metadata.get("Version", "") or ""
if dist_name:
source += f" ({dist_name} {dist_version})".rstrip()
source += f" at {origin}"
description = xontrib.get_description() or ""
lines = [
"{PURPLE}Name{RESET}: " + name,
"{PURPLE}Source{RESET}: " + source,
"{PURPLE}Description{RESET}: " + description,
]
if xontrib.url:
lines.append("{PURPLE}URL{RESET}: " + xontrib.url)
if xontrib.license:
lines.append("{PURPLE}License{RESET}: " + xontrib.license)
lines.append(
"{PURPLE}Loaded{RESET}: "
+ ("{GREEN}yes{RESET}" if xontrib.is_loaded else "{RED}no{RESET}")
+ (" {GREEN}(auto){RESET}" if xontrib.is_auto_loaded else "")
)
print_color("\n".join(lines), file=_stdout)
return ExitCode.OK
[docs]
def xontrib_data():
"""Collects and returns the data about installed xontribs."""
data = {}
for xo_name, xontrib in get_xontribs().items():
desc = xontrib.get_description()
try:
max_desc = os.get_terminal_size().columns - 40
except (OSError, ValueError):
max_desc = 60
max_desc = max(max_desc, 20)
short_desc = desc.split("\n")[0][:max_desc] if desc else ""
data[xo_name] = {
"name": xo_name,
"loaded": xontrib.is_loaded,
"auto": xontrib.is_auto_loaded,
"module": xontrib.module,
"description": short_desc,
}
return dict(sorted(data.items()))
[docs]
def xontribs_loaded():
"""Returns list of loaded xontribs."""
return [k for k, xontrib in get_xontribs().items() if xontrib.is_loaded]
[docs]
def xontribs_list(to_json=False, _stdout=None):
"""List installed xontribs and show whether they are loaded or not
Parameters
----------
to_json : -j, --json
reports results as json
"""
data = xontrib_data()
if to_json:
s = json.dumps(data)
return s
else:
nname = max([6] + [len(x) for x in data])
s = ""
for name, d in data.items():
s += "{PURPLE}" + name + "{RESET} " + " " * (nname - len(name))
if d["loaded"]:
s += "{GREEN}loaded{RESET}" + " " * 4
if d["auto"]:
s += " {GREEN}auto{RESET}"
elif d["loaded"]:
s += " {CYAN}manual{RESET}"
else:
s += "{RED}not-loaded{RESET}" + " " * 8
if d.get("description"):
s += " " + d["description"]
s += "\n"
print_color(s[:-1], file=_stdout)
def _get_xontrib_entrypoints() -> "tp.Iterable[EntryPoint]":
from importlib import metadata
name = "xonsh.xontribs"
entries = metadata.entry_points()
# for some reason, on CI (win py3.8) atleast, returns dict
group = (
entries.select(group=name)
if hasattr(entries, "select")
else entries.get(name, []) # type: ignore
)
yield from group
[docs]
def auto_load_xontribs_from_entrypoints(
blocked: "tp.Sequence[str]" = (), verbose=False
):
"""Load xontrib modules exposed via setuptools's entrypoints"""
if not hasattr(XSH.builtins, "autoloaded_xontribs"):
XSH.builtins.autoloaded_xontribs = {}
def get_loadable():
for entry in _get_xontrib_entrypoints():
if entry.name not in blocked:
XSH.builtins.autoloaded_xontribs[entry.name] = entry.value
yield entry.value
modules = list(get_loadable())
return xontribs_load(modules, verbose=verbose, full_module=True)
[docs]
class XontribAlias(ArgParserAlias):
"""Manage xonsh extensions"""
[docs]
def build(self):
parser = self.create_parser(prog="xontrib")
parser.add_command(xontribs_load, prog="load")
parser.add_command(xontribs_unload, prog="unload")
parser.add_command(xontribs_reload, prog="reload")
parser.add_command(xontribs_list, prog="list", default=True)
parser.add_command(xontribs_info, prog="info")
return parser
xontribs_main = XontribAlias(threadable=False)