"""Context management tools for xonsh."""importsysimporttextwrapfromcollections.abcimportMappingfromxonsh.built_insimportXSH
[docs]classBlock:"""This is a context manager for obtaining a block of lines without actually executing the block. The lines are accessible as the 'lines' attribute. This must be used as a macro. """__xonsh_block__=strdef__init__(self):""" Attributes ---------- lines : list of str or None Block lines as if split by str.splitlines(), if available. glbs : Mapping or None Global execution context, ie globals(). locs : Mapping or None Local execution context, ie locals(). """self.lines=self.glbs=self.locs=Nonedef__enter__(self):ifnothasattr(self,"macro_block"):raiseXSH.builtins.XonshError(self.__class__.__name__+" must be entered as a macro!")self.lines=self.macro_block.splitlines()self.glbs=self.macro_globalsifself.macro_localsisnotself.macro_globals:# leave locals as None when it is the same as globalsself.locs=self.macro_localsreturnselfdef__exit__(self,exc_type,exc_value,traceback):pass
[docs]classFunctor(Block):"""This is a context manager that turns the block into a callable object, bound to the execution context it was created in. """def__init__(self,args=(),kwargs=None,rtn=""):""" Parameters ---------- args : Sequence of str, optional A tuple of argument names for the functor. kwargs : Mapping of str to values or list of item tuples, optional Keyword argument names and values, if available. rtn : str, optional Name of object to return, if available. Attributes ---------- func : function The underlying function object. This defaults to none and is set after the the block is exited. """super().__init__()self.func=Noneself.args=argsifkwargsisNone:self.kwargs=[]elifisinstance(kwargs,Mapping):self.kwargs=sorted(kwargs.items())else:self.kwargs=kwargsself.rtn=rtndef__enter__(self):super().__enter__()body=textwrap.indent(self.macro_block," ")uid=hash(body)+sys.maxsize# should always be a positive intname=f"__xonsh_functor_{uid}__"# construct signature stringsig=rtn=""sig=", ".join(self.args)kwstr=", ".join([k+"=None"fork,_inself.kwargs])iflen(kwstr)>0:sig=kwstriflen(sig)==0elsesig+", "+kwstr# construct return stringrtn=str(self.rtn)iflen(rtn)>0:rtn=" return "+rtn+"\n"# construct function stringfstr="def {name}({sig}):\n{body}\n{rtn}"fstr=fstr.format(name=name,sig=sig,body=body,rtn=rtn)glbs=self.glbslocs=self.locsexecer=XSH.execerexecer.exec(fstr,glbs=glbs,locs=locs)iflocsisnotNoneandnameinlocs:func=locs[name]elifnameinglbs:func=glbs[name]else:raiseValueError("Functor block could not be found in context.")iflen(self.kwargs)>0:func.__defaults__=tuple(vfor_,vinself.kwargs)self.func=funcreturnselfdef__exit__(self,exc_type,exc_value,traceback):passdef__call__(self,*args,**kwargs):"""Dispatches to func."""ifself.funcisNone:msg="{} block with 'None' func not callable"raiseAttributeError(msg.formst(self.__class__.__name__))returnself.func(*args,**kwargs)