Source code for xonsh.procs.pipes
"""Pipe channel for single-owner fd management."""
import os
import threading
[docs]
class PipeChannel:
"""Single-owner pipe fd manager.
Owner creates the pipe and is responsible for closing both ends.
Consumers get non-owning wrappers via open_writer() / open_reader().
"""
def __init__(self, read_fd, write_fd):
self._read_fd = read_fd
self._write_fd = write_fd
self._lock = threading.Lock()
[docs]
@classmethod
def from_pipe(cls):
"""Create a PipeChannel from os.pipe()."""
r, w = os.pipe()
return cls(r, w)
[docs]
@classmethod
def from_pty(cls):
"""Create a PipeChannel from pty.openpty().
Falls back to os.pipe() if PTY devices are exhausted.
"""
import xonsh.lib.lazyimps as xli
try:
r, w = xli.pty.openpty()
except OSError:
r, w = os.pipe()
return cls(r, w)
@property
def write_fd(self):
return self._write_fd
@property
def read_fd(self):
return self._read_fd
[docs]
def open_writer(self, mode="wb", buffering=-1):
"""Non-owning file wrapper for the write end."""
# Hold the lock to prevent close_writer() from closing the fd
# between our read and the open() call (fd reuse race).
with self._lock:
fd = self._write_fd
if fd is None:
raise OSError("write end is closed")
return open(fd, mode, buffering=buffering, closefd=False)
[docs]
def open_reader(self, mode="rb", buffering=-1):
"""Non-owning file wrapper for the read end."""
# Hold the lock to prevent close_reader() from closing the fd
# between our read and the open() call (fd reuse race).
with self._lock:
fd = self._read_fd
if fd is None:
raise OSError("read end is closed")
return open(fd, mode, buffering=buffering, closefd=False)
[docs]
def close_writer(self):
"""Close the write end fd. Thread-safe and idempotent."""
with self._lock:
fd = self._write_fd
self._write_fd = None
if fd is not None:
try:
os.close(fd)
except OSError:
pass
[docs]
def close_reader(self):
"""Close the read end fd. Thread-safe and idempotent."""
with self._lock:
fd = self._read_fd
self._read_fd = None
if fd is not None:
try:
os.close(fd)
except OSError:
pass
[docs]
def close(self):
"""Close both ends."""
self.close_writer()
self.close_reader()
def __del__(self):
"""Safety net: close any fds that were not explicitly closed."""
self.close()