"""Interface for running Python functions as subprocess-mode commands.
Code for several helper methods in the `ProcProxy` class have been reproduced
without modification from `subprocess.py` in the Python 3.4.2 standard library.
The contents of `subprocess.py` (and, thus, the reproduced methods) are
Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se> and were
licensed to the Python Software foundation under a Contributor Agreement.
"""
import collections.abc as cabc
import io
import os
import signal
import subprocess
import sys
import threading
import time
import xonsh.lib.lazyimps as xli
import xonsh.platform as xp
import xonsh.tools as xt
from xonsh.built_ins import XSH
from xonsh.cli_utils import run_with_partial_args
from xonsh.procs.readers import safe_fdclose
[docs]
def still_writable(fd):
"""Determines whether a file descriptor is still writable by trying to
write an empty string and seeing if it fails.
"""
try:
os.write(fd, b"")
status = True
except OSError:
status = False
return status
[docs]
def safe_flush(handle):
"""Attempts to safely flush a file handle, returns success bool."""
status = True
try:
handle.flush()
except OSError:
status = False
return status
[docs]
class Handle(int):
closed = False
[docs]
def Close(self, CloseHandle=None):
CloseHandle = CloseHandle or xli._winapi.CloseHandle
if not self.closed:
self.closed = True
CloseHandle(self)
[docs]
def Detach(self):
if not self.closed:
self.closed = True
return int(self)
raise ValueError("already closed")
def __repr__(self):
return f"Handle({int(self)})"
__del__ = Close
__str__ = __repr__
[docs]
class FileThreadDispatcher:
"""Dispatches to different file handles depending on the
current thread. Useful if you want file operation to go to different
places for different threads.
"""
def __init__(self, default=None):
"""
Parameters
----------
default : file-like or None, optional
The file handle to write to if a thread cannot be found in
the registry. If None, a new in-memory instance.
Attributes
----------
registry : dict
Maps thread idents to file handles.
"""
if default is None:
default = io.TextIOWrapper(io.BytesIO())
self.default = default
self.registry = {}
[docs]
def register(self, handle):
"""Registers a file handle for the current thread. Returns self so
that this method can be used in a with-statement.
"""
if handle is self:
# prevent weird recurssion errors
return self
self.registry[threading.get_ident()] = handle
return self
[docs]
def deregister(self):
"""Removes the current thread from the registry."""
ident = threading.get_ident()
if ident in self.registry:
# don't remove if we have already been deregistered
del self.registry[threading.get_ident()]
@property
def available(self):
"""True if the thread is available in the registry."""
return threading.get_ident() in self.registry
@property
def handle(self):
"""Gets the current handle for the thread."""
return self.registry.get(threading.get_ident(), self.default)
def __enter__(self):
pass
def __exit__(self, ex_type, ex_value, ex_traceback):
self.deregister()
#
# io.TextIOBase interface
#
@property
def encoding(self):
"""Gets the encoding for this thread's handle."""
return self.handle.encoding
@property
def errors(self):
"""Gets the errors for this thread's handle."""
return self.handle.errors
@property
def newlines(self):
"""Gets the newlines for this thread's handle."""
return self.handle.newlines
@property
def buffer(self):
"""Gets the buffer for this thread's handle."""
return self.handle.buffer
[docs]
def detach(self):
"""Detaches the buffer for the current thread."""
return self.handle.detach()
[docs]
def read(self, size=None):
"""Reads from the handle for the current thread."""
return self.handle.read(size)
[docs]
def readline(self, size=-1):
"""Reads a line from the handle for the current thread."""
return self.handle.readline(size)
[docs]
def readlines(self, hint=-1):
"""Reads lines from the handle for the current thread."""
return self.handle.readlines(hint)
[docs]
def seek(self, offset, whence=io.SEEK_SET):
"""Seeks the current file."""
return self.handle.seek(offset, whence)
[docs]
def tell(self):
"""Reports the current position in the handle for the current thread."""
return self.handle.tell()
[docs]
def write(self, s):
"""Writes to this thread's handle. This also flushes, just to be
extra sure the string was written.
"""
h = self.handle
try:
r = h.write(s)
h.flush()
except OSError:
r = None
return r
@property
def line_buffering(self):
"""Gets if line buffering for this thread's handle enabled."""
return self.handle.line_buffering
#
# io.IOBase interface
#
[docs]
def close(self):
"""Closes the current thread's handle."""
return self.handle.close()
@property
def closed(self):
"""Is the thread's handle closed."""
return self.handle.closed
[docs]
def fileno(self):
"""Returns the file descriptor for the current thread."""
return self.handle.fileno()
[docs]
def flush(self):
"""Flushes the file descriptor for the current thread."""
return safe_flush(self.handle)
[docs]
def isatty(self):
"""Returns if the file descriptor for the current thread is a tty."""
if self.default:
return self.default.isatty()
return self.handle.isatty()
[docs]
def readable(self):
"""Returns if file descriptor for the current thread is readable."""
return self.handle.readable()
[docs]
def seekable(self):
"""Returns if file descriptor for the current thread is seekable."""
return self.handle.seekable()
[docs]
def truncate(self, size=None):
"""Truncates the file for for the current thread."""
return self.handle.truncate()
[docs]
def writable(self, size=None):
"""Returns if file descriptor for the current thread is writable."""
return self.handle.writable(size)
[docs]
def writelines(self):
"""Writes lines for the file descriptor for the current thread."""
return self.handle.writelines()
# These should NOT be lazy since they *need* to get the true stdout from the
# main thread. Also their creation time should be negligible.
STDOUT_DISPATCHER = FileThreadDispatcher(default=sys.stdout)
STDERR_DISPATCHER = FileThreadDispatcher(default=sys.stderr)
[docs]
def parse_proxy_return(r, stdout, stderr):
"""Proxies may return a variety of outputs. This handles them generally.
Parameters
----------
r : tuple, str, int, or None
Return from proxy function
stdout : file-like
Current stdout stream
stdout : file-like
Current stderr stream
Returns
-------
cmd_result : int
The return code of the proxy
"""
cmd_result = 0
if isinstance(r, str):
stdout.write(r)
stdout.flush()
elif isinstance(r, int):
cmd_result = r
elif isinstance(r, cabc.Sequence):
rlen = len(r)
if rlen > 0 and r[0] is not None:
stdout.write(str(r[0]))
stdout.flush()
if rlen > 1 and r[1] is not None:
stderr.write(xt.endswith_newline(str(r[1])))
stderr.flush()
if rlen > 2 and isinstance(r[2], int):
cmd_result = r[2]
elif r is not None:
# for the random object...
stdout.write(str(r))
stdout.flush()
return cmd_result
[docs]
def get_proc_proxy_name(cls):
return repr(
{
"cls": cls.__class__.__name__,
"name": getattr(cls, "name", None),
"func": cls.f,
"alias": cls.env.get("__ALIAS_NAME", None),
"pid": cls.pid,
}
)
[docs]
class ProcProxyThread(threading.Thread):
"""
Class representing a function to be run as a subprocess-mode command.
"""
def __init__(
self,
f,
args,
stdin=None,
stdout=None,
stderr=None,
universal_newlines=False,
close_fds=False,
env=None,
):
"""Parameters
----------
f : function
The function to be executed.
args : list
A (possibly empty) list containing the arguments that were given on
the command line
stdin : file-like, optional
A file-like object representing stdin (input can be read from
here). If `stdin` is not provided or if it is explicitly set to
`None`, then an instance of `io.StringIO` representing an empty
file is used.
stdout : file-like, optional
A file-like object representing stdout (normal output can be
written here). If `stdout` is not provided or if it is explicitly
set to `None`, then `sys.stdout` is used.
stderr : file-like, optional
A file-like object representing stderr (error output can be
written here). If `stderr` is not provided or if it is explicitly
set to `None`, then `sys.stderr` is used.
universal_newlines : bool, optional
Whether or not to use universal newlines.
close_fds : bool, optional
Whether or not to close file descriptors. This is here for Popen
compatability and currently does nothing.
env : Mapping, optional
Environment mapping.
"""
self.f = f
self.args = args
self.pid = None
self.returncode = None
self._closed_handle_cache = {}
handles = self._get_handles(stdin, stdout, stderr)
(
self.p2cread,
self.p2cwrite,
self.c2pread,
self.c2pwrite,
self.errread,
self.errwrite,
) = handles
# default values
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.close_fds = close_fds
self.env = env
self._interrupted = False
if xp.ON_WINDOWS:
if self.p2cwrite != -1:
self.p2cwrite = xli.msvcrt.open_osfhandle(self.p2cwrite.Detach(), 0)
if self.c2pread != -1:
self.c2pread = xli.msvcrt.open_osfhandle(self.c2pread.Detach(), 0)
if self.errread != -1:
self.errread = xli.msvcrt.open_osfhandle(self.errread.Detach(), 0)
if self.p2cwrite != -1:
self.stdin = open(self.p2cwrite, "wb", -1)
if universal_newlines:
self.stdin = io.TextIOWrapper(
self.stdin, write_through=True, line_buffering=False
)
elif isinstance(stdin, int) and stdin != 0:
self.stdin = open(stdin, "wb", -1)
if self.c2pread != -1:
self.stdout = open(self.c2pread, "rb", -1)
if universal_newlines:
self.stdout = io.TextIOWrapper(self.stdout)
if self.errread != -1:
self.stderr = open(self.errread, "rb", -1)
if universal_newlines:
self.stderr = io.TextIOWrapper(self.stderr)
# Set some signal handles, if we can. Must come before process
# is started to prevent deadlock on windows
self.old_int_handler = None
if xt.on_main_thread():
self.old_int_handler = signal.signal(signal.SIGINT, self._signal_int)
# start up the proc
super().__init__()
# This is so the thread will use the same swapped values as the origin one.
self.original_swapped_values = XSH.env.get_swapped_values()
self.start()
def __del__(self):
self._restore_sigint()
[docs]
def run(self):
"""Set up input/output streams and execute the child function in a new
thread. This is part of the `threading.Thread` interface and should
not be called directly.
"""
if self.f is None:
return
# Set the thread-local swapped values.
XSH.env.set_swapped_values(self.original_swapped_values)
spec = self._wait_and_getattr("spec")
last_in_pipeline = spec.last_in_pipeline
if last_in_pipeline:
capout = spec.captured_stdout # NOQA
caperr = spec.captured_stderr # NOQA
env = XSH.env
enc = env.get("XONSH_ENCODING")
err = env.get("XONSH_ENCODING_ERRORS")
if xp.ON_WINDOWS:
if self.p2cread != -1:
self.p2cread = xli.msvcrt.open_osfhandle(self.p2cread.Detach(), 0)
if self.c2pwrite != -1:
self.c2pwrite = xli.msvcrt.open_osfhandle(self.c2pwrite.Detach(), 0)
if self.errwrite != -1:
self.errwrite = xli.msvcrt.open_osfhandle(self.errwrite.Detach(), 0)
# get stdin
if self.stdin is None:
sp_stdin = None
elif self.p2cread != -1:
sp_stdin = io.TextIOWrapper(
open(self.p2cread, "rb", -1), encoding=enc, errors=err
)
else:
sp_stdin = sys.stdin
# stdout
if self.c2pwrite != -1:
sp_stdout = io.TextIOWrapper(
open(self.c2pwrite, "wb", -1), encoding=enc, errors=err
)
else:
sp_stdout = sys.stdout
# stderr
if self.errwrite == self.c2pwrite:
sp_stderr = sp_stdout
elif self.errwrite != -1:
sp_stderr = io.TextIOWrapper(
open(self.errwrite, "wb", -1), encoding=enc, errors=err
)
else:
sp_stderr = sys.stderr
# run the function itself
try:
alias_stack = XSH.env.get("__ALIAS_STACK", "")
if self.env and self.env.get("__ALIAS_NAME"):
alias_stack += ":" + self.env["__ALIAS_NAME"]
with (
STDOUT_DISPATCHER.register(sp_stdout),
STDERR_DISPATCHER.register(sp_stderr),
xt.redirect_stdout(STDOUT_DISPATCHER),
xt.redirect_stderr(STDERR_DISPATCHER),
XSH.env.swap(self.env, __ALIAS_STACK=alias_stack),
):
r = run_with_partial_args(
self.f,
{
"args": self.args,
"stdin": sp_stdin,
"stdout": sp_stdout,
"stderr": sp_stderr,
"spec": spec,
"stack": spec.stack,
},
)
except SystemExit as e:
r = e.code if isinstance(e.code, int) else int(bool(e.code))
except OSError:
status = still_writable(self.c2pwrite) and still_writable(self.errwrite)
if status:
# stdout and stderr are still writable, so error must
# come from function itself.
xt.print_exception(
source_msg="Exception in thread " + get_proc_proxy_name(self)
)
r = 1
else:
# stdout and stderr are no longer writable, so error must
# come from the fact that the next process in the pipeline
# has closed the other side of the pipe. The function then
# attempted to write to this side of the pipe anyway. This
# is not truly an error and we should exit gracefully.
r = 0
except Exception:
xt.print_exception(
source_msg="Exception in thread " + get_proc_proxy_name(self)
)
r = 1
safe_flush(sp_stdout)
safe_flush(sp_stderr)
self.returncode = parse_proxy_return(r, sp_stdout, sp_stderr)
if not last_in_pipeline and not xp.ON_WINDOWS:
# mac requires us *not to* close the handles here while
# windows requires us *to* close the handles here
return
# clean up
# scopz: not sure why this is needed, but stdin cannot go here
# and stdout & stderr must.
if xp.ON_WINDOWS:
handles = [self.stdout, self.stderr]
else:
handles = [sp_stdout, sp_stderr]
for handle in handles:
safe_fdclose(handle, cache=self._closed_handle_cache)
def _wait_and_getattr(self, name):
"""make sure the instance has a certain attr, and return it."""
while not hasattr(self, name):
time.sleep(1e-7)
return getattr(self, name)
[docs]
def poll(self):
"""Check if the function has completed.
Returns
-------
None if the function is still executing, and the returncode otherwise
"""
return self.returncode
[docs]
def wait(self, timeout=None):
"""Waits for the process to finish and returns the return code."""
self.join()
self._restore_sigint()
return self.returncode
#
# SIGINT handler
#
def _signal_int(self, signum, frame):
"""Signal handler for SIGINT - Ctrl+C may have been pressed."""
# Check if we have already been interrupted. This should prevent
# the possibility of infinite recursion.
if self._interrupted:
return
self._interrupted = True
# close file handles here to stop an processes piped to us.
handles = (
self.p2cread,
self.p2cwrite,
self.c2pread,
self.c2pwrite,
self.errread,
self.errwrite,
)
for handle in handles:
safe_fdclose(handle)
if self.poll() is not None:
self._restore_sigint(frame=frame)
if xt.on_main_thread() and not xp.ON_WINDOWS:
signal.pthread_kill(threading.get_ident(), signal.SIGINT)
def _restore_sigint(self, frame=None):
old = self.old_int_handler
if old is not None:
if xt.on_main_thread():
signal.signal(signal.SIGINT, old)
self.old_int_handler = None
if frame is not None:
if old is not None and old is not self._signal_int:
old(signal.SIGINT, frame)
if self._interrupted:
self.returncode = 1
# The code below (_get_devnull, _get_handles, and _make_inheritable) comes
# from subprocess.py in the Python 3.4.2 Standard Library
def _get_devnull(self):
if not hasattr(self, "_devnull"):
self._devnull = os.open(os.devnull, os.O_RDWR)
return self._devnull
if xp.ON_WINDOWS:
def _make_inheritable(self, handle):
"""Return a duplicate of handle, which is inheritable"""
h = xli._winapi.DuplicateHandle(
xli._winapi.GetCurrentProcess(),
handle,
xli._winapi.GetCurrentProcess(),
0,
1,
xli._winapi.DUPLICATE_SAME_ACCESS,
)
return Handle(h)
def _get_handles(self, stdin, stdout, stderr):
"""Construct and return tuple with IO objects:
p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite
"""
if stdin is None and stdout is None and stderr is None:
return (-1, -1, -1, -1, -1, -1)
p2cread, p2cwrite = -1, -1
c2pread, c2pwrite = -1, -1
errread, errwrite = -1, -1
if stdin is None:
p2cread = xli._winapi.GetStdHandle(xli._winapi.STD_INPUT_HANDLE)
if p2cread is None:
p2cread, _ = xli._winapi.CreatePipe(None, 0)
p2cread = Handle(p2cread)
xli._winapi.CloseHandle(_)
elif stdin == subprocess.PIPE:
p2cread, p2cwrite = Handle(p2cread), Handle(p2cwrite)
elif stdin == subprocess.DEVNULL:
p2cread = xli.msvcrt.get_osfhandle(self._get_devnull())
elif isinstance(stdin, int):
p2cread = xli.msvcrt.get_osfhandle(stdin)
else:
# Assuming file-like object
p2cread = xli.msvcrt.get_osfhandle(stdin.fileno())
p2cread = self._make_inheritable(p2cread)
if stdout is None:
c2pwrite = xli._winapi.GetStdHandle(xli._winapi.STD_OUTPUT_HANDLE)
if c2pwrite is None:
_, c2pwrite = xli._winapi.CreatePipe(None, 0)
c2pwrite = Handle(c2pwrite)
xli._winapi.CloseHandle(_)
elif stdout == subprocess.PIPE:
c2pread, c2pwrite = xli._winapi.CreatePipe(None, 0)
c2pread, c2pwrite = Handle(c2pread), Handle(c2pwrite)
elif stdout == subprocess.DEVNULL:
c2pwrite = xli.msvcrt.get_osfhandle(self._get_devnull())
elif isinstance(stdout, int):
c2pwrite = xli.msvcrt.get_osfhandle(stdout)
else:
# Assuming file-like object
c2pwrite = xli.msvcrt.get_osfhandle(stdout.fileno())
c2pwrite = self._make_inheritable(c2pwrite)
if stderr is None:
errwrite = xli._winapi.GetStdHandle(xli._winapi.STD_ERROR_HANDLE)
if errwrite is None:
_, errwrite = xli._winapi.CreatePipe(None, 0)
errwrite = Handle(errwrite)
xli._winapi.CloseHandle(_)
elif stderr == subprocess.PIPE:
errread, errwrite = xli._winapi.CreatePipe(None, 0)
errread, errwrite = Handle(errread), Handle(errwrite)
elif stderr == subprocess.STDOUT:
errwrite = c2pwrite
elif stderr == subprocess.DEVNULL:
errwrite = xli.msvcrt.get_osfhandle(self._get_devnull())
elif isinstance(stderr, int):
errwrite = xli.msvcrt.get_osfhandle(stderr)
else:
# Assuming file-like object
errwrite = xli.msvcrt.get_osfhandle(stderr.fileno())
errwrite = self._make_inheritable(errwrite)
return (p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite)
else:
# POSIX versions
def _get_handles(self, stdin, stdout, stderr):
"""Construct and return tuple with IO objects:
p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite
"""
p2cread, p2cwrite = -1, -1
c2pread, c2pwrite = -1, -1
errread, errwrite = -1, -1
if stdin is None:
pass
elif stdin == subprocess.PIPE:
p2cread, p2cwrite = os.pipe()
elif stdin == subprocess.DEVNULL:
p2cread = self._get_devnull()
elif isinstance(stdin, int):
p2cread = stdin
else:
# Assuming file-like object
p2cread = stdin.fileno()
if stdout is None:
pass
elif stdout == subprocess.PIPE:
c2pread, c2pwrite = os.pipe()
elif stdout == subprocess.DEVNULL:
c2pwrite = self._get_devnull()
elif isinstance(stdout, int):
c2pwrite = stdout
else:
# Assuming file-like object
c2pwrite = stdout.fileno()
if stderr is None:
pass
elif stderr == subprocess.PIPE:
errread, errwrite = os.pipe()
elif stderr == subprocess.STDOUT:
errwrite = c2pwrite
elif stderr == subprocess.DEVNULL:
errwrite = self._get_devnull()
elif isinstance(stderr, int):
errwrite = stderr
else:
# Assuming file-like object
errwrite = stderr.fileno()
return (p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite)
#
# Foreground Thread Process Proxies
#
[docs]
class ProcProxy:
"""This is process proxy class that runs its alias functions on the
same thread that it was called from, which is typically the main thread.
This prevents the process from running on a background thread, but enables
debugger and profiler tools (functions) be run on the same thread that they
are attempting to debug.
"""
def __init__(
self,
f,
args,
stdin=None,
stdout=None,
stderr=None,
universal_newlines=False,
close_fds=False,
env=None,
):
self.f = f
self.args = args
self.pid = os.getpid()
self.returncode = None
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.universal_newlines = universal_newlines
self.close_fds = close_fds
self.env = env
[docs]
def poll(self):
"""Check if the function has completed via the returncode or None."""
return self.returncode
[docs]
def wait(self, timeout=None):
"""Runs the function and returns the result. Timeout argument only
present for API compatibility.
"""
if self.f is None:
return 0
env = XSH.env
enc = env.get("XONSH_ENCODING")
err = env.get("XONSH_ENCODING_ERRORS")
spec = self._wait_and_getattr("spec")
# set file handles
if self.stdin is None:
stdin = None
else:
if isinstance(self.stdin, int):
inbuf = open(self.stdin, "rb", -1)
else:
inbuf = self.stdin
stdin = io.TextIOWrapper(inbuf, encoding=enc, errors=err)
stdout = self._pick_buf(self.stdout, sys.stdout, enc, err)
stderr = self._pick_buf(self.stderr, sys.stderr, enc, err)
# run the actual function
try:
with XSH.env.swap(self.env):
r = run_with_partial_args(
self.f,
{
"args": self.args,
"stdin": stdin,
"stdout": stdout,
"stderr": stderr,
"spec": spec,
"stack": spec.stack,
},
)
except SystemExit as e:
# the alias function is running in the main thread, so we need to
# catch SystemExit to prevent the entire shell from exiting (see #5689)
r = e.code if isinstance(e.code, int) else int(bool(e.code))
except Exception:
xt.print_exception(source_msg="Exception in " + get_proc_proxy_name(self))
r = 1
self.returncode = parse_proxy_return(r, stdout, stderr)
safe_flush(stdout)
safe_flush(stderr)
return self.returncode
@staticmethod
def _pick_buf(handle, sysbuf, enc, err):
if handle is None or handle is sysbuf:
buf = sysbuf
elif isinstance(handle, int):
if handle < 3:
buf = sysbuf
else:
buf = io.TextIOWrapper(open(handle, "wb", -1), encoding=enc, errors=err)
elif hasattr(handle, "encoding"):
# must be a text stream, no need to wrap.
buf = handle
else:
# must be a binary stream, should wrap it.
buf = io.TextIOWrapper(handle, encoding=enc, errors=err)
return buf
def _wait_and_getattr(self, name):
"""make sure the instance has a certain attr, and return it."""
while not hasattr(self, name):
time.sleep(1e-7)
return getattr(self, name)