Source code for univention.lib.atjobs

#!/usr/bin/python3
# SPDX-FileCopyrightText: 2012-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""
Univention common Python library for handling :program:`at` jobs.

This module abstracts the handling of at-jobs, each job is encapsulated by
the class :py:class:`AtJob`. Use the method :py:meth:`add` in order to add a new command to the
queue of at-jobs. Use the methods :py:meth:`list` and :py:meth:`load` to get a list of all
registered jobs or to load a specific job given an ID, respectively. The module
uses time stamps in seconds for scheduling jobs.
"""
from __future__ import annotations

import datetime
import locale
import re
import subprocess
from typing import TYPE_CHECKING


if TYPE_CHECKING:
    import builtins
    from collections.abc import Mapping


__all__ = ['AtJob', 'add', 'list', 'load', 'remove', 'reschedule']

# internal formatting strings and regexps
_regWhiteSpace = re.compile(r'\s+')
_regJobNr = re.compile(r'job\s+(\d+)'.encode('ASCII'))
_dateTimeFormatRead = '%a %b %d %H:%M:%S %Y'
_dateTimeFormatWrite = '%Y-%m-%d %H:%M'
_timeFormatWrite = '%H:%M'
_dateFormatWrite = '%Y-%m-%d'

SCRIPT_PREFIX = '# --- Univention-Lib at job  ---'
COMMENT_PREFIX = '# Comment: '


[docs] def add(cmd: str, execTime: float | datetime.datetime | None = None, comments: Mapping[str, str] | None = {}) -> AtJob | None: """ Add a new command to the job queue given a time at which the job will be executed. :param execTime: execution time either as seconds since the epoch or as a :py:class:`datetime.datetime` instance. Defaults to `now`. :type execTime: int or float or datetime.datetime or None :param dict comments: A optional dictionary with comments to be associated with the job. :returns: The created job or `None`. :rtype: AtJob or None """ if isinstance(execTime, int | float): start: datetime.datetime | None = datetime.datetime.fromtimestamp(execTime) else: start = execTime # launch the at job directly atCmd = ['/usr/bin/at'] if start: jobTime = start.strftime(_timeFormatWrite) jobDate = start.strftime(_dateFormatWrite) atCmd.extend([jobTime, jobDate]) else: atCmd.append('now') # prevent injections from user supplied input # by encoding newlines def _encode_comment(value): if isinstance(value, bytes): try: value = value.decode('utf-8') except UnicodeDecodeError: value = value.decode('latin-1') return ('%s' % (value,)).encode('unicode_escape').decode('ASCII') # add comments if comments: cmd = '\n'.join('%s%s:%s' % (COMMENT_PREFIX, _encode_comment(c[0]).replace(':', ''), _encode_comment(c[1])) for c in comments.items()) + '\n' + SCRIPT_PREFIX + '\n' + cmd # add job p = subprocess.Popen(atCmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) # send the job to stdin out = p.communicate(cmd.encode('UTF-8')) # parse output and return job matches = _regJobNr.findall(b'\n'.join(out)) if matches: return load(int(matches[0])) return None
[docs] def reschedule(nr: int, execTime: float | None = None) -> AtJob | None: """ Re-schedules the at job with the given number for the specified time. :param int nr: The job number. :param execTime: execution time either as seconds since the epoch or as a :py:class:`datetime.datetime` instance. Defaults to `now`. :type execTime: int or float or datetime.datetime or None :returns: The created job or `None`. :rtype: AtJob or None :raises: AttributeError: if the job cannot be found. """ atjob = load(nr, extended=True) if atjob is None: raise AttributeError('Could not find at job %s' % nr) if atjob.command is None: raise AttributeError('The command of the at job is not available') atjob.rm() return add(atjob.command, execTime, atjob.comments)
[docs] def list(extended: bool = False) -> builtins.list[AtJob]: """ Returns a list of all registered jobs. :param bool extended: If set to `True` also the comments and the command to execute are fetched. :returns: A list of :py:class:`AtJob` instances. :rtype: list[AtJob] This can be used to re-schedule a job. """ p = subprocess.Popen('/usr/bin/atq', stdout=subprocess.PIPE, stderr=subprocess.PIPE) out = p.communicate()[0].decode('UTF-8', 'replace').splitlines() jobs = [] for line in out: ijob = _parseJob(line) if ijob: jobs.append(ijob) if extended: for job in jobs: _parseScript(job) return jobs
[docs] def load(nr: int, extended: bool = False) -> AtJob | None: """ Load the job given. :param nr: Job number. :param bool extended: If set to `True` also the comments and the command to execute are fetched. :returns: `None` if job does not exist, otherwise an instance of :py:class:`AtJob`. :rtype: AtJob """ result = [p for p in list(extended) if p.nr == nr] if result: return result[0] return None
[docs] def remove(nr: int) -> int | None: """ Removes the at job with the given number. :param int nr: Job number. """ for job in list(): # noqa: C408 if job.nr == nr: return job.rm() return None
def _parseScript(job: AtJob) -> None: """ Internal function to load the job details by parsing the job of :command:`atq`. :param AtJob job: A job. """ # FIXME: This should be a method of the class. p = subprocess.Popen(['/usr/bin/at', '-c', str(job.nr)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out = p.communicate()[0].decode('UTF-8', 'replace').splitlines() job.comments = {} script = False job.command = '' for line in out: if script: job.command = '%s%s\n' % (job.command, line) continue if line.startswith(COMMENT_PREFIX): line = line[len(COMMENT_PREFIX):] try: key, value = line.split(':', 1) except ValueError: continue try: key, value = key.encode('UTF-8').decode('unicode_escape'), value.encode('UTF-8').decode('unicode_escape') except UnicodeDecodeError: pass # can only happen if user manipulates/fakes atjob. job.comments[key] = value elif line.startswith(SCRIPT_PREFIX): script = True def _parseJob(string: str) -> AtJob | None: """ Internal method to parse output of :command:`atq`. :param str string: A output line of :command:`atq`. :returns: A :py:class:`AtJob` instance or `None` :rtype: AtJob """ timeLocale = locale.getlocale(locale.LC_TIME) try: # change the time locale temporarily to 'C' as atq uses English date format # ignoring the currently set locale locale.setlocale(locale.LC_TIME, 'C') # parse string tmp = _regWhiteSpace.split(string) execTime = datetime.datetime.strptime(' '.join(tmp[1:6]), _dateTimeFormatRead) isRunning = tmp[6] == '=' owner = tmp[7] nr = int(tmp[0]) except (IndexError, ValueError): # parsing failed return None finally: # reset locale to default locale.setlocale(locale.LC_TIME, timeLocale) return AtJob(nr, owner, execTime, isRunning)
[docs] class AtJob: """ This class is an abstract representation of an at-job. Do not initiate the class directly, but use the methods provided in this module. :param int nr: Job number. :param str owner: User owning the job. :param datetime.datetime execTime: Planned job execution time. :param bool isRunning: `True` is the jub is currently running, `False` otherwise. """ def __init__(self, nr: int, owner: str, execTime: datetime.datetime, isRunning: bool) -> None: self.nr = nr self.owner = owner self.command: str | None = None self.execTime = execTime self.isRunning = isRunning self.comments: dict[str, str] = {} def __str__(self) -> str: t = self.execTime.strftime(_dateTimeFormatWrite) if self.isRunning: t = 'running' return 'Job #%d (%s)' % (self.nr, t) def __repr__(self) -> str: return self.__str__()
[docs] def rm(self) -> int: """Remove the job from the queue.""" p = subprocess.Popen(['/usr/bin/atrm', str(self.nr)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) p.communicate() return p.returncode == 0