"""The readline based xonsh shell.
Portions of this code related to initializing the readline library
are included from the IPython project.  The IPython project is:
* Copyright (c) 2008-2014, IPython Development Team
* Copyright (c) 2001-2007, Fernando Perez <fernando.perez@colorado.edu>
* Copyright (c) 2001, Janko Hauser <jhauser@zscout.de>
* Copyright (c) 2001, Nathaniel Gray <n8gray@caltech.edu>
"""
import cmd
import collections
import importlib
import os
import select
import shutil
import sys
import threading
import typing as tp
import xonsh.completers.tools as xct
from xonsh.ansi_colors import (
    ansi_color_style,
    ansi_color_style_names,
    ansi_partial_color_format,
)
from xonsh.base_shell import BaseShell
from xonsh.built_ins import XSH
from xonsh.events import events
from xonsh.lazyasd import LazyObject, lazyobject
from xonsh.lazyimps import pyghooks, pygments, winutils
from xonsh.platform import (
    ON_CYGWIN,
    ON_DARWIN,
    ON_MSYS,
    ON_POSIX,
    ON_WINDOWS,
    os_environ,
)
from xonsh.prompt.base import multiline_prompt
from xonsh.tools import (
    carriage_return,
    columnize,
    ends_with_colon_token,
    print_exception,
    to_bool,
)
if tp.TYPE_CHECKING:
    from types import ModuleType
readline: "ModuleType|None" = None
RL_COMPLETION_SUPPRESS_APPEND = RL_LIB = RL_STATE = None  # type: tp.Any
RL_COMPLETION_QUERY_ITEMS: "tp.Any" = None
RL_CAN_RESIZE = False
RL_VARIABLE_VALUE: "tp.Callable[..., tp.Any]|None" = None
_RL_STATE_DONE = 0x1000000
_RL_STATE_ISEARCH = 0x0000080
_RL_PREV_CASE_SENSITIVE_COMPLETIONS = "to-be-set"
[docs]
def setup_readline():
    """Sets up the readline module and completion suppression, if available."""
    global \
        
RL_COMPLETION_SUPPRESS_APPEND, \
        
RL_LIB, \
        
RL_CAN_RESIZE, \
        
RL_STATE, \
        
readline, \
        
