Source code for xonsh.jobs

# -*- coding: utf-8 -*-
"""Job control for the xonsh shell."""
import os
import sys
import time
import ctypes
import signal
import builtins
import subprocess
import collections

from xonsh.lazyasd import LazyObject
from xonsh.platform import FD_STDERR, ON_DARWIN, ON_WINDOWS, ON_CYGWIN, LIBC
from xonsh.tools import unthreadable


tasks = LazyObject(collections.deque, globals(), 'tasks')
# Track time stamp of last exit command, so that two consecutive attempts to
# exit can kill all jobs and exit.
_last_exit_time = None


if ON_DARWIN:
    def _send_signal(job, signal):
        # On OS X, os.killpg() may cause PermissionError when there are
        # any zombie processes in the process group.
        # See github issue #1012 for details
        for pid in job['pids']:
            if pid is None:  # the pid of an aliased proc is None
                continue
            try:
                os.kill(pid, signal)
            except ProcessLookupError:
                pass
elif ON_WINDOWS:
    pass
elif ON_CYGWIN:
    # Similar to what happened on OSX, more issues on Cygwin
    # (see Github issue #514).
    def _send_signal(job, signal):
        try:
            os.killpg(job['pgrp'], signal)
        except Exception:
            for pid in job['pids']:
                try:
                    os.kill(pid, signal)
                except Exception:
                    pass
else:
    def _send_signal(job, signal):
        pgrp = job['pgrp']
        if pgrp is None:
            for pid in job['pids']:
                try:
                    os.kill(pid, signal)
                except Exception:
                    pass
        else:
            os.killpg(job['pgrp'], signal)


if ON_WINDOWS:
    def _continue(job):
        job['status'] = "running"

    def _kill(job):
        subprocess.check_output(['taskkill', '/F', '/T', '/PID',
                                 str(job['obj'].pid)])

    def ignore_sigtstp():
        pass

    def give_terminal_to(pgid):
        pass

    def wait_for_active_job(last_task=None, backgrounded=False):
        """
        Wait for the active job to finish, to be killed by SIGINT, or to be
        suspended by ctrl-z.
        """
        _clear_dead_jobs()
        active_task = get_next_task()
        # Return when there are no foreground active task
        if active_task is None:
            return last_task
        obj = active_task['obj']
        _continue(active_task)
        while obj.returncode is None:
            try:
                obj.wait(0.01)
            except subprocess.TimeoutExpired:
                pass
            except KeyboardInterrupt:
                _kill(active_task)
        return wait_for_active_job(last_task=active_task)

else:
    def _continue(job):
        _send_signal(job, signal.SIGCONT)

    def _kill(job):
        _send_signal(job, signal.SIGKILL)

