"""Implements JSON version of xonsh history backend."""
import collections
import collections.abc as cabc
import os
import re
import sys
import threading
import time
from xonsh.built_ins import XSH
try:
    import ujson as json
    JSONDecodeError = json.JSONDecodeError  # type: ignore
except ImportError:
    import json  # type: ignore
    JSONDecodeError = json.decoder.JSONDecodeError  # type: ignore
import xonsh.lib.lazyjson as xlj
import xonsh.tools as xt
import xonsh.xoreutils.uptime as uptime
from xonsh.history.base import History
def _xhj_gc_commands_to_rmfiles(hsize, files):
    """Return number of units and list of history files to remove to get under the limit,
    Parameters:
    -----------
    hsize (int):  units of history, # of commands in this case.
    files ((mod_ts, num_commands, path)[], fsize): history files, sorted oldest first.
    Returns:
    --------
    hsize_removed (int):  units of history to be removed
    rm_files ((mod_ts, num_commands, path, fsize)[]): list of files to remove.
    """
    n = 0
    ncmds = 0
    for _, fcmds, _, _ in reversed(files):
        # `files` comes in with empty files included (now), don't need special handling to gc them here.
        if ncmds + fcmds > hsize:
            break
        ncmds += fcmds
        n += 1
    cmds_removed = 0
    files_removed = files[:-n]
    for _, fcmds, _, _ in files_removed:
        cmds_removed += fcmds
    return cmds_removed, files_removed
def _xhj_gc_files_to_rmfiles(hsize, files):
    """Return the number and list of history files to remove to get under the file limit."""
    rmfiles = files[:-hsize] if len(files) > hsize else []
    return len(rmfiles), rmfiles
def _xhj_gc_seconds_to_rmfiles(hsize, files):
    """Return excess duration and list of history files to remove to get under the age limit."""
    now = time.time()
    n = 0
    for ts, _, _, _ in files:
        if (now - ts) < hsize:
            break
        n += 1
    rmfiles = files[:n]
    size_over = now - hsize - rmfiles[0][0] if n > 0 else 0
    return size_over, rmfiles
def _xhj_gc_bytes_to_rmfiles(hsize, files):
    """Return the history files to remove to get under the byte limit."""
    n = 0
    nbytes = 0
    for _, _, _, fsize in reversed(files):
        if nbytes + fsize > hsize:
            break
        nbytes += fsize
        n += 1
    bytes_removed = 0
    files_removed = files[:-n]
    for _, _, _, fsize in files_removed:
        bytes_removed += fsize
    return bytes_removed, files_removed
def _xhj_get_data_dir():
    dir = xt.expanduser_abs_path(
        os.path.join(XSH.env.get("XONSH_DATA_DIR"), "history_json")
    )
    if not os.path.exists(dir):
        os.makedirs(dir)
    return dir
def _xhj_get_history_files(sort=True, newest_first=False):
    """Find and return the history files. Optionally sort files by
    modify time.
    """
    data_dirs = [
        _xhj_get_data_dir(),
        XSH.env.get("XONSH_DATA_DIR"),  # backwards compatibility, remove in the future
    ]
    files = []
    for data_dir in data_dirs:
        data_dir = xt.expanduser_abs_path(data_dir)
        try:
            files += [
                os.path.join(data_dir, f)
                for f in os.listdir(data_dir)
                if f.startswith("xonsh-") and f.endswith(".json")
            ]
        except OSError:
            if XSH.env.get("XONSH_DEBUG"):
                xt.print_exception(
                    f"Could not collect xonsh history json files from {data_dir}"
                )
    if sort:
        files.sort(key=lambda x: os.path.getmtime(x), reverse=newest_first)
    custom_history_file = XSH.env.get("XONSH_HISTORY_FILE", None)
    if custom_history_file:
        custom_history_file = xt.expanduser_abs_path(custom_history_file)
        if custom_history_file not in files:
            files.insert(0, custom_history_file)
    return files
