Source code for xonsh.procs.pipes

"""Pipe channel for single-owner fd management."""

import os
import threading


[docs] class PipeChannel: """Single-owner pipe fd manager. Owner creates the pipe and is responsible for closing both ends. Consumers get non-owning wrappers via open_writer() / open_reader(). """ def __init__(self, read_fd, write_fd): self._read_fd = read_fd self._write_fd = write_fd self._lock = threading.Lock()
[docs] @classmethod def from_pipe(cls): """Create a PipeChannel from os.pipe().""" r, w = os.pipe() return cls(r, w)
[docs] @classmethod def from_pty(cls): """Create a PipeChannel from pty.openpty(). Falls back to os.pipe() if PTY devices are exhausted. """ import xonsh.lib.lazyimps as xli try: r, w = xli.pty.openpty() except OSError: r, w = os.pipe() return cls(r, w)
@property def write_fd(self): return self._write_fd @property def read_fd(self): return self._read_fd
[docs] def open_writer(self, mode="wb", buffering=-1): """Non-owning file wrapper for the write end.""" # Hold the lock to prevent close_writer() from closing the fd # between our read and the open() call (fd reuse race). with self._lock: fd = self._write_fd if fd is None: raise OSError("write end is closed") return open(fd, mode, buffering=buffering, closefd=False)
[docs] def open_reader(self, mode="rb", buffering=-1): """Non-owning file wrapper for the read end.""" # Hold the lock to prevent close_reader() from closing the fd # between our read and the open() call (fd reuse race). with self._lock: fd = self._read_fd if fd is None: raise OSError("read end is closed") return open(fd, mode, buffering=buffering, closefd=False)
[docs] def close_writer(self): """Close the write end fd. Thread-safe and idempotent.""" with self._lock: fd = self._write_fd self._write_fd = None if fd is not None: try: os.close(fd) except OSError: pass
[docs] def close_reader(self): """Close the read end fd. Thread-safe and idempotent.""" with self._lock: fd = self._read_fd self._read_fd = None if fd is not None: try: os.close(fd) except OSError: pass
[docs] def close(self): """Close both ends.""" self.close_writer() self.close_reader()
def __del__(self): """Safety net: close any fds that were not explicitly closed.""" self.close()