"""Main entry points of the xonsh history."""
import argparse as ap
import datetime
import json
import os
import sys
import threading
import typing as tp
import xonsh.cli_utils as xcli
import xonsh.history.diff_history as xdh
import xonsh.tools as xt
from xonsh.built_ins import XSH
from xonsh.history.base import History
from xonsh.history.dummy import DummyHistory
from xonsh.history.json import JsonHistory
HISTORY_BACKENDS = {"dummy": DummyHistory, "json": JsonHistory}
try:
    from xonsh.history.sqlite import SqliteHistory
    HISTORY_BACKENDS |= {"sqlite": SqliteHistory}
except Exception:
    """
    On some linux systems (e.g. alt linux) sqlite3 is not installed
    and it's hard to install it and maybe user can't install it.
    We need to just go forward.
    """
    pass
[docs]
def construct_history(backend=None, **kwargs) -> "History":
    """Construct the history backend object."""
    env = XSH.env
    backend = backend or env.get("XONSH_HISTORY_BACKEND", "json")
    if isinstance(backend, str) and backend in HISTORY_BACKENDS:
        kls_history = HISTORY_BACKENDS[backend]
    elif xt.is_class(backend):
        kls_history = backend
    elif isinstance(backend, History):
        return backend
    else:
        print(
            f"Unknown history backend: {backend}. Using JSON version",
            file=sys.stderr,
        )
        kls_history = JsonHistory
    try:
        return kls_history(**kwargs)
    except Exception as e:
        xt.print_exception(
            f"Error during load {kls_history}: {e}\n"
            f"Set $XONSH_HISTORY_BACKEND='dummy' to disable history.\n"
            f"History disabled."
        )
        return DummyHistory() 
def _xh_session_parser(hist=None, newest_first=False, **kwargs):
    """Returns history items of current session."""
    if hist is None:
        hist = XSH.history
    return hist.items()
def _xh_all_parser(hist=None, newest_first=False, **kwargs):
    """Returns all history items."""
    if hist is None:
        hist = XSH.history
    return hist.all_items(newest_first=newest_first)
def _xh_find_histfile_var(file_list, default=None):
    """Return the path of the history file
    from the value of the envvar HISTFILE.
    """
    for f in file_list:
        f = xt.expanduser_abs_path(f)
        if not os.path.isfile(f):
            continue
        with open(f) as rc_file:
            for line in rc_file:
                if line.startswith("HISTFILE="):
                    hist_file = line.split("=", 1)[1].strip("'\"\n")
                    hist_file = xt.expanduser_abs_path(hist_file)
                    if os.path.isfile(hist_file):
                        return hist_file
    else:
        if default:
            default = xt.expanduser_abs_path(default)
            if os.path.isfile(default):
                return default
def _xh_bash_hist_parser(location=None, **kwargs):
    """Yield commands from bash history file"""
    if location is None:
        location = _xh_find_histfile_var(
            [os.path.join("~", ".bashrc"), os.path.join("~", ".bash_profile")],
            os.path.join("~", ".bash_history"),
        )
    if location:
        try:
            with open(location, errors="backslashreplace") as bash_hist:
                for ind, line in enumerate(bash_hist):
                    yield {"inp": line.rstrip(), "ts": 0.0, "ind": ind}
        except PermissionError:
            print(f"Bash history permission error in {location!r}", file=sys.stderr)
            yield {
                "inp": f"# Bash history permission error in {location!r}",
                "ts": 0.0,
                "ind": 0,
            }
    else:
        print("No bash history file", file=sys.stderr)
def _xh_zsh_hist_parser(location=None, **kwargs):
    """Yield commands from zsh history file"""
    if location is None:
        location = _xh_find_histfile_var(
            [os.path.join("~", ".zshrc"), os.path.join("~", ".zprofile")],
            os.path.join("~", ".zsh_history"),
        )
    if location:
        with open(location, errors="backslashreplace") as zsh_hist:
            for ind, line in enumerate(zsh_hist):
                if line.startswith(":"):
                    try:
                        start_time, command = line.split(";", 1)
                    except ValueError:
                        # Invalid history entry
                        continue
                    try:
                        start_time = float(start_time.split(":")[1])
                    except ValueError:
                        start_time = 0.0
                    yield {"inp": command.rstrip(), "ts": start_time, "ind": ind}
                else:
                    yield {"inp": line.rstrip(), "ts": 0.0, "ind": ind}
    else:
        print("No zsh history file found", file=sys.stderr)
def _xh_filter_ts(commands, start_time, end_time):
    """Yield only the commands between start and end time."""
    for cmd in commands:
        if start_time <= cmd["ts"] < end_time:
            yield cmd