[docs]
class JsonHistoryGC(threading.Thread):
    """Shell history garbage collection."""
    def __init__(self, wait_for_shell=True, size=None, force=False, *args, **kwargs):
        """Thread responsible for garbage collecting old history.
        May wait for shell (and for xonshrc to have been loaded) to start work.
        """
        super().__init__(*args, **kwargs)
        self.daemon = True
        self.size = size
        self.wait_for_shell = wait_for_shell
        self.force_gc = force
        self.gc_units_to_rmfiles = {
            "commands": _xhj_gc_commands_to_rmfiles,
            "files": _xhj_gc_files_to_rmfiles,
            "s": _xhj_gc_seconds_to_rmfiles,
            "b": _xhj_gc_bytes_to_rmfiles,
        }
        self.start()
[docs]
    def run(self):
        while self.wait_for_shell:
            time.sleep(0.01)
        env = XSH.env  # pylint: disable=no-member
        xonsh_debug = env.get("XONSH_DEBUG", 0)
        if self.size is None:
            hsize, units = env.get("XONSH_HISTORY_SIZE")
        else:
            hsize, units = xt.to_history_tuple(self.size)
        files = self.files(only_unlocked=True)
        rmfiles_fn = self.gc_units_to_rmfiles.get(units)
        if rmfiles_fn is None:
            raise ValueError(f"Units type {units!r} not understood")
        size_over, rm_files = rmfiles_fn(hsize, files)
        hist = getattr(XSH, "history", None)
        if hist is not None:  # remember last gc pass history size
            hist.hist_size = size_over + hsize
            hist.hist_units = units
        if self.force_gc or size_over < hsize:
            i = 0
            for _, _, f, _ in rm_files:
                try:
                    os.remove(f)
                    if xonsh_debug:
                        print(
                            f"... Deleted {i:7d} of {len(rm_files):7d} history files.\r",
                            end="",
                        )
                    pass
                except OSError:
                    pass
                i += 1
        else:
            print(
                f"Warning: History garbage collection would discard more history ({size_over} {units}) than it would keep ({hsize}).\n"
                "Not removing any history for now. Either increase your limit ($XONSH_HISTORY_SIZE), or run `history gc --force`."
            ) 
[docs]
    def files(self, only_unlocked=False):
        """Find and return the history files. Optionally locked files may be
        excluded.
        This is sorted by the last closed time. Returns a list of
        (file_size, timestamp, number of cmds, file name) tuples.
        """
        env = XSH.env
        if env is None:
            return []
        xonsh_debug = env.get("XONSH_DEBUG", 0)
        boot = uptime.boottime()
        fs = _xhj_get_history_files(sort=False)
        files = []
        time_start = time.time()
        for f in fs:
            try:
                cur_file_size = os.path.getsize(f)
                if cur_file_size == 0:
                    # collect empty files (for gc)
                    files.append((os.path.getmtime(f), 0, f, cur_file_size))
                    continue
                lj = xlj.LazyJSON(f, reopen=False)
                if lj.get("locked", False) and lj["ts"][0] < boot:
                    # computer was rebooted between when this history was created
                    # and now and so this history should be unlocked.
                    hist = lj.load()
                    lj.close()
                    hist["locked"] = False
                    with open(f, "w", newline="\n") as fp:
                        xlj.ljdump(hist, fp, sort_keys=True)
                    lj = xlj.LazyJSON(f, reopen=False)
                if only_unlocked and lj.get("locked", False):
                    continue
                # info: file size, closing timestamp, number of commands, filename
                ts = lj.get("ts", (0.0, None))
                files.append(
                    (ts[1] or ts[0], len(lj.sizes["cmds"]) - 1, f, cur_file_size)
                )
                lj.close()
                if xonsh_debug:
                    time_lag = time.time() - time_start
                    print(
                        f"[history.{json.__name__}] Enumerated {len(files):,d} history files for {time_lag:0.4f}s.\r",
                        end="",
                        file=sys.stderr,
                    )
            except (OSError, ValueError):
                continue
        files.sort()  # this sorts by elements of the tuple,
        # the first of which just happens to be file mod time.
        # so sort by oldest first.
        return files 
 
