Source code for xonsh.__amalgam__

"""Amalgamation of xonsh package, made up of the following modules, in order:

* cli_utils
* contexts
* lazyasd
* lazyjson
* platform
* pretty
* xontribs_meta
* codecache
* lazyimps
* parser
* tokenize
* tools
* ast
* color_tools
* commands_cache
* completer
* events
* foreign_shells
* jobs
* jsonutils
* lexer
* openpy
* xontribs
* ansi_colors
* diff_history
* dirstack
* execer
* shell
* style_tools
* timings
* xonfig
* base_shell
* environ
* imphooks
* inspectors
* aliases
* main
* readline_shell
* tracer
* dumb_shell

"""

from sys import modules as _modules
from types import ModuleType as _ModuleType
from importlib import import_module as _import_module


class _LazyModule(_ModuleType):

    def __init__(self, pkg, mod, asname=None):
        '''Lazy module 'pkg.mod' in package 'pkg'.'''
        self.__dct__ = {
            'loaded': False,
            'pkg': pkg,  # pkg
            'mod': mod,  # pkg.mod
            'asname': asname,  # alias
            }

    @classmethod
    def load(cls, pkg, mod, asname=None):
        if mod in _modules:
            key = pkg if asname is None else mod
            return _modules[key]
        else:
            return cls(pkg, mod, asname)

    def __getattribute__(self, name):
        if name == '__dct__':
            return super(_LazyModule, self).__getattribute__(name)
        dct = self.__dct__
        mod = dct['mod']
        if dct['loaded']:
            m = _modules[mod]
        else:
            m = _import_module(mod)
            glbs = globals()
            pkg = dct['pkg']
            asname = dct['asname']
            if asname is None:
                glbs[pkg] = m = _modules[pkg]
            else:
                glbs[asname] = m
            dct['loaded'] = True
        return getattr(m, name)

#
# cli_utils
#
"""
small functions to create argparser CLI from functions.
"""

ap = _LazyModule.load('argparse', 'argparse', 'ap')
os = _LazyModule.load('os', 'os')
tp = _LazyModule.load('typing', 'typing', 'tp')
def _get_func_doc(doc: str) -> str:
    lines = doc.splitlines()
    if "Parameters" in lines:
        idx = lines.index("Parameters")
        lines = lines[:idx]
    return os.linesep.join(lines)


def _from_index_of(container: tp.Sequence[str], key: str):
    if key in container:
        idx = container.index(key)
        if idx + 1 < len(container):
            return container[idx + 1 :]
    return []


def _get_param_doc(doc: str, param: str) -> str:
    lines = tuple(doc.splitlines())
    if "Parameters" not in lines:
        return ""

    par_doc = []
    for lin in _from_index_of(lines, param):
        if lin and not lin.startswith(" "):
            break
        par_doc.append(lin)
    return os.linesep.join(par_doc).strip()


def get_doc(func: tp.Callable, parameter: str = None):
    """Parse the function docstring and return its help content

    Parameters
    ----------
    func
        a callable object that holds docstring
    parameter
        name of the function parameter to parse doc for

    Returns
    -------
    str
        doc of the parameter/function
    """
    import inspect

    doc = inspect.getdoc(func) or ""
    if parameter:
        return _get_param_doc(doc, parameter)
    else:
        return _get_func_doc(doc)


_FUNC_NAME = "_func_"


def make_parser(
    func: tp.Callable,
    subparser: ap._SubParsersAction = None,
    params: tp.Dict[str, tp.Dict[str, tp.Any]] = None,
    **kwargs
) -> "ap.ArgumentParser":
    """A bare-bones argparse builder from functions"""

    doc = get_doc(func)
    kwargs.setdefault("formatter_class", ap.RawTextHelpFormatter)
    if subparser is None:
        kwargs.setdefault("description", doc)
        parser = ap.ArgumentParser(**kwargs)
        parser.set_defaults(
            **{_FUNC_NAME: lambda stdout: parser.print_help(file=stdout)}
        )
        return parser
    else:
        parser = subparser.add_parser(
            kwargs.pop("prog", func.__name__),
            help=doc,
            **kwargs,
        )
        parser.set_defaults(**{_FUNC_NAME: func})

        if params:
            for par, args in params.items():
                args.setdefault("help", get_doc(func, par))
                parser.add_argument(par, **args)

        return parser


def dispatch(**ns):
    """call the sub-command selected by user"""
    import inspect

    func = ns[_FUNC_NAME]
    sign = inspect.signature(func)
    kwargs = {}
    for name, _ in sign.parameters.items():
        kwargs[name] = ns[name]
    return func(**kwargs)

#
# contexts
#
"""Context management tools for xonsh."""
sys = _LazyModule.load('sys', 'sys')
textwrap = _LazyModule.load('textwrap', 'textwrap')
from collections.abc import Mapping

from xonsh.built_ins import XSH


class Block(object):
    """This is a context manager for obtaining a block of lines without actually
    executing the block. The lines are accessible as the 'lines' attribute.
    This must be used as a macro.
    """

    __xonsh_block__ = str

    def __init__(self):
        """
        Attributes
        ----------
        lines : list of str or None
            Block lines as if split by str.splitlines(), if available.
        glbs : Mapping or None
            Global execution context, ie globals().
        locs : Mapping or None
            Local execution context, ie locals().
        """
        self.lines = self.glbs = self.locs = None

    def __enter__(self):
        if not hasattr(self, "macro_block"):
            raise XSH.builtins.XonshError(
                self.__class__.__name__ + " must be entered as a macro!"
            )
        self.lines = self.macro_block.splitlines()
        self.glbs = self.macro_globals
        if self.macro_locals is not self.macro_globals:
            # leave locals as None when it is the same as globals
            self.locs = self.macro_locals
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        pass


class Functor(Block):
    """This is a context manager that turns the block into a callable
    object, bound to the execution context it was created in.
    """

    def __init__(self, args=(), kwargs=None, rtn=""):
        """
        Parameters
        ----------
        args : Sequence of str, optional
            A tuple of argument names for the functor.
        kwargs : Mapping of str to values or list of item tuples, optional
            Keyword argument names and values, if available.
        rtn : str, optional
            Name of object to return, if available.

        Attributes
        ----------
        func : function
            The underlying function object. This defaults to none and is set
            after the the block is exited.
        """
        super().__init__()
        self.func = None
        self.args = args
        if kwargs is None:
            self.kwargs = []
        elif isinstance(kwargs, Mapping):
            self.kwargs = sorted(kwargs.items())
        else:
            self.kwargs = kwargs
        self.rtn = rtn

    def __enter__(self):
        super().__enter__()
        body = textwrap.indent(self.macro_block, "    ")
        uid = hash(body) + sys.maxsize  # should always be a positive int
        name = "__xonsh_functor_{uid}__".format(uid=uid)
        # construct signature string
        sig = rtn = ""
        sig = ", ".join(self.args)
        kwstr = ", ".join([k + "=None" for k, _ in self.kwargs])
        if len(kwstr) > 0:
            sig = kwstr if len(sig) == 0 else sig + ", " + kwstr
        # construct return string
        rtn = str(self.rtn)
        if len(rtn) > 0:
            rtn = "    return " + rtn + "\n"
        # construct function string
        fstr = "def {name}({sig}):\n{body}\n{rtn}"
        fstr = fstr.format(name=name, sig=sig, body=body, rtn=rtn)
        glbs = self.glbs
        locs = self.locs
        execer = XSH.execer
        execer.exec(fstr, glbs=glbs, locs=locs)
        if locs is not None and name in locs:
            func = locs[name]
        elif name in glbs:
            func = glbs[name]
        else:
            raise ValueError("Functor block could not be found in context.")
        if len(self.kwargs) > 0:
            func.__defaults__ = tuple(v for _, v in self.kwargs)
        self.func = func
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        pass

    def __call__(self, *args, **kwargs):
        """Dispatches to func."""
        if self.func is None:
            msg = "{} block with 'None' func not callable"
            raise AttributeError(msg.formst(self.__class__.__name__))
        return self.func(*args, **kwargs)

#
# lazyasd
#
"""Lazy and self destructive containers for speeding up module import."""
# Copyright 2015-2016, the xonsh developers. All rights reserved.
# amalgamated os
# amalgamated sys
time = _LazyModule.load('time', 'time')
types = _LazyModule.load('types', 'types')
builtins = _LazyModule.load('builtins', 'builtins')
threading = _LazyModule.load('threading', 'threading')
importlib = _LazyModule.load('importlib', 'importlib')
importlib = _LazyModule.load('importlib', 'importlib.util')
cabc = _LazyModule.load('collections', 'collections.abc', 'cabc')
# amalgamated typing
__version__ = "0.1.3"


class LazyObject(object):
    def __init__(self, load, ctx, name):
        """Lazily loads an object via the load function the first time an
        attribute is accessed. Once loaded it will replace itself in the
        provided context (typically the globals of the call site) with the
        given name.

        For example, you can prevent the compilation of a regular expression
        until it is actually used::

            DOT = LazyObject((lambda: re.compile('.')), globals(), 'DOT')

        Parameters
        ----------
        load : function with no arguments
            A loader function that performs the actual object construction.
        ctx : Mapping
            Context to replace the LazyObject instance in
            with the object returned by load().
        name : str
            Name in the context to give the loaded object. This *should*
            be the name on the LHS of the assignment.
        """
        self._lasdo = {"loaded": False, "load": load, "ctx": ctx, "name": name}

    def _lazy_obj(self):
        d = self._lasdo
        if d["loaded"]:
            obj = d["obj"]
        else:
            obj = d["load"]()
            d["ctx"][d["name"]] = d["obj"] = obj
            d["loaded"] = True
        return obj

    def __getattribute__(self, name):
        if name == "_lasdo" or name == "_lazy_obj":
            return super().__getattribute__(name)
        obj = self._lazy_obj()
        return getattr(obj, name)

    def __bool__(self):
        obj = self._lazy_obj()
        return bool(obj)

    def __iter__(self):
        obj = self._lazy_obj()
        yield from obj

    def __getitem__(self, item):
        obj = self._lazy_obj()
        return obj[item]

    def __setitem__(self, key, value):
        obj = self._lazy_obj()
        obj[key] = value

    def __delitem__(self, item):
        obj = self._lazy_obj()
        del obj[item]

    def __call__(self, *args, **kwargs):
        obj = self._lazy_obj()
        return obj(*args, **kwargs)

    def __lt__(self, other):
        obj = self._lazy_obj()
        return obj < other

    def __le__(self, other):
        obj = self._lazy_obj()
        return obj <= other

    def __eq__(self, other):
        obj = self._lazy_obj()
        return obj == other

    def __ne__(self, other):
        obj = self._lazy_obj()
        return obj != other

    def __gt__(self, other):
        obj = self._lazy_obj()
        return obj > other

    def __ge__(self, other):
        obj = self._lazy_obj()
        return obj >= other

    def __hash__(self):
        obj = self._lazy_obj()
        return hash(obj)

    def __or__(self, other):
        obj = self._lazy_obj()
        return obj | other

    def __str__(self):
        return str(self._lazy_obj())

    def __repr__(self):
        return repr(self._lazy_obj())


RT = tp.TypeVar("RT")


def lazyobject(f: tp.Callable[..., RT]) -> RT:
    """Decorator for constructing lazy objects from a function."""
    return LazyObject(f, f.__globals__, f.__name__)  # type: ignore


class LazyDict(cabc.MutableMapping):
    def __init__(self, loaders, ctx, name):
        """Dictionary like object that lazily loads its values from an initial
        dict of key-loader function pairs.  Each key is loaded when its value
        is first accessed. Once fully loaded, this object will replace itself
        in the provided context (typically the globals of the call site) with
        the given name.

        For example, you can prevent the compilation of a bunch of regular
        expressions until they are actually used::

            RES = LazyDict({
                    'dot': lambda: re.compile('.'),
                    'all': lambda: re.compile('.*'),
                    'two': lambda: re.compile('..'),
                    }, globals(), 'RES')

        Parameters
        ----------
        loaders : Mapping of keys to functions with no arguments
            A mapping of loader function that performs the actual value
            construction upon access.
        ctx : Mapping
            Context to replace the LazyDict instance in
            with the the fully loaded mapping.
        name : str
            Name in the context to give the loaded mapping. This *should*
            be the name on the LHS of the assignment.
        """
        self._loaders = loaders
        self._ctx = ctx
        self._name = name
        self._d = type(loaders)()  # make sure to return the same type

    def _destruct(self):
        if len(self._loaders) == 0:
            self._ctx[self._name] = self._d

    def __getitem__(self, key):
        d = self._d
        if key in d:
            val = d[key]
        else:
            # pop will raise a key error for us
            loader = self._loaders.pop(key)
            d[key] = val = loader()
            self._destruct()
        return val

    def __setitem__(self, key, value):
        self._d[key] = value
        if key in self._loaders:
            del self._loaders[key]
            self._destruct()

    def __delitem__(self, key):
        if key in self._d:
            del self._d[key]
        else:
            del self._loaders[key]
            self._destruct()

    def __iter__(self):
        yield from (set(self._d.keys()) | set(self._loaders.keys()))

    def __len__(self):
        return len(self._d) + len(self._loaders)


def lazydict(f):
    """Decorator for constructing lazy dicts from a function."""
    return LazyDict(f, f.__globals__, f.__name__)


class LazyBool(object):
    def __init__(self, load, ctx, name):
        """Boolean like object that lazily computes it boolean value when it is
        first asked. Once loaded, this result will replace itself
        in the provided context (typically the globals of the call site) with
        the given name.

        For example, you can prevent the complex boolean until it is actually
        used::

            ALIVE = LazyDict(lambda: not DEAD, globals(), 'ALIVE')

        Parameters
        ----------
        load : function with no arguments
            A loader function that performs the actual boolean evaluation.
        ctx : Mapping
            Context to replace the LazyBool instance in
            with the the fully loaded mapping.
        name : str
            Name in the context to give the loaded mapping. This *should*
            be the name on the LHS of the assignment.
        """
        self._load = load
        self._ctx = ctx
        self._name = name
        self._result = None

    def __bool__(self):
        if self._result is None:
            res = self._ctx[self._name] = self._result = self._load()
        else:
            res = self._result
        return res


def lazybool(f):
    """Decorator for constructing lazy booleans from a function."""
    return LazyBool(f, f.__globals__, f.__name__)


#
# Background module loaders
#


class BackgroundModuleProxy(types.ModuleType):
    """Proxy object for modules loaded in the background that block attribute
    access until the module is loaded..
    """

    def __init__(self, modname):
        self.__dct__ = {"loaded": False, "modname": modname}

    def __getattribute__(self, name):
        passthrough = frozenset({"__dct__", "__class__", "__spec__"})
        if name in passthrough:
            return super().__getattribute__(name)
        dct = self.__dct__
        modname = dct["modname"]
        if dct["loaded"]:
            mod = sys.modules[modname]
        else:
            delay_types = (BackgroundModuleProxy, type(None))
            while isinstance(sys.modules.get(modname, None), delay_types):
                time.sleep(0.001)
            mod = sys.modules[modname]
            dct["loaded"] = True
        # some modules may do construction after import, give them a second
        stall = 0
        while not hasattr(mod, name) and stall < 1000:
            stall += 1
            time.sleep(0.001)
        return getattr(mod, name)


class BackgroundModuleLoader(threading.Thread):
    """Thread to load modules in the background."""

    def __init__(self, name, package, replacements, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.daemon = True
        self.name = name
        self.package = package
        self.replacements = replacements
        self.start()

    def run(self):
        # wait for other modules to stop being imported
        # We assume that module loading is finished when sys.modules doesn't
        # get longer in 5 consecutive 1ms waiting steps
        counter = 0
        last = -1
        while counter < 5:
            new = len(sys.modules)
            if new == last:
                counter += 1
            else:
                last = new
                counter = 0
            time.sleep(0.001)
        # now import module properly
        modname = importlib.util.resolve_name(self.name, self.package)
        if isinstance(sys.modules[modname], BackgroundModuleProxy):
            del sys.modules[modname]
        mod = importlib.import_module(self.name, package=self.package)
        for targname, varname in self.replacements.items():
            if targname in sys.modules:
                targmod = sys.modules[targname]
                setattr(targmod, varname, mod)


def load_module_in_background(
    name, package=None, debug="DEBUG", env=None, replacements=None
):
    """Entry point for loading modules in background thread.

    Parameters
    ----------
    name : str
        Module name to load in background thread.
    package : str or None, optional
        Package name, has the same meaning as in importlib.import_module().
    debug : str, optional
        Debugging symbol name to look up in the environment.
    env : Mapping or None, optional
        Environment this will default to __xonsh__.env, if available, and
        os.environ otherwise.
    replacements : Mapping or None, optional
        Dictionary mapping fully qualified module names (eg foo.bar.baz) that
        import the lazily loaded module, with the variable name in that
        module. For example, suppose that foo.bar imports module a as b,
        this dict is then {'foo.bar': 'b'}.

    Returns
    -------
    module : ModuleType
        This is either the original module that is found in sys.modules or
        a proxy module that will block until delay attribute access until the
        module is fully loaded.
    """
    modname = importlib.util.resolve_name(name, package)
    if modname in sys.modules:
        return sys.modules[modname]
    if env is None:
        xonsh_obj = getattr(builtins, "__xonsh__", None)
        env = os.environ if xonsh_obj is None else getattr(xonsh_obj, "env", os.environ)
    if env.get(debug, None):
        mod = importlib.import_module(name, package=package)
        return mod
    proxy = sys.modules[modname] = BackgroundModuleProxy(modname)
    BackgroundModuleLoader(name, package, replacements or {})
    return proxy

#
# lazyjson
#
# -*- coding: utf-8 -*-
"""Implements a lazy JSON file class that wraps around json data."""
io = _LazyModule.load('io', 'io')
weakref = _LazyModule.load('weakref', 'weakref')
contextlib = _LazyModule.load('contextlib', 'contextlib')
# amalgamated collections.abc
try:
    import ujson as json
except ImportError:
    import json  # type: ignore


def _to_json_with_size(obj, offset=0, sort_keys=False):
    if isinstance(obj, str):
        s = json.dumps(obj)
        o = offset
        n = size = len(s.encode())  # size in bytes
    elif isinstance(obj, cabc.Mapping):
        s = "{"
        j = offset + 1
        o = {}
        size = {}
        items = sorted(obj.items()) if sort_keys else obj.items()
        for key, val in items:
            s_k, o_k, n_k, size_k = _to_json_with_size(
                key, offset=j, sort_keys=sort_keys
            )
            s += s_k + ": "
            j += n_k + 2
            s_v, o_v, n_v, size_v = _to_json_with_size(
                val, offset=j, sort_keys=sort_keys
            )
            o[key] = o_v
            size[key] = size_v
            s += s_v + ", "
            j += n_v + 2
        if s.endswith(", "):
            s = s[:-2]
        s += "}\n"
        n = len(s)
        o["__total__"] = offset
        size["__total__"] = n
    elif isinstance(obj, cabc.Sequence):
        s = "["
        j = offset + 1
        o = []
        size = []
        for x in obj:
            s_x, o_x, n_x, size_x = _to_json_with_size(x, offset=j, sort_keys=sort_keys)
            o.append(o_x)
            size.append(size_x)
            s += s_x + ", "
            j += n_x + 2
        if s.endswith(", "):
            s = s[:-2]
        s += "]\n"
        n = len(s)
        o.append(offset)
        size.append(n)
    else:
        s = json.dumps(obj, sort_keys=sort_keys)
        o = offset
        n = size = len(s)
    return s, o, n, size


def index(obj, sort_keys=False):
    """Creates an index for a JSON file."""
    idx = {}
    json_obj = _to_json_with_size(obj, sort_keys=sort_keys)
    s, idx["offsets"], _, idx["sizes"] = json_obj
    return s, idx


JSON_FORMAT = """{{"locs": [{iloc:>10}, {ilen:>10}, {dloc:>10}, {dlen:>10}],
 "index": {index},
 "data": {data}
}}
"""


def dumps(obj, sort_keys=False):
    """Dumps an object to JSON with an index."""
    data, idx = index(obj, sort_keys=sort_keys)
    jdx = json.dumps(idx, sort_keys=sort_keys)
    iloc = 69
    ilen = len(jdx)
    dloc = iloc + ilen + 11
    dlen = len(data)
    s = JSON_FORMAT.format(
        index=jdx, data=data, iloc=iloc, ilen=ilen, dloc=dloc, dlen=dlen
    )
    return s


def ljdump(obj, fp, sort_keys=False):
    """Dumps an object to JSON file."""
    s = dumps(obj, sort_keys=sort_keys)
    fp.write(s)


class LJNode(cabc.Mapping, cabc.Sequence):
    """A proxy node for JSON nodes. Acts as both sequence and mapping."""

    def __init__(self, offsets, sizes, root):
        """Parameters
        ----------
        offsets : dict, list, or int
            offsets of corresponding data structure, in bytes
        sizes : dict, list, or int
            sizes of corresponding data structure, in bytes
        root : weakref.proxy of LazyJSON
            weakref back to root node, which should be a LazyJSON object.
        """
        self.offsets = offsets
        self.sizes = sizes
        self.root = root
        self.is_mapping = isinstance(self.offsets, cabc.Mapping)
        self.is_sequence = isinstance(self.offsets, cabc.Sequence)

    def __len__(self):
        # recall that for maps, the '__total__' key is added and for
        # sequences the last element represents the total size/offset.
        return len(self.sizes) - 1

    def load(self):
        """Returns the Python data structure represented by the node."""
        if self.is_mapping:
            offset = self.offsets["__total__"]
            size = self.sizes["__total__"]
        elif self.is_sequence:
            offset = self.offsets[-1]
            size = self.sizes[-1]
        elif isinstance(self.offsets, int):
            offset = self.offsets
            size = self.sizes
        return self._load_or_node(offset, size)

    def _load_or_node(self, offset, size):
        if isinstance(offset, int):
            with self.root._open(newline="\n") as f:
                f.seek(self.root.dloc + offset)
                s = f.read(size)
            val = json.loads(s)
        elif isinstance(offset, (cabc.Mapping, cabc.Sequence)):
            val = LJNode(offset, size, self.root)
        else:
            raise TypeError("incorrect types for offset node")
        return val

    def _getitem_mapping(self, key):
        if key == "__total__":
            raise KeyError('"__total__" is a special LazyJSON key!')
        offset = self.offsets[key]
        size = self.sizes[key]
        return self._load_or_node(offset, size)

    def _getitem_sequence(self, key):
        if isinstance(key, int):
            rtn = self._load_or_node(self.offsets[key], self.sizes[key])
        elif isinstance(key, slice):
            key = slice(*key.indices(len(self)))
            rtn = list(map(self._load_or_node, self.offsets[key], self.sizes[key]))
        else:
            raise TypeError("only integer indexing available")
        return rtn

    def __getitem__(self, key):
        if self.is_mapping:
            rtn = self._getitem_mapping(key)
        elif self.is_sequence:
            rtn = self._getitem_sequence(key)
        else:
            raise NotImplementedError
        return rtn

    def __iter__(self):
        if self.is_mapping:
            keys = set(self.offsets.keys())
            keys.discard("__total__")
            yield from iter(keys)
        elif self.is_sequence:
            i = 0
            n = len(self)
            while i < n:
                yield self._load_or_node(self.offsets[i], self.sizes[i])
                i += 1
        else:
            raise NotImplementedError


class LazyJSON(LJNode):
    """Represents a lazy json file. Can be used like a normal Python
    dict or list.
    """

    def __init__(self, f, reopen=True):
        """Parameters
        ----------
        f : file handle or str
            JSON file to open.
        reopen : bool, optional
            Whether new file handle should be opened for each load.
        """
        self._f = f
        self.reopen = reopen
        if not reopen and isinstance(f, str):
            self._f = open(f, "r", newline="\n")
        self._load_index()
        self.root = weakref.proxy(self)
        self.is_mapping = isinstance(self.offsets, cabc.Mapping)
        self.is_sequence = isinstance(self.offsets, cabc.Sequence)

    def __del__(self):
        self.close()

    def close(self):
        """Close the file handle, if appropriate."""
        if not self.reopen and isinstance(self._f, io.IOBase):
            try:
                self._f.close()
            except OSError:
                pass

    @contextlib.contextmanager
    def _open(self, *args, **kwargs):
        if self.reopen and isinstance(self._f, str):
            f = open(self._f, *args, **kwargs)
            yield f
            f.close()
        else:
            yield self._f

    def _load_index(self):
        """Loads the index from the start of the file."""
        with self._open(newline="\n") as f:
            # read in the location data
            f.seek(9)
            locs = f.read(48)
            locs = json.loads(locs)
            self.iloc, self.ilen, self.dloc, self.dlen = locs
            # read in the index
            f.seek(self.iloc)
            idx = f.read(self.ilen)
            idx = json.loads(idx)
        self.offsets = idx["offsets"]
        self.sizes = idx["sizes"]

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()

#
# platform
#
"""Module for platform-specific constants and implementations, as well as
compatibility layers to make use of the 'best' implementation available
on a platform.
"""
# amalgamated os
# amalgamated sys
ctypes = _LazyModule.load('ctypes', 'ctypes')
signal = _LazyModule.load('signal', 'signal')
pathlib = _LazyModule.load('pathlib', 'pathlib')
platform = _LazyModule.load('platform', 'platform')
functools = _LazyModule.load('functools', 'functools')
subprocess = _LazyModule.load('subprocess', 'subprocess')
# amalgamated collections.abc
# amalgamated importlib.util
# amalgamated xonsh.lazyasd
FD_STDIN = 0
FD_STDOUT = 1
FD_STDERR = 2


@lazyobject
def distro():
    try:
        import distro as d
    except ImportError:
        d = None
    except Exception:
        raise
    return d


#
# OS
#
ON_DARWIN = LazyBool(lambda: platform.system() == "Darwin", globals(), "ON_DARWIN")
"""``True`` if executed on a Darwin platform, else ``False``. """
ON_LINUX = LazyBool(lambda: platform.system() == "Linux", globals(), "ON_LINUX")
"""``True`` if executed on a Linux platform, else ``False``. """
ON_WINDOWS = LazyBool(lambda: platform.system() == "Windows", globals(), "ON_WINDOWS")
"""``True`` if executed on a native Windows platform, else ``False``. """
ON_CYGWIN = LazyBool(lambda: sys.platform == "cygwin", globals(), "ON_CYGWIN")
"""``True`` if executed on a Cygwin Windows platform, else ``False``. """
ON_MSYS = LazyBool(lambda: sys.platform == "msys", globals(), "ON_MSYS")
"""``True`` if executed on a MSYS Windows platform, else ``False``. """
ON_POSIX = LazyBool(lambda: (os.name == "posix"), globals(), "ON_POSIX")
"""``True`` if executed on a POSIX-compliant platform, else ``False``. """
ON_FREEBSD = LazyBool(
    lambda: (sys.platform.startswith("freebsd")), globals(), "ON_FREEBSD"
)
"""``True`` if on a FreeBSD operating system, else ``False``."""
ON_DRAGONFLY = LazyBool(
    lambda: (sys.platform.startswith("dragonfly")), globals(), "ON_DRAGONFLY"
)
"""``True`` if on a DragonFly BSD operating system, else ``False``."""
ON_NETBSD = LazyBool(
    lambda: (sys.platform.startswith("netbsd")), globals(), "ON_NETBSD"
)
"""``True`` if on a NetBSD operating system, else ``False``."""
ON_OPENBSD = LazyBool(
    lambda: (sys.platform.startswith("openbsd")), globals(), "ON_OPENBSD"
)
"""``True`` if on a OpenBSD operating system, else ``False``."""
IN_APPIMAGE = LazyBool(
    lambda: ("APPIMAGE" in os.environ and "APPDIR" in os.environ),
    globals(),
    "IN_APPIMAGE",
)
"""``True`` if in AppImage, else ``False``."""


@lazybool
def ON_BSD():
    """``True`` if on a BSD operating system, else ``False``."""
    return bool(ON_FREEBSD) or bool(ON_NETBSD) or bool(ON_OPENBSD) or bool(ON_DRAGONFLY)


@lazybool
def ON_BEOS():
    """True if we are on BeOS or Haiku."""
    return sys.platform == "beos5" or sys.platform == "haiku1"


@lazybool
def ON_WSL():
    """True if we are on Windows Subsystem for Linux (WSL)"""
    return "microsoft" in platform.release()


#
# Python & packages
#

PYTHON_VERSION_INFO = sys.version_info[:3]
""" Version of Python interpreter as three-value tuple. """


@lazyobject
def PYTHON_VERSION_INFO_BYTES():
    """The python version info tuple in a canonical bytes form."""
    return ".".join(map(str, sys.version_info)).encode()


ON_ANACONDA = LazyBool(
    lambda: pathlib.Path(sys.prefix).joinpath("conda-meta").exists(),
    globals(),
    "ON_ANACONDA",
)
""" ``True`` if executed in an Anaconda instance, else ``False``. """
CAN_RESIZE_WINDOW = LazyBool(
    lambda: hasattr(signal, "SIGWINCH"), globals(), "CAN_RESIZE_WINDOW"
)
"""``True`` if we can resize terminal window, as provided by the presense of
signal.SIGWINCH, else ``False``.
"""


@lazybool
def HAS_PYGMENTS():
    """``True`` if `pygments` is available, else ``False``."""
    spec = importlib.util.find_spec("pygments")
    return spec is not None


@functools.lru_cache(1)
def pygments_version():
    """pygments.__version__ version if available, else None."""
    if HAS_PYGMENTS:
        import pygments

        v = pygments.__version__
    else:
        v = None
    return v


@functools.lru_cache(1)
def pygments_version_info():
    """Returns `pygments`'s version as tuple of integers."""
    if HAS_PYGMENTS:
        return tuple(int(x) for x in pygments_version().strip("<>+-=.").split("."))
    else:
        return None


@functools.lru_cache(1)
def has_prompt_toolkit():
    """Tests if the `prompt_toolkit` is available."""
    spec = importlib.util.find_spec("prompt_toolkit")
    return spec is not None


@functools.lru_cache(1)
def ptk_version():
    """Returns `prompt_toolkit.__version__` if available, else ``None``."""
    if has_prompt_toolkit():
        import prompt_toolkit

        return getattr(prompt_toolkit, "__version__", "<0.57")
    else:
        return None


@functools.lru_cache(1)
def ptk_version_info():
    """Returns `prompt_toolkit`'s version as tuple of integers."""
    if has_prompt_toolkit():
        return tuple(int(x) for x in ptk_version().strip("<>+-=.").split("."))
    else:
        return None


minimum_required_ptk_version = (2, 0, 0)
"""Minimum version of prompt-toolkit supported by Xonsh"""


@functools.lru_cache(1)
def ptk_above_min_supported():
    return ptk_version_info() and ptk_version_info() >= minimum_required_ptk_version


@functools.lru_cache(1)
def win_ansi_support():
    if ON_WINDOWS:
        try:
            from prompt_toolkit.utils import is_windows_vt100_supported, is_conemu_ansi
        except ImportError:
            return False
        return is_conemu_ansi() or is_windows_vt100_supported()
    else:
        return False


@functools.lru_cache(1)
def ptk_below_max_supported():
    ptk_max_version_cutoff = (99999, 0)  # currently, no limit.
    return ptk_version_info()[:2] < ptk_max_version_cutoff


@functools.lru_cache(1)
def best_shell_type():
    from xonsh.built_ins import XSH

    if XSH.env.get("TERM", "") == "dumb":
        return "dumb"
    if has_prompt_toolkit():
        return "prompt_toolkit"
    return "readline"


@functools.lru_cache(1)
def is_readline_available():
    """Checks if readline is available to import."""
    spec = importlib.util.find_spec("readline")
    return spec is not None


@lazyobject
def seps():
    """String of all path separators."""
    s = os.path.sep
    if os.path.altsep is not None:
        s += os.path.altsep
    return s


def pathsplit(p):
    """This is a safe version of os.path.split(), which does not work on input
    without a drive.
    """
    n = len(p)
    if n == 0:
        # lazy object seps does not get initialized when n is zero
        return "", ""
    while n and p[n - 1] not in seps:
        n -= 1
    pre = p[:n]
    pre = pre.rstrip(seps) or pre
    post = p[n:]
    return pre, post


def pathbasename(p):
    """This is a safe version of os.path.basename(), which does not work on
    input without a drive.  This version does.
    """
    return pathsplit(p)[-1]


@lazyobject
def expanduser():
    """Dispatches to the correct platform-dependent expanduser() function."""
    if ON_WINDOWS:
        return windows_expanduser
    else:
        return os.path.expanduser


def windows_expanduser(path):
    """A Windows-specific expanduser() function for xonsh. This is needed
    since os.path.expanduser() does not check on Windows if the user actually
    exists. This restricts expanding the '~' if it is not followed by a
    separator. That is only '~/' and '~\' are expanded.
    """
    path = str(path)
    if not path.startswith("~"):
        return path
    elif len(path) < 2 or path[1] in seps:
        return os.path.expanduser(path)
    else:
        return path


# termios tc(get|set)attr indexes.
IFLAG = 0
OFLAG = 1
CFLAG = 2
LFLAG = 3
ISPEED = 4
OSPEED = 5
CC = 6


#
# Dev release info
#


@functools.lru_cache(1)
def githash():
    """Returns a tuple contains two strings: the hash and the date."""
    install_base = os.path.dirname(__file__)
    githash_file = "{}/dev.githash".format(install_base)
    if not os.path.exists(githash_file):
        return None, None
    sha = None
    date_ = None
    try:
        with open(githash_file) as f:
            sha, date_ = f.read().strip().split("|")
    except ValueError:
        pass
    return sha, date_


#
# Encoding
#

DEFAULT_ENCODING = sys.getdefaultencoding()
""" Default string encoding. """


#
# Linux distro
#


@functools.lru_cache(1)
def linux_distro():
    """The id of the Linux distribution running on, possibly 'unknown'.
    None on non-Linux platforms.
    """
    if ON_LINUX:
        if distro:
            ld = distro.id()
        elif PYTHON_VERSION_INFO < (3, 6, 6):
            ld = platform.linux_distribution()[0] or "unknown"
        elif "-ARCH-" in platform.platform():
            ld = "arch"  # that's the only one we need to know for now
        else:
            ld = "unknown"
    else:
        ld = None
    return ld


#
# Windows
#


@functools.lru_cache(1)
def git_for_windows_path():
    """Returns the path to git for windows, if available and None otherwise."""
    import winreg

    try:
        key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, "SOFTWARE\\GitForWindows")
        gfwp, _ = winreg.QueryValueEx(key, "InstallPath")
    except FileNotFoundError:
        gfwp = None
    return gfwp