RL_COMPLETION_QUERY_ITEMS
    if RL_COMPLETION_SUPPRESS_APPEND is not None:
        return
    for _rlmod_name in ("gnureadline", "readline"):
        try:
            readline = importlib.import_module(_rlmod_name)
            sys.modules["readline"] = readline
        except ImportError:
            pass
        else:
            break
    if readline is None:
        print(
            """Skipping setup. Because no `readline` implementation available.
            Please install a backend (`readline`, `prompt-toolkit`, etc) to use
            `xonsh` interactively.
            See https://github.com/xonsh/xonsh/issues/1170"""
        )
        return
    import ctypes
    import ctypes.util
    uses_libedit = readline.__doc__ and "libedit" in readline.__doc__
    readline.set_completer_delims(" \t\n")
    # Cygwin seems to hang indefinitely when querying the readline lib
    if (not ON_CYGWIN) and (not ON_MSYS) and (not readline.__file__.endswith(".py")):
        RL_LIB = lib = ctypes.cdll.LoadLibrary(readline.__file__)
        try:
            RL_COMPLETION_SUPPRESS_APPEND = ctypes.c_int.in_dll(
                lib, "rl_completion_suppress_append"
            )
        except ValueError:
            # not all versions of readline have this symbol, ie Macs sometimes
            RL_COMPLETION_SUPPRESS_APPEND = None
        try:
            RL_COMPLETION_QUERY_ITEMS = ctypes.c_int.in_dll(
                lib, "rl_completion_query_items"
            )
        except ValueError:
            # not all versions of readline have this symbol, ie Macs sometimes
            RL_COMPLETION_QUERY_ITEMS = None
        try:
            RL_STATE = ctypes.c_int.in_dll(lib, "rl_readline_state")
        except Exception:
            pass
        RL_CAN_RESIZE = hasattr(lib, "rl_reset_screen_size")
    env = XSH.env
    # reads in history
    readline.set_history_length(-1)
    ReadlineHistoryAdder()
    # sets up IPython-like history matching with up and down
    readline.parse_and_bind('"\\e[B": history-search-forward')
    readline.parse_and_bind('"\\e[A": history-search-backward')
    # Setup Shift-Tab to indent
    readline.parse_and_bind('"\\e[Z": "{}"'.format(env.get("INDENT")))
    # handle tab completion differences found in libedit readline compatibility
    # as discussed at http://stackoverflow.com/a/7116997
    if uses_libedit and ON_DARWIN:
        readline.parse_and_bind("bind ^I rl_complete")
        print(
            "\n".join(
                [
                    "",
                    "*" * 78,
                    "libedit detected - readline will not be well behaved, including but not limited to:",
                    "   * crashes on tab completion",
                    "   * incorrect history navigation",
                    "   * corrupting long-lines",
                    "   * failure to wrap or indent lines properly",
                    "",
                    "It is highly recommended that you install gnureadline, which is installable with:",
                    "     xpip install gnureadline",
                    "*" * 78,
                ]
            ),
            file=sys.stderr,
        )
    else:
        readline.parse_and_bind("tab: complete")
    # try to load custom user settings
    inputrc_name = os_environ.get("INPUTRC")
    if inputrc_name is None:
        if uses_libedit:
            inputrc_name = ".editrc"
        else:
            inputrc_name = ".inputrc"
        inputrc_name = os.path.join(os.path.expanduser("~"), inputrc_name)
    if (not ON_WINDOWS) and (not os.path.isfile(inputrc_name)):
        inputrc_name = "/etc/inputrc"
    if ON_WINDOWS:
        winutils.enable_virtual_terminal_processing()
    if os.path.isfile(inputrc_name):
        try:
            readline.read_init_file(inputrc_name)
        except Exception:
            # this seems to fail with libedit
            print_exception("xonsh: could not load readline default init file.")
    # Protection against paste jacking (issue #1154)
    # This must be set after the init file is loaded since read_init_file()
    # automatically disables bracketed paste
    # (https://github.com/python/cpython/pull/24108)
    readline.parse_and_bind("set enable-bracketed-paste on")
    # properly reset input typed before the first prompt
    readline.set_startup_hook(carriage_return) 
[docs]
def teardown_readline():
    """Tears down up the readline module, if available."""
    try:
        import readline
    except (ImportError, TypeError):
        return 
def _rebind_case_sensitive_completions():
    # handle case sensitive, see Github issue #1342 for details
    global _RL_PREV_CASE_SENSITIVE_COMPLETIONS
    env = XSH.env
    case_sensitive = env.get("CASE_SENSITIVE_COMPLETIONS")
    if case_sensitive is _RL_PREV_CASE_SENSITIVE_COMPLETIONS:
        return
    if case_sensitive:
        readline.parse_and_bind("set completion-ignore-case off")
    else:
        readline.parse_and_bind("set completion-ignore-case on")
    _RL_PREV_CASE_SENSITIVE_COMPLETIONS = case_sensitive
[docs]
def fix_readline_state_after_ctrl_c():
    """
    Fix to allow Ctrl-C to exit reverse-i-search.
    Based on code from:
        http://bugs.python.org/file39467/raw_input__workaround_demo.py
    """
    if ON_WINDOWS:
        # hack to make pyreadline mimic the desired behavior
        try:
            _q = readline.rl.mode.process_keyevent_queue
            if len(_q) > 1:
                _q.pop()
        except Exception:
            pass
    if RL_STATE is None:
        return
    if RL_STATE.value & _RL_STATE_ISEARCH:
        RL_STATE.value &= ~_RL_STATE_ISEARCH
    if not RL_STATE.value & _RL_STATE_DONE:
        RL_STATE.value |= _RL_STATE_DONE 
[docs]
def rl_completion_suppress_append(val=1):
    """Sets the rl_completion_suppress_append variable, if possible.
    A value of 1 (default) means to suppress, a value of 0 means to enable.
    """
    if RL_COMPLETION_SUPPRESS_APPEND is None:
        return
    RL_COMPLETION_SUPPRESS_APPEND.value = val 