[docs]
class JsonHistoryFlusher(threading.Thread):
    """Flush shell history to disk periodically."""
    def __init__(
        self, filename, buffer, queue, cond, at_exit=False, skip=None, *args, **kwargs
    ):
        """Thread for flushing history."""
        super().__init__(*args, **kwargs)
        self.filename = filename
        self.buffer = buffer
        self.queue = queue
        queue.append(self)
        self.cond = cond
        self.at_exit = at_exit
        self.skip = skip
        if at_exit:
            self.dump()
            queue.popleft()
        else:
            self.start()
[docs]
    def run(self):
        with self.cond:
            self.cond.wait_for(self.i_am_at_the_front)
            self.dump()
            self.queue.popleft() 
[docs]
    def i_am_at_the_front(self):
        """Tests if the flusher is at the front of the queue."""
        return self is self.queue[0] 
[docs]
    def dump(self):
        """Write the cached history to external storage."""
        opts = XSH.env.get("HISTCONTROL", "")
        last_inp = None
        cmds = []
        for cmd in self.buffer:
            if "ignoredups" in opts and cmd["inp"] == last_inp:
                # Skipping dup cmd
                if self.skip is not None:
                    self.skip(1)
                continue
            if "ignoreerr" in opts and cmd["rtn"] != 0:
                # Skipping failed cmd
                if self.skip is not None:
                    self.skip(1)
                continue
            cmds.append(cmd)
            last_inp = cmd["inp"]
        with open(self.filename, newline="\n") as f:
            hist = xlj.LazyJSON(f).load()
        load_hist_len = len(hist["cmds"])
        hist["cmds"].extend(cmds)
        if self.at_exit:
            # todo: check why this is here.
            if "ts" in hist:
                hist["ts"][1] = time.time()  # apply end time
            hist["locked"] = False
        if not XSH.env.get("XONSH_STORE_STDOUT", False):
            [cmd.pop("out") for cmd in hist["cmds"][load_hist_len:] if "out" in cmd]
        with open(self.filename, "w", newline="\n") as f:
            xlj.ljdump(hist, f, sort_keys=True) 
 
[docs]
class JsonCommandField(cabc.Sequence):
    """A field in the 'cmds' portion of history."""
    def __init__(self, field, hist, default=None):
        """Represents a field in the 'cmds' portion of history.
        Will query the buffer for the relevant data, if possible. Otherwise it
        will lazily acquire data from the file.
        Parameters
        ----------
        field : str
            The name of the field to query.
        hist : History object
            The history object to query.
        default : optional
            The default value to return if key is not present.
        """
        self.field = field
        self.hist = hist
        self.default = default
    def __len__(self):
        return len(self.hist)
    def __getitem__(self, key):
        if not self.hist.remember_history:
            return ""
        size = len(self)
        if isinstance(key, slice):
            return [self[i] for i in range(*key.indices(size))]
        elif not isinstance(key, int):
            raise IndexError("JsonCommandField may only be indexed by int or slice.")
        elif size == 0:
            raise IndexError("JsonCommandField is empty.")
        # now we know we have an int
        key = size + key if key < 0 else key  # ensure key is non-negative
        bufsize = len(self.hist.buffer)
        if size - bufsize <= key:  # key is in buffer
            return self.hist.buffer[key + bufsize - size].get(self.field, self.default)
        # now we know we have to go into the file
        queue = self.hist._queue
        queue.append(self)
        with self.hist._cond:
            self.hist._cond.wait_for(self.i_am_at_the_front)
            with open(self.hist.filename, newline="\n") as f:
                lj = xlj.LazyJSON(f, reopen=False)
                rtn = lj["cmds"][key].get(self.field, self.default)
                if isinstance(rtn, xlj.LJNode):
                    rtn = rtn.load()
            queue.popleft()
        return rtn
[docs]
    def i_am_at_the_front(self):
        """Tests if the command field is at the front of the queue."""
        return self is self.hist._queue[0] 
 
