Source code for xonsh.codecache

"""Tools for caching xonsh code."""

import hashlib
import marshal
import os
import sys

from xonsh import __version__ as XONSH_VERSION
from xonsh.built_ins import XSH
from xonsh.lib.lazyasd import lazyobject
from xonsh.platform import PYTHON_VERSION_INFO_BYTES
from xonsh.tools import is_writable_file, print_warning


def _splitpath(path, sofar=()):
    folder, path = os.path.split(path)
    if path == "":
        return sofar[::-1]
    elif folder == "":
        return (sofar + (path,))[::-1]
    else:
        return _splitpath(folder, sofar + (path,))


@lazyobject
def _CHARACTER_MAP():
    cmap = {chr(o): "_" + chr(o + 32) for o in range(65, 91)}
    cmap.update({".": "_.", "_": "__"})
    return cmap


def _cache_renamer(path, code=False):
    if not code:
        path = os.path.realpath(path)
    o = ["".join(_CHARACTER_MAP.get(i, i) for i in w) for w in _splitpath(path)]
    o[-1] = f"{o[-1]}.{sys.implementation.cache_tag}"
    return o


[docs] def should_use_cache(execer, mode): """ Return ``True`` if caching has been enabled for this mode (through command line flags or environment variables) """ if mode == "exec": return (execer.scriptcache or execer.cacheall) and ( XSH.env["XONSH_CACHE_SCRIPTS"] or XSH.env["XONSH_CACHE_EVERYTHING"] ) else: return execer.cacheall or XSH.env["XONSH_CACHE_EVERYTHING"]
[docs] def run_compiled_code(code, glb, loc, mode): """ Helper to run code in a given mode and context. Returns a sys.exc_info() triplet in case the code raises an exception, or (None, None, None) otherwise. """ if code is None: return if mode in {"exec", "single"}: func = exec else: func = eval try: func(code, glb, loc) return (None, None, None) except BaseException: type, value, traceback = sys.exc_info() # strip off the current frame as the traceback should only show user code traceback = traceback.tb_next return type, value, traceback
[docs] def get_cache_filename(fname, code=True): """ Return the filename of the cache for the given filename. Cache filenames are similar to those used by the Mercurial DVCS for its internal store. The ``code`` switch should be true if we should use the code store rather than the script store. """ datadir = XSH.env["XONSH_DATA_DIR"] cachedir = os.path.join( datadir, "xonsh_code_cache" if code else "xonsh_script_cache" ) cachefname = os.path.join(cachedir, *_cache_renamer(fname, code=code)) return cachefname
[docs] def update_cache(ccode, cache_file_name): """ Update the cache at ``cache_file_name`` to contain the compiled code represented by ``ccode``. """ if cache_file_name is not None: if not is_writable_file(cache_file_name): if XSH.env.get("XONSH_DEBUG", "False"): print_warning( f"update_cache: Cache file is not writable: {cache_file_name}\n" f"Set $XONSH_CACHE_SCRIPTS=0, $XONSH_CACHE_EVERYTHING=0 to disable cache." ) return os.makedirs(os.path.dirname(cache_file_name), exist_ok=True) with open(cache_file_name, "wb") as cfile: cfile.write(XONSH_VERSION.encode() + b"\n") cfile.write(bytes(PYTHON_VERSION_INFO_BYTES) + b"\n") marshal.dump(ccode, cfile)
def _check_cache_versions(cfile): # version data should be < 1 kb ver = cfile.readline(1024).strip() if ver != XONSH_VERSION.encode(): return False ver = cfile.readline(1024).strip() return ver == PYTHON_VERSION_INFO_BYTES
[docs] def compile_code(filename, code, execer, glb, loc, mode): """ Wrapper for ``execer.compile`` to compile the given code """ if filename.endswith(".py") and mode == "exec": return compile(code, filename, mode) if not code.endswith("\n"): code += "\n" old_filename = execer.filename try: execer.filename = filename ccode = execer.compile(code, glbs=glb, locs=loc, mode=mode, filename=filename) except Exception: raise finally: execer.filename = old_filename return ccode
[docs] def script_cache_check(filename, cachefname): """ Check whether the script cache for a particular file is valid. Returns a tuple containing: a boolean representing whether the cached code should be used, and the cached code (or ``None`` if the cache should not be used). """ ccode = None run_cached = False if os.path.isfile(cachefname): if os.stat(cachefname).st_mtime >= os.stat(filename).st_mtime: with open(cachefname, "rb") as cfile: if not _check_cache_versions(cfile): return False, None ccode = marshal.load(cfile) run_cached = True return run_cached, ccode
[docs] def run_script_with_cache(filename, execer, glb=None, loc=None, mode="exec"): """ Run a script, using a cached version if it exists (and the source has not changed), and updating the cache as necessary. See run_compiled_code for the return value. """ run_cached = False use_cache = should_use_cache(execer, mode) cachefname = get_cache_filename(filename, code=False) if use_cache: run_cached, ccode = script_cache_check(filename, cachefname) if not run_cached: with open(filename, encoding="utf-8") as f: code = f.read() ccode = compile_code(filename, code, execer, glb, loc, mode) if use_cache: update_cache(ccode, cachefname) return run_compiled_code(ccode, glb, loc, mode)
[docs] def code_cache_name(code): """ Return an appropriate spoofed filename for the given code. """ if isinstance(code, str): code = code.encode() return hashlib.md5(code).hexdigest()
[docs] def code_cache_check(cachefname): """ Check whether the code cache for a particular piece of code is valid. Returns a tuple containing: a boolean representing whether the cached code should be used, and the cached code (or ``None`` if the cache should not be used). """ ccode = None run_cached = False if os.path.isfile(cachefname): with open(cachefname, "rb") as cfile: if not _check_cache_versions(cfile): return False, None ccode = marshal.load(cfile) run_cached = True return run_cached, ccode
[docs] def run_code_with_cache( code, display_filename, execer, glb=None, loc=None, mode="exec" ): """ Run a piece of code, using a cached version if it exists, and updating the cache as necessary. See run_compiled_code for the return value. """ use_cache = should_use_cache(execer, mode) filename = code_cache_name(code) cachefname = get_cache_filename(filename, code=True) run_cached = False if use_cache: run_cached, ccode = code_cache_check(cachefname) if not run_cached: ccode = compile_code(display_filename, code, execer, glb, loc, mode) update_cache(ccode, cachefname) return run_compiled_code(ccode, glb, loc, mode)