[docs]
def rl_completion_query_items(val=None):
    """Sets the rl_completion_query_items variable, if possible.
    A None value will set this to $COMPLETION_QUERY_LIMIT, otherwise any integer
    is accepted.
    """
    if RL_COMPLETION_QUERY_ITEMS is None:
        return
    if val is None:
        val = XSH.env.get("COMPLETION_QUERY_LIMIT")
    RL_COMPLETION_QUERY_ITEMS.value = val 
[docs]
def rl_variable_dumper(readable=True):
    """Dumps the currently set readline variables. If readable is True, then this
    output may be used in an inputrc file.
    """
    RL_LIB.rl_variable_dumper(int(readable)) 
[docs]
def rl_variable_value(variable):
    """Returns the currently set value for a readline configuration variable."""
    global RL_VARIABLE_VALUE
    if RL_VARIABLE_VALUE is None:
        import ctypes
        RL_VARIABLE_VALUE = RL_LIB.rl_variable_value
        RL_VARIABLE_VALUE.restype = ctypes.c_char_p
    env = XSH.env
    enc, errors = env.get("XONSH_ENCODING"), env.get("XONSH_ENCODING_ERRORS")
    if isinstance(variable, str):
        variable = variable.encode(encoding=enc, errors=errors)
    rtn = RL_VARIABLE_VALUE(variable)
    return rtn.decode(encoding=enc, errors=errors) 
[docs]
@lazyobject
def rl_on_new_line():
    """Grabs one of a few possible redisplay functions in readline."""
    names = ["rl_on_new_line", "rl_forced_update_display", "rl_redisplay"]
    for name in names:
        func = getattr(RL_LIB, name, None)
        if func is not None:
            break
    else:
        def print_for_newline():
            print()
        func = print_for_newline
    return func 
def _insert_text_func(s, readline):
    """Creates a function to insert text via readline."""
    def inserter():
        readline.insert_text(s)
        readline.redisplay()
    return inserter
def _render_completions(completions, prefix, prefix_len):
    """Render the completions according to the required prefix_len.
    Readline will replace the current prefix with the chosen rendered completion.
    """
    chopped = prefix[:-prefix_len] if prefix_len else prefix
    rendered_completions = []
    for comp in completions:
        if isinstance(comp, xct.RichCompletion) and comp.prefix_len is not None:
            if comp.prefix_len:
                comp = prefix[: -comp.prefix_len] + comp
            else:
                comp = prefix + comp
        elif chopped:
            comp = chopped + comp
        rendered_completions.append(comp)
    return rendered_completions
DEDENT_TOKENS = LazyObject(
    lambda: frozenset(["raise", "return", "pass", "break", "continue"]),
    globals(),
    "DEDENT_TOKENS",
)
[docs]
class ReadlineShell(BaseShell, cmd.Cmd):
    """The readline based xonsh shell."""
    def __init__(self, completekey="tab", stdin=None, stdout=None, **kwargs):
        BaseShell.__init__(self, **kwargs)
        # super() doesn't pass the stdin/stdout to Cmd's init method correctly.
        # so calling it explicitly
        cmd.Cmd.__init__(self, completekey=completekey, stdin=stdin, stdout=stdout)
        setup_readline()
        self._current_indent = ""
        self._current_prompt = ""
        self._force_hide = None
        self._complete_only_last_table = {
            # Truth table for completions, keys are:
            # (prefix_begs_quote, prefix_ends_quote, i_ends_quote,
            #  last_starts_with_prefix, i_has_space)
            (True, True, True, True, True): True,
            (True, True, True, True, False): True,
            (True, True, True, False, True): False,
            (True, True, True, False, False): True,
            (True, True, False, True, True): False,
            (True, True, False, True, False): False,
            (True, True, False, False, True): False,
            (True, True, False, False, False): False,
            (True, False, True, True, True): True,
            (True, False, True, True, False): False,
            (True, False, True, False, True): False,
            (True, False, True, False, False): True,
            (True, False, False, True, True): False,
            (True, False, False, True, False): False,
            (True, False, False, False, True): False,
            (True, False, False, False, False): False,
            (False, True, True, True, True): True,
            (False, True, True, True, False): True,
            (False, True, True, False, True): True,
            (False, True, True, False, False): True,
            (False, True, False, True, True): False,
            (False, True, False, True, False): False,
            (False, True, False, False, True): False,
            (False, True, False, False, False): False,
            (False, False, True, True, True): False,
            (False, False, True, True, False): False,
            (False, False, True, False, True): False,
            (False, False, True, False, False): True,
            (False, False, False, True, True): True,
            (False, False, False, True, False): False,
            (False, False, False, False, True): False,
            (False, False, False, False, False): False,
        }
        self.cmdqueue = collections.deque()
    def __del__(self):
        teardown_readline()
