"""Xonsh completer tools."""
import inspect
import os
import shlex
import subprocess
import textwrap
import typing as tp
from functools import wraps

import as xt
from xonsh.built_ins import XSH
from xonsh.lazyasd import lazyobject
from xonsh.parsers.completion_context import CommandContext, CompletionContext

def _filter_with_func(text, prefix, func):
    if isinstance(text, RichCompletion) and text.display:
        parts = [p.strip() for p in text.display.split(",")]
        return any(map(lambda part: func(part.strip(), prefix), parts))
    return func(text, prefix)

def _filter_normal(text, prefix):
    return _filter_with_func(text, prefix, str.startswith)

def _filter_ignorecase(text, prefix):
    func = lambda txt, pre: txt.lower().startswith(pre.lower())
    return _filter_with_func(text, prefix, func)

[docs]def get_filter_function(): """ Return an appropriate filtering function for completions, given the valid of $CASE_SENSITIVE_COMPLETIONS """ csc = XSH.env.get("CASE_SENSITIVE_COMPLETIONS") if csc: return _filter_normal else: return _filter_ignorecase
[docs]def justify(s, max_length, left_pad=0): """ Re-wrap the string s so that each line is no more than max_length characters long, padding all lines but the first on the left with the string left_pad. """ txt = textwrap.wrap(s, width=max_length, subsequent_indent=" " * left_pad) return "\n".join(txt)
[docs]class RichCompletion(str): """A rich completion that completers can return instead of a string""" def __new__(cls, value, *args, **kwargs): completion = super().__new__(cls, value) # ``str``'s ``__new__`` doesn't call ``__init__``, so we'll call it ourselves cls.__init__(completion, value, *args, **kwargs) return completion def __init__( self, value: str, prefix_len: tp.Optional[int] = None, display: tp.Optional[str] = None, description: str = "", style: str = "", append_closing_quote: bool = True, append_space: bool = False, ): """ Parameters ---------- value : The completion's actual value. prefix_len : Length of the prefix to be replaced in the completion. If None, the default prefix len will be used. display : Text to display in completion option list instead of ``value``. NOTE: If supplied, the common prefix with other completions won't be removed. description : Extra text to display when the completion is selected. style : Style to pass to prompt-toolkit's ``Completion`` object. append_closing_quote : Whether to append a closing quote to the completion if the cursor is after it. See ``Completer.complete`` in ``xonsh/`` append_space : Whether to append a space after the completion. This is intended to work with ``appending_closing_quote``, so the space will be added correctly **after** the closing quote. This is used in ``Completer.complete``. An extra bonus is that the space won't show up in the ``display`` attribute. """ super().__init__() self.prefix_len = prefix_len self.display = display self.description = description = style self.append_closing_quote = append_closing_quote self.append_space = append_space @property def value(self): return str(self) def __repr__(self): # don't print default values attrs = ", ".join( f"{name}={getattr(self, name)!r}" for name, default in RICH_COMPLETION_DEFAULTS if getattr(self, name) != default ) return f"RichCompletion({self.value!r}, {attrs})"
[docs] def replace(self, **kwargs): """Create a new RichCompletion with replaced attributes""" default_kwargs = dict( value=self.value, **self.__dict__, ) default_kwargs.update(kwargs) return RichCompletion(**default_kwargs)
@lazyobject def RICH_COMPLETION_DEFAULTS(): """The ``__init__`` parameters' default values (excluding ``self`` and ``value``).""" return [ (name, param.default) for name, param in inspect.signature(RichCompletion.__init__).parameters.items() if name not in ("self", "value") ] Completion = tp.Union[RichCompletion, str] CompleterResult = tp.Union[tp.Set[Completion], tp.Tuple[tp.Set[Completion], int], None] ContextualCompleter = tp.Callable[[CompletionContext], CompleterResult]
[docs]def contextual_completer(func: ContextualCompleter): """Decorator for a contextual completer This is used to mark completers that want to use the parsed completion context. See ``xonsh/parsers/``. ``func`` receives a single CompletionContext object. """ func.contextual = True # type: ignore return func
[docs]def is_contextual_completer(func): return getattr(func, "contextual", False)
[docs]def contextual_command_completer(func: tp.Callable[[CommandContext], CompleterResult]): """like ``contextual_completer``, but will only run when completing a command and will directly receive the ``CommandContext`` object""" @contextual_completer @wraps(func) def _completer(context: CompletionContext) -> CompleterResult: if context.command is not None: return func(context.command) return None return _completer
[docs]def contextual_command_completer_for(cmd: str): """like ``contextual_command_completer``, but will only run when completing the ``cmd`` command""" def decor(func: tp.Callable[[CommandContext], CompleterResult]): @contextual_completer @wraps(func) def _completer(context: CompletionContext) -> CompleterResult: if context.command is not None and context.command.completing_command(cmd): return func(context.command) return None return _completer return decor
[docs]def non_exclusive_completer(func): """Decorator for a non-exclusive completer This is used to mark completers that will be collected with other completer's results. """ func.non_exclusive = True # type: ignore return func
[docs]def is_exclusive_completer(func): return not getattr(func, "non_exclusive", False)
[docs]def apply_lprefix(comps, lprefix): if lprefix is None: return comps for comp in comps: if isinstance(comp, RichCompletion): if comp.prefix_len is None: yield comp.replace(prefix_len=lprefix) else: # this comp has a custom prefix len yield comp else: yield RichCompletion(comp, prefix_len=lprefix)
[docs]def completion_from_cmd_output(line: str, append_space=False): line = line.strip() if "\t" in line: cmd, desc = map(str.strip, line.split("\t", maxsplit=1)) else: cmd, desc = line, "" # special treatment for path completions. # not appending space even if it is a single candidate. if cmd.endswith(os.pathsep) or (os.altsep and cmd.endswith(os.altsep)): append_space = False return RichCompletion( cmd, description=desc, append_space=append_space, )
[docs]def sub_proc_get_output(*args, **env_vars: str) -> "tuple[bytes, bool]": env = {} # env.detype is mutable, so update the newly created variable env.update(XSH.env.detype()) env.update(env_vars) # prefer passed env variables out = b"" not_found = False try: out = args, env=env, stderr=subprocess.DEVNULL, stdout=subprocess.PIPE, ).stdout except FileNotFoundError: not_found = True except Exception as ex: xt.print_exception(f"Failed to get completions from sub-proc: {args} ({ex!r})") return out, not_found
[docs]def complete_from_sub_proc(*args: str, sep=None, filter_prefix=None, **env_vars: str): if sep is None: sep = str.splitlines filter_func = get_filter_function() stdout, _ = sub_proc_get_output(*args, **env_vars) if stdout: output = stdout.decode().strip() if callable(sep): lines = sep(output) else: lines = output.split(sep) # if there is a single completion candidate then maybe it is over append_space = len(lines) == 1 for line in lines: if filter_prefix and (not filter_func(line, filter_prefix)): continue comp = completion_from_cmd_output(line, append_space) yield comp
[docs]def comp_based_completer(ctx: CommandContext, start_index=0, **env: str): """Helper function to complete commands such as ``pip``,``django-admin``,... that use bash's ``complete``""" prefix = ctx.prefix args = [arg.value for arg in ctx.args] if prefix: args.append(prefix) yield from complete_from_sub_proc( *args[: start_index + 1], sep=shlex.split, COMP_WORDS=os.linesep.join(args[start_index:]) + os.linesep, COMP_CWORD=str(ctx.arg_index - start_index), **env, )