Source code for xonsh.cli_utils

"""
helper functions and classes to create argparse CLI from functions.

Examples
 please see :py:class:`xonsh.completers.completer.CompleterAlias` class
"""

import argparse as ap
import functools
import inspect
import os
import sys
import typing as tp
from collections import defaultdict
from typing import Annotated

from xonsh.built_ins import XSH
from xonsh.completers.tools import RichCompletion


class ArgCompleter:
    """Gives a structure to the argparse completers"""

    def __call__(self, **kwargs):
        """return dynamic completers for the given action."""
        raise NotImplementedError


[docs] def Arg( *args: str, completer: ArgCompleter | tp.Callable[..., tp.Iterator[str]] | None = None, **kwargs, ): # converting to tuple because of limitation with hashing args in py3.6 # after dropping py36 support, the dict can be returned kwargs["completer"] = completer return args, tuple(kwargs.items())
[docs] class NumpyDoc: """Represent parsed function docstring""" def __init__(self, func, prefix_chars="-", follow_wraps=True): """Parse the function docstring and return its help content Parameters ---------- func a callable/object that holds docstring """ if follow_wraps and isinstance(func, functools.partial): func = func.func doc: str = inspect.getdoc(func) or "" self.description, rest = self.get_func_doc(doc) params, rest = self.get_param_doc(rest) self.params = {} self.flags = {} for head, lines in params.items(): parts = [st.strip() for st in head.split(":")] if len(parts) == 2: name, flag = parts if flag and any(map(flag.startswith, prefix_chars)): self.flags[name] = [st.strip() for st in flag.split(",")] else: name = parts[0] self.params[name] = self.join(lines) self.epilog = self.join(rest)
[docs] @staticmethod def join(lines): # remove any extra noise after parse return inspect.cleandoc(os.linesep.join(lines)).strip()
[docs] @staticmethod def get_func_doc(doc): lines = doc.splitlines() token = "Parameters" if token in lines: idx = lines.index(token) desc = lines[:idx] else: desc = lines idx = len(lines) return NumpyDoc.join(desc), lines[idx + 2 :]
[docs] @staticmethod def get_param_doc(lines: list[str]): docs: dict[str, list[str]] = defaultdict(list) name = None while lines: # check new section by checking next line if len(lines) > 1 and (set(lines[1].strip()) == {"-"}): break lin = lines.pop(0) if not lin: continue if lin.startswith(" ") and name: docs[name].append(lin) else: name = lin return docs, lines
_FUNC_NAME = "_func_" def _get_args_kwargs(annot: tp.Any) -> tuple[tp.Sequence[str], dict[str, tp.Any]]: args, kwargs = [], {} if isinstance(annot, tuple): args, kwargs = annot elif "Annotated[" in str(annot): if hasattr(annot, "__metadata__"): args, kwargs = annot.__metadata__[0] else: from typing import get_args _, (args, kwargs) = get_args(annot) if isinstance(kwargs, tuple): kwargs = dict(kwargs) return args, kwargs
[docs] def add_args( parser: ap.ArgumentParser, func: tp.Callable, allowed_params=None, doc=None, ) -> None: """Using the function's annotation add arguments to the parser basically converts ``def fn(param : Arg(*args, **kw), ...): ...`` -> into equivalent ``parser.add_argument(*args, *kw)`` call. """ # call this function when this sub-command is selected parser.set_defaults(**{_FUNC_NAME: func}) doc = doc or NumpyDoc(func, parser.prefix_chars) sign = inspect.signature(func) for name, param in sign.parameters.items(): if name.startswith("_") or ( allowed_params is not None and name not in allowed_params ): continue flags, kwargs = _get_args_kwargs(param.annotation) if (not flags) and (name in doc.flags): # load from docstring flags = doc.flags.get(name) if flags: # optional argument. eg. --option kwargs.setdefault("dest", name) else: # positional argument flags = [name] # checks for optional positional arg if ( (inspect.Parameter.empty != param.default) and (param.default is None) and ("nargs" not in kwargs) and ("action" not in kwargs) ): kwargs.setdefault("nargs", "?") if inspect.Parameter.empty != param.default: kwargs.setdefault("default", param.default) # for booleans set action automatically if ( flags and isinstance(param.default, bool) and ("action" not in kwargs) and ("type" not in kwargs) ): # opposite of default value act_name = "store_false" if param.default else "store_true" kwargs.setdefault("action", act_name) # help can be set by passing help argument otherwise inferred from docstring kwargs.setdefault("help", doc.params.get(name)) completer = kwargs.pop("completer", None) action = parser.add_argument(*flags, **kwargs) if completer: action.completer = completer # type: ignore action.help = action.help or "" # Don't show default when # 1. None : No value is given for the option # 2. bool : in case of flags the default is opposite of the flag's meaning if ( action.default and (not isinstance(action.default, bool)) and ("%(default)s" not in action.help) ): action.help += os.linesep + " (default: '%(default)s')" if action.type and "%(type)s" not in action.help: action.help += " (type: %(type)s)"
[docs] def make_parser( func: tp.Callable | str, empty_help=False, **kwargs, ) -> "ArgParser": """A bare-bones argparse builder from functions""" doc = NumpyDoc(func) if "description" not in kwargs: kwargs["description"] = doc.description if "epilog" not in kwargs: if doc.epilog: kwargs["epilog"] = doc.epilog parser = ArgParser(**kwargs) if empty_help: parser.default_command = "--help" return parser
class RstHelpFormatter(ap.RawTextHelpFormatter): """Highlight help string as rst""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) from pygments.formatters.terminal import TerminalFormatter self.formatter = TerminalFormatter() def start_section(self, heading) -> None: from pygments.token import Generic heading = self.colorize((Generic.Heading, heading)) return super().start_section(heading) def _get_help_string(self, action) -> str: return self.markup_rst(action.help) def colorize(self, *tokens: tuple) -> str: from pygments import format return format(tokens, self.formatter) def markup_rst(self, text): from pygments import highlight from pygments.lexers.markup import RstLexer return highlight(text, RstLexer(), self.formatter) def _format_text(self, text): text = super()._format_text(text) if text: text = self.markup_rst(text) return text def _format_usage(self, usage, actions, groups, prefix): from pygments.token import Generic, Name text = super()._format_usage(usage, actions, groups, prefix) parts = text.split(self._prog, maxsplit=1) if len(parts) == 2 and all(parts): text = self.colorize( (Generic.Heading, parts[0]), (Name.Function, self._prog), (Name.Attribute, parts[1]), # from _format_actions_usage ) return text def _format_action_invocation(self, action): from pygments.token import Name text = super()._format_action_invocation(action) return self.colorize((Name.Attribute, text)) def get_argparse_formatter_class(): from xonsh.platform import HAS_PYGMENTS if ( hasattr(sys, "stderr") and sys.stderr.isatty() and XSH.env.get("XONSH_INTERACTIVE") and HAS_PYGMENTS ): return RstHelpFormatter return ap.RawTextHelpFormatter
[docs] class ArgParser(ap.ArgumentParser): """Sub-class of ArgumentParser with special methods to nest commands""" def __init__(self, **kwargs): if "formatter_class" not in kwargs: kwargs["formatter_class"] = get_argparse_formatter_class() super().__init__(**kwargs) self.commands = None self.default_command = None
[docs] def add_command( self, func: tp.Callable, default=False, args: "tuple[str, ...] | None" = None, **kwargs, ): """ create a sub-parser and call this function during dispatch Parameters ---------- func a type-annotated function that will be used to create ArgumentParser instance. All parameters that start with ``_`` will not be added to parser arguments. Use _stdout, _stack ... to receive them from callable-alias/commands. Use _parser to get the generated parser instance. Use _args to get what is passed from sys.argv Use _parsed to get result of ``parser.parse_args`` default Marks this sub-command as the default command for this parser. args if given only add these arguments to the parser. Otherwise all parameters to the function without `_` prefixed in their name gets added to the parser. kwargs passed to ``subparser.add_parser`` call Returns ------- result from ``subparser.add_parser`` """ if not self.commands: self.commands = self.add_subparsers(title="commands", dest="command") doc = NumpyDoc(func) kwargs.setdefault("description", doc.description) kwargs.setdefault("help", doc.description) name = kwargs.pop("prog", None) if not name: name = func.__name__.lstrip("_").replace("_", "-") if default: self.default_command = name parser = self.commands.add_parser(name, **kwargs) add_args(parser, func, allowed_params=args, doc=doc) return parser
def _parse_known_args( self, arg_strings: list[str], namespace: ap.Namespace, *args, **kwargs ): arg_set = set(arg_strings) if ( self.commands and self.default_command and ({"-h", "--help"}.isdisjoint(arg_set)) and (set(self.commands.choices).isdisjoint(arg_set)) ): arg_strings = [self.default_command] + arg_strings return super()._parse_known_args(arg_strings, namespace, *args, **kwargs)
def run_with_partial_args(func: tp.Callable, ns: dict[str, tp.Any]): """Run function based on signature. Filling the arguments will be based on the values in ``ns``.""" sign = inspect.signature(func) kwargs = {} for name, param in sign.parameters.items(): default = None # sometimes the args are skipped in the parser. # like ones having _ prefix(private to the function), or some special cases like exclusive group. # it is better to fill the defaults from paramspec when available. if param.default != inspect.Parameter.empty: default = param.default kwargs[name] = ns.get(name, default) return func(**kwargs)
[docs] def dispatch(parser: ap.ArgumentParser, args=None, lenient=False, **ns): """Call the underlying function with arguments parsed from sys.argv Parameters ---------- parser root parser args sys.argv as parsed by Alias lenient if True, then use parser_know_args and pass the extra arguments as `_unparsed` ns a dict that will be passed to underlying function """ ns.setdefault("_parser", parser) ns.setdefault("_args", args) if lenient: parsed, unparsed = parser.parse_known_args(args) ns["_unparsed"] = unparsed else: parsed = parser.parse_args(args) ns["_parsed"] = parsed ns.update(vars(parsed)) if _FUNC_NAME in ns: func = ns[_FUNC_NAME] return run_with_partial_args(func, ns)
[docs] class ArgparseCompleter: """A completer function for ArgParserAlias commands""" def __init__(self, parser: ap.ArgumentParser, command, **kwargs): args = tuple(c.value for c in command.args[: command.arg_index]) self.parser, self.remaining_args = self.get_parser(parser, args[1:]) self.long_opts_only = XSH.env.get("ALIAS_COMPLETIONS_OPTIONS_LONGEST", False) self.command = command kwargs["command"] = command # will be sent to completer function self.kwargs = kwargs
[docs] @staticmethod def get_parser(parser, args) -> tuple[ap.ArgumentParser, tuple[str, ...]]: """Check for sub-parsers""" sub_parsers = {} for act in parser._get_positional_actions(): if act.nargs == ap.PARSER: sub_parsers = act.choices # there should be only one subparser if sub_parsers: for idx, pos in enumerate(args): if pos in sub_parsers: # get the correct parser return ArgparseCompleter.get_parser( sub_parsers[pos], args[idx + 1 :] ) # base scenario return parser, args
[docs] def filled(self, act: ap.Action) -> int: """Consume remaining_args for the given action""" args_len = 0 for arg in self.remaining_args: if arg and arg[0] in self.parser.prefix_chars: # stop when other --option explicitly given break args_len += 1 nargs = ( act.nargs if isinstance(act.nargs, int) else args_len + 1 if act.nargs in {ap.ONE_OR_MORE, ap.ZERO_OR_MORE} else 1 ) if len(self.remaining_args) >= nargs: # consume n-number of args self.remaining_args = self.remaining_args[nargs:] # complete for next action return True return False
def _complete(self, act: ap.Action, **kwargs): if hasattr(act, "completer") and callable(act.completer): # type: ignore # call the completer function kwargs.update(self.kwargs) yield from act.completer(xsh=XSH, action=act, completer=self, **kwargs) # type: ignore if ( hasattr(act, "choices") and act.choices and not isinstance(act.choices, dict) ): # any sequence or iterable yield from act.choices def _complete_pos(self, act): # even subparserAction can have completer attribute set yield from self._complete(act) if isinstance(act.choices, dict): # sub-parsers for choice, sub_parser in act.choices.items(): yield RichCompletion( choice, description=sub_parser.description or "", append_space=True, )
[docs] def complete(self): # options will come before/after positionals options = {act: None for act in self.parser._get_optional_actions()} # remove options that are already filled opt_completions = self._complete_options(options) if opt_completions: yield from opt_completions return for act in self.parser._get_positional_actions(): # number of arguments it consumes if self.filled(act): continue yield from self._complete_pos(act) # close after a valid positional arg completion break opt_completions = self._complete_options(options) if opt_completions: yield from opt_completions return # complete remaining options only if requested or enabled show_opts = XSH.env.get("ALIAS_COMPLETIONS_OPTIONS_BY_DEFAULT", False) if not show_opts: if not ( self.command.prefix and self.command.prefix[0] in self.parser.prefix_chars ): return # in the end after positionals show remaining unfilled options for act in options: for flag in sorted(act.option_strings, key=len, reverse=True): desc = "" if act.help: formatter = self.parser._get_formatter() try: desc = formatter._expand_help(act) except KeyError: desc = act.help yield RichCompletion(flag, description=desc) if self.long_opts_only: break
def _complete_options(self, options): while self.remaining_args: arg = self.remaining_args[0] act_res = self.parser._parse_optional(arg) if not act_res: # it is not a option string: pass break if isinstance(act_res, list): assert len(act_res) == 1 act_res = act_res[0] # it is a valid option and advance self.remaining_args = self.remaining_args[1:] act, *_, value = act_res # remove the found option # todo: not remove if append/extend options.pop(act, None) if self.filled(act): continue # stop suggestion until current option is complete return self._complete(act)
[docs] class ArgParserAlias: """Provides a structure to the Alias. The parser is lazily loaded. can help create ``argparse.ArgumentParser`` parser from function signature and dispatch the functions. Examples --------- For usage please check :py:mod:`xonsh.completers.completer` """
[docs] class Error(Exception): """Special case, when raised, the traceback will not be shown. Instead the process with exit with error code and message""" def __init__(self, message: str, errno=1): super().__init__(message) self.errno = errno
def __init__(self, threadable=True, **kwargs) -> None: if not threadable: from xonsh.tools import unthreadable unthreadable(self) self._parser = None self.kwargs = kwargs self.stdout = None self.stderr = None
[docs] def build(self) -> "ArgParser": """Sub-classes should return constructed ArgumentParser""" if self.kwargs: return self.create_parser(**self.kwargs) raise NotImplementedError
@property def parser(self): if self._parser is None: self._parser = self.build() return self._parser
[docs] def create_parser( self, func=None, has_args=False, allowed_params=None, **kwargs ) -> "ArgParser": """create root parser""" func = func or self has_args = has_args or bool(allowed_params) if has_args: kwargs.setdefault("empty_help", False) parser = make_parser(func, **kwargs) if has_args: add_args(parser, func, allowed_params=allowed_params) return parser
[docs] def xonsh_complete(self, command, **kwargs): completer = ArgparseCompleter(self.parser, command=command, **kwargs) yield from completer.complete()
[docs] def write_to(self, stream: str, *args, **kwargs): value = getattr(self, stream) out = getattr(sys, stream) if value is None else value kwargs.setdefault("file", out) print(*args, **kwargs)
[docs] def err(self, *args, **kwargs): """Write text to error stream""" self.write_to("stderr", *args, **kwargs)
[docs] def out(self, *args, **kwargs): """Write text to output stream""" self.write_to("stdout", *args, **kwargs)
def __call__( self, args, stdin=None, stdout=None, stderr=None, spec=None, stack=None, **kwargs, ): self.stdout = stdout self.stderr = stderr try: result = dispatch( self.parser, args, _stdin=stdin, _stdout=stdout, _stderr=stderr, _spec=spec, _stack=stack, **kwargs, ) except self.Error as ex: self.err(f"Error: {ex}") sys.exit(getattr(ex, "errno", 1)) finally: # free the reference to input/output. Otherwise it will result in errors self.stdout = None self.stderr = None return result
__all__ = ( "Arg", "ArgParserAlias", "ArgparseCompleter", "Annotated", "ArgParser", "make_parser", "add_args", "NumpyDoc", "dispatch", )