[docs]
    def singleline(self, store_in_history=True, **kwargs):
        """Reads a single line of input. The store_in_history kwarg
        flags whether the input should be stored in readline's in-memory
        history.
        """
        if not store_in_history:  # store current position to remove it later
            try:
                import readline
            except ImportError:
                store_in_history = True
            pos = readline.get_current_history_length() - 1
        events.on_pre_prompt_format.fire()
        prompt = self.prompt
        events.on_pre_prompt.fire()
        rtn = input(prompt)
        events.on_post_prompt.fire()
        if not store_in_history and pos >= 0:
            readline.remove_history_item(pos)
        return rtn 
[docs]
    def parseline(self, line):
        """Overridden to no-op."""
        return "", line, line 
    def _querycompletions(self, completions, loc):
        """Returns whether or not we should show completions. 0 means that prefixes
        should not be shown, 1 means that there is a common prefix among all completions
        and they should be shown, while 2 means that there is no common prefix but
        we are under the query limit and they should be shown.
        """
        if os.path.commonprefix([c[loc:] for c in completions]):
            return 1
        elif len(completions) <= XSH.env.get("COMPLETION_QUERY_LIMIT"):
            return 2
        msg = f"\nDisplay all {len(completions)} possibilities? "
        msg += "({GREEN}y{RESET} or {RED}n{RESET})"
        self.print_color(msg, end="", flush=True, file=sys.stderr)
        yn = "x"
        while yn not in "yn":
            yn = sys.stdin.read(1)
        show_completions = to_bool(yn)
        print()
        if not show_completions:
            rl_on_new_line()
            return 0
        w, h = shutil.get_terminal_size()
        lines = columnize(completions, width=w)
        more_msg = self.format_color(
            "{YELLOW}==={RESET} more or "
            "{PURPLE}({RESET}q{PURPLE}){RESET}uit "
            "{YELLOW}==={RESET}"
        )
        while len(lines) > h - 1:
            print("".join(lines[: h - 1]), end="", flush=True, file=sys.stderr)
            lines = lines[h - 1 :]
            print(more_msg, end="", flush=True, file=sys.stderr)
            q = sys.stdin.read(1).lower()
            print(flush=True, file=sys.stderr)
            if q == "q":
                rl_on_new_line()
                return 0
        print("".join(lines), end="", flush=True, file=sys.stderr)
        rl_on_new_line()
        return 0
[docs]
    def completedefault(self, prefix, line, begidx, endidx):
        """Implements tab-completion for text."""
        if self.completer is None:
            return []
        rl_completion_suppress_append()  # this needs to be called each time
        _rebind_case_sensitive_completions()
        rl_completion_query_items(val=999999999)
        prev_text = "".join(self.buffer)
        completions, plen = self.completer.complete(
            prefix,
            line,
            begidx,
            endidx,
            ctx=self.ctx,
            multiline_text=prev_text + line,
            cursor_index=len(prev_text) + endidx,
        )
        rtn_completions = _render_completions(completions, prefix, plen)
        rtn = []
        prefix_begs_quote = prefix.startswith("'") or prefix.startswith('"')
        prefix_ends_quote = prefix.endswith("'") or prefix.endswith('"')
        for i in rtn_completions:
            i_ends_quote = i.endswith("'") or i.endswith('"')
            last = i.rsplit(" ", 1)[-1]
            last_starts_prefix = last.startswith(prefix)
            i_has_space = " " in i
            key = (
                prefix_begs_quote,
                prefix_ends_quote,
                i_ends_quote,
                last_starts_prefix,
                i_has_space,
            )
            rtn.append(last if self._complete_only_last_table[key] else i)
        # return based on show completions
        show_completions = self._querycompletions(completions, endidx - begidx)
        if show_completions == 0:
            return []
        elif show_completions == 1:
            return rtn
        elif show_completions == 2:
            return completions
        else:
            raise ValueError("query completions flag not understood.") 
    # tab complete on first index too
    completenames = completedefault  # type:ignore
    def _load_remaining_input_into_queue(self):
        buf = b""
        while True:
            r, w, x = select.select([self.stdin], [], [], 1e-6)
            if len(r) == 0:
                break
            buf += os.read(self.stdin.fileno(), 1024)
        if len(buf) > 0:
            buf = buf.decode().replace("\r\n", "\n").replace("\r", "\n")
            self.cmdqueue.extend(buf.splitlines(keepends=True))