[docs]
class JsonHistory(History):
    """Xonsh history backend implemented with JSON files.
    JsonHistory implements an extra action: ``diff``
    """
    def __init__(
        self,
        filename=None,
        sessionid=None,
        buffersize=100,
        gc=True,
        save_cwd=None,
        **meta,
    ):
        """Represents a xonsh session's history as an in-memory buffer that is
        periodically flushed to disk.
        Parameters
        ----------
        filename : str, optional
            Location of history file, defaults to
            ``$XONSH_DATA_DIR/history_json/xonsh-{sessionid}.json``.
        sessionid : int, uuid, str, optional
            Current session identifier, will generate a new sessionid if not
            set.
        buffersize : int, optional
            Maximum buffersize in memory.
        meta : optional
            Top-level metadata to store along with the history. The kwargs
            'cmds' and 'sessionid' are not allowed and will be overwritten.
        gc : bool, optional
            Run garbage collector flag.
        """
        super().__init__(sessionid=sessionid, **meta)
        if filename is None:
            # pylint: disable=no-member
            data_dir = _xhj_get_data_dir()
            self.filename = os.path.join(data_dir, f"xonsh-{self.sessionid}.json")
        else:
            self.filename = filename
        if self.filename and not os.path.exists(os.path.expanduser(self.filename)):
            meta["cmds"] = []
            meta["sessionid"] = str(self.sessionid)
            with open(self.filename, "w", newline="\n") as f:
                xlj.ljdump(meta, f, sort_keys=True)
            try:
                sudo_uid = os.environ.get("SUDO_UID")
                sudo_gid = os.environ.get("SUDO_GID")
                if None not in (sudo_uid, sudo_gid):
                    os.chown(self.filename, int(sudo_uid), int(sudo_gid))
                os.chmod(self.filename, 0o600)
            except Exception:  # pylint: disable=broad-except
                pass
        self.buffer = []
        self.buffersize = buffersize
        self._queue = collections.deque()
        self._cond = threading.Condition()
        self._len = 0
        self._skipped = 0
        self.last_cmd_out = None
        self.last_cmd_rtn = None
        self.gc = JsonHistoryGC() if gc else None
        # command fields that are known
        self.tss = JsonCommandField("ts", self)
        self.inps = JsonCommandField("inp", self)
        self.outs = JsonCommandField("out", self)
        self.rtns = JsonCommandField("rtn", self)
        self.cwds = JsonCommandField("cwd", self)
        self.save_cwd = (
            save_cwd
            if save_cwd is not None
            else XSH.env.get("XONSH_HISTORY_SAVE_CWD", True)
        )
    def __len__(self):
        return self._len - self._skipped
[docs]
    def append(self, cmd):
        """Appends command to history. Will periodically flush the history to file.
        Parameters
        ----------
        cmd : dict
            This dict contains information about the command that is to be
            added to the history list. It should contain the keys ``inp``,
            ``rtn`` and ``ts``. These key names mirror the same names defined
            as instance variables in the ``HistoryEntry`` class.
            Additionally, an optional key ``spc`` may be present which will
            affect commands from being stored if ignorespace is in $HISTCONTROL.
        Returns
        -------
        hf : JsonHistoryFlusher or None
            The thread that was spawned to flush history
        """
        if (not self.remember_history) or self.is_ignored(cmd):
            return
        opts = XSH.env.get("HISTCONTROL", "")
        skipped_by_ignore_space = "ignorespace" in opts and cmd.get("spc")
        if skipped_by_ignore_space:
            return None
        self.buffer.append(cmd)
        self._len += 1  # must come before flushing
        if not self.save_cwd and "cwd" in cmd:
            del cmd["cwd"]
        try:
            del cmd["spc"]
        except KeyError:
            pass
        if len(self.buffer) >= self.buffersize:
            hf = self.flush()
        else:
            hf = None
        return hf 
[docs]
    def flush(self, at_exit=False):
        """Flushes the current command buffer to disk.
        Parameters
        ----------
        at_exit : bool, optional
            Whether the JsonHistoryFlusher should act as a thread in the
            background, or execute immediately and block.
        Returns
        -------
        hf : JsonHistoryFlusher or None
            The thread that was spawned to flush history
        """
        # Implicitly covers case of self.remember_history being False.
        if len(self.buffer) == 0:
            return
        def skip(num):
            self._skipped += num
        hf = JsonHistoryFlusher(
            self.filename,
            tuple(self.buffer),
            self._queue,
            self._cond,
            at_exit=at_exit,
            skip=skip,
        )
        self.buffer = []
        return hf 
