"""Interface for running subprocess-mode commands on posix systems."""
import array
import io
import os
import signal
import subprocess
import sys
import threading
import time
import xonsh.lib.lazyasd as xl
import xonsh.lib.lazyimps as xli
import xonsh.platform as xp
import xonsh.tools as xt
from xonsh.built_ins import XSH
from xonsh.procs.jobs import proc_untraced_waitpid
from xonsh.procs.readers import (
BufferedFDParallelReader,
NonBlockingFDReader,
safe_fdclose,
)
# The following escape codes are xterm codes.
# See http://rtfm.etla.org/xterm/ctlseq.html for more.
MODE_NUMS = ("1049", "47", "1047")
@xl.lazyobject
def START_ALTERNATE_MODE():
return frozenset(f"\x1b[?{i}h".encode() for i in MODE_NUMS)
@xl.lazyobject
def END_ALTERNATE_MODE():
return frozenset(f"\x1b[?{i}l".encode() for i in MODE_NUMS)
@xl.lazyobject
def ALTERNATE_MODE_FLAGS():
return tuple(START_ALTERNATE_MODE) + tuple(END_ALTERNATE_MODE)
[docs]
class PopenThread(threading.Thread):
"""A thread for running and managing subprocess. This allows reading
from the stdin, stdout, and stderr streams in a non-blocking fashion.
This takes the same arguments and keyword arguments as regular Popen.
This requires that the captured_stdout and captured_stderr attributes
to be set following instantiation.
"""
def __init__(self, *args, stdin=None, stdout=None, stderr=None, **kwargs):
super().__init__()
self.daemon = True
self.lock = threading.RLock()
env = XSH.env
# stdin setup
self.orig_stdin = stdin
if stdin is None:
self.stdin_fd = 0
elif isinstance(stdin, int):
self.stdin_fd = stdin
else:
self.stdin_fd = stdin.fileno()
self.store_stdin = env.get("XONSH_STORE_STDIN")
self.timeout = env.get("XONSH_PROC_FREQUENCY")
self.in_alt_mode = False
self.stdin_mode = None
self._tc_cc_vsusp = b"\x1a" # default is usually ^Z
self._disable_suspend_keybind()
# stdout setup
self.orig_stdout = stdout
self.stdout_fd = 1 if stdout is None else stdout.fileno()
self._set_pty_size()
# stderr setup
self.orig_stderr = stderr
# Set some signal handles, if we can. Must come before process
# is started to prevent deadlock on windows
self.proc = None # has to be here for closure for handles
self.old_int_handler = self.old_winch_handler = None
self.old_tstp_handler = self.old_quit_handler = None
if xt.on_main_thread():
self.old_int_handler = signal.signal(signal.SIGINT, self._signal_int)
if xp.ON_POSIX:
self.old_tstp_handler = signal.signal(signal.SIGTSTP, self._signal_tstp)
self.old_quit_handler = signal.signal(signal.SIGQUIT, self._signal_quit)
if xp.CAN_RESIZE_WINDOW:
self.old_winch_handler = signal.signal(
signal.SIGWINCH, self._signal_winch
)
# start up process
if xp.ON_WINDOWS and stdout is not None:
os.set_inheritable(stdout.fileno(), False)
try:
self.proc = proc = subprocess.Popen(
*args, stdin=stdin, stdout=stdout, stderr=stderr, **kwargs
)
except Exception:
self._clean_up()
raise
self.pid = proc.pid
self.name = repr(
{
"cls": self.__class__.__name__,
"name": self.name,
"cmd": args,
"pid": self.pid,
}
)
self.universal_newlines = uninew = proc.universal_newlines
if uninew:
self.encoding = enc = env.get("XONSH_ENCODING")
self.encoding_errors = err = env.get("XONSH_ENCODING_ERRORS")
self.stdin = io.BytesIO() # stdin is always bytes!
self.stdout = io.TextIOWrapper(io.BytesIO(), encoding=enc, errors=err)
self.stderr = io.TextIOWrapper(io.BytesIO(), encoding=enc, errors=err)
else:
self.encoding = self.encoding_errors = None
self.stdin = io.BytesIO()
self.stdout = io.BytesIO()
self.stderr = io.BytesIO()
self.suspended = False
self.prevs_are_closed = False
# This is so the thread will use the same swapped values as the origin one.
self.original_swapped_values = XSH.env.get_swapped_values()
self.start()
[docs]
def run(self):
"""Runs the subprocess by performing a parallel read on stdin if allowed,
and copying bytes from captured_stdout to stdout and bytes from
captured_stderr to stderr.
"""
# Set the thread-local swapped values.
XSH.env.set_swapped_values(self.original_swapped_values)
proc = self.proc
spec = self._wait_and_getattr("spec")
# get stdin and apply parallel reader if needed.
stdin = self.stdin
if self.orig_stdin is None:
origin = None
elif xp.ON_POSIX and self.store_stdin:
origin = self.orig_stdin
origfd = origin if isinstance(origin, int) else origin.fileno()
origin = BufferedFDParallelReader(origfd, buffer=stdin)
else:
origin = None
# get non-blocking stdout
stdout = self.stdout.buffer if self.universal_newlines else self.stdout
capout = spec.captured_stdout
if capout is None:
procout = None
else:
procout = NonBlockingFDReader(capout.fileno(), timeout=self.timeout)
# get non-blocking stderr
stderr = self.stderr.buffer if self.universal_newlines else self.stderr
caperr = spec.captured_stderr
if caperr is None:
procerr = None
else:
procerr = NonBlockingFDReader(caperr.fileno(), timeout=self.timeout)
# initial read from buffer
self._read_write(procout, stdout, sys.__stdout__)
self._read_write(procerr, stderr, sys.__stderr__)
# loop over reads while process is running.
i = j = cnt = 1
while proc.poll() is None:
info = proc_untraced_waitpid(proc, hang=False)
if getattr(proc, "suspended", False):
self.suspended = True
if XSH.env.get("XONSH_DEBUG", False):
procname = f"{getattr(proc, 'args', '')} {proc.pid}".strip()
print(
f"Process {procname} suspended with signal {info['signal_name']}.",
file=sys.stderr,
)
# this is here for CPU performance reasons.
if i + j == 0:
cnt = min(cnt + 1, 1000)
tout = self.timeout * cnt
if procout is not None:
procout.timeout = tout
if procerr is not None:
procerr.timeout = tout
elif cnt == 1:
pass
else:
cnt = 1
if procout is not None:
procout.timeout = self.timeout
if procerr is not None:
procerr.timeout = self.timeout
# redirect some output!
i = self._read_write(procout, stdout, sys.__stdout__)
j = self._read_write(procerr, stderr, sys.__stderr__)
if self.suspended:
break
if self.suspended:
return
# close files to send EOF to non-blocking reader.
# capout & caperr seem to be needed only by Windows, while
# orig_stdout & orig_stderr are need by posix and Windows.
# Also, order seems to matter here,
# with orig_* needed to be closed before cap*
safe_fdclose(self.orig_stdout)
safe_fdclose(self.orig_stderr)
if xp.ON_WINDOWS:
safe_fdclose(capout)
safe_fdclose(caperr)
# read in the remaining data in a blocking fashion.
while (procout is not None and not procout.is_fully_read()) or (
procerr is not None and not procerr.is_fully_read()
):
self._read_write(procout, stdout, sys.__stdout__)
self._read_write(procerr, stderr, sys.__stderr__)
# kill the process if it is still alive. Happens when piping.
if proc.poll() is None:
proc.terminate()
def _wait_and_getattr(self, name):
"""make sure the instance has a certain attr, and return it."""
while not hasattr(self, name):
time.sleep(1e-7)
return getattr(self, name)
def _read_write(self, reader, writer, stdbuf):
"""Reads a chunk of bytes from a buffer and write into memory or back
down to the standard buffer, as appropriate. Returns the number of
successful reads.
"""
if reader is None:
return 0
i = -1
for i, chunk in enumerate(iter(reader.read_queue, b"")): # noqa
self._alt_mode_switch(chunk, writer, stdbuf)
if i >= 0:
writer.flush()
stdbuf.flush()
return i + 1
def _alt_mode_switch(self, chunk, membuf, stdbuf):
"""Enables recursively switching between normal capturing mode
and 'alt' mode, which passes through values to the standard
buffer. Pagers, text editors, curses applications, etc. use
alternate mode.
"""
i, flag = xt.findfirst(chunk, ALTERNATE_MODE_FLAGS)
if flag is None:
self._alt_mode_writer(chunk, membuf, stdbuf)
else:
# This code is executed when the child process switches the
# terminal into or out of alternate mode. The line below assumes
# that the user has opened vim, less, or similar, and writes writes
# to stdin.
j = i + len(flag)
# write the first part of the chunk in the current mode.
self._alt_mode_writer(chunk[:i], membuf, stdbuf)
# switch modes
# write the flag itself the current mode where alt mode is on
# so that it is streamed to the terminal ASAP.
# this is needed for terminal emulators to find the correct
# positions before and after alt mode.
alt_mode = flag in START_ALTERNATE_MODE
if alt_mode:
self.in_alt_mode = alt_mode
self._alt_mode_writer(flag, membuf, stdbuf)
self._enable_cbreak_stdin()
else:
self._alt_mode_writer(flag, membuf, stdbuf)
self.in_alt_mode = alt_mode
self._disable_cbreak_stdin()
# recurse this function, but without the current flag.
self._alt_mode_switch(chunk[j:], membuf, stdbuf)
def _alt_mode_writer(self, chunk, membuf, stdbuf):
"""Write bytes to the standard buffer if in alt mode or otherwise
to the in-memory buffer.
"""
if not chunk:
pass # don't write empty values
elif self.in_alt_mode:
stdbuf.buffer.write(chunk)
else:
with self.lock:
p = membuf.tell()
membuf.seek(0, io.SEEK_END)
membuf.write(chunk)
membuf.seek(p)
#
# Window resize handlers
#
def _signal_winch(self, signum, frame):
"""Signal handler for SIGWINCH - window size has changed."""
self.send_signal(signal.SIGWINCH)
self._set_pty_size()
def _set_pty_size(self):
"""Sets the window size of the child pty based on the window size of
our own controlling terminal.
"""
if xp.ON_WINDOWS or not os.isatty(self.stdout_fd):
return
# Get the terminal size of the real terminal, set it on the
# pseudoterminal.
buf = array.array("h", [0, 0, 0, 0])
# 1 = stdout here
try:
xli.fcntl.ioctl(1, xli.termios.TIOCGWINSZ, buf, True)
xli.fcntl.ioctl(self.stdout_fd, xli.termios.TIOCSWINSZ, buf)
except OSError:
pass
#
# SIGINT handler
#
def _signal_int(self, signum, frame):
"""Signal handler for SIGINT - Ctrl+C may have been pressed."""
self.send_signal(signal.CTRL_C_EVENT if xp.ON_WINDOWS else signum)
if self.proc is not None and self.proc.poll() is not None:
self._restore_sigint(frame=frame)
if xt.on_main_thread() and not xp.ON_WINDOWS:
signal.pthread_kill(threading.get_ident(), signal.SIGINT)
def _restore_sigint(self, frame=None):
old = self.old_int_handler
if old is not None:
if xt.on_main_thread():
signal.signal(signal.SIGINT, old)
self.old_int_handler = None
if frame is not None:
self._disable_cbreak_stdin()
if old is not None and old is not self._signal_int:
old(signal.SIGINT, frame)
#
# SIGTSTP handler
#
def _signal_tstp(self, signum, frame):
"""Signal handler for suspending SIGTSTP - Ctrl+Z may have been pressed."""
self.suspended = True
self.send_signal(signum)
self._restore_sigtstp(frame=frame)
def _restore_sigtstp(self, frame=None):
old = self.old_tstp_handler
if old is not None:
if xt.on_main_thread():
signal.signal(signal.SIGTSTP, old)
self.old_tstp_handler = None
if frame is not None:
self._disable_cbreak_stdin()
self._restore_suspend_keybind()
def _disable_suspend_keybind(self):
if xp.ON_WINDOWS:
return
try:
mode = xli.termios.tcgetattr(0) # only makes sense for stdin
self._tc_cc_vsusp = mode[xp.CC][xli.termios.VSUSP]
mode[xp.CC][xli.termios.VSUSP] = b"\x00" # set ^Z (ie SIGSTOP) to undefined
xli.termios.tcsetattr(0, xli.termios.TCSANOW, mode)
except xli.termios.error:
return
def _restore_suspend_keybind(self):
if xp.ON_WINDOWS:
return
try:
mode = xli.termios.tcgetattr(0) # only makes sense for stdin
mode[xp.CC][xli.termios.VSUSP] = (
self._tc_cc_vsusp
) # set ^Z (ie SIGSTOP) to original
# this usually doesn't work in interactive mode,
# but we should try it anyway.
xli.termios.tcsetattr(0, xli.termios.TCSANOW, mode)
except xli.termios.error:
pass
#
# SIGQUIT handler
#
def _signal_quit(self, signum, frame):
r"""Signal handler for quiting SIGQUIT - Ctrl+\ may have been pressed."""
self.send_signal(signum)
self._restore_sigquit(frame=frame)
def _restore_sigquit(self, frame=None):
old = self.old_quit_handler
if old is not None:
if xt.on_main_thread():
signal.signal(signal.SIGQUIT, old)
self.old_quit_handler = None
if frame is not None:
self._disable_cbreak_stdin()
#
# cbreak mode handlers
#
def _enable_cbreak_stdin(self):
if not xp.ON_POSIX:
return
try:
self.stdin_mode = xli.termios.tcgetattr(self.stdin_fd)[:]
except xli.termios.error:
# this can happen for cases where another process is controlling
# xonsh's tty device, such as in testing.
self.stdin_mode = None
return
new = self.stdin_mode[:]
new[xp.LFLAG] &= ~(xli.termios.ECHO | xli.termios.ICANON)
new[xp.CC][xli.termios.VMIN] = 1
new[xp.CC][xli.termios.VTIME] = 0
try:
# termios.TCSAFLUSH may be less reliable than termios.TCSANOW
xli.termios.tcsetattr(self.stdin_fd, xli.termios.TCSANOW, new)
except xli.termios.error:
self._disable_cbreak_stdin()
def _disable_cbreak_stdin(self):
if not xp.ON_POSIX or self.stdin_mode is None:
return
new = self.stdin_mode[:]
new[xp.LFLAG] |= xli.termios.ECHO | xli.termios.ICANON
new[xp.CC][xli.termios.VMIN] = 1
new[xp.CC][xli.termios.VTIME] = 0
try:
xli.termios.tcsetattr(self.stdin_fd, xli.termios.TCSANOW, new)
except xli.termios.error:
pass
#
# Dispatch methods
#
[docs]
def poll(self):
"""Dispatches to Popen.returncode."""
return self.proc.returncode
[docs]
def wait(self, timeout=None):
"""Dispatches to Popen.wait(), but also does process cleanup such as
joining this thread and replacing the original window size signal
handler.
"""
self._disable_cbreak_stdin()
rtn = self.proc.wait(timeout=timeout)
self.join()
# need to replace the old signal handlers somewhere...
if self.old_winch_handler is not None and xt.on_main_thread():
signal.signal(signal.SIGWINCH, self.old_winch_handler)
self.old_winch_handler = None
self._clean_up()
return rtn
def _clean_up(self):
self._restore_sigint()
self._restore_sigtstp()
self._restore_sigquit()
@property
def returncode(self):
"""Process return code."""
return self.proc.returncode
@returncode.setter
def returncode(self, value):
"""Process return code."""
self.proc.returncode = value
@property
def signal(self):
"""Process signal, or None."""
s = getattr(self.proc, "signal", None)
if s is None:
rtn = self.returncode
if rtn is not None and rtn != 0:
s = (-1 * rtn, rtn < 0 if xp.ON_WINDOWS else os.WCOREDUMP(rtn))
return s
@signal.setter
def signal(self, value):
"""Process signal, or None."""
self.proc.signal = value
[docs]
def send_signal(self, signal):
"""Dispatches to Popen.send_signal()."""
dt = 0.0
while self.proc is None and dt < self.timeout:
time.sleep(1e-7)
dt += 1e-7
if self.proc is None:
return
try:
rtn = self.proc.send_signal(signal)
except ProcessLookupError:
# This can happen in the case of !(cmd) when the command has ended
rtn = None
return rtn
[docs]
def terminate(self):
"""Dispatches to Popen.terminate()."""
return self.proc.terminate()
[docs]
def kill(self):
"""Dispatches to Popen.kill()."""
return self.proc.kill()