Source code for xonsh.shells.ptk_shell.updator

"""Has classes that help updating Prompt sections using Threads."""

import concurrent.futures
import threading
import typing as tp

from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import PygmentsTokens

from xonsh.built_ins import XSH
from xonsh.prompt.base import ParsedTokens
from xonsh.style_tools import partial_color_tokenize, style_as_faded


[docs] class Executor: """Caches thread results across prompts.""" def __init__(self): self.thread_pool = concurrent.futures.ThreadPoolExecutor( max_workers=XSH.env["ASYNC_PROMPT_THREAD_WORKERS"] ) # the attribute, .cache is cleared between calls. # This caches results from callback alone by field name. self.thread_results = {}
[docs] def submit(self, func: tp.Callable, field: str): future = self.thread_pool.submit(self._run_func, func, field) place_holder = "{" + field + "}" return ( future, ( self.thread_results[field] if field in self.thread_results else place_holder ), place_holder, )
def _run_func(self, func, field): """Run the callback and store the result.""" result = func() self.thread_results[field] = ( result if result is None else style_as_faded(result) ) return result
[docs] class AsyncPrompt: """Represent an asynchronous prompt.""" def __init__(self, name: str, session: PromptSession, executor: Executor): """ Parameters ---------- name: str what prompt to update. One of ['message', 'rprompt', 'bottom_toolbar'] session: PromptSession current ptk session """ self.name = name # list of tokens in that prompt. It could either be resolved or not resolved. self.tokens: tp.Optional[ParsedTokens] = None self.timer = None self.session = session self.executor = executor # (Key: the future object) that is created for the (value: index/field_name) in the tokens list self.futures: dict[ concurrent.futures.Future, tuple[str, tp.Optional[int], tp.Optional[str], tp.Optional[str]], ] = {}
[docs] def start_update(self, on_complete): """Listen on futures and update the prompt as each one completed. Timer is used to avoid clogging multiple calls at the same time. Parameters ----------- on_complete: callback to notify after all the futures are completed """ if not self.tokens: print(f"Warn: AsyncPrompt is created without tokens - {self.name}") return for fut in concurrent.futures.as_completed(self.futures): try: val = fut.result() except concurrent.futures.CancelledError: continue if fut not in self.futures: # rare case where the future is completed but the container is already cleared # because new prompt is called continue placeholder, idx, spec, conv = self.futures[fut] # example: placeholder="{field}", idx=10, spec="env: {}" if isinstance(idx, int): self.tokens.update(idx, val, spec, conv) else: # when the function is called outside shell. for idx, ptok in enumerate(self.tokens.tokens): if placeholder in ptok.value: val = ptok.value.replace(placeholder, val) self.tokens.update(idx, val, spec, conv) # calling invalidate in less period is inefficient self.invalidate() on_complete(self.name)
[docs] def invalidate(self): """Create a timer to update the prompt. The timing can be configured through env variables. threading.Timer is used to stop calling invalidate frequently. """ from xonsh.shells.ptk_shell import tokenize_ansi if self.timer: self.timer.cancel() def _invalidate(): new_prompt = self.tokens.process() formatted_tokens = tokenize_ansi( PygmentsTokens(partial_color_tokenize(new_prompt)) ) setattr(self.session, self.name, formatted_tokens) self.session.app.invalidate() self.timer = threading.Timer(XSH.env["ASYNC_INVALIDATE_INTERVAL"], _invalidate) self.timer.start()
[docs] def stop(self): """Stop any running threads""" for fut in self.futures: fut.cancel() self.futures.clear()
[docs] def submit_section( self, func: tp.Callable, field: str, idx: tp.Optional[int] = None, spec: tp.Optional[str] = None, conv=None, ): future, intermediate_value, placeholder = self.executor.submit(func, field) self.futures[future] = (placeholder, idx, spec, conv) return intermediate_value
[docs] class PromptUpdator: """Handle updating multiple AsyncPrompt instances prompt/rprompt/bottom_toolbar""" def __init__(self, shell): from xonsh.shells.ptk_shell import PromptToolkitShell self.prompts: dict[str, AsyncPrompt] = {} self.shell: PromptToolkitShell = shell self.executor = Executor() self.futures = {} self.attrs_loaded = None
[docs] def add(self, prompt_name: tp.Optional[str]) -> tp.Optional[AsyncPrompt]: # clear out old futures from the same prompt if prompt_name is None: return None self.stop(prompt_name) self.prompts[prompt_name] = AsyncPrompt( prompt_name, self.shell.prompter, self.executor ) return self.prompts[prompt_name]
[docs] def add_attrs(self): for attr, val in self.shell.get_lazy_ptk_kwargs(): setattr(self.shell.prompter, attr, val) self.shell.prompter.app.invalidate()
[docs] def start(self): """after ptk prompt is created, update it in background.""" if not self.attrs_loaded: self.attrs_loaded = self.executor.thread_pool.submit(self.add_attrs) prompts = list(self.prompts) # removal safe for pt_name in prompts: if pt_name not in self.prompts: continue prompt = self.prompts[pt_name] future = self.executor.thread_pool.submit( prompt.start_update, self.on_complete ) self.futures[pt_name] = future
[docs] def stop(self, prompt_name: str): if prompt_name in self.prompts: self.prompts[prompt_name].stop() if prompt_name in self.futures: self.futures[prompt_name].cancel()
[docs] def on_complete(self, prompt_name): self.prompts.pop(prompt_name, None) self.futures.pop(prompt_name, None)