def _xh_get_history(
    session="session",
    *,
    slices=None,
    datetime_format=None,
    start_time=None,
    end_time=None,
    location=None,
):
    """Get the requested portion of shell history.
    Parameters
    ----------
    session: {'session', 'all', 'xonsh', 'bash', 'zsh'}
        The history session to get.
    slices : list of slice-like objects, optional
        Get only portions of history.
    start_time, end_time: float, optional
        Filter commands by timestamp.
    location: string, optional
        The history file location (bash or zsh)
    Returns
    -------
    generator
       A filtered list of commands
    """
    cmds = []
    for i, item in enumerate(_XH_HISTORY_SESSIONS[session](location=location)):
        item["ind"] = i
        cmds.append(item)
    if slices:
        # transform/check all slices
        slices = [xt.ensure_slice(s) for s in slices]
        cmds = xt.get_portions(cmds, slices)
    if start_time or end_time:
        if start_time is None:
            start_time = 0.0
        else:
            start_time = xt.ensure_timestamp(start_time, datetime_format)
        if end_time is None:
            end_time = float("inf")
        else:
            end_time = xt.ensure_timestamp(end_time, datetime_format)
        cmds = _xh_filter_ts(cmds, start_time, end_time)
    return cmds
_XH_HISTORY_SESSIONS = {
    "session": _xh_session_parser,
    "xonsh": _xh_all_parser,
    "all": _xh_all_parser,
    "zsh": _xh_zsh_hist_parser,
    "bash": _xh_bash_hist_parser,
}
[docs]
class SessionAction(ap.Action):
    """Set the choices lazily"""
    def __init__(self, *args, **kwargs):
        kwargs.setdefault("choices", tuple(_XH_HISTORY_SESSIONS))
        super().__init__(*args, **kwargs)
    def __call__(self, parser, namespace, values, option_string: "str | None" = None):
        setattr(namespace, self.dest, values) 
[docs]
class HistoryAlias(xcli.ArgParserAlias):
    """Try 'history <command> --help' for more info"""
[docs]
    def show(
        self,
        session: xcli.Annotated[
            str, xcli.Arg(nargs="?", action=SessionAction)
        ] = "session",
        slices: xcli.Annotated[list[int], xcli.Arg(nargs="*")] = None,
        datetime_format: tp.Optional[str] = None,
        start_time: tp.Optional[str] = None,
        end_time: tp.Optional[str] = None,
        location: tp.Optional[str] = None,
        reverse=False,
        numerate=False,
        timestamp=False,
        null_byte=False,
        _stdout=None,
        _stderr=None,
        _unparsed=None,
    ):
        """Display history of a session, default command
        Parameters
        ----------
        session:
            The history session to get. (all is an alias for xonsh)
        slices:
            integer or slice notation to get only portions of history.
        datetime_format : -f
            the datetime format to be used for filtering and printing
        start_time: --start-time, +T
            show only commands after timestamp
        end_time: -T, --end-time
            show only commands before timestamp
        location: -l, --location
            The history file location (bash or zsh)
        reverse: -r, --reverse
            Reverses the direction
        numerate: -n, --numerate
            Numerate each command
        timestamp: -t, --ts, --time-stamp
            show command timestamps
        null_byte: -0, --nb, --null-byte
            separate commands by the null character for piping history to external filters
        _unparsed
            remaining args from ``parser.parse_known_args``
        """
        slices = list(slices or ())
        if _unparsed:
            slices.extend(_unparsed)
        try:
            commands = _xh_get_history(
                session,
                slices=slices,
                start_time=start_time,
                end_time=end_time,
                datetime_format=datetime_format,
                location=location,
            )
        except Exception as err:
            self.parser.error(err)
            return
        if reverse:
            commands = reversed(list(commands))
        end = "\0" if null_byte else "\n"
        if numerate and timestamp:
            for c in commands:
                dt = datetime.datetime.fromtimestamp(c["ts"])
                print(
                    "{}:({}) {}".format(c["ind"], xt.format_datetime(dt), c["inp"]),
                    file=_stdout,
                    end=end,
                )
        elif numerate:
            for c in commands:
                print("{}: {}".format(c["ind"], c["inp"]), file=_stdout, end=end)
        elif timestamp:
            for c in commands:
                dt = datetime.datetime.fromtimestamp(c["ts"])
                print(
                    "({}) {}".format(xt.format_datetime(dt), c["inp"]),
                    file=_stdout,
                    end=end,
                )
        else:
            for c in commands:
                print(c["inp"], file=_stdout, end=end) 
[docs]
    @staticmethod
    def id_cmd(_stdout):
        """Display the current session id"""
        hist = XSH.history
        if not hist.sessionid:
            return
        print(str(hist.sessionid), file=_stdout) 
[docs]
    @staticmethod
    def pull(show_commands=False, _stdout=None):
        """Pull history from other parallel sessions.
        Parameters
        ----------
        show_commands: -c, --show-commands
            show pulled commands
        """
        hist = XSH.history
        if hist.pull.__module__ == "xonsh.history.base":
            backend = XSH.env.get("XONSH_HISTORY_BACKEND", "unknown")
            print(
                f"Pull method is not supported in {backend} history backend.",
                file=_stdout,
            )
        lines_added = hist.pull(show_commands)
        if lines_added:
            print(f"Added {lines_added} records!", file=_stdout)
        else:
            print("No records found!", file=_stdout) 
[docs]
    @staticmethod
    def flush(_stdout):
        """Flush the current history to disk"""
        hist = XSH.history
        hf = hist.flush()
        if isinstance(hf, threading.Thread):
            hf.join() 
