Source code for xonsh.completers.tools

"""Xonsh completer tools."""

import inspect
import os
import shlex
import subprocess
import textwrap
import typing as tp
from functools import wraps

import xonsh.tools as xt
from xonsh.built_ins import XSH
from xonsh.lib.lazyasd import lazyobject
from xonsh.parsers.completion_context import CommandContext, CompletionContext


def _filter_with_func(text, prefix, func):
    if isinstance(text, RichCompletion) and text.display:
        parts = [p.strip() for p in text.display.split(",")]
        return any(map(lambda part: func(part.strip(), prefix), parts))
    return func(text, prefix)


def _filter_normal(text, prefix):
    return _filter_with_func(text, prefix, str.startswith)


def _filter_ignorecase(text, prefix):
    func = lambda txt, pre: txt.lower().startswith(pre.lower())
    return _filter_with_func(text, prefix, func)


[docs] def get_filter_function(): """ Return an appropriate filtering function for completions, given the valid of $CASE_SENSITIVE_COMPLETIONS """ csc = XSH.env.get("CASE_SENSITIVE_COMPLETIONS") if csc: return _filter_normal else: return _filter_ignorecase
[docs] def justify(s, max_length, left_pad=0): """ Re-wrap the string s so that each line is no more than max_length characters long, padding all lines but the first on the left with the string left_pad. """ txt = textwrap.wrap(s, width=max_length, subsequent_indent=" " * left_pad) return "\n".join(txt)
[docs] class RichCompletion(str): """A rich completion that completers can return instead of a string""" def __new__(cls, value, *args, **kwargs): completion = super().__new__(cls, value) # ``str``'s ``__new__`` doesn't call ``__init__``, so we'll call it ourselves cls.__init__(completion, value, *args, **kwargs) return completion def __init__( self, value: str, prefix_len: tp.Optional[int] = None, display: tp.Optional[str] = None, description: str = "", style: str = "", append_closing_quote: bool = True, append_space: bool = False, ): """ Parameters ---------- value : The completion's actual value. prefix_len : Length of the prefix to be replaced in the completion. If None, the default prefix len will be used. display : Text to display in completion option list instead of ``value``. NOTE: If supplied, the common prefix with other completions won't be removed. description : Extra text to display when the completion is selected. style : Style to pass to prompt-toolkit's ``Completion`` object. append_closing_quote : Whether to append a closing quote to the completion if the cursor is after it. See ``Completer.complete`` in ``xonsh/completer.py`` append_space : Whether to append a space after the completion. This is intended to work with ``appending_closing_quote``, so the space will be added correctly **after** the closing quote. This is used in ``Completer.complete``. An extra bonus is that the space won't show up in the ``display`` attribute. """ super().__init__() self.prefix_len = prefix_len self.display = display self.description = description self.style = style self.append_closing_quote = append_closing_quote self.append_space = append_space @property def value(self): return str(self) def __repr__(self): # don't print default values attrs = ", ".join( f"{name}={getattr(self, name)!r}" for name, default in RICH_COMPLETION_DEFAULTS if getattr(self, name) != default ) return f"RichCompletion({self.value!r}, {attrs})"
[docs] def replace(self, **kwargs): """Create a new RichCompletion with replaced attributes""" default_kwargs = dict( value=self.value, **self.__dict__, ) default_kwargs.update(kwargs) return RichCompletion(**default_kwargs)
@lazyobject def RICH_COMPLETION_DEFAULTS(): """The ``__init__`` parameters' default values (excluding ``self`` and ``value``).""" return [ (name, param.default) for name, param in inspect.signature(RichCompletion.__init__).parameters.items() if name not in ("self", "value") ] Completion = tp.Union[RichCompletion, str] CompleterResult = tp.Union[set[Completion], tuple[set[Completion], int], None] ContextualCompleter = tp.Callable[[CompletionContext], CompleterResult]
[docs] def contextual_completer(func: ContextualCompleter): """Decorator for a contextual completer This is used to mark completers that want to use the parsed completion context. See ``xonsh/parsers/completion_context.py``. ``func`` receives a single CompletionContext object. """ func.contextual = True # type: ignore return func
[docs] def is_contextual_completer(func): return getattr(func, "contextual", False)
[docs] def contextual_command_completer(func: tp.Callable[[CommandContext], CompleterResult]): """like ``contextual_completer``, but will only run when completing a command and will directly receive the ``CommandContext`` object """ @contextual_completer @wraps(func) def _completer(context: CompletionContext) -> CompleterResult: if context.command is not None: return func(context.command) return None return _completer
[docs] def contextual_command_completer_for(cmd: str): """like ``contextual_command_completer``, but will only run when completing the ``cmd`` command""" def decor(func: tp.Callable[[CommandContext], CompleterResult]): @contextual_completer @wraps(func) def _completer(context: CompletionContext) -> CompleterResult: if context.command is not None and context.command.completing_command(cmd): return func(context.command) return None return _completer return decor
[docs] def non_exclusive_completer(func): """Decorator for a non-exclusive completer This is used to mark completers that will be collected with other completer's results. """ func.non_exclusive = True # type: ignore return func
[docs] def is_exclusive_completer(func): return not getattr(func, "non_exclusive", False)
[docs] def apply_lprefix(comps, lprefix): if lprefix is None: return comps for comp in comps: if isinstance(comp, RichCompletion): if comp.prefix_len is None: yield comp.replace(prefix_len=lprefix) else: # this comp has a custom prefix len yield comp else: yield RichCompletion(comp, prefix_len=lprefix)
[docs] def completion_from_cmd_output(line: str, append_space=False): line = line.strip() if "\t" in line: cmd, desc = map(str.strip, line.split("\t", maxsplit=1)) else: cmd, desc = line, "" # special treatment for path completions. # not appending space even if it is a single candidate. if cmd.endswith(os.pathsep) or (os.altsep and cmd.endswith(os.altsep)): append_space = False return RichCompletion( cmd, description=desc, append_space=append_space, )
[docs] def sub_proc_get_output(*args, **env_vars: str) -> "tuple[bytes, bool]": env = {} # env.detype is mutable, so update the newly created variable env.update(XSH.env.detype()) env.update(env_vars) # prefer passed env variables out = b"" not_found = False try: out = subprocess.run( args, env=env, stderr=subprocess.DEVNULL, stdout=subprocess.PIPE, ).stdout except FileNotFoundError: not_found = True except Exception as ex: xt.print_exception(f"Failed to get completions from sub-proc: {args} ({ex!r})") return out, not_found
[docs] def complete_from_sub_proc(*args: str, sep=None, filter_prefix=None, **env_vars: str): if sep is None: sep = str.splitlines filter_func = get_filter_function() stdout, _ = sub_proc_get_output(*args, **env_vars) if stdout: output = stdout.decode().strip() if callable(sep): lines = sep(output) else: lines = output.split(sep) # if there is a single completion candidate then maybe it is over append_space = len(lines) == 1 for line in lines: if filter_prefix and (not filter_func(line, filter_prefix)): continue comp = completion_from_cmd_output(line, append_space) yield comp
[docs] def comp_based_completer(ctx: CommandContext, start_index=0, **env: str): """Helper function to complete commands such as ``pip``,``django-admin``,... that use bash's ``complete``""" prefix = ctx.prefix args = [arg.value for arg in ctx.args] if prefix: args.append(prefix) yield from complete_from_sub_proc( *args[: start_index + 1], sep=shlex.split, COMP_WORDS=os.linesep.join(args[start_index:]) + os.linesep, COMP_CWORD=str(ctx.arg_index - start_index), **env, )