@functools.lru_cache(1)
def windows_bash_command():
    """Determines the command for Bash on windows."""
    # Check that bash is on path otherwise try the default directory
    # used by Git for windows
    from xonsh.built_ins import XSH

    wbc = "bash"
    cmd_cache = XSH.commands_cache
    bash_on_path = cmd_cache.lazy_locate_binary("bash", ignore_alias=True)
    if bash_on_path:
        try:
            out = subprocess.check_output(
                [bash_on_path, "--version"],
                stderr=subprocess.PIPE,
                universal_newlines=True,
            )
        except subprocess.CalledProcessError:
            bash_works = False
        else:
            # Check if Bash is from the "Windows Subsystem for Linux" (WSL)
            # which can't be used by xonsh foreign-shell/completer
            bash_works = out and "pc-linux-gnu" not in out.splitlines()[0]

        if bash_works:
            wbc = bash_on_path
        else:
            gfwp = git_for_windows_path()
            if gfwp:
                bashcmd = os.path.join(gfwp, "bin\\bash.exe")
                if os.path.isfile(bashcmd):
                    wbc = bashcmd
    return wbc


#
# Environment variables defaults
#

if ON_WINDOWS:

    class OSEnvironCasePreserving(cabc.MutableMapping):
        """Case-preserving wrapper for os.environ on Windows.
        It uses nt.environ to get the correct cased keys on
        initialization. It also preserves the case of any variables
        add after initialization.
        """

        def __init__(self):
            import nt

            self._upperkeys = dict((k.upper(), k) for k in nt.environ)

        def _sync(self):
            """Ensure that the case sensitive map of the keys are
            in sync with os.environ
            """
            envkeys = set(os.environ.keys())
            for key in envkeys.difference(self._upperkeys):
                self._upperkeys[key] = key.upper()
            for key in set(self._upperkeys).difference(envkeys):
                del self._upperkeys[key]

        def __contains__(self, k):
            self._sync()
            return k.upper() in self._upperkeys

        def __len__(self):
            self._sync()
            return len(self._upperkeys)

        def __iter__(self):
            self._sync()
            return iter(self._upperkeys.values())

        def __getitem__(self, k):
            self._sync()
            return os.environ[k]

        def __setitem__(self, k, v):
            self._sync()
            self._upperkeys[k.upper()] = k
            os.environ[k] = v

        def __delitem__(self, k):
            self._sync()
            if k.upper() in self._upperkeys:
                del self._upperkeys[k.upper()]
                del os.environ[k]

        def getkey_actual_case(self, k):
            self._sync()
            return self._upperkeys.get(k.upper())


@lazyobject
def os_environ():
    """This dispatches to the correct, case-sensitive version of os.environ.
    This is mainly a problem for Windows. See #2024 for more details.
    This can probably go away once support for Python v3.5 or v3.6 is
    dropped.
    """
    if ON_WINDOWS:
        return OSEnvironCasePreserving()
    else:
        return os.environ


@functools.lru_cache(1)
def bash_command():
    """Determines the command for Bash on the current platform."""
    if ON_WINDOWS:
        bc = windows_bash_command()
    else:
        bc = "bash"
    return bc


@lazyobject
def BASH_COMPLETIONS_DEFAULT():
    """A possibly empty tuple with default paths to Bash completions known for
    the current platform.
    """
    if ON_LINUX or ON_CYGWIN or ON_MSYS:
        bcd = ("/usr/share/bash-completion/bash_completion",)
    elif ON_DARWIN:
        bcd = (
            "/usr/local/share/bash-completion/bash_completion",  # v2.x
            "/usr/local/etc/bash_completion",
        )  # v1.x
    elif ON_WINDOWS and git_for_windows_path():
        bcd = (
            os.path.join(
                git_for_windows_path(), "usr\\share\\bash-completion\\bash_completion"
            ),
            os.path.join(
                git_for_windows_path(),
                "mingw64\\share\\git\\completion\\" "git-completion.bash",
            ),
        )
    else:
        bcd = ()
    return bcd


@lazyobject
def PATH_DEFAULT():
    if ON_LINUX or ON_CYGWIN or ON_MSYS:
        if linux_distro() == "arch":
            pd = (
                "/usr/local/sbin",
                "/usr/local/bin",
                "/usr/bin",
                "/usr/bin/site_perl",
                "/usr/bin/vendor_perl",
                "/usr/bin/core_perl",
            )
        else:
            pd = (
                os.path.expanduser("~/bin"),
                "/usr/local/sbin",
                "/usr/local/bin",
                "/usr/sbin",
                "/usr/bin",
                "/sbin",
                "/bin",
                "/usr/games",
                "/usr/local/games",
            )
    elif ON_DARWIN:
        pd = ("/usr/local/bin", "/usr/bin", "/bin", "/usr/sbin", "/sbin")
    elif ON_WINDOWS:
        import winreg

        key = winreg.OpenKey(
            winreg.HKEY_LOCAL_MACHINE,
            r"SYSTEM\CurrentControlSet\Control\Session Manager\Environment",
        )
        pd = tuple(winreg.QueryValueEx(key, "Path")[0].split(os.pathsep))
    else:
        pd = ()
    return pd


#
# libc
#
@lazyobject
def LIBC():
    """The platform dependent libc implementation."""
    global ctypes
    if ON_DARWIN:
        import ctypes.util

        libc = ctypes.CDLL(ctypes.util.find_library("c"))
    elif ON_CYGWIN:
        libc = ctypes.CDLL("cygwin1.dll")
    elif ON_MSYS:
        libc = ctypes.CDLL("msys-2.0.dll")
    elif ON_FREEBSD:
        try:
            libc = ctypes.CDLL("libc.so.7")
        except OSError:
            libc = None
    elif ON_BSD:
        try:
            libc = ctypes.CDLL("libc.so")
        except AttributeError:
            libc = None
        except OSError:
            # OS X; can't use ctypes.util.find_library because that creates
            # a new process on Linux, which is undesirable.
            try:
                libc = ctypes.CDLL("libc.dylib")
            except OSError:
                libc = None
    elif ON_POSIX:
        try:
            libc = ctypes.CDLL("libc.so")
        except AttributeError:
            libc = None
        except OSError:
            # Debian and derivatives do the wrong thing because /usr/lib/libc.so
            # is a GNU ld script rather than an ELF object. To get around this, we
            # have to be more specific.
            # We don't want to use ctypes.util.find_library because that creates a
            # new process on Linux. We also don't want to try too hard because at
            # this point we're already pretty sure this isn't Linux.
            try:
                libc = ctypes.CDLL("libc.so.6")
            except OSError:
                libc = None
        if not hasattr(libc, "sysinfo"):
            # Not Linux.
            libc = None
    elif ON_WINDOWS:
        if hasattr(ctypes, "windll") and hasattr(ctypes.windll, "kernel32"):
            libc = ctypes.windll.kernel32
        else:
            try:
                # Windows CE uses the cdecl calling convention.
                libc = ctypes.CDLL("coredll.lib")
            except (AttributeError, OSError):
                libc = None
    elif ON_BEOS:
        libc = ctypes.CDLL("libroot.so")
    else:
        libc = None
    return libc

#
# pretty
#
# -*- coding: utf-8 -*-
"""
Python advanced pretty printer.  This pretty printer is intended to
replace the old `pprint` python module which does not allow developers
to provide their own pretty print callbacks.

This module is based on ruby's `prettyprint.rb` library by `Tanaka Akira`.

The following implementations were forked from the IPython project:
* Copyright (c) 2008-2014, IPython Development Team
* Copyright (C) 2001-2007 Fernando Perez <fperez@colorado.edu>
* Copyright (c) 2001, Janko Hauser <jhauser@zscout.de>
* Copyright (c) 2001, Nathaniel Gray <n8gray@caltech.edu>

Example Usage
-------------

To directly print the representation of an object use `pprint`::

    from pretty import pretty_print
    pretty_pprint(complex_object)

To get a string of the output use `pretty`::

    from pretty import pretty
    string = pretty(complex_object)


Extending
---------

The pretty library allows developers to add pretty printing rules for their
own objects.  This process is straightforward.  All you have to do is to
add a `_repr_pretty_` method to your object and call the methods on the
pretty printer passed::

    class MyObject(object):

        def _repr_pretty_(self, p, cycle):
            ...

Here is an example implementation of a `_repr_pretty_` method for a list
subclass::

    class MyList(list):

        def _repr_pretty_(self, p, cycle):
            if cycle:
                p.text('MyList(...)')
            else:
                with p.group(8, 'MyList([', '])'):
                    for idx, item in enumerate(self):
                        if idx:
                            p.text(',')
                            p.breakable()
                        p.pretty(item)

The `cycle` parameter is `True` if pretty detected a cycle.  You *have* to
react to that or the result is an infinite loop.  `p.text()` just adds
non breaking text to the output, `p.breakable()` either adds a whitespace
or breaks here.  If you pass it an argument it's used instead of the
default space.  `p.pretty` prettyprints another object using the pretty print
method.

The first parameter to the `group` function specifies the extra indentation
of the next line.  In this example the next item will either be on the same
line (if the items are short enough) or aligned with the right edge of the
opening bracket of `MyList`.

If you just want to indent something you can use the group function
without open / close parameters.  You can also use this code::

    with p.indent(2):
        ...


:copyright: 2007 by Armin Ronacher.
            Portions (c) 2009 by Robert Kern.
:license: BSD License.
"""
# amalgamated io
re = _LazyModule.load('re', 're')
# amalgamated sys
# amalgamated types
datetime = _LazyModule.load('datetime', 'datetime')
# amalgamated contextlib
collections = _LazyModule.load('collections', 'collections')
# amalgamated xonsh.lazyasd
__all__ = [
    "pretty",
    "pretty_print",
    "PrettyPrinter",
    "RepresentationPrinter",
    "for_type",
    "for_type_by_name",
]


MAX_SEQ_LENGTH = 1000


def _safe_getattr(obj, attr, default=None):
    """Safe version of getattr.

    Same as getattr, but will return ``default`` on any Exception,
    rather than raising.
    """
    try:
        return getattr(obj, attr, default)
    except Exception:
        return default


CUnicodeIO = io.StringIO


def pretty(
    obj, verbose=False, max_width=79, newline="\n", max_seq_length=MAX_SEQ_LENGTH
):
    """
    Pretty print the object's representation.
    """
    if hasattr(obj, "xonsh_display"):
        return obj.xonsh_display()

    stream = CUnicodeIO()
    printer = RepresentationPrinter(
        stream, verbose, max_width, newline, max_seq_length=max_seq_length
    )
    printer.pretty(obj)
    printer.flush()
    return stream.getvalue()


def pretty_print(
    obj, verbose=False, max_width=79, newline="\n", max_seq_length=MAX_SEQ_LENGTH
):
    """
    Like pretty() but print to stdout.
    """
    printer = RepresentationPrinter(
        sys.stdout, verbose, max_width, newline, max_seq_length=max_seq_length
    )
    printer.pretty(obj)
    printer.flush()
    sys.stdout.write(newline)
    sys.stdout.flush()


class _PrettyPrinterBase(object):
    @contextlib.contextmanager
    def indent(self, indent):
        """with statement support for indenting/dedenting."""
        self.indentation += indent
        try:
            yield
        finally:
            self.indentation -= indent

    @contextlib.contextmanager
    def group(self, indent=0, open="", close=""):
        """like begin_group / end_group but for the with statement."""
        self.begin_group(indent, open)
        try:
            yield
        finally:
            self.end_group(indent, close)


class PrettyPrinter(_PrettyPrinterBase):
    """
    Baseclass for the `RepresentationPrinter` prettyprinter that is used to
    generate pretty reprs of objects.  Contrary to the `RepresentationPrinter`
    this printer knows nothing about the default pprinters or the `_repr_pretty_`
    callback method.
    """

    def __init__(
        self, output, max_width=79, newline="\n", max_seq_length=MAX_SEQ_LENGTH
    ):
        self.output = output
        self.max_width = max_width
        self.newline = newline
        self.max_seq_length = max_seq_length
        self.output_width = 0
        self.buffer_width = 0
        self.buffer = collections.deque()

        root_group = Group(0)
        self.group_stack = [root_group]
        self.group_queue = GroupQueue(root_group)
        self.indentation = 0

    def _break_outer_groups(self):
        while self.max_width < self.output_width + self.buffer_width:
            group = self.group_queue.deq()
            if not group:
                return
            while group.breakables:
                x = self.buffer.popleft()
                self.output_width = x.output(self.output, self.output_width)
                self.buffer_width -= x.width
            while self.buffer and isinstance(self.buffer[0], Text):
                x = self.buffer.popleft()
                self.output_width = x.output(self.output, self.output_width)
                self.buffer_width -= x.width

    def text(self, obj):
        """Add literal text to the output."""
        width = len(obj)
        if self.buffer:
            text = self.buffer[-1]
            if not isinstance(text, Text):
                text = Text()
                self.buffer.append(text)
            text.add(obj, width)
            self.buffer_width += width
            self._break_outer_groups()
        else:
            self.output.write(obj)
            self.output_width += width

    def breakable(self, sep=" "):
        """
        Add a breakable separator to the output.  This does not mean that it
        will automatically break here.  If no breaking on this position takes
        place the `sep` is inserted which default to one space.
        """
        width = len(sep)
        group = self.group_stack[-1]
        if group.want_break:
            self.flush()
            self.output.write(self.newline)
            self.output.write(" " * self.indentation)
            self.output_width = self.indentation
            self.buffer_width = 0
        else:
            self.buffer.append(Breakable(sep, width, self))
            self.buffer_width += width
            self._break_outer_groups()

    def break_(self):
        """
        Explicitly insert a newline into the output, maintaining correct indentation.
        """
        self.flush()
        self.output.write(self.newline)
        self.output.write(" " * self.indentation)
        self.output_width = self.indentation
        self.buffer_width = 0

    def begin_group(self, indent=0, open=""):
        """
        Begin a group.  If you want support for python < 2.5 which doesn't has
        the with statement this is the preferred way:

            p.begin_group(1, '{')
            ...
            p.end_group(1, '}')

        The python 2.5 expression would be this:

            with p.group(1, '{', '}'):
                ...

        The first parameter specifies the indentation for the next line (usually
        the width of the opening text), the second the opening text.  All
        parameters are optional.
        """
        if open:
            self.text(open)
        group = Group(self.group_stack[-1].depth + 1)
        self.group_stack.append(group)
        self.group_queue.enq(group)
        self.indentation += indent

    def _enumerate(self, seq):
        """like enumerate, but with an upper limit on the number of items"""
        for idx, x in enumerate(seq):
            if self.max_seq_length and idx >= self.max_seq_length:
                self.text(",")
                self.breakable()
                self.text("...")
                return
            yield idx, x

    def end_group(self, dedent=0, close=""):
        """End a group. See `begin_group` for more details."""
        self.indentation -= dedent
        group = self.group_stack.pop()
        if not group.breakables:
            self.group_queue.remove(group)
        if close:
            self.text(close)

    def flush(self):
        """Flush data that is left in the buffer."""
        for data in self.buffer:
            self.output_width += data.output(self.output, self.output_width)
        self.buffer.clear()
        self.buffer_width = 0


def _get_mro(obj_class):
    """Get a reasonable method resolution order of a class and its superclasses
    for both old-style and new-style classes.
    """
    if not hasattr(obj_class, "__mro__"):
        # Old-style class. Mix in object to make a fake new-style class.
        try:
            obj_class = type(obj_class.__name__, (obj_class, object), {})
        except TypeError:
            # Old-style extension type that does not descend from object.
            # FIXME: try to construct a more thorough MRO.
            mro = [obj_class]
        else:
            mro = obj_class.__mro__[1:-1]
    else:
        mro = obj_class.__mro__
    return mro


class RepresentationPrinter(PrettyPrinter):
    """
    Special pretty printer that has a `pretty` method that calls the pretty
    printer for a python object.

    This class stores processing data on `self` so you must *never* use
    this class in a threaded environment.  Always lock it or reinstantiate
    it.

    Instances also have a verbose flag callbacks can access to control their
    output.  For example the default instance repr prints all attributes and
    methods that are not prefixed by an underscore if the printer is in
    verbose mode.
    """

    def __init__(
        self,
        output,
        verbose=False,
        max_width=79,
        newline="\n",
        singleton_pprinters=None,
        type_pprinters=None,
        deferred_pprinters=None,
        max_seq_length=MAX_SEQ_LENGTH,
    ):

        PrettyPrinter.__init__(
            self, output, max_width, newline, max_seq_length=max_seq_length
        )
        self.verbose = verbose
        self.stack = []
        if singleton_pprinters is None:
            singleton_pprinters = _singleton_pprinters.copy()
        self.singleton_pprinters = singleton_pprinters
        if type_pprinters is None:
            type_pprinters = _type_pprinters.copy()
        self.type_pprinters = type_pprinters
        if deferred_pprinters is None:
            deferred_pprinters = _deferred_type_pprinters.copy()
        self.deferred_pprinters = deferred_pprinters

    def pretty(self, obj):
        """Pretty print the given object."""
        obj_id = id(obj)
        cycle = obj_id in self.stack
        self.stack.append(obj_id)
        self.begin_group()
        try:
            obj_class = _safe_getattr(obj, "__class__", None) or type(obj)
            # First try to find registered singleton printers for the type.
            try:
                printer = self.singleton_pprinters[obj_id]
            except (TypeError, KeyError):
                pass
            else:
                return printer(obj, self, cycle)
            # Next walk the mro and check for either:
            #   1) a registered printer
            #   2) a _repr_pretty_ method
            for cls in _get_mro(obj_class):
                if cls in self.type_pprinters:
                    # printer registered in self.type_pprinters
                    return self.type_pprinters[cls](obj, self, cycle)
                else:
                    # deferred printer
                    printer = self._in_deferred_types(cls)
                    if printer is not None:
                        return printer(obj, self, cycle)
                    else:
                        # Finally look for special method names.
                        # Some objects automatically create any requested
                        # attribute. Try to ignore most of them by checking for
                        # callability.
                        if "_repr_pretty_" in cls.__dict__:
                            meth = cls._repr_pretty_
                            if callable(meth):
                                return meth(obj, self, cycle)
            return _default_pprint(obj, self, cycle)
        finally:
            self.end_group()
            self.stack.pop()

    def _in_deferred_types(self, cls):
        """
        Check if the given class is specified in the deferred type registry.

        Returns the printer from the registry if it exists, and None if the
        class is not in the registry. Successful matches will be moved to the
        regular type registry for future use.
        """
        mod = _safe_getattr(cls, "__module__", None)
        name = _safe_getattr(cls, "__name__", None)
        key = (mod, name)
        printer = None
        if key in self.deferred_pprinters:
            # Move the printer over to the regular registry.
            printer = self.deferred_pprinters.pop(key)
            self.type_pprinters[cls] = printer
        return printer


class Printable(object):
    def output(self, stream, output_width):
        return output_width


class Text(Printable):
    def __init__(self):
        self.objs = []
        self.width = 0

    def output(self, stream, output_width):
        for obj in self.objs:
            stream.write(obj)
        return output_width + self.width

    def add(self, obj, width):
        self.objs.append(obj)
        self.width += width