[docs] def ignore_sigtstp(): signal.signal(signal.SIGTSTP, signal.SIG_IGN)
_shell_pgrp = os.getpgrp() _block_when_giving = LazyObject(lambda: (signal.SIGTTOU, signal.SIGTTIN, signal.SIGTSTP, signal.SIGCHLD), globals(), '_block_when_giving') # give_terminal_to is a simplified version of: # give_terminal_to from bash 4.3 source, jobs.c, line 4030 # this will give the terminal to the process group pgid if ON_CYGWIN: # on cygwin, signal.pthread_sigmask does not exist in Python, even # though pthread_sigmask is defined in the kernel. thus, we use # ctypes to mimic the calls in the "normal" version below. def give_terminal_to(pgid): omask = ctypes.c_ulong() mask = ctypes.c_ulong() LIBC.sigemptyset(ctypes.byref(mask)) for i in _block_when_giving: LIBC.sigaddset(ctypes.byref(mask), ctypes.c_int(i)) LIBC.sigemptyset(ctypes.byref(omask)) LIBC.sigprocmask(ctypes.c_int(signal.SIG_BLOCK), ctypes.byref(mask), ctypes.byref(omask)) LIBC.tcsetpgrp(ctypes.c_int(FD_STDERR), ctypes.c_int(pgid)) LIBC.sigprocmask(ctypes.c_int(signal.SIG_SETMASK), ctypes.byref(omask), None) return True else:
[docs] def give_terminal_to(pgid): if pgid is None: return False oldmask = signal.pthread_sigmask(signal.SIG_BLOCK, _block_when_giving) try: os.tcsetpgrp(FD_STDERR, pgid) return True except ProcessLookupError: # when the process finished before giving terminal to it, # see issue #2288 return False except OSError as e: if e.errno == 22: # [Errno 22] Invalid argument # there are cases that all the processes of pgid have # finished, then we don't need to do anything here, see # issue #2220 return False elif e.errno == 25: # [Errno 25] Inappropriate ioctl for device # There are also cases where we are not connected to a # real TTY, even though we may be run in interactive # mode. See issue #2267 for an example with emacs return False else: raise finally: signal.pthread_sigmask(signal.SIG_SETMASK, oldmask)
[docs] def wait_for_active_job(last_task=None, backgrounded=False): """ Wait for the active job to finish, to be killed by SIGINT, or to be suspended by ctrl-z. """ _clear_dead_jobs() active_task = get_next_task() # Return when there are no foreground active task if active_task is None: return last_task obj = active_task['obj'] backgrounded = False try: _, wcode = os.waitpid(obj.pid, os.WUNTRACED) except ChildProcessError: # No child processes return wait_for_active_job(last_task=active_task, backgrounded=backgrounded) if os.WIFSTOPPED(wcode): print('^Z') active_task['status'] = "stopped" backgrounded = True elif os.WIFSIGNALED(wcode): print() # get a newline because ^C will have been printed obj.signal = (os.WTERMSIG(wcode), os.WCOREDUMP(wcode)) obj.returncode = None else: obj.returncode = os.WEXITSTATUS(wcode) obj.signal = None return wait_for_active_job(last_task=active_task, backgrounded=backgrounded)
[docs]def get_next_task(): """ Get the next active task and put it on top of the queue""" selected_task = None for tid in tasks: task = get_task(tid) if not task['bg'] and task['status'] == "running": selected_task = tid break if selected_task is None: return tasks.remove(selected_task) tasks.appendleft(selected_task) return get_task(selected_task)
[docs]def get_task(tid): return builtins.__xonsh_all_jobs__[tid]
def _clear_dead_jobs(): to_remove = set() for tid in tasks: obj = get_task(tid)['obj'] if obj.poll() is not None: to_remove.add(tid) for job in to_remove: tasks.remove(job) del builtins.__xonsh_all_jobs__[job]
[docs]def get_next_job_number(): """Get the lowest available unique job number (for the next job created). """ _clear_dead_jobs() i = 1 while i in builtins.__xonsh_all_jobs__: i += 1 return i
[docs]def add_job(info): """Add a new job to the jobs dictionary.""" num = get_next_job_number() info['started'] = time.time() info['status'] = "running" tasks.appendleft(num) builtins.__xonsh_all_jobs__[num] = info if info['bg'] and builtins.__xonsh_env__.get('XONSH_INTERACTIVE'): print_one_job(num)
[docs]def clean_jobs(): """Clean up jobs for exiting shell In non-interactive mode, kill all jobs. In interactive mode, check for suspended or background jobs, print a warning if any exist, and return False. Otherwise, return True. """ jobs_clean = True if builtins.__xonsh_env__['XONSH_INTERACTIVE']: _clear_dead_jobs() if builtins.__xonsh_all_jobs__: global _last_exit_time hist = builtins.__xonsh_history__ if hist is not None and len(hist.tss) > 0: last_cmd_start = hist.tss[-1][0] else: last_cmd_start = None if (_last_exit_time and last_cmd_start and _last_exit_time > last_cmd_start): # Exit occurred after last command started, so it was called as # part of the last command and is now being called again # immediately. Kill jobs and exit without reminder about # unfinished jobs in this case. kill_all_jobs() else: if len(builtins.__xonsh_all_jobs__) > 1: msg = 'there are unfinished jobs' else: msg = 'there is an unfinished job' if builtins.__xonsh_env__['SHELL_TYPE'] != 'prompt_toolkit': # The Ctrl+D binding for prompt_toolkit already inserts a # newline print() print('xonsh: {}'.format(msg), file=sys.stderr) print('-' * 5, file=sys.stderr) jobs([], stdout=sys.stderr) print('-' * 5, file=sys.stderr) print('Type "exit" or press "ctrl-d" again to force quit.', file=sys.stderr) jobs_clean = False _last_exit_time = time.time() else: kill_all_jobs() return jobs_clean
[docs]def kill_all_jobs(): """ Send SIGKILL to all child processes (called when exiting xonsh). """ _clear_dead_jobs() for job in builtins.__xonsh_all_jobs__.values(): _kill(job)
[docs]def jobs(args, stdin=None, stdout=sys.stdout, stderr=None): """ xonsh command: jobs Display a list of all current jobs. """ _clear_dead_jobs() for j in tasks: print_one_job(j, outfile=stdout) return None, None
@unthreadable
[docs]def fg(args, stdin=None): """ xonsh command: fg Bring the currently active job to the foreground, or, if a single number is given as an argument, bring that job to the foreground. Additionally, specify "+" for the most recent job and "-" for the second most recent job. """ _clear_dead_jobs() if len(tasks) == 0: return '', 'Cannot bring nonexistent job to foreground.\n' if len(args) == 0: tid = tasks[0] # take the last manipulated task by default elif len(args) == 1: try: if args[0] == '+': # take the last manipulated task tid = tasks[0] elif args[0] == '-': # take the second to last manipulated task tid = tasks[1] else: tid = int(args[0]) except (ValueError, IndexError): return '', 'Invalid job: {}\n'.format(args[0]) if tid not in builtins.__xonsh_all_jobs__: return '', 'Invalid job: {}\n'.format(args[0]) else: return '', 'fg expects 0 or 1 arguments, not {}\n'.format(len(args)) # Put this one on top of the queue tasks.remove(tid) tasks.appendleft(tid) job = get_task(tid) job['bg'] = False job['status'] = "running" if builtins.__xonsh_env__.get('XONSH_INTERACTIVE'): print_one_job(tid) pipeline = job['pipeline'] pipeline.resume(job)
[docs]def bg(args, stdin=None): """xonsh command: bg Resume execution of the currently active job in the background, or, if a single number is given as an argument, resume that job in the background. """ res = fg(args, stdin) if res is None: curtask = get_task(tasks[0]) curtask['bg'] = True _continue(curtask) else: return res