Source code for xontrib.distributed

"""Hooks for the distributed parallel computing library."""
from xonsh.contexts import Functor
from xonsh.built_ins import XSH

__all__ = ["DSubmitter", "dsubmit"]

def dworker(args, stdin=None):
    """Programmatic access to the dworker utility, to allow launching
    workers that also have access to xonsh builtins.
    from distributed.cli import dworker

    dworker.main.main(args=args, prog_name="dworker", standalone_mode=False)

XSH.aliases["dworker"] = dworker

[docs]class DSubmitter(Functor): """Context manager for submitting distributed jobs.""" def __init__(self, executor, **kwargs): """ Parameters ---------- executor : distributed.Executor The executor to submit to. kwargs : optional All other kwargs are passed up to superclasses init. """ super().__init__(**kwargs) self.executor = executor self.future = None def __enter__(self): super().__enter__() self.future = None return self def __exit__(self, exc_type, exc_value, traceback): res = super().__exit__(exc_type, exc_value, traceback) if not res: return res self.future = self.executor.submit(self.func) return res
[docs]def dsubmit(*a, args=(), kwargs=None, rtn="", **kw): """Returns a distributed submission context manager, DSubmitter(), with a new executor instance. Parameters ---------- args : Sequence of str, optional A tuple of argument names for DSubmitter. kwargs : Mapping of str to values or list of item tuples, optional Keyword argument names and values for DSubmitter. rtn : str, optional Name of object to return for DSubmitter. a, kw : Sequence and Mapping All other arguments and keyword arguments are used to construct the executor instance. Returns ------- dsub : DSubmitter An instance of the DSubmitter context manager. """ from distributed import Executor e = Executor(*a, **kw) dsub = DSubmitter(e, args=args, kwargs=kwargs, rtn=rtn) return dsub