[docs]
    def postcmd(self, stop, line):
        """Called just before execution of line. For readline, this handles the
        automatic indentation of code blocks.
        """
        try:
            import readline
        except ImportError:
            return stop
        if self.need_more_lines:
            if len(line.strip()) == 0:
                readline.set_pre_input_hook(None)
                self._current_indent = ""
            elif ends_with_colon_token(line):
                ind = line[: len(line) - len(line.lstrip())]
                ind += XSH.env.get("INDENT")
                readline.set_pre_input_hook(_insert_text_func(ind, readline))
                self._current_indent = ind
            elif line.split(maxsplit=1)[0] in DEDENT_TOKENS:
                env = XSH.env
                ind = self._current_indent[: -len(env.get("INDENT"))]
                readline.set_pre_input_hook(_insert_text_func(ind, readline))
                self._current_indent = ind
            else:
                ind = line[: len(line) - len(line.lstrip())]
                if ind != self._current_indent:
                    insert_func = _insert_text_func(ind, readline)
                    readline.set_pre_input_hook(insert_func)
                    self._current_indent = ind
        else:
            readline.set_pre_input_hook(None)
        return stop 
    def _cmdloop(self, intro=None):
        """Repeatedly issue a prompt, accept input, parse an initial prefix
        off the received input, and dispatch to action methods, passing them
        the remainder of the line as argument.
        This was forked from Lib/cmd.py from the Python standard library v3.4.3,
        (C) Python Software Foundation, 2015.
        """
        self.preloop()
        try:
            import readline
            if self.use_rawinput and self.completekey:
                self.old_completer = readline.get_completer()
                readline.set_completer(self.complete)
                readline.parse_and_bind(self.completekey + ": complete")
            have_readline = True
        except ImportError:
            have_readline = False
        try:
            if intro is not None:
                self.intro = intro
            if self.intro:
                self.stdout.write(str(self.intro) + "\n")
            stop = None
            while not stop:
                line = None
                exec_now = False
                if len(self.cmdqueue) > 0:
                    line = self.cmdqueue.popleft()
                    exec_now = line.endswith("\n")
                if self.use_rawinput and not exec_now:
                    inserter = (
                        None if line is None else _insert_text_func(line, readline)
                    )
                    if inserter is not None:
                        readline.set_pre_input_hook(inserter)
                    try:
                        line = self.singleline()
                    except EOFError:
                        if XSH.env.get("IGNOREEOF"):
                            self.stdout.write('Use "exit" to leave the shell.' "\n")
                            line = ""
                        else:
                            line = "EOF"
                    if inserter is not None:
                        readline.set_pre_input_hook(None)
                else:
                    self.print_color(self.prompt, file=self.stdout)
                    if line is not None:
                        os.write(self.stdin.fileno(), line.encode())
                    if not exec_now:
                        line = self.stdin.readline()
                    if len(line) == 0:
                        line = "EOF"
                    else:
                        line = line.rstrip("\r\n")
                    if have_readline and line != "EOF":
                        readline.add_history(line)
                if not ON_WINDOWS:
                    # select() is not fully functional on windows
                    self._load_remaining_input_into_queue()
                line = self.precmd(line)
                stop = self.onecmd(line)
                stop = self.postcmd(stop, line)
                if ON_WINDOWS:
                    winutils.enable_virtual_terminal_processing()
            self.postloop()
        finally:
            if self.use_rawinput and self.completekey:
                try:
                    import readline
                    readline.set_completer(self.old_completer)
                except ImportError:
                    pass