[docs]
    def items(self, newest_first=False):
        """Display history items of current session."""
        if newest_first:
            items = zip(reversed(self.inps), reversed(self.tss))
        else:
            items = zip(self.inps, self.tss)
        for item, tss in items:
            yield {"inp": item.rstrip(), "ts": tss[0]} 
[docs]
    def all_items(self, newest_first=False, **kwargs):
        """
        Returns all history as found in XONSH_DATA_DIR.
        yield format: {'inp': cmd, 'rtn': 0, ...}
        """
        while self.gc and self.gc.is_alive():
            time.sleep(0.011)  # gc sleeps for 0.01 secs, sleep a beat longer
        for f in _xhj_get_history_files(newest_first=newest_first):
            try:
                json_file = xlj.LazyJSON(f, reopen=False)
            except ValueError:
                # Invalid json file
                continue
            try:
                commands = json_file.load()["cmds"]
            except (JSONDecodeError, ValueError):
                # file is corrupted somehow
                if XSH.env.get("XONSH_DEBUG") > 0:
                    msg = "xonsh history file {0!r} is not valid JSON"
                    print(msg.format(f), file=sys.stderr)
                continue
            if newest_first:
                commands = reversed(commands)
            for c in commands:
                yield {"inp": c["inp"].rstrip(), "ts": c["ts"][0]}
        # all items should also include session items
        yield from self.items() 
[docs]
    def info(self):
        data = collections.OrderedDict()
        data["backend"] = "json"
        data["sessionid"] = str(self.sessionid)
        data["filename"] = self.filename
        data["length"] = len(self)
        data["buffersize"] = self.buffersize
        data["bufferlength"] = len(self.buffer)
        envs = XSH.env
        data["gc options"] = envs.get("XONSH_HISTORY_SIZE")
        data["gc_last_size"] = f"{(self.hist_size, self.hist_units)}"
        return data 
[docs]
    def run_gc(self, size=None, blocking=True, force=False, **_):
        self.gc = JsonHistoryGC(wait_for_shell=False, size=size, force=force)
        if blocking:
            while self.gc.is_alive():  # while waiting for gc.
                time.sleep(0.1)  # don't monopolize the thread (or Python GIL?) 
[docs]
    def clear(self):
        """Clears the current session's history from both memory and disk."""
        # Wipe history from memory. Keep sessionid and other metadata.
        self.buffer = []
        self.tss = JsonCommandField("ts", self)
        self.inps = JsonCommandField("inp", self)
        self.outs = JsonCommandField("out", self)
        self.rtns = JsonCommandField("rtn", self)
        self.cwds = JsonCommandField("cwd", self)
        self._len = 0
        self._skipped = 0
        # Flush empty history object to disk, overwriting previous data.
        self.flush() 
[docs]
    def delete(self, pattern):
        """Deletes all entries in history which matches a pattern."""
        pattern = re.compile(pattern)
        deleted = 0
        # First, delete any matching commands in the in-memory buffer.
        for i, cmd in enumerate(self.buffer):
            if pattern.match(cmd["inp"]):
                del self.buffer[i]
                deleted += 1
        # Then, delete any matching commands on disk.
        while self.gc and self.gc.is_alive():
            time.sleep(0.011)  # gc sleeps for 0.01 secs, sleep a beat longer
        for f in _xhj_get_history_files():
            try:
                json_file = xlj.LazyJSON(f, reopen=False)
            except ValueError:
                # Invalid json file
                continue
            try:
                file_content = json_file.load()
                commands = file_content["cmds"]
                for i, c in enumerate(commands):
                    if pattern.match(c["inp"]):
                        del commands[i]
                        deleted += 1
                file_content["cmds"] = commands
                with open(f, "w") as fp:
                    xlj.ljdump(file_content, fp)
            except (JSONDecodeError, ValueError):
                # file is corrupted somehow
                if XSH.env.get("XONSH_DEBUG") > 0:
                    msg = "xonsh history file {0!r} is not valid JSON"
                    print(msg.format(f), file=sys.stderr)
                continue
        return deleted