class Breakable(Printable):
    def __init__(self, seq, width, pretty):
        self.obj = seq
        self.width = width
        self.pretty = pretty
        self.indentation = pretty.indentation
        self.group = pretty.group_stack[-1]
        self.group.breakables.append(self)

    def output(self, stream, output_width):
        self.group.breakables.popleft()
        if self.group.want_break:
            stream.write(self.pretty.newline)
            stream.write(" " * self.indentation)
            return self.indentation
        if not self.group.breakables:
            self.pretty.group_queue.remove(self.group)
        stream.write(self.obj)
        return output_width + self.width


class Group(Printable):
    def __init__(self, depth):
        self.depth = depth
        self.breakables = collections.deque()
        self.want_break = False


class GroupQueue(object):
    def __init__(self, *groups):
        self.queue = []
        for group in groups:
            self.enq(group)

    def enq(self, group):
        depth = group.depth
        while depth > len(self.queue) - 1:
            self.queue.append([])
        self.queue[depth].append(group)

    def deq(self):
        for stack in self.queue:
            for idx, group in enumerate(reversed(stack)):
                if group.breakables:
                    del stack[idx]
                    group.want_break = True
                    return group
            for group in stack:
                group.want_break = True
            del stack[:]

    def remove(self, group):
        try:
            self.queue[group.depth].remove(group)
        except ValueError:
            pass


@lazyobject
def _baseclass_reprs():
    try:
        br = (object.__repr__, types.InstanceType.__repr__)
    except AttributeError:  # Python 3
        br = (object.__repr__,)
    return br


def _default_pprint(obj, p, cycle):
    """
    The default print function.  Used if an object does not provide one and
    it's none of the builtin objects.
    """
    klass = _safe_getattr(obj, "__class__", None) or type(obj)
    if _safe_getattr(klass, "__repr__", None) not in _baseclass_reprs:
        # A user-provided repr. Find newlines and replace them with p.break_()
        _repr_pprint(obj, p, cycle)
        return
    p.begin_group(1, "<")
    p.pretty(klass)
    p.text(" at 0x%x" % id(obj))
    if cycle:
        p.text(" ...")
    elif p.verbose:
        first = True
        for key in dir(obj):
            if not key.startswith("_"):
                try:
                    value = getattr(obj, key)
                except AttributeError:
                    continue
                if isinstance(value, types.MethodType):
                    continue
                if not first:
                    p.text(",")
                p.breakable()
                p.text(key)
                p.text("=")
                step = len(key) + 1
                p.indentation += step
                p.pretty(value)
                p.indentation -= step
                first = False
    p.end_group(1, ">")


def _seq_pprinter_factory(start, end, basetype):
    """
    Factory that returns a pprint function useful for sequences.  Used by
    the default pprint for tuples, dicts, and lists.
    """

    def inner(obj, p, cycle):
        typ = type(obj)
        if (
            basetype is not None
            and typ is not basetype
            and typ.__repr__ != basetype.__repr__
        ):
            # If the subclass provides its own repr, use it instead.
            return p.text(typ.__repr__(obj))

        if cycle:
            return p.text(start + "..." + end)
        step = len(start)
        p.begin_group(step, start)
        for idx, x in p._enumerate(obj):
            if idx:
                p.text(",")
                p.breakable()
            p.pretty(x)
        if len(obj) == 1 and type(obj) is tuple:
            # Special case for 1-item tuples.
            p.text(",")
        p.end_group(step, end)

    return inner


def _set_pprinter_factory(start, end, basetype):
    """
    Factory that returns a pprint function useful for sets and frozensets.
    """

    def inner(obj, p, cycle):
        typ = type(obj)
        if (
            basetype is not None
            and typ is not basetype
            and typ.__repr__ != basetype.__repr__
        ):
            # If the subclass provides its own repr, use it instead.
            return p.text(typ.__repr__(obj))

        if cycle:
            return p.text(start + "..." + end)
        if len(obj) == 0:
            # Special case.
            p.text(basetype.__name__ + "()")
        else:
            step = len(start)
            p.begin_group(step, start)
            # Like dictionary keys, we will try to sort the items if there aren't too many
            items = obj
            if not (p.max_seq_length and len(obj) >= p.max_seq_length):
                try:
                    items = sorted(obj)
                except Exception:
                    # Sometimes the items don't sort.
                    pass
            for idx, x in p._enumerate(items):
                if idx:
                    p.text(",")
                    p.breakable()
                p.pretty(x)
            p.end_group(step, end)

    return inner


def _dict_pprinter_factory(start, end, basetype=None):
    """
    Factory that returns a pprint function used by the default pprint of
    dicts and dict proxies.
    """

    def inner(obj, p, cycle):
        typ = type(obj)
        if (
            basetype is not None
            and typ is not basetype
            and typ.__repr__ != basetype.__repr__
        ):
            # If the subclass provides its own repr, use it instead.
            return p.text(typ.__repr__(obj))

        if cycle:
            return p.text("{...}")
        p.begin_group(1, start)
        keys = obj.keys()
        # if dict isn't large enough to be truncated, sort keys before displaying
        if not (p.max_seq_length and len(obj) >= p.max_seq_length):
            try:
                keys = sorted(keys)
            except Exception:
                # Sometimes the keys don't sort.
                pass
        for idx, key in p._enumerate(keys):
            if idx:
                p.text(",")
                p.breakable()
            p.pretty(key)
            p.text(": ")
            p.pretty(obj[key])
        p.end_group(1, end)

    return inner


def _super_pprint(obj, p, cycle):
    """The pprint for the super type."""
    p.begin_group(8, "<super: ")
    p.pretty(obj.__thisclass__)
    p.text(",")
    p.breakable()
    p.pretty(obj.__self__)
    p.end_group(8, ">")


def _re_pattern_pprint(obj, p, cycle):
    """The pprint function for regular expression patterns."""
    p.text("re.compile(")
    pattern = repr(obj.pattern)
    if pattern[:1] in "uU":
        pattern = pattern[1:]
        prefix = "ur"
    else:
        prefix = "r"
    pattern = prefix + pattern.replace("\\\\", "\\")
    p.text(pattern)
    if obj.flags:
        p.text(",")
        p.breakable()
        done_one = False
        for flag in (
            "TEMPLATE",
            "IGNORECASE",
            "LOCALE",
            "MULTILINE",
            "DOTALL",
            "UNICODE",
            "VERBOSE",
            "DEBUG",
        ):
            if obj.flags & getattr(re, flag):
                if done_one:
                    p.text("|")
                p.text("re." + flag)
                done_one = True
    p.text(")")


def _type_pprint(obj, p, cycle):
    """The pprint for classes and types."""
    # Heap allocated types might not have the module attribute,
    # and others may set it to None.

    # Checks for a __repr__ override in the metaclass
    if type(obj).__repr__ is not type.__repr__:
        _repr_pprint(obj, p, cycle)
        return

    mod = _safe_getattr(obj, "__module__", None)
    try:
        name = obj.__qualname__
        if not isinstance(name, str):
            # This can happen if the type implements __qualname__ as a property
            # or other descriptor in Python 2.
            raise Exception("Try __name__")
    except Exception:
        name = obj.__name__
        if not isinstance(name, str):
            name = "<unknown type>"

    if mod in (None, "__builtin__", "builtins", "exceptions"):
        p.text(name)
    else:
        p.text(mod + "." + name)


def _repr_pprint(obj, p, cycle):
    """A pprint that just redirects to the normal repr function."""
    # Find newlines and replace them with p.break_()
    output = repr(obj)
    for idx, output_line in enumerate(output.splitlines()):
        if idx:
            p.break_()
        p.text(output_line)


def _function_pprint(obj, p, cycle):
    """Base pprint for all functions and builtin functions."""
    name = _safe_getattr(obj, "__qualname__", obj.__name__)
    mod = obj.__module__
    if mod and mod not in ("__builtin__", "builtins", "exceptions"):
        name = mod + "." + name
    p.text("<function %s>" % name)


def _exception_pprint(obj, p, cycle):
    """Base pprint for all exceptions."""
    name = getattr(obj.__class__, "__qualname__", obj.__class__.__name__)
    if obj.__class__.__module__ not in ("exceptions", "builtins"):
        name = "%s.%s" % (obj.__class__.__module__, name)
    step = len(name) + 1
    p.begin_group(step, name + "(")
    for idx, arg in enumerate(getattr(obj, "args", ())):
        if idx:
            p.text(",")
            p.breakable()
        p.pretty(arg)
    p.end_group(step, ")")


@lazyobject
def _type_pprinters():
    #: printers for builtin types
    tp = {
        int: _repr_pprint,
        float: _repr_pprint,
        str: _repr_pprint,
        tuple: _seq_pprinter_factory("(", ")", tuple),
        list: _seq_pprinter_factory("[", "]", list),
        dict: _dict_pprinter_factory("{", "}", dict),
        set: _set_pprinter_factory("{", "}", set),
        frozenset: _set_pprinter_factory("frozenset({", "})", frozenset),
        super: _super_pprint,
        type(re.compile("")): _re_pattern_pprint,
        type: _type_pprint,
        types.FunctionType: _function_pprint,
        types.BuiltinFunctionType: _function_pprint,
        types.MethodType: _repr_pprint,
        datetime.datetime: _repr_pprint,
        datetime.timedelta: _repr_pprint,
    }
    #: the exception base
    try:
        _exception_base = BaseException
    except NameError:
        _exception_base = Exception
    tp[_exception_base] = _exception_pprint
    try:
        tp[types.DictProxyType] = _dict_pprinter_factory("<dictproxy {", "}>")
        tp[types.ClassType] = _type_pprint
        tp[types.SliceType] = _repr_pprint
    except AttributeError:  # Python 3
        tp[slice] = _repr_pprint
    try:
        tp[xrange] = _repr_pprint
        tp[long] = _repr_pprint
        tp[unicode] = _repr_pprint
    except NameError:
        tp[range] = _repr_pprint
        tp[bytes] = _repr_pprint
    return tp


#: printers for types specified by name
@lazyobject
def _deferred_type_pprinters():
    dtp = {}
    for_type_by_name("collections", "defaultdict", _defaultdict_pprint, dtp=dtp)
    for_type_by_name("collections", "OrderedDict", _ordereddict_pprint, dtp=dtp)
    for_type_by_name("collections", "deque", _deque_pprint, dtp=dtp)
    for_type_by_name("collections", "Counter", _counter_pprint, dtp=dtp)
    return dtp


def for_type(typ, func):
    """
    Add a pretty printer for a given type.
    """
    oldfunc = _type_pprinters.get(typ, None)
    if func is not None:
        # To support easy restoration of old pprinters, we need to ignore Nones.
        _type_pprinters[typ] = func
    return oldfunc


def for_type_by_name(type_module, type_name, func, dtp=None):
    """
    Add a pretty printer for a type specified by the module and name of a type
    rather than the type object itself.
    """
    if dtp is None:
        dtp = _deferred_type_pprinters
    key = (type_module, type_name)
    oldfunc = dtp.get(key, None)
    if func is not None:
        # To support easy restoration of old pprinters, we need to ignore Nones.
        dtp[key] = func
    return oldfunc


#: printers for the default singletons
_singleton_pprinters = LazyObject(
    lambda: dict.fromkeys(
        map(id, [None, True, False, Ellipsis, NotImplemented]), _repr_pprint
    ),
    globals(),
    "_singleton_pprinters",
)


def _defaultdict_pprint(obj, p, cycle):
    name = obj.__class__.__name__
    with p.group(len(name) + 1, name + "(", ")"):
        if cycle:
            p.text("...")
        else:
            p.pretty(obj.default_factory)
            p.text(",")
            p.breakable()
            p.pretty(dict(obj))


def _ordereddict_pprint(obj, p, cycle):
    name = obj.__class__.__name__
    with p.group(len(name) + 1, name + "(", ")"):
        if cycle:
            p.text("...")
        elif len(obj):
            p.pretty(list(obj.items()))


def _deque_pprint(obj, p, cycle):
    name = obj.__class__.__name__
    with p.group(len(name) + 1, name + "(", ")"):
        if cycle:
            p.text("...")
        else:
            p.pretty(list(obj))


def _counter_pprint(obj, p, cycle):
    name = obj.__class__.__name__
    with p.group(len(name) + 1, name + "(", ")"):
        if cycle:
            p.text("...")
        elif len(obj):
            p.pretty(dict(obj))

#
# xontribs_meta
#
"""
This modules is the place where one would define the xontribs.
"""

ast = _LazyModule.load('ast', 'ast')
# amalgamated functools
# amalgamated importlib.util
from pathlib import Path
# amalgamated typing
# amalgamated xonsh.lazyasd
class _XontribPkg(tp.NamedTuple):
    """Class to define package information of a xontrib.

    Attributes
    ----------
    install
        a mapping of tools with respective install commands. e.g. {"pip": "pip install xontrib"}
    license
        license type of the xontrib package
    name
        full name of the package. e.g. "xontrib-argcomplete"
    url
        URL to the homepage of the xontrib package.
    """

    install: tp.Dict[str, str]
    license: str = ""
    name: str = ""
    url: tp.Optional[str] = None


class Xontrib(tp.NamedTuple):
    """Meta class that is used to describe xontribs.

    Attributes
    ----------
    url
        url to the home page of the xontrib.
    description
        short description about the xontrib.
    package
        pkg information for installing the xontrib
    tags
        category.
    """

    url: str = ""
    description: tp.Union[str, LazyObject] = ""
    package: tp.Optional[_XontribPkg] = None
    tags: tp.Tuple[str, ...] = ()


def get_module_docstring(module: str) -> str:
    """Find the module and return its docstring without actual import"""

    spec = importlib.util.find_spec(module)
    if spec and spec.has_location and spec.origin:
        return ast.get_docstring(ast.parse(Path(spec.origin).read_text())) or ""
    return ""


@functools.lru_cache()
def get_xontribs() -> tp.Dict[str, Xontrib]:
    """Return xontrib definitions lazily."""
    return define_xontribs()


def define_xontribs():
    """Xontrib registry."""
    core_pkg = _XontribPkg(
        name="xonsh",
        license="BSD 3-clause",
        install={
            "conda": "conda install -c conda-forge xonsh",
            "pip": "xpip install xonsh",
            "aura": "sudo aura -A xonsh",
            "yaourt": "yaourt -Sa xonsh",
        },
        url="http://xon.sh",
    )
    return {
        "abbrevs": Xontrib(
            url="http://xon.sh",
            description=lazyobject(lambda: get_module_docstring("xontrib.abbrevs")),
            package=core_pkg,
        ),
        "apt_tabcomplete": Xontrib(
            url="https://github.com/DangerOnTheRanger/xonsh-apt-tabcomplete",
            description="Adds tabcomplete functionality to "
            "apt-get/apt-cache inside of xonsh.",
            package=_XontribPkg(
                name="xonsh-apt-tabcomplete",
                license="BSD 2-clause",
                install={"pip": "xpip install xonsh-apt-tabcomplete"},
                url="https://github.com/DangerOnTheRanger/xonsh-apt-tabcomplete",
            ),
        ),
        "argcomplete": Xontrib(
            url="https://github.com/anki-code/xontrib-argcomplete",
            description="Argcomplete support to tab completion of "
            "python and xonsh scripts in xonsh.",
            package=_XontribPkg(
                name="xontrib-argcomplete",
                license="BSD",
                install={"pip": "xpip install xontrib-argcomplete"},
                url="https://github.com/anki-code/xontrib-argcomplete",
            ),
        ),
        "autojump": Xontrib(
            url="https://github.com/wshanks/xontrib-autojump",
            description="autojump support for xonsh",
        ),
        "autovox": Xontrib(
            url="http://xon.sh",
            description="Manages automatic activation of virtual " "environments.",
            package=core_pkg,
        ),
        "autoxsh": Xontrib(
            url="https://github.com/Granitas/xonsh-autoxsh",
            description="Adds automatic execution of xonsh script files "
            "called ``.autoxsh`` when enterting a directory "
            "with ``cd`` function",
            package=_XontribPkg(
                name="xonsh-autoxsh",
                license="GPLv3",
                install={"pip": "xpip install xonsh-autoxsh"},
                url="https://github.com/Granitas/xonsh-autoxsh",
            ),
        ),
        "avox": Xontrib(
            url="https://github.com/AstraLuma/xontrib-avox",
            description="Policy for autovox based on project directories",
            package=_XontribPkg(
                name="xontrib-avox",
                license="GPLv3",
                install={"pip": "xpip install xontrib-avox"},
                url="https://github.com/AstraLuma/xontrib-avox",
            ),
        ),
        "avox_poetry": Xontrib(
            url="github.com/jnoortheen/xontrib-avox-poetry",
            description="auto-activate venv as one cd into a poetry project folder. "
            "Activate ``.venv`` inside the project folder is also supported.",
            package=_XontribPkg(
                name="xontrib-avox-poetry",
                license="MIT",
                install={"pip": "xpip install xontrib-avox-poetry"},
                url="https://github.com/jnoortheen/xontrib-avox-poetry",
            ),
        ),
        "back2dir": Xontrib(
            url="https://github.com/anki-code/xontrib-back2dir",
            description="Return to the most recently used directory when "
            "starting the xonsh shell. For example, if you "
            "were in the '/work' directory when you last "
            "exited xonsh, then your next xonsh session will "
            "start in the '/work' directory, instead of your "
            "home directory.",
            package=_XontribPkg(
                name="xontrib-back2dir",
                license="BSD",
                install={"pip": "xpip install xontrib-back2dir"},
                url="https://github.com/anki-code/xontrib-back2dir",
            ),
        ),
        "base16_shell": Xontrib(
            url="https://github.com/ErickTucto/xontrib-base16-shell",
            description="Change base16 shell themes",
        ),
        "bashisms": Xontrib(
            url="http://xon.sh",
            description="Enables additional Bash-like syntax while at the "
            "command prompt. For example, the ``!!`` syntax "
            "for running the previous command is now usable. "
            "Note that these features are implemented as "
            "precommand events and these additions do not "
            "affect the xonsh language when run as script. "
            "That said, you might find them useful if you "
            "have strong muscle memory.\n"
            "\n"
            "**Warning:** This xontrib may modify user "
            "command line input to implement its behavior. To "
            "see the modifications as they are applied (in "
            "unified diffformat), please set ``$XONSH_DEBUG`` "
            "to ``2`` or higher.\n"
            "\n"
            "The xontrib also adds commands: ``alias``, "
            "``export``, ``unset``, ``set``, ``shopt``, "
            "``complete``.",
            package=core_pkg,
        ),
        "broot": Xontrib(
            url="github.com/jnoortheen/xontrib-broot",
            description="supports broot with br alias",
            package=_XontribPkg(
                name="xontrib-broot",
                license="MIT",
                install={"pip": "xpip install xontrib-broot"},
                url="https://github.com/jnoortheen/xontrib-broot",
            ),
        ),
        "powerline3": Xontrib(
            url="github.com/jnoortheen/xontrib-powerline3",
            description="Powerline theme with native $PROMPT_FIELDS support.",
            package=_XontribPkg(
                name="xontrib-powerline3",
                license="MIT",
                install={"pip": "xpip install xontrib-powerline3"},
                url="https://github.com/jnoortheen/xontrib-broot",
            ),
        ),
        "cd": Xontrib(
            url="https://github.com/eugenesvk/xontrib-cd",
            description="'cd' to any path without escaping in xonsh shell "
            "('cd '→'cd! ')",
            package=_XontribPkg(
                name="xontrib-cd",
                license="MIT",
                install={"pip": "xpip install xontrib-cd"},
                url="https://github.com/eugenesvk/xontrib-cd",
            ),
        ),
        "cmd_done": Xontrib(
            url="https://github.com/jnoortheen/xontrib-cmd-durations",
            description="send notification once long-running command is "
            "finished. Adds `long_cmd_duration` field to "
            "$PROMPT_FIELDS. Note: It needs `xdotool` "
            "installed to detect current window.",
            package=_XontribPkg(
                name="xontrib-cmd-durations",
                license="MIT",
                install={"pip": "xpip install xontrib-cmd-durations"},
                url="https://github.com/jnoortheen/xontrib-cmd-durations",
            ),
        ),
        "commands": Xontrib(
            url="https://github.com/jnoortheen/xontrib-commands",
            description="Some useful commands/aliases to use with Xonsh shell",
            package=_XontribPkg(
                name="xontrib-commands",
                license="MIT",
                install={"pip": "xpip install xontrib-commands"},
                url="https://github.com/jnoortheen/xontrib-commands",
            ),
        ),
        "coreutils": Xontrib(
            url="http://xon.sh",
            description="Additional core utilities that are implemented "
            "in xonsh. The current list includes:\n"
            "\n"
            "* cat\n"
            "* echo\n"
            "* pwd\n"
            "* tee\n"
            "* tty\n"
            "* yes\n"
            "\n"
            "In many cases, these may have a lower "
            "performance overhead than the posix command "
            "line utility with the same name. This is "
            "because these tools avoid the need for a full "
            "subprocess call. Additionally, these tools are "
            "cross-platform.",
            package=core_pkg,
        ),
        "direnv": Xontrib(
            url="https://github.com/74th/xonsh-direnv",
            description="Supports direnv.",
            package=_XontribPkg(
                name="xonsh-direnv",
                license="MIT",
                install={"pip": "xpip install xonsh-direnv"},
                url="https://github.com/74th/xonsh-direnv",
            ),
        ),
        "distributed": Xontrib(
            url="http://xon.sh",
            description="The distributed parallel computing library "
            "hooks for xonsh. Importantly this provides a "
            "substitute 'dworker' command which enables "
            "distributed workers to have access to xonsh "
            "builtins.\n"
            "\n"
            "Furthermore, this xontrib adds a 'DSubmitter' "
            "context manager for executing a block "
            "remotely. Moreover, this also adds a "
            "convenience function 'dsubmit()' for creating "
            "DSubmitter and Executor instances at the same "
            "time. Thus users may submit distributed jobs "
            "with::\n"
            "\n"
            "    with dsubmit('127.0.0.1:8786', rtn='x') "
            "as dsub:\n"
            "        x = $(echo I am elsewhere)\n"
            "\n"
            "    res = dsub.future.result()\n"
            "    print(res)\n"
            "\n"
            "This is useful for long running or "
            "non-blocking jobs.",
            package=core_pkg,
        ),
        "docker_tabcomplete": Xontrib(
            url="https://github.com/xsteadfastx/xonsh-docker-tabcomplete",
            description="Adds tabcomplete functionality to " "docker inside of xonsh.",
            package=_XontribPkg(
                name="xonsh-docker-tabcomplete",
                license="MIT",
                install={"pip": "xpip install xonsh-docker-tabcomplete"},
                url="https://github.com/xsteadfastx/xonsh-docker-tabcomplete",
            ),
        ),
        "free_cwd": Xontrib(
            url="http://xon.sh",
            description="Windows only xontrib, to release the lock on the "
            "current directory whenever the prompt is shown. "
            "Enabling this will allow the other programs or "
            "Windows Explorer to delete or rename the current "
            "or parent directories. Internally, it is "
            "accomplished by temporarily resetting CWD to the "
            "root drive folder while waiting at the prompt. "
            "This only works with the prompt_toolkit backend "
            "and can cause cause issues if any extensions are "
            "enabled that hook the prompt and relies on "
            "``os.getcwd()``",
            package=core_pkg,
        ),
        "fzf-widgets": Xontrib(
            url="https://github.com/laloch/xontrib-fzf-widgets",
            description="Adds some fzf widgets to your xonsh shell.",
            package=_XontribPkg(
                name="xontrib-fzf-widgets",
                license="GPLv3",
                install={"pip": "xpip install xontrib-fzf-widgets"},
                url="https://github.com/laloch/xontrib-fzf-widgets",
            ),
        ),
        "gitinfo": Xontrib(
            url="https://github.com/dyuri/xontrib-gitinfo",
            description="Displays git information on entering a repository "
            "folder. Uses ``onefetch`` if available.",
            package=_XontribPkg(
                name="xontrib-gitinfo",
                license="MIT",
                install={"pip": "xpip install xontrib-gitinfo"},
                url="https://github.com/dyuri/xontrib-gitinfo",
            ),
        ),
        "history_encrypt": Xontrib(
            url="https://github.com/anki-code/xontrib-history-encrypt",
            description="History backend that encrypt the xonsh shell commands history "
            "to prevent leaking sensitive data.",
            package=_XontribPkg(
                name="xontrib-history-encrypt",
                license="MIT",
                install={"pip": "xpip install xontrib-history-encrypt"},
                url="https://github.com/anki-code/xontrib-history-encrypt",
            ),
        ),
        "hist_navigator": Xontrib(
            url="https://github.com/jnoortheen/xontrib-hist-navigator",
            description="Move through directory history with nextd "
            "and prevd also with keybindings.",
            package=_XontribPkg(
                name="xontrib-hist-navigator",
                license="MIT",
                install={"pip": "xpip install xontrib-hist-navigator"},
                url="https://github.com/jnoortheen/xontrib-hist-navigator",
            ),
        ),
        "histcpy": Xontrib(
            url="https://github.com/con-f-use/xontrib-histcpy",
            description="Useful aliases and shortcuts for extracting links "
            "and textfrom command output history and putting "
            "them into the clipboard.",
            package=_XontribPkg(
                name="xontrib-histcpy",
                license="GPLv3",
                install={"pip": "xpip install xontrib-histcpy"},
                url="https://github.com/con-f-use/xontrib-histcpy",
            ),
        ),
        "jedi": Xontrib(
            url="http://xon.sh",
            description="Use Jedi as xonsh's python completer.",
            package=core_pkg,
        ),
        "kitty": Xontrib(
            url="https://github.com/scopatz/xontrib-kitty",
            description="Xonsh hooks for the Kitty terminal emulator.",
            package=_XontribPkg(
                name="xontrib-kitty",
                license="BSD-3-Clause",
                install={
                    "conda": "conda install -c conda-forge " "xontrib-kitty",
                    "pip": "xpip install xontrib-kitty",
                },
                url="https://github.com/scopatz/xontrib-kitty",
            ),
        ),
        "linuxbrew": Xontrib(
            url="https://github.com/eugenesvk/xontrib-linuxbrew",
            description="Add Homebrew's shell environment to xonsh shell on Linux",
            package=_XontribPkg(
                name="xontrib-linuxbrew",
                license="MIT",
                install={"pip": "xpip install xontrib-linuxbrew"},
                url="https://github.com/eugenesvk/xontrib-linuxbrew",
            ),
        ),
        "macro_lib": Xontrib(
            url="https://github.com/anki-code/xontrib-macro-lib",
            description="Library of the useful macros for the xonsh shell.",
            package=_XontribPkg(
                name="xontrib-macro-lib",
                license="BSD",
                install={"pip": "xpip install xontrib-macro-lib"},
                url="https://github.com/anki-code/xontrib-macro-lib",
            ),
        ),
        "mpl": Xontrib(
            url="http://xon.sh",
            description="Matplotlib hooks for xonsh, including the new 'mpl' "
            "alias that displays the current figure on the screen.",
            package=core_pkg,
        ),
        "onepath": Xontrib(
            url="https://github.com/anki-code/xontrib-onepath",
            description="When you click to a file or folder in graphical "
            "OS they will be opened in associated app.The "
            "xontrib-onepath brings the same logic for the "
            "xonsh shell. Type the filename or pathwithout "
            "preceding command and an associated action will "
            "be executed. The actions are customizable.",
            package=_XontribPkg(
                name="xontrib-onepath",
                license="BSD",
                install={"pip": "xpip install xontrib-onepath"},
                url="https://github.com/anki-code/xontrib-onepath",
            ),
        ),
        "output_search": Xontrib(
            url="https://github.com/anki-code/xontrib-output-search",
            description="Get identifiers, names, paths, URLs and "
            "words from the previous command output and "
            "use them for the next command.",
            package=_XontribPkg(
                name="xontrib-output-search",
                license="BSD",
                install={"pip": "xpip install xontrib-output-search"},
                url="https://github.com/tokenizer/xontrib-output-search",
            ),
        ),
        "pdb": Xontrib(
            url="http://xon.sh",
            description="Simple built-in debugger. Runs pdb on reception of "
            "SIGUSR1 signal.",
            package=core_pkg,
        ),
        "pipeliner": Xontrib(
            url="https://github.com/anki-code/xontrib-pipeliner",
            description="Let your pipe lines flow thru the Python code " "in xonsh.",
            package=_XontribPkg(
                name="xontrib-pipeliner",
                license="MIT",
                install={"pip": "xpip install xontrib-pipeliner"},
                url="https://github.com/anki-code/xontrib-pipeliner",
            ),
        ),
        "powerline": Xontrib(
            url="https://github.com/santagada/xontrib-powerline",
            description="Powerline for Xonsh shell",
            package=_XontribPkg(
                name="xontrib-powerline",
                license="MIT",
                install={"pip": "xpip install xontrib-powerline"},
                url="https://github.com/santagada/xontrib-powerline",
            ),
        ),
        "powerline2": Xontrib(
            url="https://github.com/vaaaaanquish/xontrib-powerline2",
            description="Powerline for Xonsh shell forked from "
            "santagada/xontrib-powerline",
            package=_XontribPkg(
                name="xontrib-powerline2",
                license="MIT",
                install={"pip": "xpip install xontrib-powerline2"},
                url="https://github.com/vaaaaanquish/xontrib-powerline2",
            ),
        ),
        "powerline_binding": Xontrib(
            url="https://github.com/dyuri/xontrib-powerline-binding",
            description="Uses powerline to render the xonsh " "prompt",
            package=_XontribPkg(
                name="xontrib-powerline-binding",
                license="MIT",
                install={"pip": "xpip install xontrib-powerline-binding"},
                url="https://github.com/dyuri/xontrib-powerline-binding",
            ),
        ),
        "prompt_bar": Xontrib(
            url="https://github.com/anki-code/xontrib-prompt-bar",
            description="An elegance bar style for prompt.",
            package=_XontribPkg(
                name="xontrib-prompt-bar",
                license="MIT",
                install={"pip": "xpip install xontrib-prompt-bar"},
                url="https://github.com/anki-code/xontrib-prompt-bar",
            ),
        ),
        "prompt_ret_code": Xontrib(
            url="http://xon.sh",
            description="Adds return code info to the prompt",
            package=core_pkg,
        ),
        "prompt_starship": Xontrib(
            url="https://github.com/anki-code/xontrib-prompt-starship",
            description="Starship prompt in xonsh shell.",
            package=_XontribPkg(
                name="xontrib-prompt-starship",
                license="MIT",
                install={"pip": "xpip install xontrib-prompt-starship"},
                url="https://github.com/anki-code/xontrib-prompt-starship",
            ),
        ),
        "prompt_vi_mode": Xontrib(
            url="https://github.com/t184256/xontrib-prompt-vi-mode",
            description="vi-mode status formatter for xonsh prompt",
            package=_XontribPkg(
                name="xontrib-prompt-vi-mode",
                license="MIT",
                install={"pip": "xpip install xontrib-prompt-vi-mode"},
                url="https://github.com/t184256/xontrib-prompt-vi-mode",
            ),
        ),
        "pyenv": Xontrib(
            url="https://github.com/dyuri/xontrib-pyenv",
            description="pyenv integration for xonsh.",
            package=_XontribPkg(
                name="xontrib-pyenv",
                license="MIT",
                install={"pip": "xpip install xontrib-pyenv"},
                url="https://github.com/dyuri/xontrib-pyenv",
            ),
        ),
        "readable-traceback": Xontrib(
            url="https://github.com/6syun9/xontrib-readable-traceback",
            description="Make traceback easier to see for " "xonsh.",
            package=_XontribPkg(
                name="xontrib-readable-traceback",
                license="MIT",
                install={"pip": "xpip install xontrib-readable-traceback"},
                url="https://github.com/6syun9/xontrib-readable-traceback",
            ),
        ),
        "schedule": Xontrib(
            url="https://github.com/AstraLuma/xontrib-schedule",
            description="Xonsh Task Scheduling",
            package=_XontribPkg(
                name="xontrib-schedule",
                license="MIT",
                install={"pip": "xpip install xontrib-schedule"},
                url="https://github.com/AstraLuma/xontrib-schedule",
            ),
        ),
        "scrapy_tabcomplete": Xontrib(
            url="https://github.com/Granitas/xonsh-scrapy-tabcomplete",
            description="Adds tabcomplete functionality to " "scrapy inside of xonsh.",
            package=_XontribPkg(
                name="xonsh-scrapy-tabcomplete",
                license="GPLv3",
                install={"pip": "xpip install xonsh-scrapy-tabcomplete"},
                url="https://github.com/Granitas/xonsh-scrapy-tabcomplete",
            ),
        ),
        "sh": Xontrib(
            url="https://github.com/anki-code/xontrib-sh",
            description="Paste and run commands from bash, zsh, fish in xonsh "
            "shell.",
            package=_XontribPkg(
                name="xontrib-sh",
                license="MIT",
                install={"pip": "xpip install xontrib-sh"},
                url="https://github.com/anki-code/xontrib-sh",
            ),
        ),
        "ssh_agent": Xontrib(
            url="https://github.com/dyuri/xontrib-ssh-agent",
            description="ssh-agent integration",
            package=_XontribPkg(
                name="xontrib-ssh-agent",
                license="MIT",
                install={"pip": "xpip install xontrib-ssh-agent"},
                url="https://github.com/dyuri/xontrib-ssh-agent",
            ),
        ),
        "tcg": Xontrib(
            url="https://github.com/zasdfgbnm/tcg/tree/master/shells/xonsh",
            description="tcg integration.",
            package=_XontribPkg(
                name="xonsh-tcg",
                license="MIT",
                install={"pip": "xpip install xonsh-tcg"},
                url="https://github.com/zasdfgbnm/tcg/tree/master/shells/xonsh",
            ),
        ),
        "vox": Xontrib(
            url="http://xon.sh",
            description="Python virtual environment manager for xonsh.",
            package=core_pkg,
        ),
        "vox_tabcomplete": Xontrib(
            url="https://github.com/Granitosaurus/xonsh-vox-tabcomplete",
            description="Adds tabcomplete functionality to vox " "inside of xonsh.",
            package=_XontribPkg(
                name="xonsh-vox-tabcomplete",
                license="GPLv3",
                install={"pip": "xpip install xonsh-vox-tabcomplete"},
                url="https://github.com/Granitosaurus/xonsh-vox-tabcomplete",
            ),
        ),
        "whole_word_jumping": Xontrib(
            url="http://xon.sh",
            description="Jumping across whole words "
            "(non-whitespace) with Ctrl+Left/Right. "
            "Alt+Left/Right remains unmodified to "
            "jump over smaller word segments. "
            "Shift+Delete removes the whole word.",
            package=core_pkg,
        ),
        "xo": Xontrib(
            url="https://github.com/scopatz/xo",
            description="Adds an 'xo' alias to run the exofrills text editor in "
            "the current Python interpreter session. This shaves "
            "off a bit of the startup time when running your "
            "favorite, minimal text editor.",
            package=_XontribPkg(
                name="exofrills",
                license="WTFPL",
                install={
                    "conda": "conda install -c conda-forge xo",
                    "pip": "xpip install exofrills",
                },
                url="http://exofrills.org",
            ),
        ),
        "xog": Xontrib(
            url="http://xon.sh",
            description="Adds a simple command to establish and print "
            "temporary traceback log file.",
            package=core_pkg,
        ),
        "xpg": Xontrib(
            url="https://github.com/fengttt/xsh/tree/master/py",
            description="Run/plot/explain sql query for PostgreSQL.",
            package=_XontribPkg(
                name="xontrib-xpg",
                license="Apache",
                install={"pip": "xpip install xontrib-xpg"},
                url="https://github.com/fengttt/xsh/py",
            ),
        ),
        "z": Xontrib(
            url="https://github.com/AstraLuma/xontrib-z",
            description="Tracks your most used directories, based on 'frecency'.",
            package=_XontribPkg(
                name="xontrib-z",
                license="GPLv3",
                install={"pip": "xpip install xontrib-z"},
                url="https://github.com/AstraLuma/xontrib-z",
            ),
        ),
        "zoxide": Xontrib(
            url="https://github.com/dyuri/xontrib-zoxide",
            description="Zoxide integration for xonsh.",
            package=_XontribPkg(
                name="xontrib-zoxide",
                license="MIT",
                install={"pip": "xpip install xontrib-zoxide"},
                url="https://github.com/dyuri/xontrib-zoxide",
            ),
        ),
    }