[docs]
    def cmdloop(self, intro=None):
        while not XSH.exit:
            try:
                self._cmdloop(intro=intro)
            except (KeyboardInterrupt, SystemExit) as e:
                print(file=self.stdout)  # Gives a newline
                fix_readline_state_after_ctrl_c()
                self.reset_buffer()
                intro = None
                if isinstance(e, SystemExit):
                    raise 
    @property
    def prompt(self):
        """Obtains the current prompt string."""
        global RL_LIB, RL_CAN_RESIZE
        if RL_CAN_RESIZE:
            # This is needed to support some system where line-wrapping doesn't
            # work. This is a bug in upstream Python, or possibly readline.
            RL_LIB.rl_reset_screen_size()
        if self.need_more_lines:
            if self.mlprompt is None:
                try:
                    self.mlprompt = multiline_prompt(curr=self._current_prompt)
                except Exception:  # pylint: disable=broad-except
                    print_exception()
                    self.mlprompt = "<multiline prompt error> "
            return self.mlprompt
        env = XSH.env  # pylint: disable=no-member
        p = env.get("PROMPT")
        # clear prompt level cache
        env["PROMPT_FIELDS"].reset()
        try:
            p = self.prompt_formatter(p)
        except Exception:  # pylint: disable=broad-except
            print_exception()
        hide = True if self._force_hide is None else self._force_hide
        p = ansi_partial_color_format(p, style=env.get("XONSH_COLOR_STYLE"), hide=hide)
        self._current_prompt = p
        self.settitle()
        return p
[docs]
    def print_color(self, string, hide=False, **kwargs):
        if isinstance(string, str):
            s = self.format_color(string, hide=hide)
        else:
            # assume this is a list of (Token, str) tuples and format it
            env = XSH.env
            style_overrides_env = env.get("XONSH_STYLE_OVERRIDES", {})
            self.styler.style_name = env.get("XONSH_COLOR_STYLE")
            self.styler.override(style_overrides_env)
            style_proxy = pyghooks.xonsh_style_proxy(self.styler)
            formatter = pyghooks.XonshTerminal256Formatter(style=style_proxy)
            s = pygments.format(string, formatter).rstrip()
        print(s, **kwargs) 
[docs]
    def color_style_names(self):
        """Returns an iterable of all available style names."""
        return ansi_color_style_names() 
[docs]
    def color_style(self):
        """Returns the current color map."""
        style = XSH.env.get("XONSH_COLOR_STYLE")
        return ansi_color_style(style=style) 
[docs]
    def restore_tty_sanity(self):
        """An interface for resetting the TTY stdin mode. This is highly
        dependent on the shell backend. Also it is mostly optional since
        it only affects ^Z backgrounding behaviour.
        """
        if not ON_POSIX:
            return
        stty, _ = XSH.commands_cache.lazyget("stty", (None, None))
        if stty is None:
            return
        # If available, we should just call the stty utility. This call should
        # not throw even if stty fails. It should also be noted that subprocess
        # calls, like the following, seem to be ineffective:
        #       subprocess.call([stty, 'sane'], shell=True)
        # My guess is that this is because Popen does some crazy redirecting
        # under the covers. This effectively hides the true TTY stdin handle
        # from stty. To get around this we have to use the lower level
        # os.system() function.
        os.system(stty + " sane") 
 
[docs]
class ReadlineHistoryAdder(threading.Thread):
    def __init__(self, wait_for_gc=True, *args, **kwargs):
        """Thread responsible for adding inputs from history to the
        current readline instance. May wait for the history garbage
        collector to finish.
        """
        super().__init__(*args, **kwargs)
        self.daemon = True
        self.wait_for_gc = wait_for_gc
        self.start()
[docs]
    def run(self):
        try:
            import readline
        except ImportError:
            return
        hist = XSH.history
        if hist is None:
            return
        i = 1
        for h in hist.all_items():
            line = h["inp"].rstrip()
            if i == 1:
                pass
            elif line == readline.get_history_item(i - 1):
                continue
            readline.add_history(line)
            if RL_LIB is not None:
                RL_LIB.history_set_pos(i)
            i += 1