Source code for xonsh.history.sqlite

# -*- coding: utf-8 -*-
"""Implements the xonsh history backend via sqlite3."""
import builtins
import collections
import json
import os
import sqlite3
import sys
import threading
import time

from xonsh.history.base import History
import xonsh.tools as xt

XH_SQLITE_CACHE = threading.local()
XH_SQLITE_TABLE_NAME = "xonsh_history"
XH_SQLITE_CREATED_SQL_TBL = "CREATED_SQL_TABLE"


def _xh_sqlite_get_file_name():
    envs = builtins.__xonsh__.env
    file_name = envs.get("XONSH_HISTORY_SQLITE_FILE")
    if not file_name:
        data_dir = envs.get("XONSH_DATA_DIR")
        file_name = os.path.join(data_dir, "xonsh-history.sqlite")
    return xt.expanduser_abs_path(file_name)


def _xh_sqlite_get_conn(filename=None):
    if filename is None:
        filename = _xh_sqlite_get_file_name()
    return sqlite3.connect(filename)


def _xh_sqlite_create_history_table(cursor):
    """Create Table for history items.

    Columns:
        info - JSON formatted, reserved for future extension.
        frequency - in case of HISTCONTROL=erasedups,
        it tracks the frequency of the inputs. helps in sorting autocompletion
    """
    if not getattr(XH_SQLITE_CACHE, XH_SQLITE_CREATED_SQL_TBL, False):
        cursor.execute(
            """
            CREATE TABLE IF NOT EXISTS {}
                 (inp TEXT,
                  rtn INTEGER,
                  tsb REAL,
                  tse REAL,
                  sessionid TEXT,
                  out TEXT,
                  info TEXT,
                  frequency INTEGER default 1
                 )
        """.format(
                XH_SQLITE_TABLE_NAME
            )
        )
        # add frequency column if not exists for backward compatibility
        try:
            cursor.execute(
                "ALTER TABLE "
                + XH_SQLITE_TABLE_NAME
                + " ADD COLUMN frequency INTEGER default 1"
            )
        except sqlite3.OperationalError:
            pass

        # mark that this function ran for this session
        setattr(XH_SQLITE_CACHE, XH_SQLITE_CREATED_SQL_TBL, True)


def _xh_sqlite_get_frequency(cursor, input):
    # type: (sqlite3.Cursor, str) -> int
    sql = "SELECT sum(frequency) FROM {} WHERE inp=?".format(XH_SQLITE_TABLE_NAME)
    cursor.execute(sql, (input,))
    return cursor.fetchone()[0] or 0


def _xh_sqlite_erase_dups(cursor, input):
    freq = _xh_sqlite_get_frequency(cursor, input)
    sql = "DELETE FROM {} WHERE inp=?".format(XH_SQLITE_TABLE_NAME)
    cursor.execute(sql, (input,))
    return freq


def _sql_insert(cursor, values):
    # type: (sqlite3.Cursor, dict) -> None
    """handy function to run insert query"""
    sql = "INSERT INTO {} ({}) VALUES ({});"
    fields = ", ".join(values)
    marks = ", ".join(["?"] * len(values))
    cursor.execute(
        sql.format(XH_SQLITE_TABLE_NAME, fields, marks), tuple(values.values())
    )


def _xh_sqlite_insert_command(cursor, cmd, sessionid, store_stdout, remove_duplicates):
    tss = cmd.get("ts", [None, None])
    values = collections.OrderedDict(
        [
            ("inp", cmd["inp"].rstrip()),
            ("rtn", cmd["rtn"]),
            ("tsb", tss[0]),
            ("tse", tss[1]),
            ("sessionid", sessionid),
        ]
    )
    if store_stdout and "out" in cmd:
        values["out"] = cmd["out"]
    if "info" in cmd:
        info = json.dumps(cmd["info"])
        values["info"] = info
    if remove_duplicates:
        values["frequency"] = _xh_sqlite_erase_dups(cursor, values["inp"]) + 1
    _sql_insert(cursor, values)


def _xh_sqlite_get_count(cursor, sessionid=None):
    sql = "SELECT count(*) FROM xonsh_history "
    params = []
    if sessionid is not None:
        sql += "WHERE sessionid = ? "
        params.append(str(sessionid))
    cursor.execute(sql, tuple(params))
    return cursor.fetchone()[0]


def _xh_sqlite_get_records(cursor, sessionid=None, limit=None, newest_first=False):
    sql = "SELECT inp, tsb, rtn, frequency FROM xonsh_history "
    params = []
    if sessionid is not None:
        sql += "WHERE sessionid = ? "
        params.append(sessionid)
    sql += "ORDER BY tsb "
    if newest_first:
        sql += "DESC "
    if limit is not None:
        sql += "LIMIT %d " % limit
    cursor.execute(sql, tuple(params))
    return cursor.fetchall()


