Source code for xonsh.procs.posix

"""Interface for running subprocess-mode commands on posix systems."""

import array
import io
import os
import signal
import subprocess
import sys
import threading
import time

import xonsh.lazyasd as xl
import xonsh.lazyimps as xli
import xonsh.platform as xp
import xonsh.tools as xt
from xonsh.built_ins import XSH
from xonsh.procs.readers import (
    BufferedFDParallelReader,
    NonBlockingFDReader,
    safe_fdclose,
)

# The following escape codes are xterm codes.
# See http://rtfm.etla.org/xterm/ctlseq.html for more.
MODE_NUMS = ("1049", "47", "1047")


@xl.lazyobject
def START_ALTERNATE_MODE():
    return frozenset(f"\x1b[?{i}h".encode() for i in MODE_NUMS)


@xl.lazyobject
def END_ALTERNATE_MODE():
    return frozenset(f"\x1b[?{i}l".encode() for i in MODE_NUMS)


@xl.lazyobject
def ALTERNATE_MODE_FLAGS():
    return tuple(START_ALTERNATE_MODE) + tuple(END_ALTERNATE_MODE)


[docs] class PopenThread(threading.Thread): """A thread for running and managing subprocess. This allows reading from the stdin, stdout, and stderr streams in a non-blocking fashion. This takes the same arguments and keyword arguments as regular Popen. This requires that the captured_stdout and captured_stderr attributes to be set following instantiation. """ def __init__(self, *args, stdin=None, stdout=None, stderr=None, **kwargs): super().__init__() self.daemon = True self.lock = threading.RLock() env = XSH.env # stdin setup self.orig_stdin = stdin if stdin is None: self.stdin_fd = 0 elif isinstance(stdin, int): self.stdin_fd = stdin else: self.stdin_fd = stdin.fileno() self.store_stdin = env.get("XONSH_STORE_STDIN") self.timeout = env.get("XONSH_PROC_FREQUENCY") self.in_alt_mode = False self.stdin_mode = None self._tc_cc_vsusp = b"\x1a" # default is usually ^Z self._disable_suspend_keybind() # stdout setup self.orig_stdout = stdout self.stdout_fd = 1 if stdout is None else stdout.fileno() self._set_pty_size() # stderr setup self.orig_stderr = stderr # Set some signal handles, if we can. Must come before process # is started to prevent deadlock on windows self.proc = None # has to be here for closure for handles self.old_int_handler = self.old_winch_handler = None self.old_tstp_handler = self.old_quit_handler = None if xt.on_main_thread(): self.old_int_handler = signal.signal(signal.SIGINT, self._signal_int) if xp.ON_POSIX: self.old_tstp_handler = signal.signal(signal.SIGTSTP, self._signal_tstp) self.old_quit_handler = signal.signal(signal.SIGQUIT, self._signal_quit) if xp.CAN_RESIZE_WINDOW: self.old_winch_handler = signal.signal( signal.SIGWINCH, self._signal_winch ) # start up process if xp.ON_WINDOWS and stdout is not None: os.set_inheritable(stdout.fileno(), False) try: self.proc = proc = subprocess.Popen( *args, stdin=stdin, stdout=stdout, stderr=stderr, **kwargs ) except Exception: self._clean_up() raise self.pid = proc.pid self.universal_newlines = uninew = proc.universal_newlines if uninew: self.encoding = enc = env.get("XONSH_ENCODING") self.encoding_errors = err = env.get("XONSH_ENCODING_ERRORS") self.stdin = io.BytesIO() # stdin is always bytes! self.stdout = io.TextIOWrapper(io.BytesIO(), encoding=enc, errors=err) self.stderr = io.TextIOWrapper(io.BytesIO(), encoding=enc, errors=err) else: self.encoding = self.encoding_errors = None self.stdin = io.BytesIO() self.stdout = io.BytesIO() self.stderr = io.BytesIO() self.suspended = False self.prevs_are_closed = False # This is so the thread will use the same swapped values as the origin one. self.original_swapped_values = XSH.env.get_swapped_values() self.start()
[docs] def run(self): """Runs the subprocess by performing a parallel read on stdin if allowed, and copying bytes from captured_stdout to stdout and bytes from captured_stderr to stderr. """ # Set the thread-local swapped values. XSH.env.set_swapped_values(self.original_swapped_values) proc = self.proc spec = self._wait_and_getattr("spec") # get stdin and apply parallel reader if needed. stdin = self.stdin if self.orig_stdin is None: origin = None elif xp.ON_POSIX and self.store_stdin: origin = self.orig_stdin origfd = origin if isinstance(origin, int) else origin.fileno() origin = BufferedFDParallelReader(origfd, buffer=stdin) else: origin = None # get non-blocking stdout stdout = self.stdout.buffer if self.universal_newlines else self.stdout capout = spec.captured_stdout if capout is None: procout = None else: procout = NonBlockingFDReader(capout.fileno(), timeout=self.timeout) # get non-blocking stderr stderr = self.stderr.buffer if self.universal_newlines else self.stderr caperr = spec.captured_stderr if caperr is None: procerr = None else: procerr = NonBlockingFDReader(caperr.fileno(), timeout=self.timeout) # initial read from buffer self._read_write(procout, stdout, sys.__stdout__) self._read_write(procerr, stderr, sys.__stderr__) # loop over reads while process is running. i = j = cnt = 1 while proc.poll() is None: # this is here for CPU performance reasons. if i + j == 0: cnt = min(cnt + 1, 1000) tout = self.timeout * cnt if procout is not None: procout.timeout = tout if procerr is not None: procerr.timeout = tout elif cnt == 1: pass else: cnt = 1 if procout is not None: procout.timeout = self.timeout if procerr is not None: procerr.timeout = self.timeout # redirect some output! i = self._read_write(procout, stdout, sys.__stdout__) j = self._read_write(procerr, stderr, sys.__stderr__) if self.suspended: break if self.suspended: return # close files to send EOF to non-blocking reader. # capout & caperr seem to be needed only by Windows, while # orig_stdout & orig_stderr are need by posix and Windows. # Also, order seems to matter here, # with orig_* needed to be closed before cap* safe_fdclose(self.orig_stdout) safe_fdclose(self.orig_stderr) if xp.ON_WINDOWS: safe_fdclose(capout) safe_fdclose(caperr) # read in the remaining data in a blocking fashion. while (procout is not None and not procout.is_fully_read()) or ( procerr is not None and not procerr.is_fully_read() ): self._read_write(procout, stdout, sys.__stdout__) self._read_write(procerr, stderr, sys.__stderr__) # kill the process if it is still alive. Happens when piping. if proc.poll() is None: proc.terminate()
def _wait_and_getattr(self, name): """make sure the instance has a certain attr, and return it.""" while not hasattr(self, name): time.sleep(1e-7) return getattr(self, name) def _read_write(self, reader, writer, stdbuf): """Reads a chunk of bytes from a buffer and write into memory or back down to the standard buffer, as appropriate. Returns the number of successful reads. """ if reader is None: return 0 i = -1 for i, chunk in enumerate(iter(reader.read_queue, b"")): # noqa self._alt_mode_switch(chunk, writer, stdbuf) if i >= 0: writer.flush() stdbuf.flush() return i + 1 def _alt_mode_switch(self, chunk, membuf, stdbuf): """Enables recursively switching between normal capturing mode and 'alt' mode, which passes through values to the standard buffer. Pagers, text editors, curses applications, etc. use alternate mode. """ i, flag = xt.findfirst(chunk, ALTERNATE_MODE_FLAGS) if flag is None: self._alt_mode_writer(chunk, membuf, stdbuf) else: # This code is executed when the child process switches the # terminal into or out of alternate mode. The line below assumes # that the user has opened vim, less, or similar, and writes writes # to stdin. j = i + len(flag) # write the first part of the chunk in the current mode. self._alt_mode_writer(chunk[:i], membuf, stdbuf) # switch modes # write the flag itself the current mode where alt mode is on # so that it is streamed to the terminal ASAP. # this is needed for terminal emulators to find the correct # positions before and after alt mode. alt_mode = flag in START_ALTERNATE_MODE if alt_mode: self.in_alt_mode = alt_mode self._alt_mode_writer(flag, membuf, stdbuf) self._enable_cbreak_stdin() else: self._alt_mode_writer(flag, membuf, stdbuf) self.in_alt_mode = alt_mode self._disable_cbreak_stdin() # recurse this function, but without the current flag. self._alt_mode_switch(chunk[j:], membuf, stdbuf) def _alt_mode_writer(self, chunk, membuf, stdbuf): """Write bytes to the standard buffer if in alt mode or otherwise to the in-memory buffer. """ if not chunk: pass # don't write empty values elif self.in_alt_mode: stdbuf.buffer.write(chunk) else: with self.lock: p = membuf.tell() membuf.seek(0, io.SEEK_END) membuf.write(chunk) membuf.seek(p) # # Window resize handlers # def _signal_winch(self, signum, frame): """Signal handler for SIGWINCH - window size has changed.""" self.send_signal(signal.SIGWINCH) self._set_pty_size() def _set_pty_size(self): """Sets the window size of the child pty based on the window size of our own controlling terminal. """ if xp.ON_WINDOWS or not os.isatty(self.stdout_fd): return # Get the terminal size of the real terminal, set it on the # pseudoterminal. buf = array.array("h", [0, 0, 0, 0]) # 1 = stdout here try: xli.fcntl.ioctl(1, xli.termios.TIOCGWINSZ, buf, True) xli.fcntl.ioctl(self.stdout_fd, xli.termios.TIOCSWINSZ, buf) except OSError: pass # # SIGINT handler # def _signal_int(self, signum, frame): """Signal handler for SIGINT - Ctrl+C may have been pressed.""" self.send_signal(signal.CTRL_C_EVENT if xp.ON_WINDOWS else signum) if self.proc is not None and self.proc.poll() is not None: self._restore_sigint(frame=frame) if xt.on_main_thread() and not xp.ON_WINDOWS: signal.pthread_kill(threading.get_ident(), signal.SIGINT) def _restore_sigint(self, frame=None): old = self.old_int_handler if old is not None: if xt.on_main_thread(): signal.signal(signal.SIGINT, old) self.old_int_handler = None if frame is not None: self._disable_cbreak_stdin() if old is not None and old is not self._signal_int: old(signal.SIGINT, frame) # # SIGTSTP handler # def _signal_tstp(self, signum, frame): """Signal handler for suspending SIGTSTP - Ctrl+Z may have been pressed.""" self.suspended = True self.send_signal(signum) self._restore_sigtstp(frame=frame) def _restore_sigtstp(self, frame=None): old = self.old_tstp_handler if old is not None: if xt.on_main_thread(): signal.signal(signal.SIGTSTP, old) self.old_tstp_handler = None if frame is not None: self._disable_cbreak_stdin() self._restore_suspend_keybind() def _disable_suspend_keybind(self): if xp.ON_WINDOWS: return try: mode = xli.termios.tcgetattr(0) # only makes sense for stdin self._tc_cc_vsusp = mode[xp.CC][xli.termios.VSUSP] mode[xp.CC][xli.termios.VSUSP] = b"\x00" # set ^Z (ie SIGSTOP) to undefined xli.termios.tcsetattr(0, xli.termios.TCSANOW, mode) except xli.termios.error: return def _restore_suspend_keybind(self): if xp.ON_WINDOWS: return try: mode = xli.termios.tcgetattr(0) # only makes sense for stdin mode[xp.CC][xli.termios.VSUSP] = ( self._tc_cc_vsusp ) # set ^Z (ie SIGSTOP) to original # this usually doesn't work in interactive mode, # but we should try it anyway. xli.termios.tcsetattr(0, xli.termios.TCSANOW, mode) except xli.termios.error: pass # # SIGQUIT handler # def _signal_quit(self, signum, frame): r"""Signal handler for quiting SIGQUIT - Ctrl+\ may have been pressed.""" self.send_signal(signum) self._restore_sigquit(frame=frame) def _restore_sigquit(self, frame=None): old = self.old_quit_handler if old is not None: if xt.on_main_thread(): signal.signal(signal.SIGQUIT, old) self.old_quit_handler = None if frame is not None: self._disable_cbreak_stdin() # # cbreak mode handlers # def _enable_cbreak_stdin(self): if not xp.ON_POSIX: return try: self.stdin_mode = xli.termios.tcgetattr(self.stdin_fd)[:] except xli.termios.error: # this can happen for cases where another process is controlling # xonsh's tty device, such as in testing. self.stdin_mode = None return new = self.stdin_mode[:] new[xp.LFLAG] &= ~(xli.termios.ECHO | xli.termios.ICANON) new[xp.CC][xli.termios.VMIN] = 1 new[xp.CC][xli.termios.VTIME] = 0 try: # termios.TCSAFLUSH may be less reliable than termios.TCSANOW xli.termios.tcsetattr(self.stdin_fd, xli.termios.TCSANOW, new) except xli.termios.error: self._disable_cbreak_stdin() def _disable_cbreak_stdin(self): if not xp.ON_POSIX or self.stdin_mode is None: return new = self.stdin_mode[:] new[xp.LFLAG] |= xli.termios.ECHO | xli.termios.ICANON new[xp.CC][xli.termios.VMIN] = 1 new[xp.CC][xli.termios.VTIME] = 0 try: xli.termios.tcsetattr(self.stdin_fd, xli.termios.TCSANOW, new) except xli.termios.error: pass # # Dispatch methods #
[docs] def poll(self): """Dispatches to Popen.returncode.""" return self.proc.returncode
[docs] def wait(self, timeout=None): """Dispatches to Popen.wait(), but also does process cleanup such as joining this thread and replacing the original window size signal handler. """ self._disable_cbreak_stdin() rtn = self.proc.wait(timeout=timeout) self.join() # need to replace the old signal handlers somewhere... if self.old_winch_handler is not None and xt.on_main_thread(): signal.signal(signal.SIGWINCH, self.old_winch_handler) self.old_winch_handler = None self._clean_up() return rtn
def _clean_up(self): self._restore_sigint() self._restore_sigtstp() self._restore_sigquit() @property def returncode(self): """Process return code.""" return self.proc.returncode @returncode.setter def returncode(self, value): """Process return code.""" self.proc.returncode = value @property def signal(self): """Process signal, or None.""" s = getattr(self.proc, "signal", None) if s is None: rtn = self.returncode if rtn is not None and rtn != 0: s = (-1 * rtn, rtn < 0 if xp.ON_WINDOWS else os.WCOREDUMP(rtn)) return s @signal.setter def signal(self, value): """Process signal, or None.""" self.proc.signal = value
[docs] def send_signal(self, signal): """Dispatches to Popen.send_signal().""" dt = 0.0 while self.proc is None and dt < self.timeout: time.sleep(1e-7) dt += 1e-7 if self.proc is None: return try: rtn = self.proc.send_signal(signal) except ProcessLookupError: # This can happen in the case of !(cmd) when the command has ended rtn = None return rtn
[docs] def terminate(self): """Dispatches to Popen.terminate().""" return self.proc.terminate()
[docs] def kill(self): """Dispatches to Popen.kill().""" return self.proc.kill()