Source code for xonsh.history.json

"""Implements JSON version of xonsh history backend."""

import collections
import collections.abc as cabc
import os
import re
import sys
import threading
import time

from xonsh.built_ins import XSH

try:
    import ujson as json

    JSONDecodeError = json.JSONDecodeError  # type: ignore
except ImportError:
    import json  # type: ignore

    JSONDecodeError = json.decoder.JSONDecodeError  # type: ignore

import xonsh.lazyjson as xlj
import xonsh.tools as xt
import xonsh.xoreutils.uptime as uptime
from xonsh.history.base import History


def _xhj_gc_commands_to_rmfiles(hsize, files):
    """Return number of units and list of history files to remove to get under the limit,

    Parameters:
    -----------
    hsize (int):  units of history, # of commands in this case.
    files ((mod_ts, num_commands, path)[], fsize): history files, sorted oldest first.

    Returns:
    --------
    hsize_removed (int):  units of history to be removed
    rm_files ((mod_ts, num_commands, path, fsize)[]): list of files to remove.
    """
    n = 0
    ncmds = 0
    for _, fcmds, _, _ in reversed(files):
        # `files` comes in with empty files included (now), don't need special handling to gc them here.

        if ncmds + fcmds > hsize:
            break
        ncmds += fcmds
        n += 1

    cmds_removed = 0
    files_removed = files[:-n]
    for _, fcmds, _, _ in files_removed:
        cmds_removed += fcmds

    return cmds_removed, files_removed


def _xhj_gc_files_to_rmfiles(hsize, files):
    """Return the number and list of history files to remove to get under the file limit."""
    rmfiles = files[:-hsize] if len(files) > hsize else []
    return len(rmfiles), rmfiles


def _xhj_gc_seconds_to_rmfiles(hsize, files):
    """Return excess duration and list of history files to remove to get under the age limit."""
    now = time.time()
    n = 0

    for ts, _, _, _ in files:
        if (now - ts) < hsize:
            break
        n += 1

    rmfiles = files[:n]
    size_over = now - hsize - rmfiles[0][0] if n > 0 else 0
    return size_over, rmfiles


def _xhj_gc_bytes_to_rmfiles(hsize, files):
    """Return the history files to remove to get under the byte limit."""
    n = 0
    nbytes = 0
    for _, _, _, fsize in reversed(files):
        if nbytes + fsize > hsize:
            break
        nbytes += fsize
        n += 1
    bytes_removed = 0
    files_removed = files[:-n]
    for _, _, _, fsize in files_removed:
        bytes_removed += fsize

    return bytes_removed, files_removed


def _xhj_get_data_dir():
    dir = xt.expanduser_abs_path(
        os.path.join(XSH.env.get("XONSH_DATA_DIR"), "history_json")
    )
    if not os.path.exists(dir):
        os.makedirs(dir)
    return dir


def _xhj_get_history_files(sort=True, newest_first=False):
    """Find and return the history files. Optionally sort files by
    modify time.
    """
    data_dirs = [
        _xhj_get_data_dir(),
        XSH.env.get("XONSH_DATA_DIR"),  # backwards compatibility, remove in the future
    ]

    files = []
    for data_dir in data_dirs:
        data_dir = xt.expanduser_abs_path(data_dir)
        try:
            files += [
                os.path.join(data_dir, f)
                for f in os.listdir(data_dir)
                if f.startswith("xonsh-") and f.endswith(".json")
            ]
        except OSError:
            if XSH.env.get("XONSH_DEBUG"):
                xt.print_exception(
                    f"Could not collect xonsh history json files from {data_dir}"
                )
    if sort:
        files.sort(key=lambda x: os.path.getmtime(x), reverse=newest_first)

    custom_history_file = XSH.env.get("XONSH_HISTORY_FILE", None)
    if custom_history_file:
        custom_history_file = xt.expanduser_abs_path(custom_history_file)
        if custom_history_file not in files:
            files.insert(0, custom_history_file)
    return files