def _xh_sqlite_delete_records(cursor, size_to_keep):
    sql = "SELECT min(tsb) FROM ("
    sql += "SELECT tsb FROM xonsh_history ORDER BY tsb DESC "
    sql += "LIMIT %d)" % size_to_keep
    cursor.execute(sql)
    result = cursor.fetchone()
    if not result:
        return
    max_tsb = result[0]
    sql = "DELETE FROM xonsh_history WHERE tsb < ?"
    result = cursor.execute(sql, (max_tsb,))
    return result.rowcount


def xh_sqlite_append_history(
    cmd, sessionid, store_stdout, filename=None, remove_duplicates=False
):
    with _xh_sqlite_get_conn(filename=filename) as conn:
        c = conn.cursor()
        _xh_sqlite_create_history_table(c)
        _xh_sqlite_insert_command(c, cmd, sessionid, store_stdout, remove_duplicates)
        conn.commit()


def xh_sqlite_get_count(sessionid=None, filename=None):
    with _xh_sqlite_get_conn(filename=filename) as conn:
        c = conn.cursor()
        return _xh_sqlite_get_count(c, sessionid=sessionid)


def xh_sqlite_items(sessionid=None, filename=None, newest_first=False):
    with _xh_sqlite_get_conn(filename=filename) as conn:
        c = conn.cursor()
        _xh_sqlite_create_history_table(c)
        return _xh_sqlite_get_records(c, sessionid=sessionid, newest_first=newest_first)


def xh_sqlite_delete_items(size_to_keep, filename=None):
    with _xh_sqlite_get_conn(filename=filename) as conn:
        c = conn.cursor()
        _xh_sqlite_create_history_table(c)
        return _xh_sqlite_delete_records(c, size_to_keep)


[docs]class SqliteHistoryGC(threading.Thread): """Shell history garbage collection.""" def __init__(self, wait_for_shell=True, size=None, filename=None, *args, **kwargs): """Thread responsible for garbage collecting old history. May wait for shell (and for xonshrc to have been loaded) to start work. """ super().__init__(*args, **kwargs) self.daemon = True self.filename = filename self.size = size self.wait_for_shell = wait_for_shell self.start()
[docs] def run(self): while self.wait_for_shell: time.sleep(0.01) if self.size is not None: hsize, units = xt.to_history_tuple(self.size) else: envs = builtins.__xonsh__.env hsize, units = envs.get("XONSH_HISTORY_SIZE") if units != "commands": print( "sqlite backed history gc currently only supports " '"commands" as units', file=sys.stderr, ) return if hsize < 0: return xh_sqlite_delete_items(hsize, filename=self.filename)
[docs]class SqliteHistory(History): """Xonsh history backend implemented with sqlite3.""" def __init__(self, gc=True, filename=None, **kwargs): super().__init__(**kwargs) if filename is None: filename = _xh_sqlite_get_file_name() self.filename = filename self.gc = SqliteHistoryGC() if gc else None self._last_hist_inp = None self.inps = [] self.rtns = [] self.outs = [] self.tss = [] # during init rerun create command setattr(XH_SQLITE_CACHE, XH_SQLITE_CREATED_SQL_TBL, False)
[docs] def append(self, cmd): envs = builtins.__xonsh__.env inp = cmd["inp"].rstrip() self.inps.append(inp) store_stdout = envs.get("XONSH_STORE_STDOUT", False) if store_stdout: self.outs.append(cmd.get("out")) else: self.outs.append(None) self.rtns.append(cmd["rtn"]) self.tss.append(cmd.get("ts", (None, None))) opts = envs.get("HISTCONTROL") if "ignoredups" in opts and inp == self._last_hist_inp: # Skipping dup cmd return if "ignoreerr" in opts and cmd["rtn"] != 0: # Skipping failed cmd return self._last_hist_inp = inp xh_sqlite_append_history( cmd, str(self.sessionid), store_stdout, filename=self.filename, remove_duplicates=("erasedups" in opts), )
[docs] def all_items(self, newest_first=False, session_id=None): """Display all history items.""" for inp, ts, rtn, freq in xh_sqlite_items( filename=self.filename, newest_first=newest_first, sessionid=session_id ): yield {"inp": inp, "ts": ts, "rtn": rtn, "frequency": freq}
[docs] def items(self, newest_first=False): """Display history items of current session.""" yield from self.all_items(newest_first, session_id=str(self.sessionid))
[docs] def info(self): data = collections.OrderedDict() data["backend"] = "sqlite" data["sessionid"] = str(self.sessionid) data["filename"] = self.filename data["session items"] = xh_sqlite_get_count( sessionid=self.sessionid, filename=self.filename ) data["all items"] = xh_sqlite_get_count(filename=self.filename) envs = builtins.__xonsh__.env data["gc options"] = envs.get("XONSH_HISTORY_SIZE") return data
[docs] def run_gc(self, size=None, blocking=True): self.gc = SqliteHistoryGC(wait_for_shell=False, size=size) if blocking: while self.gc.is_alive(): continue