#
# codecache
#
"""Tools for caching xonsh code."""
hashlib = _LazyModule.load('hashlib', 'hashlib')
marshal = _LazyModule.load('marshal', 'marshal')
# amalgamated os
# amalgamated sys
from xonsh import __version__ as XONSH_VERSION
# amalgamated from xonsh.built_ins import XSH
# amalgamated xonsh.lazyasd
# amalgamated xonsh.platform
def _splitpath(path, sofar=()):
    folder, path = os.path.split(path)
    if path == "":
        return sofar[::-1]
    elif folder == "":
        return (sofar + (path,))[::-1]
    else:
        return _splitpath(folder, sofar + (path,))


@lazyobject
def _CHARACTER_MAP():
    cmap = {chr(o): "_%s" % chr(o + 32) for o in range(65, 91)}
    cmap.update({".": "_.", "_": "__"})
    return cmap


def _cache_renamer(path, code=False):
    if not code:
        path = os.path.realpath(path)
    o = ["".join(_CHARACTER_MAP.get(i, i) for i in w) for w in _splitpath(path)]
    o[-1] = "{}.{}".format(o[-1], sys.implementation.cache_tag)
    return o


def should_use_cache(execer, mode):
    """
    Return ``True`` if caching has been enabled for this mode (through command
    line flags or environment variables)
    """
    if mode == "exec":
        return (execer.scriptcache or execer.cacheall) and (
            XSH.env["XONSH_CACHE_SCRIPTS"] or XSH.env["XONSH_CACHE_EVERYTHING"]
        )
    else:
        return execer.cacheall or XSH.env["XONSH_CACHE_EVERYTHING"]


def run_compiled_code(code, glb, loc, mode):
    """
    Helper to run code in a given mode and context
    """
    if code is None:
        return
    if mode in {"exec", "single"}:
        func = exec
    else:
        func = eval
    func(code, glb, loc)


def get_cache_filename(fname, code=True):
    """
    Return the filename of the cache for the given filename.

    Cache filenames are similar to those used by the Mercurial DVCS for its
    internal store.

    The ``code`` switch should be true if we should use the code store rather
    than the script store.
    """
    datadir = XSH.env["XONSH_DATA_DIR"]
    cachedir = os.path.join(
        datadir, "xonsh_code_cache" if code else "xonsh_script_cache"
    )
    cachefname = os.path.join(cachedir, *_cache_renamer(fname, code=code))
    return cachefname


def update_cache(ccode, cache_file_name):
    """
    Update the cache at ``cache_file_name`` to contain the compiled code
    represented by ``ccode``.
    """
    if cache_file_name is not None:
        os.makedirs(os.path.dirname(cache_file_name), exist_ok=True)
        with open(cache_file_name, "wb") as cfile:
            cfile.write(XONSH_VERSION.encode() + b"\n")
            cfile.write(bytes(PYTHON_VERSION_INFO_BYTES) + b"\n")
            marshal.dump(ccode, cfile)


def _check_cache_versions(cfile):
    # version data should be < 1 kb
    ver = cfile.readline(1024).strip()
    if ver != XONSH_VERSION.encode():
        return False
    ver = cfile.readline(1024).strip()
    return ver == PYTHON_VERSION_INFO_BYTES


def compile_code(filename, code, execer, glb, loc, mode):
    """
    Wrapper for ``execer.compile`` to compile the given code
    """
    if not code.endswith("\n"):
        code += "\n"
    old_filename = execer.filename
    try:
        execer.filename = filename
        ccode = execer.compile(code, glbs=glb, locs=loc, mode=mode, filename=filename)
    except Exception:
        raise
    finally:
        execer.filename = old_filename
    return ccode


def script_cache_check(filename, cachefname):
    """
    Check whether the script cache for a particular file is valid.

    Returns a tuple containing: a boolean representing whether the cached code
    should be used, and the cached code (or ``None`` if the cache should not be
    used).
    """
    ccode = None
    run_cached = False
    if os.path.isfile(cachefname):
        if os.stat(cachefname).st_mtime >= os.stat(filename).st_mtime:
            with open(cachefname, "rb") as cfile:
                if not _check_cache_versions(cfile):
                    return False, None
                ccode = marshal.load(cfile)
                run_cached = True
    return run_cached, ccode


def run_script_with_cache(filename, execer, glb=None, loc=None, mode="exec"):
    """
    Run a script, using a cached version if it exists (and the source has not
    changed), and updating the cache as necessary.
    """
    run_cached = False
    use_cache = should_use_cache(execer, mode)
    cachefname = get_cache_filename(filename, code=False)
    if use_cache:
        run_cached, ccode = script_cache_check(filename, cachefname)
    if not run_cached:
        with open(filename, "r", encoding="utf-8") as f:
            code = f.read()
        ccode = compile_code(filename, code, execer, glb, loc, mode)
        update_cache(ccode, cachefname)
    run_compiled_code(ccode, glb, loc, mode)


def code_cache_name(code):
    """
    Return an appropriate spoofed filename for the given code.
    """
    if isinstance(code, str):
        code = code.encode()
    return hashlib.md5(code).hexdigest()


def code_cache_check(cachefname):
    """
    Check whether the code cache for a particular piece of code is valid.

    Returns a tuple containing: a boolean representing whether the cached code
    should be used, and the cached code (or ``None`` if the cache should not be
    used).
    """
    ccode = None
    run_cached = False
    if os.path.isfile(cachefname):
        with open(cachefname, "rb") as cfile:
            if not _check_cache_versions(cfile):
                return False, None
            ccode = marshal.load(cfile)
            run_cached = True
    return run_cached, ccode


def run_code_with_cache(code, execer, glb=None, loc=None, mode="exec"):
    """
    Run a piece of code, using a cached version if it exists, and updating the
    cache as necessary.
    """
    use_cache = should_use_cache(execer, mode)
    filename = code_cache_name(code)
    cachefname = get_cache_filename(filename, code=True)
    run_cached = False
    if use_cache:
        run_cached, ccode = code_cache_check(cachefname)
    if not run_cached:
        ccode = compile_code(filename, code, execer, glb, loc, mode)
        update_cache(ccode, cachefname)
    run_compiled_code(ccode, glb, loc, mode)

#
# lazyimps
#
"""Lazy imports that may apply across the xonsh package."""
# amalgamated os
# amalgamated importlib
# amalgamated xonsh.platform
# amalgamated xonsh.lazyasd
pygments = LazyObject(
    lambda: importlib.import_module("pygments"), globals(), "pygments"
)
pyghooks = LazyObject(
    lambda: importlib.import_module("xonsh.pyghooks"), globals(), "pyghooks"
)


@lazyobject
def pty():
    if ON_WINDOWS:
        return
    else:
        return importlib.import_module("pty")


@lazyobject
def termios():
    if ON_WINDOWS:
        return
    else:
        return importlib.import_module("termios")


@lazyobject
def fcntl():
    if ON_WINDOWS:
        return
    else:
        return importlib.import_module("fcntl")


@lazyobject
def tty():
    if ON_WINDOWS:
        return
    else:
        return importlib.import_module("tty")


@lazyobject
def _winapi():
    if ON_WINDOWS:
        import _winapi as m
    else:
        m = None
    return m


@lazyobject
def msvcrt():
    if ON_WINDOWS:
        import msvcrt as m
    else:
        m = None
    return m


@lazyobject
def winutils():
    if ON_WINDOWS:
        import xonsh.winutils as m
    else:
        m = None
    return m


@lazyobject
def macutils():
    if ON_DARWIN:
        import xonsh.macutils as m
    else:
        m = None
    return m


@lazyobject
def terminal256():
    return importlib.import_module("pygments.formatters.terminal256")


@lazyobject
def html():
    return importlib.import_module("pygments.formatters.html")


@lazyobject
def os_listxattr():
    def dummy_listxattr(*args, **kwargs):
        return []

    return getattr(os, "listxattr", dummy_listxattr)

#
# parser
#
# -*- coding: utf-8 -*-
"""Implements the xonsh parser."""
# amalgamated xonsh.lazyasd
# amalgamated xonsh.platform
@lazyobject
def Parser():
    if PYTHON_VERSION_INFO > (3, 10):
        from xonsh.parsers.v310 import Parser as p
    elif PYTHON_VERSION_INFO > (3, 9):
        from xonsh.parsers.v39 import Parser as p
    elif PYTHON_VERSION_INFO > (3, 8):
        from xonsh.parsers.v38 import Parser as p
    else:
        from xonsh.parsers.v36 import Parser as p
    return p

#
# tokenize
#
"""Tokenization help for xonsh programs.

This file is a modified version of tokenize.py form the Python 3.4 and 3.5
standard libraries (licensed under the Python Software Foundation License,
version 2), which provides tokenization help for Python programs.

It is modified to properly tokenize xonsh code, including backtick regex
path and several xonsh-specific operators.

Original file credits:
   __author__ = 'Ka-Ping Yee <ping@lfw.org>'
   __credits__ = ('GvR, ESR, Tim Peters, Thomas Wouters, Fred Drake, '
                  'Skip Montanaro, Raymond Hettinger, Trent Nelson, '
                  'Michael Foord')
"""

# amalgamated re
# amalgamated io
# amalgamated sys
codecs = _LazyModule.load('codecs', 'codecs')
# amalgamated builtins
itertools = _LazyModule.load('itertools', 'itertools')
# amalgamated collections
token = _LazyModule.load('token', 'token')
from token import (
    AMPER,
    AMPEREQUAL,
    AT,
    CIRCUMFLEX,
    CIRCUMFLEXEQUAL,
    COLON,
    COMMA,
    DEDENT,
    DOT,
    DOUBLESLASH,
    DOUBLESLASHEQUAL,
    DOUBLESTAR,
    DOUBLESTAREQUAL,
    ENDMARKER,
    EQEQUAL,
    EQUAL,
    ERRORTOKEN,
    GREATER,
    GREATEREQUAL,
    INDENT,
    LBRACE,
    LEFTSHIFT,
    LEFTSHIFTEQUAL,
    LESS,
    LESSEQUAL,
    LPAR,
    LSQB,
    MINEQUAL,
    MINUS,
    NAME,
    NEWLINE,
    NOTEQUAL,
    NUMBER,
    N_TOKENS,
    OP,
    PERCENT,
    PERCENTEQUAL,
    PLUS,
    PLUSEQUAL,
    RBRACE,
    RIGHTSHIFT,
    RIGHTSHIFTEQUAL,
    RPAR,
    RSQB,
    SEMI,
    SLASH,
    SLASHEQUAL,
    STAR,
    STAREQUAL,
    STRING,
    TILDE,
    VBAR,
    VBAREQUAL,
    tok_name,
)
# amalgamated typing
# amalgamated xonsh.lazyasd
# amalgamated xonsh.platform
HAS_WALRUS = PYTHON_VERSION_INFO > (3, 8)
if HAS_WALRUS:
    from token import COLONEQUAL  # type:ignore

cookie_re = LazyObject(
    lambda: re.compile(r"^[ \t\f]*#.*coding[:=][ \t]*([-\w.]+)", re.ASCII),
    globals(),
    "cookie_re",
)
blank_re = LazyObject(
    lambda: re.compile(br"^[ \t\f]*(?:[#\r\n]|$)", re.ASCII), globals(), "blank_re"
)

#
# token modifications
#
tok_name = tok_name.copy()
__all__ = token.__all__ + [  # type:ignore
    "COMMENT",
    "tokenize",
    "detect_encoding",
    "NL",
    "untokenize",
    "ENCODING",
    "TokenInfo",
    "TokenError",
    "SEARCHPATH",
    "ATDOLLAR",
    "ATEQUAL",
    "DOLLARNAME",
    "IOREDIRECT",
]
HAS_ASYNC = PYTHON_VERSION_INFO < (3, 7, 0)
if HAS_ASYNC:
    ASYNC = token.ASYNC  # type:ignore
    AWAIT = token.AWAIT  # type:ignore
    ADDSPACE_TOKS = (NAME, NUMBER, ASYNC, AWAIT)
else:
    ADDSPACE_TOKS = (NAME, NUMBER)  # type:ignore
del token  # must clean up token

if HAS_WALRUS:
    AUGASSIGN_OPS = r"[+\-*/%&@|^=<>:]=?"
else:
    AUGASSIGN_OPS = r"[+\-*/%&@|^=<>]=?"

COMMENT = N_TOKENS
tok_name[COMMENT] = "COMMENT"
NL = N_TOKENS + 1
tok_name[NL] = "NL"
ENCODING = N_TOKENS + 2
tok_name[ENCODING] = "ENCODING"
N_TOKENS += 3
SEARCHPATH = N_TOKENS
tok_name[N_TOKENS] = "SEARCHPATH"
N_TOKENS += 1
IOREDIRECT = N_TOKENS
tok_name[N_TOKENS] = "IOREDIRECT"
N_TOKENS += 1
DOLLARNAME = N_TOKENS
tok_name[N_TOKENS] = "DOLLARNAME"
N_TOKENS += 1
ATDOLLAR = N_TOKENS
tok_name[N_TOKENS] = "ATDOLLAR"
N_TOKENS += 1
ATEQUAL = N_TOKENS
tok_name[N_TOKENS] = "ATEQUAL"
N_TOKENS += 1
_xonsh_tokens = {
    "?": "QUESTION",
    "@=": "ATEQUAL",
    "@$": "ATDOLLAR",
    "||": "DOUBLEPIPE",
    "&&": "DOUBLEAMPER",
    "@(": "ATLPAREN",
    "!(": "BANGLPAREN",
    "![": "BANGLBRACKET",
    "$(": "DOLLARLPAREN",
    "$[": "DOLLARLBRACKET",
    "${": "DOLLARLBRACE",
    "??": "DOUBLEQUESTION",
    "@$(": "ATDOLLARLPAREN",
}

additional_parenlevs = frozenset({"@(", "!(", "![", "$(", "$[", "${", "@$("})

_glbs = globals()
for v in _xonsh_tokens.values():
    _glbs[v] = N_TOKENS
    tok_name[N_TOKENS] = v
    N_TOKENS += 1
    __all__.append(v)
del _glbs, v

EXACT_TOKEN_TYPES: tp.Dict[str, tp.Union[str, int]] = {
    "(": LPAR,
    ")": RPAR,
    "[": LSQB,
    "]": RSQB,
    ":": COLON,
    ",": COMMA,
    ";": SEMI,
    "+": PLUS,
    "-": MINUS,
    "*": STAR,
    "/": SLASH,
    "|": VBAR,
    "&": AMPER,
    "<": LESS,
    ">": GREATER,
    "=": EQUAL,
    ".": DOT,
    "%": PERCENT,
    "{": LBRACE,
    "}": RBRACE,
    "==": EQEQUAL,
    "!=": NOTEQUAL,
    "<=": LESSEQUAL,
    ">=": GREATEREQUAL,
    "~": TILDE,
    "^": CIRCUMFLEX,
    "<<": LEFTSHIFT,
    ">>": RIGHTSHIFT,
    "**": DOUBLESTAR,
    "+=": PLUSEQUAL,
    "-=": MINEQUAL,
    "*=": STAREQUAL,
    "/=": SLASHEQUAL,
    "%=": PERCENTEQUAL,
    "&=": AMPEREQUAL,
    "|=": VBAREQUAL,
    "^=": CIRCUMFLEXEQUAL,
    "<<=": LEFTSHIFTEQUAL,
    ">>=": RIGHTSHIFTEQUAL,
    "**=": DOUBLESTAREQUAL,
    "//": DOUBLESLASH,
    "//=": DOUBLESLASHEQUAL,
    "@": AT,
}
if HAS_WALRUS:
    EXACT_TOKEN_TYPES[":="] = COLONEQUAL

EXACT_TOKEN_TYPES.update(_xonsh_tokens)