[docs] class JsonHistoryGC(threading.Thread): """Shell history garbage collection.""" def __init__(self, wait_for_shell=True, size=None, force=False, *args, **kwargs): """Thread responsible for garbage collecting old history. May wait for shell (and for xonshrc to have been loaded) to start work. """ super().__init__(*args, **kwargs) self.daemon = True self.size = size self.wait_for_shell = wait_for_shell self.force_gc = force self.gc_units_to_rmfiles = { "commands": _xhj_gc_commands_to_rmfiles, "files": _xhj_gc_files_to_rmfiles, "s": _xhj_gc_seconds_to_rmfiles, "b": _xhj_gc_bytes_to_rmfiles, } self.start()
[docs] def run(self): while self.wait_for_shell: time.sleep(0.01) env = XSH.env # pylint: disable=no-member xonsh_debug = env.get("XONSH_DEBUG", 0) if self.size is None: hsize, units = env.get("XONSH_HISTORY_SIZE") else: hsize, units = xt.to_history_tuple(self.size) files = self.files(only_unlocked=True) rmfiles_fn = self.gc_units_to_rmfiles.get(units) if rmfiles_fn is None: raise ValueError(f"Units type {units!r} not understood") size_over, rm_files = rmfiles_fn(hsize, files) hist = getattr(XSH, "history", None) if hist is not None: # remember last gc pass history size hist.hist_size = size_over + hsize hist.hist_units = units if self.force_gc or size_over < hsize: i = 0 for _, _, f, _ in rm_files: try: os.remove(f) if xonsh_debug: print( f"... Deleted {i:7d} of {len(rm_files):7d} history files.\r", end="", ) pass except OSError: pass i += 1 else: print( f"Warning: History garbage collection would discard more history ({size_over} {units}) than it would keep ({hsize}).\n" "Not removing any history for now. Either increase your limit ($XONSH_HISTORY_SIZE), or run `history gc --force`." )
[docs] def files(self, only_unlocked=False): """Find and return the history files. Optionally locked files may be excluded. This is sorted by the last closed time. Returns a list of (file_size, timestamp, number of cmds, file name) tuples. """ env = XSH.env if env is None: return [] xonsh_debug = env.get("XONSH_DEBUG", 0) boot = uptime.boottime() fs = _xhj_get_history_files(sort=False) files = [] time_start = time.time() for f in fs: try: cur_file_size = os.path.getsize(f) if cur_file_size == 0: # collect empty files (for gc) files.append((os.path.getmtime(f), 0, f, cur_file_size)) continue lj = xlj.LazyJSON(f, reopen=False) if lj.get("locked", False) and lj["ts"][0] < boot: # computer was rebooted between when this history was created # and now and so this history should be unlocked. hist = lj.load() lj.close() hist["locked"] = False with open(f, "w", newline="\n") as fp: xlj.ljdump(hist, fp, sort_keys=True) lj = xlj.LazyJSON(f, reopen=False) if only_unlocked and lj.get("locked", False): continue # info: file size, closing timestamp, number of commands, filename ts = lj.get("ts", (0.0, None)) files.append( (ts[1] or ts[0], len(lj.sizes["cmds"]) - 1, f, cur_file_size) ) lj.close() if xonsh_debug: time_lag = time.time() - time_start print( f"[history.{json.__name__}] Enumerated {len(files):,d} history files for {time_lag:0.4f}s.\r", end="", file=sys.stderr, ) except (OSError, ValueError): continue files.sort() # this sorts by elements of the tuple, # the first of which just happens to be file mod time. # so sort by oldest first. return files
[docs] class JsonHistoryFlusher(threading.Thread): """Flush shell history to disk periodically.""" def __init__( self, filename, buffer, queue, cond, at_exit=False, skip=None, *args, **kwargs ): """Thread for flushing history.""" super().__init__(*args, **kwargs) self.filename = filename self.buffer = buffer self.queue = queue queue.append(self) self.cond = cond self.at_exit = at_exit self.skip = skip if at_exit: self.dump() queue.popleft() else: self.start()
[docs] def run(self): with self.cond: self.cond.wait_for(self.i_am_at_the_front) self.dump() self.queue.popleft()
[docs] def i_am_at_the_front(self): """Tests if the flusher is at the front of the queue.""" return self is self.queue[0]
[docs] def dump(self): """Write the cached history to external storage.""" opts = XSH.env.get("HISTCONTROL", "") last_inp = None cmds = [] for cmd in self.buffer: if "ignoredups" in opts and cmd["inp"] == last_inp: # Skipping dup cmd if self.skip is not None: self.skip(1) continue if "ignoreerr" in opts and cmd["rtn"] != 0: # Skipping failed cmd if self.skip is not None: self.skip(1) continue cmds.append(cmd) last_inp = cmd["inp"] with open(self.filename, newline="\n") as f: hist = xlj.LazyJSON(f).load() load_hist_len = len(hist["cmds"]) hist["cmds"].extend(cmds) if self.at_exit: # todo: check why this is here. if "ts" in hist: hist["ts"][1] = time.time() # apply end time hist["locked"] = False if not XSH.env.get("XONSH_STORE_STDOUT", False): [cmd.pop("out") for cmd in hist["cmds"][load_hist_len:] if "out" in cmd] with open(self.filename, "w", newline="\n") as f: xlj.ljdump(hist, f, sort_keys=True)
[docs] class JsonCommandField(cabc.Sequence): """A field in the 'cmds' portion of history.""" def __init__(self, field, hist, default=None): """Represents a field in the 'cmds' portion of history. Will query the buffer for the relevant data, if possible. Otherwise it will lazily acquire data from the file. Parameters ---------- field : str The name of the field to query. hist : History object The history object to query. default : optional The default value to return if key is not present. """ self.field = field self.hist = hist self.default = default def __len__(self): return len(self.hist) def __getitem__(self, key): if not self.hist.remember_history: return "" size = len(self) if isinstance(key, slice): return [self[i] for i in range(*key.indices(size))] elif not isinstance(key, int): raise IndexError("JsonCommandField may only be indexed by int or slice.") elif size == 0: raise IndexError("JsonCommandField is empty.") # now we know we have an int key = size + key if key < 0 else key # ensure key is non-negative bufsize = len(self.hist.buffer) if size - bufsize <= key: # key is in buffer return self.hist.buffer[key + bufsize - size].get(self.field, self.default) # now we know we have to go into the file queue = self.hist._queue queue.append(self) with self.hist._cond: self.hist._cond.wait_for(self.i_am_at_the_front) with open(self.hist.filename, newline="\n") as f: lj = xlj.LazyJSON(f, reopen=False) rtn = lj["cmds"][key].get(self.field, self.default) if isinstance(rtn, xlj.LJNode): rtn = rtn.load() queue.popleft() return rtn
[docs] def i_am_at_the_front(self): """Tests if the command field is at the front of the queue.""" return self is self.hist._queue[0]
[docs] class JsonHistory(History): """Xonsh history backend implemented with JSON files. JsonHistory implements an extra action: ``diff`` """ def __init__( self, filename=None, sessionid=None, buffersize=100, gc=True, save_cwd=None, **meta, ): """Represents a xonsh session's history as an in-memory buffer that is periodically flushed to disk. Parameters ---------- filename : str, optional Location of history file, defaults to ``$XONSH_DATA_DIR/history_json/xonsh-{sessionid}.json``. sessionid : int, uuid, str, optional Current session identifier, will generate a new sessionid if not set. buffersize : int, optional Maximum buffersize in memory. meta : optional Top-level metadata to store along with the history. The kwargs 'cmds' and 'sessionid' are not allowed and will be overwritten. gc : bool, optional Run garbage collector flag. """ super().__init__(sessionid=sessionid, **meta) if filename is None: # pylint: disable=no-member data_dir = _xhj_get_data_dir() self.filename = os.path.join(data_dir, f"xonsh-{self.sessionid}.json") else: self.filename = filename if self.filename and not os.path.exists(os.path.expanduser(self.filename)): meta["cmds"] = [] meta["sessionid"] = str(self.sessionid) with open(self.filename, "w", newline="\n") as f: xlj.ljdump(meta, f, sort_keys=True) try: sudo_uid = os.environ.get("SUDO_UID") sudo_gid = os.environ.get("SUDO_GID") if None not in (sudo_uid, sudo_gid): os.chown(self.filename, int(sudo_uid), int(sudo_gid)) os.chmod(self.filename, 0o600) except Exception: # pylint: disable=broad-except pass self.buffer = [] self.buffersize = buffersize self._queue = collections.deque() self._cond = threading.Condition() self._len = 0 self._skipped = 0 self.last_cmd_out = None self.last_cmd_rtn = None self.gc = JsonHistoryGC() if gc else None # command fields that are known self.tss = JsonCommandField("ts", self) self.inps = JsonCommandField("inp", self) self.outs = JsonCommandField("out", self) self.rtns = JsonCommandField("rtn", self) self.cwds = JsonCommandField("cwd", self) self.save_cwd = ( save_cwd if save_cwd is not None else XSH.env.get("XONSH_HISTORY_SAVE_CWD", True) ) def __len__(self): return self._len - self._skipped
[docs] def append(self, cmd): """Appends command to history. Will periodically flush the history to file. Parameters ---------- cmd : dict This dict contains information about the command that is to be added to the history list. It should contain the keys ``inp``, ``rtn`` and ``ts``. These key names mirror the same names defined as instance variables in the ``HistoryEntry`` class. Additionally, an optional key ``spc`` may be present which will affect commands from being stored if ignorespace is in $HISTCONTROL. Returns ------- hf : JsonHistoryFlusher or None The thread that was spawned to flush history """ if (not self.remember_history) or self.is_ignored(cmd): return opts = XSH.env.get("HISTCONTROL", "") skipped_by_ignore_space = "ignorespace" in opts and cmd.get("spc") if skipped_by_ignore_space: return None self.buffer.append(cmd) self._len += 1 # must come before flushing if not self.save_cwd and "cwd" in cmd: del cmd["cwd"] try: del cmd["spc"] except KeyError: pass if len(self.buffer) >= self.buffersize: hf = self.flush() else: hf = None return hf
[docs] def flush(self, at_exit=False): """Flushes the current command buffer to disk. Parameters ---------- at_exit : bool, optional Whether the JsonHistoryFlusher should act as a thread in the background, or execute immediately and block. Returns ------- hf : JsonHistoryFlusher or None The thread that was spawned to flush history """ # Implicitly covers case of self.remember_history being False. if len(self.buffer) == 0: return def skip(num): self._skipped += num hf = JsonHistoryFlusher( self.filename, tuple(self.buffer), self._queue, self._cond, at_exit=at_exit, skip=skip, ) self.buffer = [] return hf
[docs] def items(self, newest_first=False): """Display history items of current session.""" if newest_first: items = zip(reversed(self.inps), reversed(self.tss)) else: items = zip(self.inps, self.tss) for item, tss in items: yield {"inp": item.rstrip(), "ts": tss[0]}
[docs] def all_items(self, newest_first=False, **kwargs): """ Returns all history as found in XONSH_DATA_DIR. yield format: {'inp': cmd, 'rtn': 0, ...} """ while self.gc and self.gc.is_alive(): time.sleep(0.011) # gc sleeps for 0.01 secs, sleep a beat longer for f in _xhj_get_history_files(newest_first=newest_first): try: json_file = xlj.LazyJSON(f, reopen=False) except ValueError: # Invalid json file continue try: commands = json_file.load()["cmds"] except (JSONDecodeError, ValueError): # file is corrupted somehow if XSH.env.get("XONSH_DEBUG") > 0: msg = "xonsh history file {0!r} is not valid JSON" print(msg.format(f), file=sys.stderr) continue if newest_first: commands = reversed(commands) for c in commands: yield {"inp": c["inp"].rstrip(), "ts": c["ts"][0]} # all items should also include session items yield from self.items()
[docs] def info(self): data = collections.OrderedDict() data["backend"] = "json" data["sessionid"] = str(self.sessionid) data["filename"] = self.filename data["length"] = len(self) data["buffersize"] = self.buffersize data["bufferlength"] = len(self.buffer) envs = XSH.env data["gc options"] = envs.get("XONSH_HISTORY_SIZE") data["gc_last_size"] = f"{(self.hist_size, self.hist_units)}" return data
[docs] def run_gc(self, size=None, blocking=True, force=False): self.gc = JsonHistoryGC(wait_for_shell=False, size=size, force=force) if blocking: while self.gc.is_alive(): # while waiting for gc. time.sleep(0.1) # don't monopolize the thread (or Python GIL?)
[docs] def clear(self): """Clears the current session's history from both memory and disk.""" # Wipe history from memory. Keep sessionid and other metadata. self.buffer = [] self.tss = JsonCommandField("ts", self) self.inps = JsonCommandField("inp", self) self.outs = JsonCommandField("out", self) self.rtns = JsonCommandField("rtn", self) self.cwds = JsonCommandField("cwd", self) self._len = 0 self._skipped = 0 # Flush empty history object to disk, overwriting previous data. self.flush()
[docs] def delete(self, pattern): """Deletes all entries in history which matches a pattern.""" pattern = re.compile(pattern) deleted = 0 # First, delete any matching commands in the in-memory buffer. for i, cmd in enumerate(self.buffer): if pattern.match(cmd["inp"]): del self.buffer[i] deleted += 1 # Then, delete any matching commands on disk. while self.gc and self.gc.is_alive(): time.sleep(0.011) # gc sleeps for 0.01 secs, sleep a beat longer for f in _xhj_get_history_files(): try: json_file = xlj.LazyJSON(f, reopen=False) except ValueError: # Invalid json file continue try: file_content = json_file.load() commands = file_content["cmds"] for i, c in enumerate(commands): if pattern.match(c["inp"]): del commands[i] deleted += 1 file_content["cmds"] = commands with open(f, "w") as fp: xlj.ljdump(file_content, fp) except (JSONDecodeError, ValueError): # file is corrupted somehow if XSH.env.get("XONSH_DEBUG") > 0: msg = "xonsh history file {0!r} is not valid JSON" print(msg.format(f), file=sys.stderr) continue return deleted