Source code for xonsh.jobs

# -*- coding: utf-8 -*-
"""Job control for the xonsh shell."""
import os
import sys
import time
import ctypes
import signal
import builtins
import subprocess
import collections
import typing as tp

from xonsh.lazyasd import LazyObject
from xonsh.platform import FD_STDERR, ON_DARWIN, ON_WINDOWS, ON_CYGWIN, ON_MSYS, LIBC
from xonsh.tools import unthreadable


tasks = LazyObject(collections.deque, globals(), "tasks")
# Track time stamp of last exit command, so that two consecutive attempts to
# exit can kill all jobs and exit.
_last_exit_time: tp.Optional[float] = None


if ON_DARWIN:

    def _send_signal(job, signal):
        # On OS X, os.killpg() may cause PermissionError when there are
        # any zombie processes in the process group.
        # See github issue #1012 for details
        for pid in job["pids"]:
            if pid is None:  # the pid of an aliased proc is None
                continue
            try:
                os.kill(pid, signal)
            except ProcessLookupError:
                pass


elif ON_WINDOWS:
    pass
elif ON_CYGWIN or ON_MSYS:
    # Similar to what happened on OSX, more issues on Cygwin
    # (see Github issue #514).
    def _send_signal(job, signal):
        try:
            os.killpg(job["pgrp"], signal)
        except Exception:
            for pid in job["pids"]:
                try:
                    os.kill(pid, signal)
                except Exception:
                    pass


else:

    def _send_signal(job, signal):
        pgrp = job["pgrp"]
        if pgrp is None:
            for pid in job["pids"]:
                try:
                    os.kill(pid, signal)
                except Exception:
                    pass
        else:
            os.killpg(job["pgrp"], signal)


if ON_WINDOWS:

    def _continue(job):
        job["status"] = "running"

    def _kill(job):
        subprocess.check_output(
            ["taskkill", "/F", "/T", "/PID", str(job["obj"].pid)],
            stderr=subprocess.STDOUT,
        )

    def ignore_sigtstp():
        pass

    def give_terminal_to(pgid):
        pass

    def wait_for_active_job(last_task=None, backgrounded=False, return_error=False):
        """
        Wait for the active job to finish, to be killed by SIGINT, or to be
        suspended by ctrl-z.
        """
        active_task = get_next_task()
        # Return when there are no foreground active task
        if active_task is None:
            return last_task
        obj = active_task["obj"]
        _continue(active_task)
        while obj.returncode is None:
            try:
                obj.wait(0.01)
            except subprocess.TimeoutExpired:
                pass
            except KeyboardInterrupt:
                try:
                    _kill(active_task)
                except subprocess.CalledProcessError:
                    pass  # ignore error if process closed before we got here
        return wait_for_active_job(last_task=active_task)


else:

    def _continue(job):
        _send_signal(job, signal.SIGCONT)

    def _kill(job):
        _send_signal(job, signal.SIGKILL)