[docs]
    @staticmethod
    def off():
        """History will not be saved for this session"""
        hist = XSH.history
        if hist.remember_history:
            hist.clear()
            hist.remember_history = False
            print("History off", file=sys.stderr) 
[docs]
    @staticmethod
    def on():
        """History will be saved for the rest of the session (default)"""
        hist = XSH.history
        if not hist.remember_history:
            hist.remember_history = True
            print("History on", file=sys.stderr) 
[docs]
    @staticmethod
    def clear():
        """One-time wipe of session history"""
        hist = XSH.history
        hist.clear()
        print("History cleared", file=sys.stderr) 
[docs]
    @staticmethod
    def delete(pattern):
        """Delete all commands matching a pattern
        Parameters
        ----------
        pattern:
            regex pattern to match against command history
        """
        hist = XSH.history
        deleted = hist.delete(pattern)
        print(f"Deleted {deleted} entries from history") 
[docs]
    @staticmethod
    def file(_stdout):
        """Display the current history filename"""
        hist = XSH.history
        if not hist.filename:
            return
        print(str(hist.filename), file=_stdout) 
[docs]
    @staticmethod
    def info(
        to_json=False,
        _stdout=None,
    ):
        """Display information about the current history
        Parameters
        ----------
        to_json: -j, --json
            print in JSON format
        """
        hist = XSH.history
        data = hist.info()
        if to_json:
            s = json.dumps(data)
            print(s, file=_stdout)
        else:
            lines = [f"{k}: {v}" for k, v in data.items()]
            print("\n".join(lines), file=_stdout) 
[docs]
    @staticmethod
    def gc(
        size: xcli.Annotated[tuple[int, str], xcli.Arg(nargs=2)] = None,
        force=False,
        blocking=True,
    ):
        """Launches a new history garbage collector
        Parameters
        ----------
        size : -s, --size
            Next two arguments represent the history size and units; e.g. "--size 8128 commands"
        force : -f, --force
            perform garbage collection even if history much bigger than configured limit
        blocking : -n, --non-blocking
            makes the gc non-blocking, and thus return sooner. By default it runs on main thread blocking input.
        """
        hist = XSH.history
        hist.run_gc(size=size, blocking=blocking, force=force) 
[docs]
    @staticmethod
    def diff(
        a,
        b,
        reopen=False,
        verbose=False,
        _stdout=None,
    ):
        """Diff two xonsh history files
        Parameters
        ----------
        left:
            The first file to diff
        right:
            The second file to diff
        reopen: -r, --reopen
            make lazy file loading reopen files each time
        verbose: -v, --verbose
            whether to print even more information
        """
        hist = XSH.history
        if isinstance(hist, JsonHistory):
            hd = xdh.HistoryDiffer(a, b, reopen=reopen, verbose=verbose)
            xt.print_color(hd.format(), file=_stdout) 
[docs]
    def transfer(
        self,
        source: tp.Annotated[str, xcli.Arg(action=SessionAction)],
        source_file: "str|None" = None,
        target: tp.Annotated["str | None", xcli.Arg(action=SessionAction)] = None,
        target_file: "str|None" = None,
    ):
        """Transfer entries between history backends.
        Parameters
        ----------
        source
            Name of the source history backend
        source_file : --source-file, --sf
            Override the default location of the history file of the backend.
        target : --target, -t
            Name of the target history backend. (default: $XONSH_HISTORY_BACKEND)
        target_file : --target-file, --tf
            Path to the location of the history file.
        Notes
        -----
        It will not remove duplicate entries, use $HISTCONTROL for managing such entries.
        """
        if source == target:
            raise self.Error("source and target backend can't be the same")
        src = construct_history(backend=source, filename=source_file, gc=False)
        dest = construct_history(backend=target, filename=target_file, gc=False)
        for entry in src.all_items():
            dest.append(entry)
        dest.flush()
        self.out("Done") 
[docs]
    def build(self):
        parser = self.create_parser(prog="history")
        parser.add_command(self.show, prefix_chars="-+")
        parser.add_command(self.id_cmd, prog="id")
        parser.add_command(self.file)
        parser.add_command(self.info)
        parser.add_command(self.pull)
        parser.add_command(self.flush)
        parser.add_command(self.off)
        parser.add_command(self.on)
        parser.add_command(self.clear)
        parser.add_command(self.delete)
        parser.add_command(self.gc)
        parser.add_command(self.transfer)
        if isinstance(XSH.history, JsonHistory):
            # add actions belong only to JsonHistory
            parser.add_command(self.diff)
        return parser 
    def __call__(self, args, *rest, **kwargs):
        if not args:
            args = ["show", "session"]
        else:
            actions = self.parser.commands.choices
            cmd = args[0]
            if cmd not in actions and cmd not in {"-h", "--help"}:
                args = ["show", "session"] + args
        if args[0] == "show":
            if not any(a in _XH_HISTORY_SESSIONS for a in args):
                args.insert(1, "session")
            kwargs.setdefault("lenient", True)
        return super().__call__(args, *rest, **kwargs) 
history_main = HistoryAlias(threadable=True)