"""Main entry points of the xonsh history."""
import argparse as ap
import datetime
import json
import os
import sys
import threading
import typing as tp
import xonsh.cli_utils as xcli
import xonsh.history.diff_history as xdh
import xonsh.tools as xt
from xonsh.built_ins import XSH
from xonsh.history.base import History
from xonsh.history.dummy import DummyHistory
from xonsh.history.json import JsonHistory
HISTORY_BACKENDS = {"dummy": DummyHistory, "json": JsonHistory}
try:
from xonsh.history.sqlite import SqliteHistory
HISTORY_BACKENDS |= {"sqlite": SqliteHistory}
except Exception:
"""
On some linux systems (e.g. alt linux) sqlite3 is not installed
and it's hard to install it and maybe user can't install it.
We need to just go forward.
"""
pass
[docs]
def construct_history(backend=None, **kwargs) -> "History":
"""Construct the history backend object."""
env = XSH.env
backend = backend or env.get("XONSH_HISTORY_BACKEND", "json")
if isinstance(backend, str) and backend in HISTORY_BACKENDS:
kls_history = HISTORY_BACKENDS[backend]
elif xt.is_class(backend):
kls_history = backend
elif isinstance(backend, History):
return backend
else:
print(
f"Unknown history backend: {backend}. Using JSON version",
file=sys.stderr,
)
kls_history = JsonHistory
try:
return kls_history(**kwargs)
except Exception as e:
xt.print_exception(
f"Error during load {kls_history}: {e}\n"
f"Set $XONSH_HISTORY_BACKEND='dummy' to disable history.\n"
f"History disabled."
)
return DummyHistory()
def _xh_session_parser(hist=None, newest_first=False, **kwargs):
"""Returns history items of current session."""
if hist is None:
hist = XSH.history
return hist.items()
def _xh_all_parser(hist=None, newest_first=False, **kwargs):
"""Returns all history items."""
if hist is None:
hist = XSH.history
return hist.all_items(newest_first=newest_first)
def _xh_find_histfile_var(file_list, default=None):
"""Return the path of the history file
from the value of the envvar HISTFILE.
"""
for f in file_list:
f = xt.expanduser_abs_path(f)
if not os.path.isfile(f):
continue
with open(f) as rc_file:
for line in rc_file:
if line.startswith("HISTFILE="):
hist_file = line.split("=", 1)[1].strip("'\"\n")
hist_file = xt.expanduser_abs_path(hist_file)
if os.path.isfile(hist_file):
return hist_file
else:
if default:
default = xt.expanduser_abs_path(default)
if os.path.isfile(default):
return default
def _xh_bash_hist_parser(location=None, **kwargs):
"""Yield commands from bash history file"""
if location is None:
location = _xh_find_histfile_var(
[os.path.join("~", ".bashrc"), os.path.join("~", ".bash_profile")],
os.path.join("~", ".bash_history"),
)
if location:
try:
with open(location, errors="backslashreplace") as bash_hist:
for ind, line in enumerate(bash_hist):
yield {"inp": line.rstrip(), "ts": 0.0, "ind": ind}
except PermissionError:
print(f"Bash history permission error in {location!r}", file=sys.stderr)
yield {
"inp": f"# Bash history permission error in {location!r}",
"ts": 0.0,
"ind": 0,
}
else:
print("No bash history file", file=sys.stderr)
def _xh_zsh_hist_parser(location=None, **kwargs):
"""Yield commands from zsh history file"""
if location is None:
location = _xh_find_histfile_var(
[os.path.join("~", ".zshrc"), os.path.join("~", ".zprofile")],
os.path.join("~", ".zsh_history"),
)
if location:
with open(location, errors="backslashreplace") as zsh_hist:
for ind, line in enumerate(zsh_hist):
if line.startswith(":"):
try:
start_time, command = line.split(";", 1)
except ValueError:
# Invalid history entry
continue
try:
start_time = float(start_time.split(":")[1])
except ValueError:
start_time = 0.0
yield {"inp": command.rstrip(), "ts": start_time, "ind": ind}
else:
yield {"inp": line.rstrip(), "ts": 0.0, "ind": ind}
else:
print("No zsh history file found", file=sys.stderr)
def _xh_filter_ts(commands, start_time, end_time):
"""Yield only the commands between start and end time."""
for cmd in commands:
if start_time <= cmd["ts"] < end_time:
yield cmd
def _xh_get_history(
session="session",
*,
slices=None,
datetime_format=None,
start_time=None,
end_time=None,
location=None,
):
"""Get the requested portion of shell history.
Parameters
----------
session: {'session', 'all', 'xonsh', 'bash', 'zsh'}
The history session to get.
slices : list of slice-like objects, optional
Get only portions of history.
start_time, end_time: float, optional
Filter commands by timestamp.
location: string, optional
The history file location (bash or zsh)
Returns
-------
generator
A filtered list of commands
"""
cmds = []
for i, item in enumerate(_XH_HISTORY_SESSIONS[session](location=location)):
item["ind"] = i
cmds.append(item)
if slices:
# transform/check all slices
slices = [xt.ensure_slice(s) for s in slices]
cmds = xt.get_portions(cmds, slices)
if start_time or end_time:
if start_time is None:
start_time = 0.0
else:
start_time = xt.ensure_timestamp(start_time, datetime_format)
if end_time is None:
end_time = float("inf")
else:
end_time = xt.ensure_timestamp(end_time, datetime_format)
cmds = _xh_filter_ts(cmds, start_time, end_time)
return cmds
_XH_HISTORY_SESSIONS = {
"session": _xh_session_parser,
"xonsh": _xh_all_parser,
"all": _xh_all_parser,
"zsh": _xh_zsh_hist_parser,
"bash": _xh_bash_hist_parser,
}
[docs]
class SessionAction(ap.Action):
"""Set the choices lazily"""
def __init__(self, *args, **kwargs):
kwargs.setdefault("choices", tuple(_XH_HISTORY_SESSIONS))
super().__init__(*args, **kwargs)
def __call__(self, parser, namespace, values, option_string: "str | None" = None):
setattr(namespace, self.dest, values)
[docs]
class HistoryAlias(xcli.ArgParserAlias):
"""Try 'history <command> --help' for more info"""
[docs]
def show(
self,
session: xcli.Annotated[
str, xcli.Arg(nargs="?", action=SessionAction)
] = "session",
slices: xcli.Annotated[list[int], xcli.Arg(nargs="*")] = None,
datetime_format: tp.Optional[str] = None,
start_time: tp.Optional[str] = None,
end_time: tp.Optional[str] = None,
location: tp.Optional[str] = None,
reverse=False,
numerate=False,
timestamp=False,
null_byte=False,
_stdout=None,
_stderr=None,
_unparsed=None,
):
"""Display history of a session, default command
Parameters
----------
session:
The history session to get. (all is an alias for xonsh)
slices:
integer or slice notation to get only portions of history.
datetime_format : -f
the datetime format to be used for filtering and printing
start_time: --start-time, +T
show only commands after timestamp
end_time: -T, --end-time
show only commands before timestamp
location: -l, --location
The history file location (bash or zsh)
reverse: -r, --reverse
Reverses the direction
numerate: -n, --numerate
Numerate each command
timestamp: -t, --ts, --time-stamp
show command timestamps
null_byte: -0, --nb, --null-byte
separate commands by the null character for piping history to external filters
_unparsed
remaining args from ``parser.parse_known_args``
"""
slices = list(slices or ())
if _unparsed:
slices.extend(_unparsed)
try:
commands = _xh_get_history(
session,
slices=slices,
start_time=start_time,
end_time=end_time,
datetime_format=datetime_format,
location=location,
)
except Exception as err:
self.parser.error(err)
return
if reverse:
commands = reversed(list(commands))
end = "\0" if null_byte else "\n"
if numerate and timestamp:
for c in commands:
dt = datetime.datetime.fromtimestamp(c["ts"])
print(
"{}:({}) {}".format(c["ind"], xt.format_datetime(dt), c["inp"]),
file=_stdout,
end=end,
)
elif numerate:
for c in commands:
print("{}: {}".format(c["ind"], c["inp"]), file=_stdout, end=end)
elif timestamp:
for c in commands:
dt = datetime.datetime.fromtimestamp(c["ts"])
print(
"({}) {}".format(xt.format_datetime(dt), c["inp"]),
file=_stdout,
end=end,
)
else:
for c in commands:
print(c["inp"], file=_stdout, end=end)
[docs]
@staticmethod
def id_cmd(_stdout):
"""Display the current session id"""
hist = XSH.history
if not hist.sessionid:
return
print(str(hist.sessionid), file=_stdout)
[docs]
@staticmethod
def pull(show_commands=False, _stdout=None):
"""Pull history from other parallel sessions.
Parameters
----------
show_commands: -c, --show-commands
show pulled commands
"""
hist = XSH.history
if hist.pull.__module__ == "xonsh.history.base":
backend = XSH.env.get("XONSH_HISTORY_BACKEND", "unknown")
print(
f"Pull method is not supported in {backend} history backend.",
file=_stdout,
)
lines_added = hist.pull(show_commands)
if lines_added:
print(f"Added {lines_added} records!", file=_stdout)
else:
print("No records found!", file=_stdout)
[docs]
@staticmethod
def flush(_stdout):
"""Flush the current history to disk"""
hist = XSH.history
hf = hist.flush()
if isinstance(hf, threading.Thread):
hf.join()
[docs]
@staticmethod
def off():
"""History will not be saved for this session"""
hist = XSH.history
if hist.remember_history:
hist.clear()
hist.remember_history = False
print("History off", file=sys.stderr)
[docs]
@staticmethod
def on():
"""History will be saved for the rest of the session (default)"""
hist = XSH.history
if not hist.remember_history:
hist.remember_history = True
print("History on", file=sys.stderr)
[docs]
@staticmethod
def clear():
"""One-time wipe of session history"""
hist = XSH.history
hist.clear()
print("History cleared", file=sys.stderr)
[docs]
@staticmethod
def delete(pattern):
"""Delete all commands matching a pattern
Parameters
----------
pattern:
regex pattern to match against command history
"""
hist = XSH.history
deleted = hist.delete(pattern)
print(f"Deleted {deleted} entries from history")
[docs]
@staticmethod
def file(_stdout):
"""Display the current history filename"""
hist = XSH.history
if not hist.filename:
return
print(str(hist.filename), file=_stdout)
[docs]
@staticmethod
def info(
to_json=False,
_stdout=None,
):
"""Display information about the current history
Parameters
----------
to_json: -j, --json
print in JSON format
"""
hist = XSH.history
data = hist.info()
if to_json:
s = json.dumps(data)
print(s, file=_stdout)
else:
lines = [f"{k}: {v}" for k, v in data.items()]
print("\n".join(lines), file=_stdout)
[docs]
@staticmethod
def gc(
size: xcli.Annotated[tuple[int, str], xcli.Arg(nargs=2)] = None,
force=False,
blocking=True,
):
"""Launches a new history garbage collector
Parameters
----------
size : -s, --size
Next two arguments represent the history size and units; e.g. "--size 8128 commands"
force : -f, --force
perform garbage collection even if history much bigger than configured limit
blocking : -n, --non-blocking
makes the gc non-blocking, and thus return sooner. By default it runs on main thread blocking input.
"""
hist = XSH.history
hist.run_gc(size=size, blocking=blocking, force=force)
[docs]
@staticmethod
def diff(
a,
b,
reopen=False,
verbose=False,
_stdout=None,
):
"""Diff two xonsh history files
Parameters
----------
left:
The first file to diff
right:
The second file to diff
reopen: -r, --reopen
make lazy file loading reopen files each time
verbose: -v, --verbose
whether to print even more information
"""
hist = XSH.history
if isinstance(hist, JsonHistory):
hd = xdh.HistoryDiffer(a, b, reopen=reopen, verbose=verbose)
xt.print_color(hd.format(), file=_stdout)
[docs]
def transfer(
self,
source: tp.Annotated[str, xcli.Arg(action=SessionAction)],
source_file: "str|None" = None,
target: tp.Annotated["str | None", xcli.Arg(action=SessionAction)] = None,
target_file: "str|None" = None,
):
"""Transfer entries between history backends.
Parameters
----------
source
Name of the source history backend
source_file : --source-file, --sf
Override the default location of the history file of the backend.
target : --target, -t
Name of the target history backend. (default: $XONSH_HISTORY_BACKEND)
target_file : --target-file, --tf
Path to the location of the history file.
Notes
-----
It will not remove duplicate entries, use $HISTCONTROL for managing such entries.
"""
if source == target:
raise self.Error("source and target backend can't be the same")
src = construct_history(backend=source, filename=source_file, gc=False)
dest = construct_history(backend=target, filename=target_file, gc=False)
for entry in src.all_items():
dest.append(entry)
dest.flush()
self.out("Done")
[docs]
def build(self):
parser = self.create_parser(prog="history")
parser.add_command(self.show, prefix_chars="-+")
parser.add_command(self.id_cmd, prog="id")
parser.add_command(self.file)
parser.add_command(self.info)
parser.add_command(self.pull)
parser.add_command(self.flush)
parser.add_command(self.off)
parser.add_command(self.on)
parser.add_command(self.clear)
parser.add_command(self.delete)
parser.add_command(self.gc)
parser.add_command(self.transfer)
if isinstance(XSH.history, JsonHistory):
# add actions belong only to JsonHistory
parser.add_command(self.diff)
return parser
def __call__(self, args, *rest, **kwargs):
if not args:
args = ["show", "session"]
else:
actions = self.parser.commands.choices
cmd = args[0]
if cmd not in actions and cmd not in {"-h", "--help"}:
args = ["show", "session"] + args
if args[0] == "show":
if not any(a in _XH_HISTORY_SESSIONS for a in args):
args.insert(1, "session")
kwargs.setdefault("lenient", True)
return super().__call__(args, *rest, **kwargs)
history_main = HistoryAlias(threadable=True)