[docs] def ignore_sigtstp(): signal.signal(signal.SIGTSTP, signal.SIG_IGN)
_shell_pgrp = os.getpgrp() # type:ignore _block_when_giving = LazyObject( lambda: ( signal.SIGTTOU, # type:ignore signal.SIGTTIN, # type:ignore signal.SIGTSTP, # type:ignore signal.SIGCHLD, # type:ignore ), globals(), "_block_when_giving", ) if ON_CYGWIN or ON_MSYS: # on cygwin, signal.pthread_sigmask does not exist in Python, even # though pthread_sigmask is defined in the kernel. thus, we use # ctypes to mimic the calls in the "normal" version below. LIBC.pthread_sigmask.restype = ctypes.c_int LIBC.pthread_sigmask.argtypes = [ ctypes.c_int, ctypes.POINTER(ctypes.c_ulong), ctypes.POINTER(ctypes.c_ulong), ] def _pthread_sigmask(how, signals): mask = 0 for sig in signals: mask |= 1 << sig oldmask = ctypes.c_ulong() mask = ctypes.c_ulong(mask) result = LIBC.pthread_sigmask( how, ctypes.byref(mask), ctypes.byref(oldmask) ) if result: raise OSError(result, "Sigmask error.") return { sig for sig in getattr(signal, "Signals", range(0, 65)) if (oldmask.value >> sig) & 1 } else: _pthread_sigmask = signal.pthread_sigmask # type:ignore # give_terminal_to is a simplified version of: # give_terminal_to from bash 4.3 source, jobs.c, line 4030 # this will give the terminal to the process group pgid
[docs] def give_terminal_to(pgid): if pgid is None: return False oldmask = _pthread_sigmask(signal.SIG_BLOCK, _block_when_giving) try: os.tcsetpgrp(FD_STDERR, pgid) return True except ProcessLookupError: # when the process finished before giving terminal to it, # see issue #2288 return False except OSError as e: if e.errno == 22: # [Errno 22] Invalid argument # there are cases that all the processes of pgid have # finished, then we don't need to do anything here, see # issue #2220 return False elif e.errno == 25: # [Errno 25] Inappropriate ioctl for device # There are also cases where we are not connected to a # real TTY, even though we may be run in interactive # mode. See issue #2267 for an example with emacs return False else: raise finally: _pthread_sigmask(signal.SIG_SETMASK, oldmask)
[docs] def wait_for_active_job(last_task=None, backgrounded=False, return_error=False): """ Wait for the active job to finish, to be killed by SIGINT, or to be suspended by ctrl-z. """ active_task = get_next_task() # Return when there are no foreground active task if active_task is None: return last_task obj = active_task["obj"] backgrounded = False try: _, wcode = os.waitpid(obj.pid, os.WUNTRACED) except ChildProcessError as e: # No child processes if return_error: return e else: return _safe_wait_for_active_job( last_task=active_task, backgrounded=backgrounded ) if os.WIFSTOPPED(wcode): active_task["status"] = "stopped" backgrounded = True elif os.WIFSIGNALED(wcode): print() # get a newline because ^C will have been printed obj.signal = (os.WTERMSIG(wcode), os.WCOREDUMP(wcode)) obj.returncode = None else: obj.returncode = os.WEXITSTATUS(wcode) obj.signal = None return wait_for_active_job(last_task=active_task, backgrounded=backgrounded)
def _safe_wait_for_active_job(last_task=None, backgrounded=False): """Safely call wait_for_active_job()""" have_error = True while have_error: try: rtn = wait_for_active_job( last_task=last_task, backgrounded=backgrounded, return_error=True ) except ChildProcessError as e: rtn = e have_error = isinstance(rtn, ChildProcessError) return rtn
[docs]def get_next_task(): """ Get the next active task and put it on top of the queue""" _clear_dead_jobs() selected_task = None for tid in tasks: task = get_task(tid) if not task["bg"] and task["status"] == "running": selected_task = tid break if selected_task is None: return tasks.remove(selected_task) tasks.appendleft(selected_task) return get_task(selected_task)
[docs]def get_task(tid): return builtins.__xonsh__.all_jobs[tid]
def _clear_dead_jobs(): to_remove = set() for tid in tasks: obj = get_task(tid)["obj"] if obj is None or obj.poll() is not None: to_remove.add(tid) for job in to_remove: tasks.remove(job) del builtins.__xonsh__.all_jobs[job]
[docs]def get_next_job_number(): """Get the lowest available unique job number (for the next job created).""" _clear_dead_jobs() i = 1 while i in builtins.__xonsh__.all_jobs: i += 1 return i
[docs]def add_job(info): """Add a new job to the jobs dictionary.""" num = get_next_job_number() info["started"] = time.time() info["status"] = "running" tasks.appendleft(num) builtins.__xonsh__.all_jobs[num] = info if info["bg"] and builtins.__xonsh__.env.get("XONSH_INTERACTIVE"): print_one_job(num)
[docs]def clean_jobs(): """Clean up jobs for exiting shell In non-interactive mode, kill all jobs. In interactive mode, check for suspended or background jobs, print a warning if any exist, and return False. Otherwise, return True. """ jobs_clean = True if builtins.__xonsh__.env["XONSH_INTERACTIVE"]: _clear_dead_jobs() if builtins.__xonsh__.all_jobs: global _last_exit_time hist = builtins.__xonsh__.history if hist is not None and len(hist.tss) > 0: last_cmd_start = hist.tss[-1][0] else: last_cmd_start = None if _last_exit_time and last_cmd_start and _last_exit_time > last_cmd_start: # Exit occurred after last command started, so it was called as # part of the last command and is now being called again # immediately. Kill jobs and exit without reminder about # unfinished jobs in this case. kill_all_jobs() else: if len(builtins.__xonsh__.all_jobs) > 1: msg = "there are unfinished jobs" else: msg = "there is an unfinished job" if builtins.__xonsh__.env["SHELL_TYPE"] != "prompt_toolkit": # The Ctrl+D binding for prompt_toolkit already inserts a # newline print() print("xonsh: {}".format(msg), file=sys.stderr) print("-" * 5, file=sys.stderr) jobs([], stdout=sys.stderr) print("-" * 5, file=sys.stderr) print( 'Type "exit" or press "ctrl-d" again to force quit.', file=sys.stderr, ) jobs_clean = False _last_exit_time = time.time() else: kill_all_jobs() return jobs_clean
[docs]def kill_all_jobs(): """ Send SIGKILL to all child processes (called when exiting xonsh). """ _clear_dead_jobs() for job in builtins.__xonsh__.all_jobs.values(): _kill(job)
[docs]def jobs(args, stdin=None, stdout=sys.stdout, stderr=None): """ xonsh command: jobs Display a list of all current jobs. """ _clear_dead_jobs() for j in tasks: print_one_job(j, outfile=stdout) return None, None
[docs]def resume_job(args, wording): """ used by fg and bg to resume a job either in the foreground or in the background. """ _clear_dead_jobs() if len(tasks) == 0: return "", "There are currently no suspended jobs" if len(args) == 0: tid = tasks[0] # take the last manipulated task by default elif len(args) == 1: try: if args[0] == "+": # take the last manipulated task tid = tasks[0] elif args[0] == "-": # take the second to last manipulated task tid = tasks[1] else: tid = int(args[0]) except (ValueError, IndexError): return "", "Invalid job: {}\n".format(args[0]) if tid not in builtins.__xonsh__.all_jobs: return "", "Invalid job: {}\n".format(args[0]) else: return "", "{} expects 0 or 1 arguments, not {}\n".format(wording, len(args)) # Put this one on top of the queue tasks.remove(tid) tasks.appendleft(tid) job = get_task(tid) job["bg"] = False job["status"] = "running" if builtins.__xonsh__.env.get("XONSH_INTERACTIVE"): print_one_job(tid) pipeline = job["pipeline"] pipeline.resume(job)
[docs]@unthreadable def fg(args, stdin=None): """ xonsh command: fg Bring the currently active job to the foreground, or, if a single number is given as an argument, bring that job to the foreground. Additionally, specify "+" for the most recent job and "-" for the second most recent job. """ return resume_job(args, wording="fg")
[docs]def bg(args, stdin=None): """xonsh command: bg Resume execution of the currently active job in the background, or, if a single number is given as an argument, resume that job in the background. """ res = resume_job(args, wording="bg") if res is None: curtask = get_task(tasks[0]) curtask["bg"] = True _continue(curtask) else: return res