[docs]class TokenInfo(collections.namedtuple("TokenInfo", "type string start end line")): def __repr__(self): annotated_type = "%d (%s)" % (self.type, tok_name[self.type]) return ( "TokenInfo(type=%s, string=%r, start=%r, end=%r, line=%r)" % self._replace(type=annotated_type) ) @property def exact_type(self): if self.type == OP and self.string in EXACT_TOKEN_TYPES: return EXACT_TOKEN_TYPES[self.string] else: return self.type
def group(*choices): return "(" + "|".join(choices) + ")" def tokany(*choices): return group(*choices) + "*" def maybe(*choices): return group(*choices) + "?" # Note: we use unicode matching for names ("\w") but ascii matching for # number literals. Whitespace = r"[ \f\t]*" Comment = r"#[^\r\n]*" Ignore = Whitespace + tokany(r"\\\r?\n" + Whitespace) + maybe(Comment) Name_RE = r"\$?\w+" Hexnumber = r"0[xX](?:_?[0-9a-fA-F])+" Binnumber = r"0[bB](?:_?[01])+" Octnumber = r"0[oO](?:_?[0-7])+" Decnumber = r"(?:0(?:_?0)*|[1-9](?:_?[0-9])*)" Intnumber = group(Hexnumber, Binnumber, Octnumber, Decnumber) Exponent = r"[eE][-+]?[0-9](?:_?[0-9])*" Pointfloat = group( r"[0-9](?:_?[0-9])*\.(?:[0-9](?:_?[0-9])*)?", r"\.[0-9](?:_?[0-9])*" ) + maybe(Exponent) Expfloat = r"[0-9](?:_?[0-9])*" + Exponent Floatnumber = group(Pointfloat, Expfloat) Imagnumber = group(r"[0-9](?:_?[0-9])*[jJ]", Floatnumber + r"[jJ]") Number = group(Imagnumber, Floatnumber, Intnumber) StringPrefix = r"(?:[bB][rR]?|[p][fFrR]?|[rR][bBpfF]?|[uU]|[fF][rR]?[p]?)?" # Tail end of ' string. Single = r"[^'\\]*(?:\\.[^'\\]*)*'" # Tail end of " string. Double = r'[^"\\]*(?:\\.[^"\\]*)*"' # Tail end of ''' string. Single3 = r"[^'\\]*(?:(?:\\.|'(?!''))[^'\\]*)*'''" # Tail end of """ string. Double3 = r'[^"\\]*(?:(?:\\.|"(?!""))[^"\\]*)*"""' Triple = group(StringPrefix + "'''", StringPrefix + '"""') # Single-line ' or " string. String = group( StringPrefix + r"'[^\n'\\]*(?:\\.[^\n'\\]*)*'", StringPrefix + r'"[^\n"\\]*(?:\\.[^\n"\\]*)*"', ) # Xonsh-specific Syntax SearchPath = r"((?:[rgp]+|@\w*)?)`([^\n`\\]*(?:\\.[^\n`\\]*)*)`" # Because of leftmost-then-longest match semantics, be sure to put the # longest operators first (e.g., if = came before ==, == would get # recognized as two instances of =). _redir_names = ("out", "all", "err", "e", "2", "a", "&", "1", "o") _redir_map = ( # stderr to stdout "err>out", "err>&1", "2>out", "err>o", "err>1", "e>out", "e>&1", "2>&1", "e>o", "2>o", "e>1", "2>1", # stdout to stderr "out>err", "out>&2", "1>err", "out>e", "out>2", "o>err", "o>&2", "1>&2", "o>e", "1>e", "o>2", "1>2", ) IORedirect = group(group(*_redir_map), "{}>>?".format(group(*_redir_names))) _redir_check_0 = set(_redir_map) _redir_check_1 = {"{}>".format(i) for i in _redir_names}.union(_redir_check_0) _redir_check_2 = {"{}>>".format(i) for i in _redir_names}.union(_redir_check_1) _redir_check = frozenset(_redir_check_2) Operator = group( r"\*\*=?", r">>=?", r"<<=?", r"!=", r"//=?", r"->", r"@\$\(?", r"\|\|", "&&", r"@\(", r"!\(", r"!\[", r"\$\(", r"\$\[", r"\${", r"\?\?", r"\?", AUGASSIGN_OPS, r"~", ) Bracket = "[][(){}]" Special = group(r"\r?\n", r"\.\.\.", r"[:;.,@]") Funny = group(Operator, Bracket, Special) PlainToken = group(IORedirect, Number, Funny, String, Name_RE, SearchPath) # First (or only) line of ' or " string. ContStr = group( StringPrefix + r"'[^\n'\\]*(?:\\.[^\n'\\]*)*" + group("'", r"\\\r?\n"), StringPrefix + r'"[^\n"\\]*(?:\\.[^\n"\\]*)*' + group('"', r"\\\r?\n"), ) PseudoExtras = group(r"\\\r?\n|\Z", Comment, Triple, SearchPath) PseudoToken = Whitespace + group( PseudoExtras, IORedirect, Number, Funny, ContStr, Name_RE ) def _compile(expr): return re.compile(expr, re.UNICODE) endpats = { "'": Single, '"': Double, "'''": Single3, '"""': Double3, "r'''": Single3, 'r"""': Double3, "b'''": Single3, 'b"""': Double3, "f'''": Single3, 'f"""': Double3, "R'''": Single3, 'R"""': Double3, "B'''": Single3, 'B"""': Double3, "F'''": Single3, 'F"""': Double3, "br'''": Single3, 'br"""': Double3, "fr'''": Single3, 'fr"""': Double3, "fp'''": Single3, 'fp"""': Double3, "bR'''": Single3, 'bR"""': Double3, "Br'''": Single3, 'Br"""': Double3, "BR'''": Single3, 'BR"""': Double3, "rb'''": Single3, 'rb"""': Double3, "rf'''": Single3, 'rf"""': Double3, "Rb'''": Single3, 'Rb"""': Double3, "Fr'''": Single3, 'Fr"""': Double3, "Fp'''": Single3, 'Fp"""': Double3, "rB'''": Single3, 'rB"""': Double3, "rF'''": Single3, 'rF"""': Double3, "RB'''": Single3, 'RB"""': Double3, "RF'''": Single3, 'RF"""': Double3, "u'''": Single3, 'u"""': Double3, "U'''": Single3, 'U"""': Double3, "p'''": Single3, 'p"""': Double3, "pr'''": Single3, 'pr"""': Double3, "pf'''": Single3, 'pf"""': Double3, "pF'''": Single3, 'pF"""': Double3, "pR'''": Single3, 'pR"""': Double3, "rp'''": Single3, 'rp"""': Double3, "Rp'''": Single3, 'Rp"""': Double3, "r": None, "R": None, "b": None, "B": None, "u": None, "U": None, "p": None, "f": None, "F": None, } triple_quoted = {} for t in ( "'''", '"""', "r'''", 'r"""', "R'''", 'R"""', "b'''", 'b"""', "B'''", 'B"""', "f'''", 'f"""', "F'''", 'F"""', "br'''", 'br"""', "Br'''", 'Br"""', "bR'''", 'bR"""', "BR'''", 'BR"""', "rb'''", 'rb"""', "rB'''", 'rB"""', "Rb'''", 'Rb"""', "RB'''", 'RB"""', "fr'''", 'fr"""', "Fr'''", 'Fr"""', "fR'''", 'fR"""', "FR'''", 'FR"""', "rf'''", 'rf"""', "rF'''", 'rF"""', "Rf'''", 'Rf"""', "RF'''", 'RF"""', "u'''", 'u"""', "U'''", 'U"""', "p'''", 'p""""', "pr'''", 'pr""""', "pR'''", 'pR""""', "rp'''", 'rp""""', "Rp'''", 'Rp""""', "pf'''", 'pf""""', "pF'''", 'pF""""', "fp'''", 'fp""""', "Fp'''", 'Fp""""', ): triple_quoted[t] = t single_quoted = {} for t in ( "'", '"', "r'", 'r"', "R'", 'R"', "b'", 'b"', "B'", 'B"', "f'", 'f"', "F'", 'F"', "br'", 'br"', "Br'", 'Br"', "bR'", 'bR"', "BR'", 'BR"', "rb'", 'rb"', "rB'", 'rB"', "Rb'", 'Rb"', "RB'", 'RB"', "fr'", 'fr"', "Fr'", 'Fr"', "fR'", 'fR"', "FR'", 'FR"', "rf'", 'rf"', "rF'", 'rF"', "Rf'", 'Rf"', "RF'", 'RF"', "u'", 'u"', "U'", 'U"', "p'", 'p"', "pr'", 'pr"', "pR'", 'pR"', "rp'", 'rp"', "Rp'", 'Rp"', "pf'", 'pf"', "pF'", 'pF"', "fp'", 'fp"', "Fp'", 'Fp"', ): single_quoted[t] = t tabsize = 8
[docs]class TokenError(Exception): pass
class StopTokenizing(Exception): pass class Untokenizer: def __init__(self): self.tokens = [] self.prev_row = 1 self.prev_col = 0 self.encoding = None def add_whitespace(self, start): row, col = start if row < self.prev_row or row == self.prev_row and col < self.prev_col: raise ValueError( "start ({},{}) precedes previous end ({},{})".format( row, col, self.prev_row, self.prev_col ) ) row_offset = row - self.prev_row if row_offset: self.tokens.append("\\\n" * row_offset) self.prev_col = 0 col_offset = col - self.prev_col if col_offset: self.tokens.append(" " * col_offset) def untokenize(self, iterable): it = iter(iterable) indents = [] startline = False for t in it: if len(t) == 2: self.compat(t, it) break tok_type, token, start, end, line = t if tok_type == ENCODING: self.encoding = token continue if tok_type == ENDMARKER: break if tok_type == INDENT: indents.append(token) continue elif tok_type == DEDENT: indents.pop() self.prev_row, self.prev_col = end continue elif tok_type in (NEWLINE, NL): startline = True elif startline and indents: indent = indents[-1] if start[1] >= len(indent): self.tokens.append(indent) self.prev_col = len(indent) startline = False self.add_whitespace(start) self.tokens.append(token) self.prev_row, self.prev_col = end if tok_type in (NEWLINE, NL): self.prev_row += 1 self.prev_col = 0 return "".join(self.tokens) def compat(self, token, iterable): indents = [] toks_append = self.tokens.append startline = token[0] in (NEWLINE, NL) prevstring = False for tok in itertools.chain([token], iterable): toknum, tokval = tok[:2] if toknum == ENCODING: self.encoding = tokval continue if toknum in ADDSPACE_TOKS: tokval += " " # Insert a space between two consecutive strings if toknum == STRING: if prevstring: tokval = " " + tokval prevstring = True else: prevstring = False if toknum == INDENT: indents.append(tokval) continue elif toknum == DEDENT: indents.pop() continue elif toknum in (NEWLINE, NL): startline = True elif startline and indents: toks_append(indents[-1]) startline = False toks_append(tokval)
[docs]def untokenize(iterable): """Transform tokens back into Python source code. It returns a bytes object, encoded using the ENCODING token, which is the first token sequence output by tokenize. Each element returned by the iterable must be a token sequence with at least two elements, a token number and token value. If only two tokens are passed, the resulting output is poor. Round-trip invariant for full input: Untokenized source will match input source exactly Round-trip invariant for limited intput: # Output bytes will tokenize the back to the input t1 = [tok[:2] for tok in tokenize(f.readline)] newcode = untokenize(t1) readline = BytesIO(newcode).readline t2 = [tok[:2] for tok in tokenize(readline)] assert t1 == t2 """ ut = Untokenizer() out = ut.untokenize(iterable) if ut.encoding is not None: out = out.encode(ut.encoding) return out
def _get_normal_name(orig_enc): """Imitates get_normal_name in tokenizer.c.""" # Only care about the first 12 characters. enc = orig_enc[:12].lower().replace("_", "-") if enc == "utf-8" or enc.startswith("utf-8-"): return "utf-8" if enc in ("latin-1", "iso-8859-1", "iso-latin-1") or enc.startswith( ("latin-1-", "iso-8859-1-", "iso-latin-1-") ): return "iso-8859-1" return orig_enc
[docs]def detect_encoding(readline): """ The detect_encoding() function is used to detect the encoding that should be used to decode a Python source file. It requires one argument, readline, in the same way as the tokenize() generator. It will call readline a maximum of twice, and return the encoding used (as a string) and a list of any lines (left as bytes) it has read in. It detects the encoding from the presence of a utf-8 bom or an encoding cookie as specified in pep-0263. If both a bom and a cookie are present, but disagree, a SyntaxError will be raised. If the encoding cookie is an invalid charset, raise a SyntaxError. Note that if a utf-8 bom is found, 'utf-8-sig' is returned. If no encoding is specified, then the default of 'utf-8' will be returned. """ try: filename = readline.__self__.name except AttributeError: filename = None bom_found = False encoding = None default = "utf-8" def read_or_stop(): try: return readline() except StopIteration: return b"" def find_cookie(line): try: # Decode as UTF-8. Either the line is an encoding declaration, # in which case it should be pure ASCII, or it must be UTF-8 # per default encoding. line_string = line.decode("utf-8") except UnicodeDecodeError: msg = "invalid or missing encoding declaration" if filename is not None: msg = "{} for {!r}".format(msg, filename) raise SyntaxError(msg) match = cookie_re.match(line_string) if not match: return None encoding = _get_normal_name(match.group(1)) try: codecs.lookup(encoding) except LookupError: # This behaviour mimics the Python interpreter if filename is None: msg = "unknown encoding: " + encoding else: msg = "unknown encoding for {!r}: {}".format(filename, encoding) raise SyntaxError(msg) if bom_found: if encoding != "utf-8": # This behaviour mimics the Python interpreter if filename is None: msg = "encoding problem: utf-8" else: msg = "encoding problem for {!r}: utf-8".format(filename) raise SyntaxError(msg) encoding += "-sig" return encoding first = read_or_stop() if first.startswith(codecs.BOM_UTF8): bom_found = True first = first[3:] default = "utf-8-sig" if not first: return default, [] encoding = find_cookie(first) if encoding: return encoding, [first] if not blank_re.match(first): return default, [first] second = read_or_stop() if not second: return default, [first] encoding = find_cookie(second) if encoding: return encoding, [first, second] return default, [first, second]
def tokopen(filename): """Open a file in read only mode using the encoding detected by detect_encoding(). """ buffer = builtins.open(filename, "rb") try: encoding, lines = detect_encoding(buffer.readline) buffer.seek(0) text = io.TextIOWrapper(buffer, encoding, line_buffering=True) text.mode = "r" return text except Exception: buffer.close() raise def _tokenize(readline, encoding, tolerant=False): lnum = parenlev = continued = 0 numchars = "0123456789" contstr, needcont = "", 0 contline = None indents = [0] # 'stashed' and 'async_*' are used for async/await parsing stashed = None async_def = False async_def_indent = 0 async_def_nl = False if encoding is not None: if encoding == "utf-8-sig": # BOM will already have been stripped. encoding = "utf-8" yield TokenInfo(ENCODING, encoding, (0, 0), (0, 0), "") while True: # loop over lines in stream try: line = readline() except StopIteration: line = b"" if encoding is not None: line = line.decode(encoding) lnum += 1 pos, max = 0, len(line) if contstr: # continued string if not line: if tolerant: # return the partial string yield TokenInfo( ERRORTOKEN, contstr, strstart, (lnum, end), contline + line ) break else: raise TokenError("EOF in multi-line string", strstart) endmatch = endprog.match(line) if endmatch: pos = end = endmatch.end(0) yield TokenInfo( STRING, contstr + line[:end], strstart, (lnum, end), contline + line ) contstr, needcont = "", 0 contline = None elif needcont and line[-2:] != "\\\n" and line[-3:] != "\\\r\n": yield TokenInfo( ERRORTOKEN, contstr + line, strstart, (lnum, len(line)), contline ) contstr = "" contline = None continue else: contstr = contstr + line contline = contline + line continue elif parenlev == 0 and not continued: # new statement if not line: break column = 0 while pos < max: # measure leading whitespace if line[pos] == " ": column += 1 elif line[pos] == "\t": column = (column // tabsize + 1) * tabsize elif line[pos] == "\f": column = 0 else: break pos += 1 if pos == max: break if line[pos] in "#\r\n": # skip comments or blank lines if line[pos] == "#": comment_token = line[pos:].rstrip("\r\n") nl_pos = pos + len(comment_token) yield TokenInfo( COMMENT, comment_token, (lnum, pos), (lnum, pos + len(comment_token)), line, ) yield TokenInfo( NL, line[nl_pos:], (lnum, nl_pos), (lnum, len(line)), line ) else: yield TokenInfo( (NL, COMMENT)[line[pos] == "#"], line[pos:], (lnum, pos), (lnum, len(line)), line, ) continue if column > indents[-1]: # count indents or dedents indents.append(column) yield TokenInfo(INDENT, line[:pos], (lnum, 0), (lnum, pos), line) while column < indents[-1]: if ( column not in indents and not tolerant ): # if tolerant, just ignore the error raise IndentationError( "unindent does not match any outer indentation level", ("<tokenize>", lnum, pos, line), ) indents = indents[:-1] if async_def and async_def_indent >= indents[-1]: async_def = False async_def_nl = False async_def_indent = 0 yield TokenInfo(DEDENT, "", (lnum, pos), (lnum, pos), line) if async_def and async_def_nl and async_def_indent >= indents[-1]: async_def = False async_def_nl = False async_def_indent = 0 else: # continued statement if not line: if tolerant: # no need to raise an error, we're done break raise TokenError("EOF in multi-line statement", (lnum, 0)) continued = 0 while pos < max: pseudomatch = _compile(PseudoToken).match(line, pos) if pseudomatch: # scan for tokens start, end = pseudomatch.span(1) spos, epos, pos = (lnum, start), (lnum, end), end if start == end: continue token, initial = line[start:end], line[start] if token in _redir_check: yield TokenInfo(IOREDIRECT, token, spos, epos, line) elif initial in numchars or ( # ordinary number initial == "." and token != "." and token != "..." ): yield TokenInfo(NUMBER, token, spos, epos, line) elif initial in "\r\n": if stashed: yield stashed stashed = None if parenlev > 0: yield TokenInfo(NL, token, spos, epos, line) else: yield TokenInfo(NEWLINE, token, spos, epos, line) if async_def: async_def_nl = True elif initial == "#": assert not token.endswith("\n") if stashed: yield stashed stashed = None yield TokenInfo(COMMENT, token, spos, epos, line) # Xonsh-specific Regex Globbing elif re.match(SearchPath, token): yield TokenInfo(SEARCHPATH, token, spos, epos, line) elif token in triple_quoted: endprog = _compile(endpats[token]) endmatch = endprog.match(line, pos) if endmatch: # all on one line pos = endmatch.end(0) token = line[start:pos] yield TokenInfo(STRING, token, spos, (lnum, pos), line) else: strstart = (lnum, start) # multiple lines contstr = line[start:] contline = line break elif ( initial in single_quoted or token[:2] in single_quoted or token[:3] in single_quoted ): if token[-1] == "\n": # continued string strstart = (lnum, start) endprog = _compile( endpats[initial] or endpats[token[1]] or endpats[token[2]] ) contstr, needcont = line[start:], 1 contline = line break else: # ordinary string yield TokenInfo(STRING, token, spos, epos, line) elif token.startswith("$") and token[1:].isidentifier(): yield TokenInfo(DOLLARNAME, token, spos, epos, line) elif initial.isidentifier(): # ordinary name if token in ("async", "await"): if async_def: yield TokenInfo( ASYNC if token == "async" else AWAIT, token, spos, epos, line, ) continue tok = TokenInfo(NAME, token, spos, epos, line) if token == "async" and not stashed: stashed = tok continue if ( HAS_ASYNC and token == "def" and ( stashed and stashed.type == NAME and stashed.string == "async" ) ): async_def = True async_def_indent = indents[-1] yield TokenInfo( ASYNC, stashed.string, stashed.start, stashed.end, stashed.line, ) stashed = None if stashed: yield stashed stashed = None yield tok elif token == "\\\n" or token == "\\\r\n": # continued stmt continued = 1 yield TokenInfo(ERRORTOKEN, token, spos, epos, line) elif initial == "\\": # continued stmt # for cases like C:\\path\\to\\file continued = 1 else: if initial in "([{": parenlev += 1 elif initial in ")]}": parenlev -= 1 elif token in additional_parenlevs: parenlev += 1 if stashed: yield stashed stashed = None yield TokenInfo(OP, token, spos, epos, line) else: yield TokenInfo( ERRORTOKEN, line[pos], (lnum, pos), (lnum, pos + 1), line ) pos += 1 if stashed: yield stashed stashed = None for _ in indents[1:]: # pop remaining indent levels yield TokenInfo(DEDENT, "", (lnum, 0), (lnum, 0), "") yield TokenInfo(ENDMARKER, "", (lnum, 0), (lnum, 0), "")
[docs]def tokenize(readline, tolerant=False): """ The tokenize() generator requires one argument, readline, which must be a callable object which provides the same interface as the readline() method of built-in file objects. Each call to the function should return one line of input as bytes. Alternately, readline can be a callable function terminating with StopIteration: readline = open(myfile, 'rb').__next__ # Example of alternate readline The generator produces 5-tuples with these members: the token type; the token string; a 2-tuple (srow, scol) of ints specifying the row and column where the token begins in the source; a 2-tuple (erow, ecol) of ints specifying the row and column where the token ends in the source; and the line on which the token was found. The line passed is the logical line; continuation lines are included. The first token sequence will always be an ENCODING token which tells you which encoding was used to decode the bytes stream. If ``tolerant`` is True, yield ERRORTOKEN with the erroneous string instead of throwing an exception when encountering an error. """ encoding, consumed = detect_encoding(readline) rl_gen = iter(readline, b"") empty = itertools.repeat(b"") return _tokenize( itertools.chain(consumed, rl_gen, empty).__next__, encoding, tolerant )
# An undocumented, backwards compatible, API for all the places in the standard # library that expect to be able to use tokenize with strings def generate_tokens(readline): return _tokenize(readline, None) def tokenize_main(): import argparse # Helper error handling routines def perror(message): print(message, file=sys.stderr) def error(message, filename=None, location=None): if location: args = (filename,) + location + (message,) perror("%s:%d:%d: error: %s" % args) elif filename: perror("%s: error: %s" % (filename, message)) else: perror("error: %s" % message) sys.exit(1) # Parse the arguments and options parser = argparse.ArgumentParser(prog="python -m tokenize") parser.add_argument( dest="filename", nargs="?", metavar="filename.py", help="the file to tokenize; defaults to stdin", ) parser.add_argument( "-e", "--exact", dest="exact", action="store_true", help="display token names using the exact type", ) args = parser.parse_args() try: # Tokenize the input if args.filename: filename = args.filename with builtins.open(filename, "rb") as f: tokens = list(tokenize(f.readline)) else: filename = "<stdin>" tokens = _tokenize(sys.stdin.readline, None) # Output the tokenization for token in tokens: token_type = token.type if args.exact: token_type = token.exact_type token_range = "%d,%d-%d,%d:" % (token.start + token.end) print("%-20s%-15s%-15r" % (token_range, tok_name[token_type], token.string)) except IndentationError as err: line, column = err.args[1][1:3] error(err.args[0], filename, (line, column)) except TokenError as err: line, column = err.args[1] error(err.args[0], filename, (line, column)) except SyntaxError as err: error(err, filename) except OSError as err: error(err) except KeyboardInterrupt: print("interrupted\n") except Exception as err: perror("unexpected error: %s" % err) raise # # tools # # -*- coding: utf-8 -*- """Misc. xonsh tools. The following implementations were forked from the IPython project: * Copyright (c) 2008-2014, IPython Development Team * Copyright (C) 2001-2007 Fernando Perez <fperez@colorado.edu> * Copyright (c) 2001, Janko Hauser <jhauser@zscout.de> * Copyright (c) 2001, Nathaniel Gray <n8gray@caltech.edu> Implementations: * decode() * encode() * cast_unicode() * safe_hasattr() * indent() """ # amalgamated collections # amalgamated collections.abc # amalgamated contextlib # amalgamated ctypes # amalgamated datetime # amalgamated functools glob = _LazyModule.load('glob', 'glob') # amalgamated itertools # amalgamated os # amalgamated pathlib # amalgamated re # amalgamated subprocess # amalgamated sys # amalgamated threading traceback = _LazyModule.load('traceback', 'traceback') warnings = _LazyModule.load('warnings', 'warnings') operator = _LazyModule.load('operator', 'operator') # amalgamated ast string = _LazyModule.load('string', 'string') # amalgamated typing shlex = _LazyModule.load('shlex', 'shlex') from xonsh import __version__ # amalgamated xonsh.lazyasd # amalgamated xonsh.platform @functools.lru_cache(1) def is_superuser(): if ON_WINDOWS: rtn = ctypes.windll.shell32.IsUserAnAdmin() != 0 else: rtn = os.getuid() == 0 return rtn @lazyobject def xsh(): from xonsh.built_ins import XSH return XSH class XonshError(Exception): pass class XonshCalledProcessError(XonshError, subprocess.CalledProcessError): """Raised when there's an error with a called process Inherits from XonshError and subprocess.CalledProcessError, catching either will also catch this error. Raised *after* iterating over stdout of a captured command, if the returncode of the command is nonzero. Example: ------- try: for line in !(ls): print(line) except subprocess.CalledProcessError as error: print("Error in process: {}.format(error.completed_command.pid)) This also handles differences between Python3.4 and 3.5 where CalledProcessError is concerned. """ def __init__( self, returncode, command, output=None, stderr=None, completed_command=None ): super().__init__(returncode, command, output) self.stderr = stderr self.completed_command = completed_command def expand_path(s, expand_user=True): """Takes a string path and expands ~ to home if expand_user is set and environment vars if EXPAND_ENV_VARS is set.""" env = xsh.env or os_environ if env.get("EXPAND_ENV_VARS", False): s = expandvars(s) if expand_user: # expand ~ according to Bash unquoted rules "Each variable assignment is # checked for unquoted tilde-prefixes immediately following a ':' or the # first '='". See the following for more details. # https://www.gnu.org/software/bash/manual/html_node/Tilde-Expansion.html pre, char, post = s.partition("=") if char: s = expanduser(pre) + char s += os.pathsep.join(map(expanduser, post.split(os.pathsep))) else: s = expanduser(s) return s def _expandpath(path): """Performs environment variable / user expansion on a given path if EXPAND_ENV_VARS is set. """ env = xsh.env or os_environ expand_user = env.get("EXPAND_ENV_VARS", False) return expand_path(path, expand_user=expand_user) def simple_random_choice(lst): """Returns random element from the list with length less than 1 million elements.""" size = len(lst) if size > 1000000: # microsecond maximum raise ValueError("The list is too long.") return lst[datetime.datetime.now().microsecond % size] def decode_bytes(b): """Tries to decode the bytes using XONSH_ENCODING if available, otherwise using sys.getdefaultencoding(). """ env = xsh.env or os_environ enc = env.get("XONSH_ENCODING") or DEFAULT_ENCODING err = env.get("XONSH_ENCODING_ERRORS") or "strict" return b.decode(encoding=enc, errors=err) def findfirst(s, substrs): """Finds whichever of the given substrings occurs first in the given string and returns that substring, or returns None if no such strings occur. """ i = len(s) result = None for substr in substrs: pos = s.find(substr) if -1 < pos < i: i = pos result = substr return i, result class EnvPath(cabc.MutableSequence): """A class that implements an environment path, which is a list of strings. Provides a custom method that expands all paths if the relevant env variable has been set. """ def __init__(self, args=None): if not args: self._l = [] else: if isinstance(args, str): self._l = args.split(os.pathsep) elif isinstance(args, pathlib.Path): self._l = [args] elif isinstance(args, bytes): # decode bytes to a string and then split based on # the default path separator self._l = decode_bytes(args).split(os.pathsep) elif isinstance(args, cabc.Iterable): # put everything in a list -before- performing the type check # in order to be able to retrieve it later, for cases such as # when a generator expression was passed as an argument args = list(args) if not all(isinstance(i, (str, bytes, pathlib.Path)) for i in args): # make TypeError's message as informative as possible # when given an invalid initialization sequence raise TypeError( "EnvPath's initialization sequence should only " "contain str, bytes and pathlib.Path entries" ) self._l = args else: raise TypeError( "EnvPath cannot be initialized with items " "of type %s" % type(args) ) def __getitem__(self, item): # handle slices separately if isinstance(item, slice): return [_expandpath(i) for i in self._l[item]] else: return _expandpath(self._l[item]) def __setitem__(self, index, item): self._l.__setitem__(index, item) def __len__(self): return len(self._l) def __delitem__(self, key): self._l.__delitem__(key) def insert(self, index, value): self._l.insert(index, value) @property def paths(self): """ Returns the list of directories that this EnvPath contains. """ return list(self) def __repr__(self): return repr(self._l) def __eq__(self, other): if len(self) != len(other): return False return all(map(operator.eq, self, other)) def _repr_pretty_(self, p, cycle): """Pretty print path list""" if cycle: p.text("EnvPath(...)") else: with p.group(1, "EnvPath(\n[", "]\n)"): for idx, item in enumerate(self): if idx: p.text(",") p.breakable() p.pretty(item) def __add__(self, other): if isinstance(other, EnvPath): other = other._l return EnvPath(self._l + other) def __radd__(self, other): if isinstance(other, EnvPath): other = other._l return EnvPath(other + self._l) def add(self, data, front=False, replace=False): """Add a value to this EnvPath, path.add(data, front=bool, replace=bool) -> ensures that path contains data, with position determined by kwargs Parameters ---------- data : string or bytes or pathlib.Path value to be added front : bool whether the value should be added to the front, will be ignored if the data already exists in this EnvPath and replace is False Default : False replace : bool If True, the value will be removed and added to the start or end(depending on the value of front) Default : False Returns ------- None """ if data not in self._l: self._l.insert(0 if front else len(self._l), data) elif replace: self._l.remove(data) self._l.insert(0 if front else len(self._l), data) @lazyobject def FORMATTER(): return string.Formatter() class DefaultNotGivenType(object): """Singleton for representing when no default value is given.""" __inst: tp.Optional["DefaultNotGivenType"] = None def __new__(cls): if DefaultNotGivenType.__inst is None: DefaultNotGivenType.__inst = object.__new__(cls) return DefaultNotGivenType.__inst DefaultNotGiven = DefaultNotGivenType() BEG_TOK_SKIPS = LazyObject( lambda: frozenset(["WS", "INDENT", "NOT", "LPAREN"]), globals(), "BEG_TOK_SKIPS" ) END_TOK_TYPES = LazyObject( lambda: frozenset(["SEMI", "AND", "OR", "RPAREN"]), globals(), "END_TOK_TYPES" ) RE_END_TOKS = LazyObject( lambda: re.compile(r"(;|and|\&\&|or|\|\||\))"), globals(), "RE_END_TOKS" ) LPARENS = LazyObject( lambda: frozenset( ["LPAREN", "AT_LPAREN", "BANG_LPAREN", "DOLLAR_LPAREN", "ATDOLLAR_LPAREN"] ), globals(), "LPARENS", ) def _is_not_lparen_and_rparen(lparens, rtok): """Tests if an RPAREN token is matched with something other than a plain old LPAREN type. """ # note that any([]) is False, so this covers len(lparens) == 0 return rtok.type == "RPAREN" and any(x != "LPAREN" for x in lparens) def balanced_parens(line, mincol=0, maxcol=None, lexer=None): """Determines if parentheses are balanced in an expression.""" line = line[mincol:maxcol] if lexer is None: lexer = xsh.execer.parser.lexer if "(" not in line and ")" not in line: return True cnt = 0 lexer.input(line) for tok in lexer: if tok.type in LPARENS: cnt += 1 elif tok.type == "RPAREN": cnt -= 1 elif tok.type == "ERRORTOKEN" and ")" in tok.value: cnt -= 1 return cnt == 0 def ends_with_colon_token(line, lexer=None): """Determines whether a line ends with a colon token, ignoring comments.""" if lexer is None: lexer = xsh.execer.parser.lexer lexer.input(line) toks = list(lexer) return len(toks) > 0 and toks[-1].type == "COLON" def find_next_break(line, mincol=0, lexer=None): """Returns the column number of the next logical break in subproc mode. This function may be useful in finding the maxcol argument of subproc_toks(). """ if mincol >= 1: line = line[mincol:] if lexer is None: lexer = xsh.execer.parser.lexer if RE_END_TOKS.search(line) is None: return None maxcol = None lparens = [] lexer.input(line) for tok in lexer: if tok.type in LPARENS: lparens.append(tok.type) elif tok.type in END_TOK_TYPES: if _is_not_lparen_and_rparen(lparens, tok): lparens.pop() else: maxcol = tok.lexpos + mincol + 1 break elif tok.type == "ERRORTOKEN" and ")" in tok.value: maxcol = tok.lexpos + mincol + 1 break elif tok.type == "BANG": maxcol = mincol + len(line) + 1 break return maxcol def _offset_from_prev_lines(line, last): lines = line.splitlines(keepends=True)[:last] return sum(map(len, lines)) def subproc_toks( line, mincol=-1, maxcol=None, lexer=None, returnline=False, greedy=False ): """Encapsulates tokens in a source code line in a uncaptured subprocess ![] starting at a minimum column. If there are no tokens (ie in a comment line) this returns None. If greedy is True, it will encapsulate normal parentheses. Greedy is False by default. """ if lexer is None: lexer = xsh.execer.parser.lexer if maxcol is None: maxcol = len(line) + 1 lexer.reset() lexer.input(line) toks = [] lparens = [] saw_macro = False end_offset = 0 for tok in lexer: pos = tok.lexpos if tok.type not in END_TOK_TYPES and pos >= maxcol: break if tok.type == "BANG": saw_macro = True if saw_macro and tok.type not in ("NEWLINE", "DEDENT"): toks.append(tok) continue if tok.type in LPARENS: lparens.append(tok.type) if greedy and len(lparens) > 0 and "LPAREN" in lparens: toks.append(tok) if tok.type == "RPAREN": lparens.pop() continue if len(toks) == 0 and tok.type in BEG_TOK_SKIPS: continue # handle indentation elif len(toks) > 0 and toks[-1].type in END_TOK_TYPES: if _is_not_lparen_and_rparen(lparens, toks[-1]): lparens.pop() # don't continue or break elif pos < maxcol and tok.type not in ("NEWLINE", "DEDENT", "WS"): if not greedy: toks.clear() if tok.type in BEG_TOK_SKIPS: continue else: break if pos < mincol: continue toks.append(tok) if tok.type == "WS" and tok.value == "\\": pass # line continuation elif tok.type == "NEWLINE": break elif tok.type == "DEDENT": # fake a newline when dedenting without a newline tok.type = "NEWLINE" tok.value = "\n" tok.lineno -= 1 if len(toks) >= 2: prev_tok_end = toks[-2].lexpos + len(toks[-2].value) else: prev_tok_end = len(line) if "#" in line[prev_tok_end:]: tok.lexpos = prev_tok_end # prevents wrapping comments else: tok.lexpos = len(line) break elif check_bad_str_token(tok): return else: if len(toks) > 0 and toks[-1].type in END_TOK_TYPES: if _is_not_lparen_and_rparen(lparens, toks[-1]): pass elif greedy and toks[-1].type == "RPAREN": pass else: toks.pop() if len(toks) == 0: return # handle comment lines tok = toks[-1] pos = tok.lexpos if isinstance(tok.value, str): end_offset = len(tok.value.rstrip()) else: el = line[pos:].split("#")[0].rstrip() end_offset = len(el) if len(toks) == 0: return # handle comment lines elif saw_macro or greedy: end_offset = len(toks[-1].value.rstrip()) + 1 if toks[0].lineno != toks[-1].lineno: # handle multiline cases end_offset += _offset_from_prev_lines(line, toks[-1].lineno) beg, end = toks[0].lexpos, (toks[-1].lexpos + end_offset) end = len(line[:end].rstrip()) rtn = "![" + line[beg:end] + "]" if returnline: rtn = line[:beg] + rtn + line[end:] return rtn def check_bad_str_token(tok): """Checks if a token is a bad string.""" if tok.type == "ERRORTOKEN" and tok.value == "EOF in multi-line string": return True elif isinstance(tok.value, str) and not check_quotes(tok.value): return True else: return False def check_quotes(s): """Checks a string to make sure that if it starts with quotes, it also ends with quotes. """ starts_as_str = RE_BEGIN_STRING.match(s) is not None ends_as_str = s.endswith('"') or s.endswith("'") if not starts_as_str and not ends_as_str: ok = True elif starts_as_str and not ends_as_str: ok = False elif not starts_as_str and ends_as_str: ok = False else: m = RE_COMPLETE_STRING.match(s) ok = m is not None return ok def _have_open_triple_quotes(s): if s.count('"""') % 2 == 1: open_triple = '"""' elif s.count("'''") % 2 == 1: open_triple = "'''" else: open_triple = False return open_triple def get_line_continuation(): """The line continuation characters used in subproc mode. In interactive mode on Windows the backslash must be preceded by a space. This is because paths on Windows may end in a backslash. """ if ON_WINDOWS and hasattr(xsh, "env") and xsh.env.get("XONSH_INTERACTIVE", False): return " \\" else: return "\\" def get_logical_line(lines, idx): """Returns a single logical line (i.e. one without line continuations) from a list of lines. This line should begin at index idx. This also returns the number of physical lines the logical line spans. The lines should not contain newlines """ n = 1 nlines = len(lines) linecont = get_line_continuation() while idx > 0 and lines[idx - 1].endswith(linecont): idx -= 1 start = idx line = lines[idx] open_triple = _have_open_triple_quotes(line) while (line.endswith(linecont) or open_triple) and idx < nlines - 1: n += 1 idx += 1 if line.endswith(linecont): line = line[:-1] + lines[idx] else: line = line + "\n" + lines[idx] open_triple = _have_open_triple_quotes(line) return line, n, start def replace_logical_line(lines, logical, idx, n): """Replaces lines at idx that may end in line continuation with a logical line that spans n lines. """ linecont = get_line_continuation() if n == 1: lines[idx] = logical return space = " " for i in range(idx, idx + n - 1): a = len(lines[i]) b = logical.find(space, a - 1) if b < 0: # no space found lines[i] = logical logical = "" else: # found space to split on lines[i] = logical[:b] + linecont logical = logical[b:] lines[idx + n - 1] = logical def is_balanced(expr, ltok, rtok): """Determines whether an expression has unbalanced opening and closing tokens.""" lcnt = expr.count(ltok) if lcnt == 0: return True rcnt = expr.count(rtok) if lcnt == rcnt: return True else: return False def subexpr_from_unbalanced(expr, ltok, rtok): """Attempts to pull out a valid subexpression for unbalanced grouping, based on opening tokens, eg. '(', and closing tokens, eg. ')'. This does not do full tokenization, but should be good enough for tab completion. """ if is_balanced(expr, ltok, rtok): return expr subexpr = expr.rsplit(ltok, 1)[-1] subexpr = subexpr.rsplit(",", 1)[-1] subexpr = subexpr.rsplit(":", 1)[-1] return subexpr def subexpr_before_unbalanced(expr, ltok, rtok): """Obtains the expression prior to last unbalanced left token.""" subexpr, _, post = expr.rpartition(ltok) nrtoks_in_post = post.count(rtok) while nrtoks_in_post != 0: for _ in range(nrtoks_in_post): subexpr, _, post = subexpr.rpartition(ltok) nrtoks_in_post = post.count(rtok) _, _, subexpr = subexpr.rpartition(rtok) _, _, subexpr = subexpr.rpartition(ltok) return subexpr @lazyobject def STARTING_WHITESPACE_RE(): return re.compile(r"^(\s*)") def starting_whitespace(s): """Returns the whitespace at the start of a string""" return STARTING_WHITESPACE_RE.match(s).group(1) def decode(s, encoding=None): encoding = encoding or DEFAULT_ENCODING return s.decode(encoding, "replace") def encode(u, encoding=None): encoding = encoding or DEFAULT_ENCODING return u.encode(encoding, "replace") def cast_unicode(s, encoding=None): if isinstance(s, bytes): return decode(s, encoding) return s def safe_hasattr(obj, attr): """In recent versions of Python, hasattr() only catches AttributeError. This catches all errors. """ try: getattr(obj, attr) return True except Exception: return False def indent(instr, nspaces=4, ntabs=0, flatten=False): """Indent a string a given number of spaces or tabstops. indent(str,nspaces=4,ntabs=0) -> indent str by ntabs+nspaces. Parameters ---------- instr : basestring The string to be indented. nspaces : int (default: 4) The number of spaces to be indented. ntabs : int (default: 0) The number of tabs to be indented. flatten : bool (default: False) Whether to scrub existing indentation. If True, all lines will be aligned to the same indentation. If False, existing indentation will be strictly increased. Returns ------- outstr : string indented by ntabs and nspaces. """ if instr is None: return ind = "\t" * ntabs + " " * nspaces if flatten: pat = re.compile(r"^\s*", re.MULTILINE) else: pat = re.compile(r"^", re.MULTILINE) outstr = re.sub(pat, ind, instr) if outstr.endswith(os.linesep + ind): return outstr[: -len(ind)] else: return outstr def get_sep(): """Returns the appropriate filepath separator char depending on OS and xonsh options set """ if ON_WINDOWS and xsh.env.get("FORCE_POSIX_PATHS"): return os.altsep else: return os.sep def fallback(cond, backup): """Decorator for returning the object if cond is true and a backup if cond is false. """ def dec(obj): return obj if cond else backup return dec # The following redirect classes were taken directly from Python 3.5's source # code (from the contextlib module). This can be removed when 3.5 is released, # although redirect_stdout exists in 3.4, redirect_stderr does not. # See the Python software license: https://docs.python.org/3/license.html # Copyright (c) Python Software Foundation. All rights reserved. class _RedirectStream: _stream: tp.Optional[str] = None def __init__(self, new_target): self._new_target = new_target # We use a list of old targets to make this CM re-entrant self._old_targets = [] def __enter__(self): self._old_targets.append(getattr(sys, self._stream)) setattr(sys, self._stream, self._new_target) return self._new_target def __exit__(self, exctype, excinst, exctb): setattr(sys, self._stream, self._old_targets.pop()) class redirect_stdout(_RedirectStream): """Context manager for temporarily redirecting stdout to another file:: # How to send help() to stderr with redirect_stdout(sys.stderr): help(dir) # How to write help() to a file with open('help.txt', 'w') as f: with redirect_stdout(f): help(pow) Mostly for backwards compatibility. """ _stream = "stdout" class redirect_stderr(_RedirectStream): """Context manager for temporarily redirecting stderr to another file.""" _stream = "stderr" def _yield_accessible_unix_file_names(path): """yield file names of executable files in path.""" if not os.path.exists(path): return for file_ in os.scandir(path): try: if file_.is_file() and os.access(file_.path, os.X_OK): yield file_.name except OSError: # broken Symlink are neither dir not files pass def _executables_in_posix(path): if not os.path.exists(path): return else: yield from _yield_accessible_unix_file_names(path) def _executables_in_windows(path): if not os.path.isdir(path): return extensions = xsh.env["PATHEXT"] try: for x in os.scandir(path): try: is_file = x.is_file() except OSError: continue if is_file: fname = x.name else: continue base_name, ext = os.path.splitext(fname) if ext.upper() in extensions: yield fname except FileNotFoundError: # On Windows, there's no guarantee for the directory to really # exist even if isdir returns True. This may happen for instance # if the path contains trailing spaces. return def executables_in(path): """Returns a generator of files in path that the user could execute.""" if ON_WINDOWS: func = _executables_in_windows else: func = _executables_in_posix try: yield from func(path) except PermissionError: return def debian_command_not_found(cmd): """Uses the debian/ubuntu command-not-found utility to suggest packages for a command that cannot currently be found. """ if not ON_LINUX: return "" cnf = xsh.commands_cache.lazyget( "command-not-found", ("/usr/lib/command-not-found",) )[0] if not os.path.isfile(cnf): return "" c = "{0} {1}; exit 0" s = subprocess.check_output( c.format(cnf, shlex.quote(cmd)), universal_newlines=True, stderr=subprocess.STDOUT, shell=True, ) s = "\n".join(s.rstrip().splitlines()).strip() return s def conda_suggest_command_not_found(cmd, env): """Uses conda-suggest to suggest packages for a command that cannot currently be found. """ try: from conda_suggest import find except ImportError: return "" return find.message_string( cmd, conda_suggest_path=env.get("CONDA_SUGGEST_PATH", None) ) def command_not_found(cmd, env): """Uses various mechanism to suggest packages for a command that cannot currently be found. """ if ON_LINUX: rtn = debian_command_not_found(cmd) else: rtn = "" conda = conda_suggest_command_not_found(cmd, env) if conda: rtn = rtn + "\n\n" + conda if rtn else conda return rtn def suggest_commands(cmd, env, aliases): """Suggests alternative commands given an environment and aliases.""" if not env.get("SUGGEST_COMMANDS"): return "" thresh = env.get("SUGGEST_THRESHOLD") max_sugg = env.get("SUGGEST_MAX_NUM") if max_sugg < 0: max_sugg = float("inf") cmd = cmd.lower() suggested = {} for alias in xsh.aliases: if alias not in suggested: if levenshtein(alias.lower(), cmd, thresh) < thresh: suggested[alias] = "Alias" for _cmd in xsh.commands_cache.all_commands: if _cmd not in suggested: if levenshtein(_cmd.lower(), cmd, thresh) < thresh: suggested[_cmd] = "Command ({0})".format(_cmd) suggested = collections.OrderedDict( sorted( suggested.items(), key=lambda x: suggestion_sort_helper(x[0].lower(), cmd) ) ) num = min(len(suggested), max_sugg) if num == 0: rtn = command_not_found(cmd, env) else: oneof = "" if num == 1 else "one of " tips = "Did you mean {}the following?".format(oneof) items = list(suggested.popitem(False) for _ in range(num)) length = max(len(key) for key, _ in items) + 2 alternatives = "\n".join( " {: <{}} {}".format(key + ":", length, val) for key, val in items ) rtn = "{}\n{}".format(tips, alternatives) c = command_not_found(cmd, env) rtn += ("\n\n" + c) if len(c) > 0 else "" return rtn def _get_manual_env_var(name, default=None): """Returns if the given variable is manually set as well as it's value.""" env = getattr(xsh, "env", None) if env is None: env = os_environ manually_set = name in env else: manually_set = env.is_manually_set(name) value = env.get(name, default) return (manually_set, value) def print_warning(msg): """Print warnings with/without traceback.""" manually_set_trace, show_trace = _get_manual_env_var("XONSH_SHOW_TRACEBACK", False) manually_set_logfile, log_file = _get_manual_env_var("XONSH_TRACEBACK_LOGFILE") if (not manually_set_trace) and (not manually_set_logfile): # Notify about the traceback output possibility if neither of # the two options have been manually set sys.stderr.write( "xonsh: For full traceback set: " "$XONSH_SHOW_TRACEBACK = True\n" ) # convert show_trace to bool if necessary if not is_bool(show_trace): show_trace = to_bool(show_trace) # if the trace option has been set, print all traceback info to stderr if show_trace: # notify user about XONSH_TRACEBACK_LOGFILE if it has # not been set manually if not manually_set_logfile: sys.stderr.write( "xonsh: To log full traceback to a file set: " "$XONSH_TRACEBACK_LOGFILE = <filename>\n" ) traceback.print_stack() # additionally, check if a file for traceback logging has been # specified and convert to a proper option if needed log_file = to_logfile_opt(log_file) if log_file: # if log_file <> '' or log_file <> None, append # traceback log there as well with open(os.path.abspath(log_file), "a") as f: traceback.print_stack(file=f) msg = msg if msg.endswith("\n") else msg + "\n" sys.stderr.write(msg) def print_exception(msg=None): """Print exceptions with/without traceback.""" manually_set_trace, show_trace = _get_manual_env_var("XONSH_SHOW_TRACEBACK", False) manually_set_logfile, log_file = _get_manual_env_var("XONSH_TRACEBACK_LOGFILE") if (not manually_set_trace) and (not manually_set_logfile): # Notify about the traceback output possibility if neither of # the two options have been manually set sys.stderr.write( "xonsh: For full traceback set: " "$XONSH_SHOW_TRACEBACK = True\n" ) # convert show_trace to bool if necessary if not is_bool(show_trace): show_trace = to_bool(show_trace) # if the trace option has been set, print all traceback info to stderr if show_trace: # notify user about XONSH_TRACEBACK_LOGFILE if it has # not been set manually if not manually_set_logfile: sys.stderr.write( "xonsh: To log full traceback to a file set: " "$XONSH_TRACEBACK_LOGFILE = <filename>\n" ) traceback.print_exc() # additionally, check if a file for traceback logging has been # specified and convert to a proper option if needed log_file = to_logfile_opt(log_file) if log_file: # if log_file <> '' or log_file <> None, append # traceback log there as well with open(os.path.abspath(log_file), "a") as f: traceback.print_exc(file=f) if not show_trace: # if traceback output is disabled, print the exception's # error message on stderr. display_error_message() if msg: msg = msg if msg.endswith("\n") else msg + "\n" sys.stderr.write(msg) def display_error_message(strip_xonsh_error_types=True): """ Prints the error message of the current exception on stderr. """ exc_type, exc_value, exc_traceback = sys.exc_info() exception_only = traceback.format_exception_only(exc_type, exc_value) if exc_type is XonshError and strip_xonsh_error_types: exception_only[0] = exception_only[0].partition(": ")[-1] sys.stderr.write("".join(exception_only)) def is_writable_file(filepath): """ Checks if a filepath is valid for writing. """ filepath = expand_path(filepath) # convert to absolute path if needed if not os.path.isabs(filepath): filepath = os.path.abspath(filepath) # cannot write to directories if os.path.isdir(filepath): return False # if the file exists and is writable, we're fine if os.path.exists(filepath): return True if os.access(filepath, os.W_OK) else False # if the path doesn't exist, isolate its directory component # and ensure that directory is writable instead return os.access(os.path.dirname(filepath), os.W_OK) # Modified from Public Domain code, by Magnus Lie Hetland # from http://hetland.org/coding/python/levenshtein.py def levenshtein(a, b, max_dist=float("inf")): """Calculates the Levenshtein distance between a and b.""" n, m = len(a), len(b) if abs(n - m) > max_dist: return float("inf") if n > m: # Make sure n <= m, to use O(min(n,m)) space a, b = b, a n, m = m, n current = range(n + 1) for i in range(1, m + 1): previous, current = current, [i] + [0] * n for j in range(1, n + 1): add, delete = previous[j] + 1, current[j - 1] + 1 change = previous[j - 1] if a[j - 1] != b[i - 1]: change = change + 1 current[j] = min(add, delete, change) return current[n] def suggestion_sort_helper(x, y): """Returns a score (lower is better) for x based on how similar it is to y. Used to rank suggestions.""" x = x.lower() y = y.lower() lendiff = len(x) + len(y) inx = len([i for i in x if i not in y]) iny = len([i for i in y if i not in x]) return lendiff + inx + iny def escape_windows_cmd_string(s): """Returns a string that is usable by the Windows cmd.exe. The escaping is based on details here and empirical testing: http://www.robvanderwoude.com/escapechars.php """ for c in '^()%!<>&|"': s = s.replace(c, "^" + c) return s def argvquote(arg, force=False): """Returns an argument quoted in such a way that that CommandLineToArgvW on Windows will return the argument string unchanged. This is the same thing Popen does when supplied with an list of arguments. Arguments in a command line should be separated by spaces; this function does not add these spaces. This implementation follows the suggestions outlined here: https://blogs.msdn.microsoft.com/twistylittlepassagesallalike/2011/04/23/everyone-quotes-command-line-arguments-the-wrong-way/ """ if not force and len(arg) != 0 and not any([c in arg for c in ' \t\n\v"']): return arg else: n_backslashes = 0 cmdline = '"' for c in arg: if c == "\\": # first count the number of current backslashes n_backslashes += 1 continue if c == '"': # Escape all backslashes and the following double quotation mark cmdline += (n_backslashes * 2 + 1) * "\\" else: # backslashes are not special here cmdline += n_backslashes * "\\" n_backslashes = 0 cmdline += c # Escape all backslashes, but let the terminating # double quotation mark we add below be interpreted # as a metacharacter cmdline += +n_backslashes * 2 * "\\" + '"' return cmdline def on_main_thread(): """Checks if we are on the main thread or not.""" return threading.current_thread() is threading.main_thread() _DEFAULT_SENTINEL = object() @contextlib.contextmanager def swap(namespace, name, value, default=_DEFAULT_SENTINEL): """Swaps a current variable name in a namespace for another value, and then replaces it when the context is exited. """ old = getattr(namespace, name, default) setattr(namespace, name, value) yield value if old is default: delattr(namespace, name) else: setattr(namespace, name, old) @contextlib.contextmanager def swap_values(d, updates, default=_DEFAULT_SENTINEL): """Updates a dictionary (or other mapping) with values from another mapping, and then restores the original mapping when the context is exited. """ old = {k: d.get(k, default) for k in updates} d.update(updates) yield for k, v in old.items(): if v is default and k in d: del d[k] else: d[k] = v # # Validators and converters # def detype(x): """This assumes that the object has a detype method, and calls that.""" return x.detype() def is_int(x): """Tests if something is an integer""" return isinstance(x, int) def is_float(x): """Tests if something is a float""" return isinstance(x, float) def is_string(x): """Tests if something is a string""" return isinstance(x, str) def is_slice(x): """Tests if something is a slice""" return isinstance(x, slice) def is_callable(x): """Tests if something is callable""" return callable(x) def is_string_or_callable(x): """Tests if something is a string or callable""" return is_string(x) or is_callable(x) def is_class(x): """Tests if something is a class""" return isinstance(x, type) def always_true(x): """Returns True""" return True def always_false(x): """Returns False""" return False def always_none(x): """Returns None""" return None def ensure_string(x): """Returns a string if x is not a string, and x if it already is.""" return str(x) def is_path(x): """This tests if something is a path.""" return isinstance(x, pathlib.Path) def is_env_path(x): """This tests if something is an environment path, ie a list of strings.""" return isinstance(x, EnvPath) def str_to_path(x): """Converts a string to a path.""" if x is None: return None elif isinstance(x, str): # checking x is needed to avoid uncontrolled converting empty string to Path('.') return pathlib.Path(x) if x else None elif isinstance(x, pathlib.Path): return x elif isinstance(x, EnvPath) and len(x) == 1: return pathlib.Path(x[0]) if x[0] else None else: raise TypeError( f"Variable should be a pathlib.Path, str or single EnvPath type. {type(x)} given." ) def str_to_env_path(x): """Converts a string to an environment path, ie a list of strings, splitting on the OS separator. """ # splitting will be done implicitly in EnvPath's __init__ return EnvPath(x) def path_to_str(x): """Converts a path to a string.""" return str(x) def env_path_to_str(x): """Converts an environment path to a string by joining on the OS separator. """ return os.pathsep.join(x) def is_bool(x): """Tests if something is a boolean.""" return isinstance(x, bool) def is_bool_or_none(x): """Tests if something is a boolean or None.""" return (x is None) or isinstance(x, bool) def is_logfile_opt(x): """ Checks if x is a valid $XONSH_TRACEBACK_LOGFILE option. Returns False if x is not a writable/creatable file or an empty string or None. """ if x is None: return True if not isinstance(x, str): return False else: return is_writable_file(x) or x == "" def to_logfile_opt(x): """Converts a $XONSH_TRACEBACK_LOGFILE option to either a str containing the filepath if it is a writable file or None if the filepath is not valid, informing the user on stderr about the invalid choice. """ if isinstance(x, os.PathLike): # type: ignore x = str(x) if is_logfile_opt(x): return x else: # if option is not valid, return a proper # option and inform the user on stderr sys.stderr.write( "xonsh: $XONSH_TRACEBACK_LOGFILE must be a " "filepath pointing to a file that either exists " "and is writable or that can be created.\n" ) return None def logfile_opt_to_str(x): """ Detypes a $XONSH_TRACEBACK_LOGFILE option. """ if x is None: # None should not be detyped to 'None', as 'None' constitutes # a perfectly valid filename and retyping it would introduce # ambiguity. Detype to the empty string instead. return "" return str(x) _FALSES = LazyObject( lambda: frozenset(["", "0", "n", "f", "no", "none", "false", "off"]), globals(), "_FALSES", ) def to_bool(x): """Converts to a boolean in a semantically meaningful way.""" if isinstance(x, bool): return x elif isinstance(x, str): return False if x.lower() in _FALSES else True else: return bool(x) def to_bool_or_none(x): """Converts to a boolean or none in a semantically meaningful way.""" if x is None or isinstance(x, bool): return x elif isinstance(x, str): low_x = x.lower() if low_x == "none": return None else: return False if x.lower() in _FALSES else True else: return bool(x) def to_itself(x): """No conversion, returns itself.""" return x def to_int_or_none(x) -> tp.Optional[int]: """Convert the given value to integer if possible. Otherwise return None""" if isinstance(x, str) and x.lower() == "none": return None else: return int(x) def bool_to_str(x): """Converts a bool to an empty string if False and the string '1' if True. """ return "1" if x else "" def bool_or_none_to_str(x): """Converts a bool or None value to a string.""" if x is None: return "None" else: return "1" if x else "" _BREAKS = LazyObject( lambda: frozenset(["b", "break", "s", "skip", "q", "quit"]), globals(), "_BREAKS" ) def to_bool_or_break(x): if isinstance(x, str) and x.lower() in _BREAKS: return "break" else: return to_bool(x) def is_bool_or_int(x): """Returns whether a value is a boolean or integer.""" return is_bool(x) or is_int(x) def to_bool_or_int(x): """Converts a value to a boolean or an integer.""" if isinstance(x, str): return int(x) if x.isdigit() else to_bool(x) elif is_int(x): # bools are ints too! return x else: return bool(x) def bool_or_int_to_str(x): """Converts a boolean or integer to a string.""" return bool_to_str(x) if is_bool(x) else str(x) @lazyobject def SLICE_REG(): return re.compile( r"(?P<start>(?:-\d)?\d*):(?P<end>(?:-\d)?\d*):?(?P<step>(?:-\d)?\d*)" ) def ensure_slice(x): """Try to convert an object into a slice, complain on failure""" if not x and x != 0: return slice(None) elif is_slice(x): return x try: x = int(x) if x != -1: s = slice(x, x + 1) else: s = slice(-1, None, None) except ValueError: x = x.strip("[]()") m = SLICE_REG.fullmatch(x) if m: groups = (int(i) if i else None for i in m.groups()) s = slice(*groups) else: raise ValueError("cannot convert {!r} to slice".format(x)) except TypeError: try: s = slice(*(int(i) for i in x)) except (TypeError, ValueError): raise ValueError("cannot convert {!r} to slice".format(x)) return s def get_portions(it, slices): """Yield from portions of an iterable. Parameters ---------- it : iterable slices : a slice or a list of slice objects """ if is_slice(slices): slices = [slices] if len(slices) == 1: s = slices[0] try: yield from itertools.islice(it, s.start, s.stop, s.step) return except ValueError: # islice failed pass it = list(it) for s in slices: yield from it[s] def is_slice_as_str(x): """ Test if string x is a slice. If not a string return False. """ try: x = x.strip("[]()") m = SLICE_REG.fullmatch(x) if m: return True except AttributeError: pass return False def is_int_as_str(x): """ Test if string x is an integer. If not a string return False. """ try: return x.isdecimal() except AttributeError: return False def is_string_set(x): """Tests if something is a set of strings""" return isinstance(x, cabc.Set) and all(isinstance(a, str) for a in x) def csv_to_set(x): """Convert a comma-separated list of strings to a set of strings.""" if not x: return set() else: return set(x.split(",")) def set_to_csv(x): """Convert a set of strings to a comma-separated list of strings.""" return ",".join(x) def pathsep_to_set(x): """Converts a os.pathsep separated string to a set of strings.""" if not x: return set() else: return set(x.split(os.pathsep)) def set_to_pathsep(x, sort=False): """Converts a set to an os.pathsep separated string. The sort kwarg specifies whether to sort the set prior to str conversion. """ if sort: x = sorted(x) return os.pathsep.join(x) def is_string_seq(x): """Tests if something is a sequence of strings""" return isinstance(x, cabc.Sequence) and all(isinstance(a, str) for a in x) def is_nonstring_seq_of_strings(x): """Tests if something is a sequence of strings, where the top-level sequence is not a string itself. """ return ( isinstance(x, cabc.Sequence) and not isinstance(x, str) and all(isinstance(a, str) for a in x) ) def pathsep_to_seq(x): """Converts a os.pathsep separated string to a sequence of strings.""" if not x: return [] else: return x.split(os.pathsep) def seq_to_pathsep(x): """Converts a sequence to an os.pathsep separated string.""" return os.pathsep.join(x) def pathsep_to_upper_seq(x): """Converts a os.pathsep separated string to a sequence of uppercase strings. """ if not x: return [] else: return x.upper().split(os.pathsep) def seq_to_upper_pathsep(x): """Converts a sequence to an uppercase os.pathsep separated string.""" return os.pathsep.join(x).upper() def is_bool_seq(x): """Tests if an object is a sequence of bools.""" return isinstance(x, cabc.Sequence) and all(isinstance(y, bool) for y in x) def csv_to_bool_seq(x): """Takes a comma-separated string and converts it into a list of bools.""" return [to_bool(y) for y in csv_to_set(x)] def bool_seq_to_csv(x): """Converts a sequence of bools to a comma-separated string.""" return ",".join(map(str, x)) def ptk2_color_depth_setter(x): """Setter function for $PROMPT_TOOLKIT_COLOR_DEPTH. Also updates os.environ so prompt toolkit can pickup the value. """ x = str(x) if x in { "DEPTH_1_BIT", "MONOCHROME", "DEPTH_4_BIT", "ANSI_COLORS_ONLY", "DEPTH_8_BIT", "DEFAULT", "DEPTH_24_BIT", "TRUE_COLOR", }: pass elif x in {"", None}: x = "" else: msg = '"{}" is not a valid value for $PROMPT_TOOLKIT_COLOR_DEPTH. '.format(x) warnings.warn(msg, RuntimeWarning) x = "" if x == "" and "PROMPT_TOOLKIT_COLOR_DEPTH" in os_environ: del os_environ["PROMPT_TOOLKIT_COLOR_DEPTH"] else: os_environ["PROMPT_TOOLKIT_COLOR_DEPTH"] = x return x def is_completions_display_value(x): """Enumerated values of ``$COMPLETIONS_DISPLAY``""" return x in {"none", "single", "multi"} def to_completions_display_value(x): """Convert user input to value of ``$COMPLETIONS_DISPLAY``""" x = str(x).lower() if x in {"none", "false"}: x = "none" elif x in {"multi", "true"}: x = "multi" elif x in {"single", "readline"}: pass else: msg = '"{}" is not a valid value for $COMPLETIONS_DISPLAY. '.format(x) msg += 'Using "multi".' warnings.warn(msg, RuntimeWarning) x = "multi" return x CANONIC_COMPLETION_MODES = frozenset({"default", "menu-complete"}) def is_completion_mode(x): """Enumerated values of $COMPLETION_MODE""" return x in CANONIC_COMPLETION_MODES def to_completion_mode(x): """Convert user input to value of $COMPLETION_MODE""" y = str(x).casefold().replace("_", "-") y = ( "default" if y in ("", "d", "xonsh", "none", "def") else "menu-complete" if y in ("m", "menu", "menu-completion") else y ) if y not in CANONIC_COMPLETION_MODES: warnings.warn( f"'{x}' is not valid for $COMPLETION_MODE, must be one of {CANONIC_COMPLETION_MODES}. Using 'default'.", RuntimeWarning, ) y = "default" return y def is_str_str_dict(x): """Tests if something is a str:str dictionary""" return isinstance(x, dict) and all( isinstance(k, str) and isinstance(v, str) for k, v in x.items() ) def to_dict(x): """Converts a string to a dictionary""" if isinstance(x, dict): return x try: x = ast.literal_eval(x) except (ValueError, SyntaxError): msg = '"{}" can not be converted to Python dictionary.'.format(x) warnings.warn(msg, RuntimeWarning) x = dict() return x def to_str_str_dict(x): """Converts a string to str:str dictionary""" if is_str_str_dict(x): return x x = to_dict(x) if not is_str_str_dict(x): msg = '"{}" can not be converted to str:str dictionary.'.format(x) warnings.warn(msg, RuntimeWarning) x = dict() return x def dict_to_str(x): """Converts a dictionary to a string""" if not x or len(x) == 0: return "" return str(x) # history validation _min_to_sec = lambda x: 60.0 * float(x) _hour_to_sec = lambda x: 60.0 * _min_to_sec(x) _day_to_sec = lambda x: 24.0 * _hour_to_sec(x) _month_to_sec = lambda x: 30.4375 * _day_to_sec(x) _year_to_sec = lambda x: 365.25 * _day_to_sec(x) _kb_to_b = lambda x: 1024 * int(x) _mb_to_b = lambda x: 1024 * _kb_to_b(x) _gb_to_b = lambda x: 1024 * _mb_to_b(x) _tb_to_b = lambda x: 1024 * _tb_to_b(x) # type: ignore CANON_HISTORY_UNITS = LazyObject( lambda: frozenset(["commands", "files", "s", "b"]), globals(), "CANON_HISTORY_UNITS" ) HISTORY_UNITS = LazyObject( lambda: { "": ("commands", int), "c": ("commands", int), "cmd": ("commands", int), "cmds": ("commands", int), "command": ("commands", int), "commands": ("commands", int), "f": ("files", int), "files": ("files", int), "s": ("s", float), "sec": ("s", float), "second": ("s", float), "seconds": ("s", float), "m": ("s", _min_to_sec), "min": ("s", _min_to_sec), "mins": ("s", _min_to_sec), "h": ("s", _hour_to_sec), "hr": ("s", _hour_to_sec), "hour": ("s", _hour_to_sec), "hours": ("s", _hour_to_sec), "d": ("s", _day_to_sec), "day": ("s", _day_to_sec), "days": ("s", _day_to_sec), "mon": ("s", _month_to_sec), "month": ("s", _month_to_sec), "months": ("s", _month_to_sec), "y": ("s", _year_to_sec), "yr": ("s", _year_to_sec), "yrs": ("s", _year_to_sec), "year": ("s", _year_to_sec), "years": ("s", _year_to_sec), "b": ("b", int), "byte": ("b", int), "bytes": ("b", int), "kb": ("b", _kb_to_b), "kilobyte": ("b", _kb_to_b), "kilobytes": ("b", _kb_to_b), "mb": ("b", _mb_to_b), "meg": ("b", _mb_to_b), "megs": ("b", _mb_to_b), "megabyte": ("b", _mb_to_b), "megabytes": ("b", _mb_to_b), "gb": ("b", _gb_to_b), "gig": ("b", _gb_to_b), "gigs": ("b", _gb_to_b), "gigabyte": ("b", _gb_to_b), "gigabytes": ("b", _gb_to_b), "tb": ("b", _tb_to_b), "terabyte": ("b", _tb_to_b), "terabytes": ("b", _tb_to_b), }, globals(), "HISTORY_UNITS", ) """Maps lowercase unit names to canonical name and conversion utilities.""" def is_history_tuple(x): """Tests if something is a proper history value, units tuple.""" if ( isinstance(x, cabc.Sequence) and len(x) == 2 and isinstance(x[0], (int, float)) and x[1].lower() in CANON_HISTORY_UNITS ): return True return False def is_history_backend(x): """Tests if something is a valid history backend.""" return is_string(x) or is_class(x) or isinstance(x, object) def is_dynamic_cwd_width(x): """Determine if the input is a valid input for the DYNAMIC_CWD_WIDTH environment variable. """ return ( isinstance(x, tuple) and len(x) == 2 and isinstance(x[0], float) and x[1] in set("c%") ) def to_dynamic_cwd_tuple(x): """Convert to a canonical cwd_width tuple.""" unit = "c" if isinstance(x, str): if x[-1] == "%": x = x[:-1] unit = "%" else: unit = "c" return (float(x), unit) else: return (float(x[0]), x[1]) def dynamic_cwd_tuple_to_str(x): """Convert a canonical cwd_width tuple to a string.""" if x[1] == "%": return str(x[0]) + "%" else: return str(x[0]) RE_HISTORY_TUPLE = LazyObject( lambda: re.compile(r"([-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?)\s*([A-Za-z]*)"), globals(), "RE_HISTORY_TUPLE", ) def to_history_tuple(x): """Converts to a canonical history tuple.""" if not isinstance(x, (cabc.Sequence, float, int)): raise ValueError("history size must be given as a sequence or number") if isinstance(x, str): m = RE_HISTORY_TUPLE.match(x.strip().lower()) return to_history_tuple((m.group(1), m.group(3))) elif isinstance(x, (float, int)): return to_history_tuple((x, "commands")) units, converter = HISTORY_UNITS[x[1]] value = converter(x[0]) return (value, units) def history_tuple_to_str(x): """Converts a valid history tuple to a canonical string.""" return "{0} {1}".format(*x) def all_permutations(iterable): """Yeilds all permutations, not just those of a specified length""" for r in range(1, len(iterable) + 1): yield from itertools.permutations(iterable, r=r) def format_color(string, **kwargs): """Formats strings that may contain colors. This simply dispatches to the shell instances method of the same name. The results of this function should be directly usable by print_color(). """ if hasattr(xsh.shell, "shell"): return xsh.shell.shell.format_color(string, **kwargs) else: # fallback for ANSI if shell is not yet initialized from xonsh.ansi_colors import ansi_partial_color_format style = xsh.env.get("XONSH_COLOR_STYLE") return ansi_partial_color_format(string, style=style) def print_color(string, **kwargs): """Prints a string that may contain colors. This dispatched to the shell method of the same name. Colors will be formatted if they have not already been. """ if hasattr(xsh.shell, "shell"): xsh.shell.shell.print_color(string, **kwargs) else: # fallback for ANSI if shell is not yet initialized print(format_color(string, **kwargs)) def color_style_names(): """Returns an iterable of all available style names.""" return xsh.shell.shell.color_style_names() def color_style(): """Returns the current color map.""" return xsh.shell.shell.color_style() def register_custom_style( name, styles, highlight_color=None, background_color=None, base="default" ): """Register custom style. Parameters ---------- name : str Style name. styles : dict Token -> style mapping. highlight_color : str Hightlight color. background_color : str Background color. base : str, optional Base style to use as default. Returns ------- style : The style object created, None if not succeeded """ style = None if pygments_version_info(): from xonsh.pyghooks import register_custom_pygments_style style = register_custom_pygments_style( name, styles, highlight_color, background_color, base ) # register ANSI colors from xonsh.ansi_colors import register_custom_ansi_style register_custom_ansi_style(name, styles, base) return style def _token_attr_from_stylemap(stylemap): """yields tokens attr, and index from a stylemap""" import prompt_toolkit as ptk if xsh.shell.shell_type == "prompt_toolkit1": style = ptk.styles.style_from_dict(stylemap) for token in stylemap: yield token, style.token_to_attrs[token] else: style = ptk.styles.style_from_pygments_dict(stylemap) for token in stylemap: style_str = "class:{}".format( ptk.styles.pygments.pygments_token_to_classname(token) ) yield (token, style.get_attrs_for_style_str(style_str)) def _get_color_lookup_table(): """Returns the prompt_toolkit win32 ColorLookupTable""" if xsh.shell.shell_type == "prompt_toolkit1": from prompt_toolkit.terminal.win32_output import ColorLookupTable else: from prompt_toolkit.output.win32 import ColorLookupTable return ColorLookupTable() def _get_color_indexes(style_map): """Generates the color and windows color index for a style""" table = _get_color_lookup_table() for token, attr in _token_attr_from_stylemap(style_map): if attr.color: index = table.lookup_fg_color(attr.color) try: rgb = ( int(attr.color[0:2], 16), int(attr.color[2:4], 16), int(attr.color[4:6], 16), ) except Exception: rgb = None yield token, index, rgb # Map of new PTK2 color names to PTK1 variants PTK_NEW_OLD_COLOR_MAP = LazyObject( lambda: { "black": "black", "red": "darkred", "green": "darkgreen", "yellow": "brown", "blue": "darkblue", "magenta": "purple", "cyan": "teal", "gray": "lightgray", "brightblack": "darkgray", "brightred": "red", "brightgreen": "green", "brightyellow": "yellow", "brightblue": "blue", "brightmagenta": "fuchsia", "brightcyan": "turquoise", "white": "white", }, globals(), "PTK_NEW_OLD_COLOR_MAP", ) # Map of new ansicolor names to old PTK1 names ANSICOLOR_NAMES_MAP = LazyObject( lambda: {"ansi" + k: "#ansi" + v for k, v in PTK_NEW_OLD_COLOR_MAP.items()}, globals(), "ANSICOLOR_NAMES_MAP", ) def _win10_color_map(): cmap = { "ansiblack": (12, 12, 12), "ansiblue": (0, 55, 218), "ansigreen": (19, 161, 14), "ansicyan": (58, 150, 221), "ansired": (197, 15, 31), "ansimagenta": (136, 23, 152), "ansiyellow": (193, 156, 0), "ansigray": (204, 204, 204), "ansibrightblack": (118, 118, 118), "ansibrightblue": (59, 120, 255), "ansibrightgreen": (22, 198, 12), "ansibrightcyan": (97, 214, 214), "ansibrightred": (231, 72, 86), "ansibrightmagenta": (180, 0, 158), "ansibrightyellow": (249, 241, 165), "ansiwhite": (242, 242, 242), } return { k: "#{0:02x}{1:02x}{2:02x}".format(r, g, b) for k, (r, g, b) in cmap.items() } WIN10_COLOR_MAP = LazyObject(_win10_color_map, globals(), "WIN10_COLOR_MAP") def _win_bold_color_map(): """Map dark ansi colors to lighter version.""" return { "ansiblack": "ansibrightblack", "ansiblue": "ansibrightblue", "ansigreen": "ansibrightgreen", "ansicyan": "ansibrightcyan", "ansired": "ansibrightred", "ansimagenta": "ansibrightmagenta", "ansiyellow": "ansibrightyellow", "ansigray": "ansiwhite", } WIN_BOLD_COLOR_MAP = LazyObject(_win_bold_color_map, globals(), "WIN_BOLD_COLOR_MAP") def hardcode_colors_for_win10(style_map): """Replace all ansi colors with hardcoded colors to avoid unreadable defaults in conhost.exe """ modified_style = {} if not xsh.env["PROMPT_TOOLKIT_COLOR_DEPTH"]: xsh.env["PROMPT_TOOLKIT_COLOR_DEPTH"] = "DEPTH_24_BIT" # Replace all ansi colors with hardcoded colors to avoid unreadable defaults # in conhost.exe for token, style_str in style_map.items(): for ansicolor in WIN10_COLOR_MAP: if ansicolor in style_str: if "bold" in style_str and "nobold" not in style_str: # Win10 doesn't yet handle bold colors. Instead dark # colors are mapped to their lighter version. We simulate # the same here. style_str.replace("bold", "") hexcolor = WIN10_COLOR_MAP[ WIN_BOLD_COLOR_MAP.get(ansicolor, ansicolor) ] else: hexcolor = WIN10_COLOR_MAP[ansicolor] style_str = style_str.replace(ansicolor, hexcolor) modified_style[token] = style_str return modified_style def ansicolors_to_ptk1_names(stylemap): """Converts ansicolor names in a stylemap to old PTK1 color names""" if pygments_version_info() and pygments_version_info() >= (2, 4, 0): return stylemap modified_stylemap = {} for token, style_str in stylemap.items(): for color, ptk1_color in ANSICOLOR_NAMES_MAP.items(): if "#" + color not in style_str: style_str = style_str.replace(color, ptk1_color) modified_stylemap[token] = style_str return modified_stylemap def intensify_colors_for_cmd_exe(style_map): """Returns a modified style to where colors that maps to dark colors are replaced with brighter versions. """ modified_style = {} replace_colors = { 1: "ansibrightcyan", # subst blue with bright cyan 2: "ansibrightgreen", # subst green with bright green 4: "ansibrightred", # subst red with bright red 5: "ansibrightmagenta", # subst magenta with bright magenta 6: "ansibrightyellow", # subst yellow with bright yellow 9: "ansicyan", # subst intense blue with dark cyan (more readable) } if xsh.shell.shell_type == "prompt_toolkit1": replace_colors = ansicolors_to_ptk1_names(replace_colors) for token, idx, _ in _get_color_indexes(style_map): if idx in replace_colors: modified_style[token] = replace_colors[idx] return modified_style def intensify_colors_on_win_setter(enable): """Resets the style when setting the INTENSIFY_COLORS_ON_WIN environment variable. """ enable = to_bool(enable) if ( hasattr(xsh, "shell") and xsh.shell is not None and hasattr(xsh.shell.shell.styler, "style_name") ): delattr(xsh.shell.shell.styler, "style_name") return enable def format_std_prepost(template, env=None): """Formats a template prefix/postfix string for a standard buffer. Returns a string suitable for prepending or appending. """ if not template: return "" env = xsh.env if env is None else env invis = "\001\002" if xsh.shell is None: # shell hasn't fully started up (probably still in xonshrc) from xonsh.prompt.base import PromptFormatter from xonsh.ansi_colors import ansi_partial_color_format pf = PromptFormatter() s = pf(template) style = env.get("XONSH_COLOR_STYLE") s = ansi_partial_color_format(invis + s + invis, hide=False, style=style) else: # shell has fully started. do the normal thing shell = xsh.shell.shell try: s = shell.prompt_formatter(template) except Exception: print_exception() # \001\002 is there to fool pygments into not returning an empty string # for potentially empty input. This happens when the template is just a # color code with no visible text. s = shell.format_color(invis + s + invis, force_string=True) s = s.replace(invis, "") return s _RE_STRING_START = "[bBprRuUf]*" _RE_STRING_TRIPLE_DOUBLE = '"""' _RE_STRING_TRIPLE_SINGLE = "'''" _RE_STRING_DOUBLE = '"' _RE_STRING_SINGLE = "'" _STRINGS = ( _RE_STRING_TRIPLE_DOUBLE, _RE_STRING_TRIPLE_SINGLE, _RE_STRING_DOUBLE, _RE_STRING_SINGLE, ) RE_BEGIN_STRING = LazyObject( lambda: re.compile("(" + _RE_STRING_START + "(" + "|".join(_STRINGS) + "))"), globals(), "RE_BEGIN_STRING", ) """Regular expression matching the start of a string, including quotes and leading characters (r, b, or u)""" RE_STRING_START = LazyObject( lambda: re.compile(_RE_STRING_START), globals(), "RE_STRING_START" ) """Regular expression matching the characters before the quotes when starting a string (r, b, or u, case insensitive)""" RE_STRING_CONT = LazyDict( { '"': lambda: re.compile(r'((\\(.|\n))|([^"\\]))*'), "'": lambda: re.compile(r"((\\(.|\n))|([^'\\]))*"), '"""': lambda: re.compile(r'((\\(.|\n))|([^"\\])|("(?!""))|\n)*'), "'''": lambda: re.compile(r"((\\(.|\n))|([^'\\])|('(?!''))|\n)*"), }, globals(), "RE_STRING_CONT", ) """Dictionary mapping starting quote sequences to regular expressions that match the contents of a string beginning with those quotes (not including the terminating quotes)""" @lazyobject def RE_COMPLETE_STRING(): ptrn = ( "^" + _RE_STRING_START + "(?P<quote>" + "|".join(_STRINGS) + ")" + ".*?(?P=quote)$" ) return re.compile(ptrn, re.DOTALL) def strip_simple_quotes(s): """Gets rid of single quotes, double quotes, single triple quotes, and single double quotes from a string, if present front and back of a string. Otherwiswe, does nothing. """ starts_single = s.startswith("'") starts_double = s.startswith('"') if not starts_single and not starts_double: return s elif starts_single: ends_single = s.endswith("'") if not ends_single: return s elif s.startswith("'''") and s.endswith("'''") and len(s) >= 6: return s[3:-3] elif len(s) >= 2: return s[1:-1] else: return s else: # starts double ends_double = s.endswith('"') if not ends_double: return s elif s.startswith('"""') and s.endswith('"""') and len(s) >= 6: return s[3:-3] elif len(s) >= 2: return s[1:-1] else: return s def check_for_partial_string(x): """Returns the starting index (inclusive), ending index (exclusive), and starting quote string of the most recent Python string found in the input. check_for_partial_string(x) -> (startix, endix, quote) Parameters ---------- x : str The string to be checked (representing a line of terminal input) Returns ------- startix : int (or None) The index where the most recent Python string found started (inclusive), or None if no strings exist in the input endix : int (or None) The index where the most recent Python string found ended (exclusive), or None if no strings exist in the input OR if the input ended in the middle of a Python string quote : str (or None) A string containing the quote used to start the string (e.g., b", ", '''), or None if no string was found. """ string_indices = [] starting_quote = [] current_index = 0 match = re.search(RE_BEGIN_STRING, x) while match is not None: # add the start in start = match.start() quote = match.group(0) lenquote = len(quote) current_index += start # store the starting index of the string, as well as the # characters in the starting quotes (e.g., ", ', """, r", etc) string_indices.append(current_index) starting_quote.append(quote) # determine the string that should terminate this string ender = re.sub(RE_STRING_START, "", quote) x = x[start + lenquote :] current_index += lenquote # figure out what is inside the string continuer = RE_STRING_CONT[ender] contents = re.match(continuer, x) inside = contents.group(0) leninside = len(inside) current_index += contents.start() + leninside + len(ender) # if we are not at the end of the input string, add the ending index of # the string to string_indices if contents.end() < len(x): string_indices.append(current_index) x = x[leninside + len(ender) :] # find the next match match = re.search(RE_BEGIN_STRING, x) numquotes = len(string_indices) if numquotes == 0: return (None, None, None) elif numquotes % 2: return (string_indices[-1], None, starting_quote[-1]) else: return (string_indices[-2], string_indices[-1], starting_quote[-1]) # regular expressions for matching environment variables # i.e $FOO, ${'FOO'} @lazyobject def POSIX_ENVVAR_REGEX(): pat = r"""\$({(?P<quote>['"])|)(?P<envvar>\w+)((?P=quote)}|(?:\1\b))""" return re.compile(pat) def expandvars(path): """Expand shell variables of the forms $var, ${var} and %var%. Unknown variables are left unchanged.""" env = xsh.env if isinstance(path, bytes): path = path.decode( encoding=env.get("XONSH_ENCODING"), errors=env.get("XONSH_ENCODING_ERRORS") ) elif isinstance(path, pathlib.Path): # get the path's string representation path = str(path) if "$" in path: shift = 0 for match in POSIX_ENVVAR_REGEX.finditer(path): name = match.group("envvar") if name in env: detyper = env.get_detyper(name) val = env[name] value = str(val) if detyper is None else detyper(val) value = str(val) if value is None else value start_pos, end_pos = match.span() path_len_before_replace = len(path) path = path[: start_pos + shift] + value + path[end_pos + shift :] shift = shift + len(path) - path_len_before_replace return path # # File handling tools # def backup_file(fname): """Moves an existing file to a new name that has the current time right before the extension. """ # lazy imports import shutil from datetime import datetime base, ext = os.path.splitext(fname) timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f") newfname = "%s.%s%s" % (base, timestamp, ext) shutil.move(fname, newfname) def normabspath(p): """Returns as normalized absolute path, namely, normcase(abspath(p))""" return os.path.normcase(os.path.abspath(p)) def expanduser_abs_path(inp): """Provides user expanded absolute path""" return os.path.abspath(expanduser(inp)) WINDOWS_DRIVE_MATCHER = LazyObject( lambda: re.compile(r"^\w:"), globals(), "WINDOWS_DRIVE_MATCHER" ) def expand_case_matching(s): """Expands a string to a case insensitive globable string.""" t = [] openers = {"[", "{"} closers = {"]", "}"} nesting = 0 drive_part = WINDOWS_DRIVE_MATCHER.match(s) if ON_WINDOWS else None if drive_part: drive_part = drive_part.group(0) t.append(drive_part) s = s[len(drive_part) :] for c in s: if c in openers: nesting += 1 elif c in closers: nesting -= 1 elif nesting > 0: pass elif c.isalpha(): folded = c.casefold() if len(folded) == 1: c = "[{0}{1}]".format(c.upper(), c.lower()) else: newc = ["[{0}{1}]?".format(f.upper(), f.lower()) for f in folded[:-1]] newc = "".join(newc) newc += "[{0}{1}{2}]".format(folded[-1].upper(), folded[-1].lower(), c) c = newc t.append(c) return "".join(t) def globpath( s, ignore_case=False, return_empty=False, sort_result=None, include_dotfiles=None ): """Simple wrapper around glob that also expands home and env vars.""" o, s = _iglobpath( s, ignore_case=ignore_case, sort_result=sort_result, include_dotfiles=include_dotfiles, ) o = list(o) no_match = [] if return_empty else [s] return o if len(o) != 0 else no_match def _dotglobstr(s): modified = False dotted_s = s if "/*" in dotted_s: dotted_s = dotted_s.replace("/*", "/.*") dotted_s = dotted_s.replace("/.**/.*", "/**/.*") modified = True if dotted_s.startswith("*") and not dotted_s.startswith("**"): dotted_s = "." + dotted_s modified = True return dotted_s, modified def _iglobpath(s, ignore_case=False, sort_result=None, include_dotfiles=None): s = xsh.expand_path(s) if sort_result is None: sort_result = xsh.env.get("GLOB_SORTED") if include_dotfiles is None: include_dotfiles = xsh.env.get("DOTGLOB") if ignore_case: s = expand_case_matching(s) if "**" in s and "**/*" not in s: s = s.replace("**", "**/*") if include_dotfiles: dotted_s, dotmodified = _dotglobstr(s) if sort_result: paths = glob.glob(s, recursive=True) if include_dotfiles and dotmodified: paths.extend(glob.iglob(dotted_s, recursive=True)) paths.sort() paths = iter(paths) else: paths = glob.iglob(s, recursive=True) if include_dotfiles and dotmodified: paths = itertools.chain(glob.iglob(dotted_s, recursive=True), paths) return paths, s def iglobpath(s, ignore_case=False, sort_result=None, include_dotfiles=None): """Simple wrapper around iglob that also expands home and env vars.""" try: return _iglobpath( s, ignore_case=ignore_case, sort_result=sort_result, include_dotfiles=include_dotfiles, )[0] except IndexError: # something went wrong in the actual iglob() call return iter(()) def ensure_timestamp(t, datetime_format=None): if isinstance(t, (int, float)): return t try: return float(t) except (ValueError, TypeError): pass if datetime_format is None: datetime_format = xsh.env["XONSH_DATETIME_FORMAT"] if isinstance(t, datetime.datetime): t = t.timestamp() else: t = datetime.datetime.strptime(t, datetime_format).timestamp() return t def format_datetime(dt): """Format datetime object to string base on $XONSH_DATETIME_FORMAT Env.""" format_ = xsh.env["XONSH_DATETIME_FORMAT"] return dt.strftime(format_) def columnize(elems, width=80, newline="\n"): """Takes an iterable of strings and returns a list of lines with the elements placed in columns. Each line will be at most *width* columns. The newline character will be appended to the end of each line. """ sizes = [len(e) + 1 for e in elems] total = sum(sizes) nelem = len(elems) if total - 1 <= width: ncols = len(sizes) nrows = 1 columns = [sizes] last_longest_row = total enter_loop = False else: ncols = 1 nrows = len(sizes) columns = [sizes] last_longest_row = max(sizes) enter_loop = True while enter_loop: longest_row = sum(map(max, columns)) if longest_row - 1 <= width: # we might be able to fit another column. ncols += 1 nrows = nelem // ncols columns = [sizes[i * nrows : (i + 1) * nrows] for i in range(ncols)] last_longest_row = longest_row else: # we can't fit another column ncols -= 1 nrows = nelem // ncols break pad = (width - last_longest_row + ncols) // ncols pad = pad if pad > 1 else 1 data = [elems[i * nrows : (i + 1) * nrows] for i in range(ncols)] colwidths = [max(map(len, d)) + pad for d in data] colwidths[-1] -= pad row_t = "".join(["{{row[{i}]: <{{w[{i}]}}}}".format(i=i) for i in range(ncols)]) row_t += newline lines = [ row_t.format(row=row, w=colwidths) for row in itertools.zip_longest(*data, fillvalue="") ] return lines ALIAS_KWARG_NAMES = frozenset(["args", "stdin", "stdout", "stderr", "spec", "stack"]) def unthreadable(f): """Decorator that specifies that a callable alias should be run only on the main thread process. This is often needed for debuggers and profilers. """ f.__xonsh_threadable__ = False return f def uncapturable(f): """Decorator that specifies that a callable alias should not be run with any capturing. This is often needed if the alias call interactive subprocess, like pagers and text editors. """ f.__xonsh_capturable__ = False return f def carriage_return(): """Writes a carriage return to stdout, and nothing else.""" print("\r", flush=True, end="") def deprecated(deprecated_in=None, removed_in=None): """Parametrized decorator that deprecates a function in a graceful manner. Updates the decorated function's docstring to mention the version that deprecation occurred in and the version it will be removed in if both of these values are passed. When removed_in is not a release equal to or less than the current release, call ``warnings.warn`` with details, while raising ``DeprecationWarning``. When removed_in is a release equal to or less than the current release, raise an ``AssertionError``. Parameters ---------- deprecated_in : str The version number that deprecated this function. removed_in : str The version number that this function will be removed in. """ message_suffix = _deprecated_message_suffix(deprecated_in, removed_in) if not message_suffix: message_suffix = "" def decorated(func): warning_message = "{} has been deprecated".format(func.__name__) warning_message += message_suffix @functools.wraps(func) def wrapped(*args, **kwargs): _deprecated_error_on_expiration(func.__name__, removed_in) func(*args, **kwargs) warnings.warn(warning_message, DeprecationWarning) wrapped.__doc__ = ( "{}\n\n{}".format(wrapped.__doc__, warning_message) if wrapped.__doc__ else warning_message ) return wrapped return decorated def _deprecated_message_suffix(deprecated_in, removed_in): if deprecated_in and removed_in: message_suffix = " in version {} and will be removed in version {}".format( deprecated_in, removed_in ) elif deprecated_in and not removed_in: message_suffix = " in version {}".format(deprecated_in) elif not deprecated_in and removed_in: message_suffix = " and will be removed in version {}".format(removed_in) else: message_suffix = None return message_suffix def _deprecated_error_on_expiration(name, removed_in): from distutils.version import LooseVersion if not removed_in: return elif LooseVersion(__version__) >= LooseVersion(removed_in): raise AssertionError( "{} has passed its version {} expiry date!".format(name, removed_in) ) def to_repr_pretty_(inst, p, cycle): name = "{0}.{1}".format(inst.__class__.__module__, inst.__class__.__name__) with p.group(0, name + "(", ")"): if cycle: p.text("...") elif len(inst): p.break_() p.pretty(dict(inst)) class XAttr: """hold attribute and value""" __slots__ = ("name", "value") def __init__(self, val) -> None: self.value = val def __set_name__(self, owner, name) -> None: self.name: str = name def __get__(self, instance, owner) -> "XAttr": return self def __str__(self) -> str: return f"<{self.name}={self.value}>" class NamedConstantMeta(type): """utility class to hold list of values as class-attributes""" def __iter__(cls) -> tp.Iterator[XAttr]: for attr in vars(cls): if not attr.startswith("__"): yield getattr(cls, attr) # # ast # # -*- coding: utf-8 -*- """The xonsh abstract syntax tree node.""" # These are imported into our module namespace for the benefit of parser.py. # pylint: disable=unused-import # amalgamated sys from ast import ( Module, Num, Expr, Str, Bytes, UnaryOp, UAdd, USub, Invert, BinOp, Add, Sub, Mult, Div, FloorDiv, Mod, Pow, Compare, Lt, Gt, LtE, GtE, Eq, NotEq, In, NotIn, Is, IsNot, Not, BoolOp, Or, And, Subscript, Load, Slice, ExtSlice, List, Tuple, Set, Dict, AST, NameConstant, Name, GeneratorExp, Store, comprehension, ListComp, SetComp, DictComp, Assign, AugAssign, BitXor, BitAnd, BitOr, LShift, RShift, Assert, Delete, Del, Pass, Raise, Import, alias, ImportFrom, Continue, Break, Yield, YieldFrom, Return, IfExp, Lambda, arguments, arg, Call, keyword, Attribute, Global, Nonlocal, If, While, For, withitem, With, Try, ExceptHandler, FunctionDef, ClassDef, Starred, NodeTransformer, Interactive, Expression, Index, literal_eval, dump, walk, increment_lineno, Constant, ) from ast import Ellipsis as EllipsisNode # pylint: enable=unused-import # amalgamated textwrap # amalgamated itertools # amalgamated from xonsh.built_ins import XSH # amalgamated xonsh.tools from ast import ( MatMult, AsyncFunctionDef, AsyncWith, AsyncFor, Await, JoinedStr, FormattedValue, AnnAssign, ) # amalgamated xonsh.platform if PYTHON_VERSION_INFO > (3, 8): from ast import NamedExpr # type:ignore STATEMENTS = ( FunctionDef, ClassDef, Return, Delete, Assign, AugAssign, For, While, If, With, Raise, Try, Assert, Import, ImportFrom, Global, Nonlocal, Expr, Pass, Break, Continue, AnnAssign, ) def leftmostname(node): """Attempts to find the first name in the tree.""" if isinstance(node, Name): rtn = node.id elif isinstance(node, (BinOp, Compare)): rtn = leftmostname(node.left) elif isinstance(node, (Attribute, Subscript, Starred, Expr)): rtn = leftmostname(node.value) elif isinstance(node, Call): rtn = leftmostname(node.func) elif isinstance(node, UnaryOp): rtn = leftmostname(node.operand) elif isinstance(node, BoolOp): rtn = leftmostname(node.values[0]) elif isinstance(node, Assign): rtn = leftmostname(node.targets[0]) elif isinstance(node, AnnAssign): rtn = leftmostname(node.target) elif isinstance(node, (Str, Bytes, JoinedStr)): # handles case of "./my executable" rtn = leftmostname(node.s) elif isinstance(node, Tuple) and len(node.elts) > 0: # handles case of echo ,1,2,3 rtn = leftmostname(node.elts[0]) else: rtn = None return rtn def get_lineno(node, default=0): """Gets the lineno of a node or returns the default.""" return getattr(node, "lineno", default) def min_line(node): """Computes the minimum lineno.""" node_line = get_lineno(node) return min(map(get_lineno, walk(node), itertools.repeat(node_line))) def max_line(node): """Computes the maximum lineno.""" return max(map(get_lineno, walk(node))) def get_col(node, default=-1): """Gets the col_offset of a node, or returns the default""" return getattr(node, "col_offset", default) def min_col(node): """Computes the minimum col_offset.""" return min(map(get_col, walk(node), itertools.repeat(node.col_offset))) def max_col(node): """Returns the maximum col_offset of the node and all sub-nodes.""" col = getattr(node, "max_col", None) if col is not None: return col highest = max(walk(node), key=get_col) col = highest.col_offset + node_len(highest) return col def node_len(node): """The length of a node as a string""" val = 0 for n in walk(node): if isinstance(n, Name): val += len(n.id) elif isinstance(n, Attribute): val += 1 + (len(n.attr) if isinstance(n.attr, str) else 0) # this may need to be added to for more nodes as more cases are found return val def get_id(node, default=None): """Gets the id attribute of a node, or returns a default.""" return getattr(node, "id", default) def gather_names(node): """Returns the set of all names present in the node's tree.""" rtn = set(map(get_id, walk(node))) rtn.discard(None) return rtn def get_id_ctx(node): """Gets the id and attribute of a node, or returns a default.""" nid = getattr(node, "id", None) if nid is None: return (None, None) return (nid, node.ctx) def gather_load_store_names(node): """Returns the names present in the node's tree in a set of load nodes and a set of store nodes. """ load = set() store = set() for nid, ctx in map(get_id_ctx, walk(node)): if nid is None: continue elif isinstance(ctx, Load): load.add(nid) else: store.add(nid) return (load, store) def has_elts(x): """Tests if x is an AST node with elements.""" return isinstance(x, AST) and hasattr(x, "elts") def load_attribute_chain(name, lineno=None, col=None): """Creates an AST that loads variable name that may (or may not) have attribute chains. For example, "a.b.c" """ names = name.split(".") node = Name(id=names.pop(0), ctx=Load(), lineno=lineno, col_offset=col) for attr in names: node = Attribute( value=node, attr=attr, ctx=Load(), lineno=lineno, col_offset=col ) return node def xonsh_call(name, args, lineno=None, col=None): """Creates the AST node for calling a function of a given name. Functions names may contain attribute access, e.g. __xonsh__.env. """ return Call( func=load_attribute_chain(name, lineno=lineno, col=col), args=args, keywords=[], starargs=None, kwargs=None, lineno=lineno, col_offset=col, ) def isdescendable(node): """Determines whether or not a node is worth visiting. Currently only UnaryOp and BoolOp nodes are visited. """ return isinstance(node, (UnaryOp, BoolOp)) def isexpression(node, ctx=None, *args, **kwargs): """Determines whether a node (or code string) is an expression, and does not contain any statements. The execution context (ctx) and other args and kwargs are passed down to the parser, as needed. """ # parse string to AST if isinstance(node, str): node = node if node.endswith("\n") else node + "\n" ctx = XSH.ctx if ctx is None else ctx node = XSH.execer.parse(node, ctx, *args, **kwargs) # determine if expression-like enough if isinstance(node, (Expr, Expression)): isexpr = True elif isinstance(node, Module) and len(node.body) == 1: isexpr = isinstance(node.body[0], (Expr, Expression)) else: isexpr = False return isexpr class CtxAwareTransformer(NodeTransformer): """Transforms a xonsh AST based to use subprocess calls when the first name in an expression statement is not known in the context. This assumes that the expression statement is instead parseable as a subprocess. """ def __init__(self, parser): """Parameters ---------- parser : xonsh.Parser A parse instance to try to parse subprocess statements with. """ super(CtxAwareTransformer, self).__init__() self.parser = parser self.input = None self.contexts = [] self.lines = None self.mode = None self._nwith = 0 self.filename = "<xonsh-code>" self.debug_level = 0 def ctxvisit(self, node, inp, ctx, mode="exec", filename=None, debug_level=0): """Transforms the node in a context-dependent way. Parameters ---------- node : ast.AST A syntax tree to transform. inp : str The input code in string format. ctx : dict The root context to use. filename : str, optional File we are to transform. debug_level : int, optional Debugging level to use in lexing and parsing. Returns ------- node : ast.AST The transformed node. """ self.filename = self.filename if filename is None else filename self.debug_level = debug_level self.lines = inp.splitlines() self.contexts = [ctx, set()] self.mode = mode self._nwith = 0 node = self.visit(node) del self.lines, self.contexts, self.mode self._nwith = 0 return node def ctxupdate(self, iterable): """Updated the most recent context.""" self.contexts[-1].update(iterable) def ctxadd(self, value): """Adds a value the most recent context.""" self.contexts[-1].add(value) def ctxremove(self, value): """Removes a value the most recent context.""" for ctx in reversed(self.contexts): if value in ctx: ctx.remove(value) break def try_subproc_toks(self, node, strip_expr=False): """Tries to parse the line of the node as a subprocess.""" line, nlogical, idx = get_logical_line(self.lines, node.lineno - 1) if self.mode == "eval": mincol = len(line) - len(line.lstrip()) maxcol = None else: mincol = max(min_col(node) - 1, 0) maxcol = max_col(node) if mincol == maxcol: maxcol = find_next_break(line, mincol=mincol, lexer=self.parser.lexer) elif nlogical > 1: maxcol = None elif maxcol < len(line) and line[maxcol] == ";": pass else: maxcol += 1 spline = subproc_toks( line, mincol=mincol, maxcol=maxcol, returnline=False, lexer=self.parser.lexer, ) if spline is None or spline != "![{}]".format(line[mincol:maxcol].strip()): # failed to get something consistent, try greedy wrap spline = subproc_toks( line, mincol=mincol, maxcol=maxcol, returnline=False, lexer=self.parser.lexer, greedy=True, ) if spline is None: return node try: newnode = self.parser.parse( spline, mode=self.mode, filename=self.filename, debug_level=(self.debug_level >= 2), ) newnode = newnode.body if not isinstance(newnode, AST): # take the first (and only) Expr newnode = newnode[0] increment_lineno(newnode, n=node.lineno - 1) newnode.col_offset = node.col_offset if self.debug_level >= 1: msg = "{0}:{1}:{2}{3} - {4}\n" "{0}:{1}:{2}{3} + {5}" mstr = "" if maxcol is None else ":" + str(maxcol) msg = msg.format(self.filename, node.lineno, mincol, mstr, line, spline) print(msg, file=sys.stderr) except SyntaxError: newnode = node if strip_expr and isinstance(newnode, Expr): newnode = newnode.value return newnode def is_in_scope(self, node): """Determines whether or not the current node is in scope.""" names, store = gather_load_store_names(node) names -= store if not names: return True inscope = False for ctx in reversed(self.contexts): names -= ctx if not names: inscope = True break return inscope # # Replacement visitors # def visit_Expression(self, node): """Handle visiting an expression body.""" if isdescendable(node.body): node.body = self.visit(node.body) body = node.body inscope = self.is_in_scope(body) if not inscope: node.body = self.try_subproc_toks(body) return node def visit_Expr(self, node): """Handle visiting an expression.""" if isdescendable(node.value): node.value = self.visit(node.value) # this allows diving into BoolOps if self.is_in_scope(node) or isinstance(node.value, Lambda): return node else: newnode = self.try_subproc_toks(node) if not isinstance(newnode, Expr): newnode = Expr( value=newnode, lineno=node.lineno, col_offset=node.col_offset ) if hasattr(node, "max_lineno"): newnode.max_lineno = node.max_lineno newnode.max_col = node.max_col return newnode def visit_UnaryOp(self, node): """Handle visiting an unary operands, like not.""" if isdescendable(node.operand): node.operand = self.visit(node.operand) operand = node.operand inscope = self.is_in_scope(operand) if not inscope: node.operand = self.try_subproc_toks(operand, strip_expr=True) return node def visit_BoolOp(self, node): """Handle visiting an boolean operands, like and/or.""" for i in range(len(node.values)): val = node.values[i] if isdescendable(val): val = node.values[i] = self.visit(val) inscope = self.is_in_scope(val) if not inscope: node.values[i] = self.try_subproc_toks(val, strip_expr=True) return node # # Context aggregator visitors # def visit_Assign(self, node): """Handle visiting an assignment statement.""" ups = set() for targ in node.targets: if isinstance(targ, (Tuple, List)): ups.update(leftmostname(elt) for elt in targ.elts) elif isinstance(targ, BinOp): newnode = self.try_subproc_toks(node) if newnode is node: ups.add(leftmostname(targ)) else: return newnode else: ups.add(leftmostname(targ)) self.ctxupdate(ups) return node def visit_AnnAssign(self, node): """Handle visiting an annotated assignment statement.""" self.ctxadd(leftmostname(node.target)) return node def visit_Import(self, node): """Handle visiting a import statement.""" for name in node.names: if name.asname is None: self.ctxadd(name.name) else: self.ctxadd(name.asname) return node def visit_ImportFrom(self, node): """Handle visiting a "from ... import ..." statement.""" for name in node.names: if name.asname is None: self.ctxadd(name.name) else: self.ctxadd(name.asname) return node def visit_With(self, node): """Handle visiting a with statement.""" for item in node.items: if item.optional_vars is not None: self.ctxupdate(gather_names(item.optional_vars)) self._nwith += 1 self.generic_visit(node) self._nwith -= 1 return node def visit_For(self, node): """Handle visiting a for statement.""" targ = node.target self.ctxupdate(gather_names(targ)) self.generic_visit(node) return node def visit_FunctionDef(self, node): """Handle visiting a function definition.""" self.ctxadd(node.name) self.contexts.append(set()) args = node.args argchain = [args.args, args.kwonlyargs] if args.vararg is not None: argchain.append((args.vararg,)) if args.kwarg is not None: argchain.append((args.kwarg,)) self.ctxupdate(a.arg for a in itertools.chain.from_iterable(argchain)) self.generic_visit(node) self.contexts.pop() return node def visit_ClassDef(self, node): """Handle visiting a class definition.""" self.ctxadd(node.name) self.contexts.append(set()) self.generic_visit(node) self.contexts.pop() return node def visit_Delete(self, node): """Handle visiting a del statement.""" for targ in node.targets: if isinstance(targ, Name): self.ctxremove(targ.id) self.generic_visit(node) return node def visit_Try(self, node): """Handle visiting a try statement.""" for handler in node.handlers: if handler.name is not None: self.ctxadd(handler.name) self.generic_visit(node) return node def visit_Global(self, node): """Handle visiting a global statement.""" self.contexts[1].update(node.names) # contexts[1] is the global ctx self.generic_visit(node) return node def pdump(s, **kwargs): """performs a pretty dump of an AST node.""" if isinstance(s, AST): s = dump(s, **kwargs).replace(",", ",\n") openers = "([{" closers = ")]}" lens = len(s) + 1 if lens == 1: return s i = min([s.find(o) % lens for o in openers]) if i == lens - 1: return s closer = closers[openers.find(s[i])] j = s.rfind(closer) if j == -1 or j <= i: return s[: i + 1] + "\n" + textwrap.indent(pdump(s[i + 1 :]), " ") pre = s[: i + 1] + "\n" mid = s[i + 1 : j] post = "\n" + s[j:] mid = textwrap.indent(pdump(mid), " ") if "(" in post or "[" in post or "{" in post: post = pdump(post) return pre + mid + post def pprint_ast(s, *, sep=None, end=None, file=None, flush=False, **kwargs): """Performs a pretty print of the AST nodes.""" print(pdump(s, **kwargs), sep=sep, end=end, file=file, flush=flush) # # Private helpers # def _getblockattr(name, lineno, col): """calls getattr(name, '__xonsh_block__', False).""" return xonsh_call( "getattr", args=[ Name(id=name, ctx=Load(), lineno=lineno, col_offset=col), Str(s="__xonsh_block__", lineno=lineno, col_offset=col), NameConstant(value=False, lineno=lineno, col_offset=col), ], lineno=lineno, col=col, ) # # color_tools # """Tools for color handling in xonsh. This includes Convert values between RGB hex codes and xterm-256 color codes. Parts of this file were originally forked from Micah Elliott http://MicahElliott.com Copyright (C) 2011 Micah Elliott. All rights reserved. WTFPL http://sam.zoy.org/wtfpl/ """ # amalgamated re math = _LazyModule.load('math', 'math') # amalgamated xonsh.lazyasd # amalgamated xonsh.tools _NO_COLOR_WARNING_SHOWN = False RE_BACKGROUND = LazyObject( lambda: re.compile("(BG#|BGHEX|BACKGROUND)"), globals(), "RE_BACKGROUND" ) class COLORS: """constants""" RESET = "{RESET}" RED = "{RED}" GREEN = "{GREEN}" BOLD_RED = "{BOLD_RED}" BOLD_GREEN = "{BOLD_GREEN}" @lazyobject def KNOWN_XONSH_COLORS(): """These are the minimum number of colors that need to be implemented by any style. """ return frozenset( [ "DEFAULT", "BLACK", "RED", "GREEN", "YELLOW", "BLUE", "PURPLE", "CYAN", "WHITE", "INTENSE_BLACK", "INTENSE_RED", "INTENSE_GREEN", "INTENSE_YELLOW", "INTENSE_BLUE", "INTENSE_PURPLE", "INTENSE_CYAN", "INTENSE_WHITE", ] ) @lazyobject def BASE_XONSH_COLORS(): return { "BLACK": (0, 0, 0), "RED": (170, 0, 0), "GREEN": (0, 170, 0), "YELLOW": (170, 85, 0), "BLUE": (0, 0, 170), "PURPLE": (170, 0, 170), "CYAN": (0, 170, 170), "WHITE": (170, 170, 170), "INTENSE_BLACK": (85, 85, 85), "INTENSE_RED": (255, 85, 85), "INTENSE_GREEN": (85, 255, 85), "INTENSE_YELLOW": (255, 255, 85), "INTENSE_BLUE": (85, 85, 255), "INTENSE_PURPLE": (255, 85, 255), "INTENSE_CYAN": (85, 255, 255), "INTENSE_WHITE": (255, 255