#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright 2012-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
Univention common Python library for handling :program:`at` jobs.
This module abstracts the handling of at-jobs, each job is encapsulated by
the class :py:class:`AtJob`. Use the method :py:meth:`add` in order to add a new command to the
queue of at-jobs. Use the methods :py:meth:`list` and :py:meth:`load` to get a list of all
registered jobs or to load a specific job given an ID, respectively. The module
uses time stamps in seconds for scheduling jobs.
"""
import datetime
import subprocess
import re
import locale
from typing import Dict, List, Mapping, Optional, Union # noqa: F401
__all__ = ['add', 'list', 'load', 'remove', 'reschedule', 'AtJob']
# internal formatting strings and regexps
_regWhiteSpace = re.compile(r'\s+')
_regJobNr = re.compile(r'job\s+(\d+)'.encode('ASCII'))
_dateTimeFormatRead = '%a %b %d %H:%M:%S %Y'
_dateTimeFormatWrite = '%Y-%m-%d %H:%M'
_timeFormatWrite = '%H:%M'
_dateFormatWrite = '%Y-%m-%d'
SCRIPT_PREFIX = '# --- Univention-Lib at job ---'
COMMENT_PREFIX = '# Comment: '
[docs]def add(cmd, execTime=None, comments={}):
# type: (str, Union[None, int, float, datetime.datetime], Optional[Mapping[str, str]]) -> Optional[AtJob]
"""
Add a new command to the job queue given a time
at which the job will be executed.
:param execTime: execution time either as seconds since the epoch or as a :py:class:`datetime.datetime` instance. Defaults to `now`.
:type execTime: int or float or datetime.datetime or None
:param dict comments: A optional dictionary with comments to be associated with the job.
:returns: The created job or `None`.
:rtype: AtJob or None
"""
if isinstance(execTime, (int, float)):
start = datetime.datetime.fromtimestamp(execTime) # type: Optional[datetime.datetime]
else:
start = execTime
# launch the at job directly
atCmd = ['/usr/bin/at']
if start:
jobTime = start.strftime(_timeFormatWrite)
jobDate = start.strftime(_dateFormatWrite)
atCmd.extend([jobTime, jobDate])
else:
atCmd.append('now')
# prevent injections from user supplied input
# by encoding newlines
def _encode_comment(value):
if isinstance(value, bytes):
try:
value = value.decode('utf-8')
except UnicodeDecodeError:
value = value.decode('latin-1')
return (u'%s' % (value,)).encode('unicode_escape').decode('ASCII')
# add comments
if comments:
cmd = '\n'.join('%s%s:%s' % (COMMENT_PREFIX, _encode_comment(c[0]).replace(':', ''), _encode_comment(c[1])) for c in comments.items()) + '\n' + SCRIPT_PREFIX + '\n' + cmd
# add job
p = subprocess.Popen(atCmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
# send the job to stdin
out = p.communicate(cmd.encode('UTF-8'))
# parse output and return job
matches = _regJobNr.findall(b'\n'.join(out))
if matches:
return load(int(matches[0]))
return None
[docs]def reschedule(nr, execTime=None):
# type: (int, Optional[float]) -> Optional[AtJob]
"""
Re-schedules the at job with the given number for the specified time.
:param int nr: The job number.
:param execTime: execution time either as seconds since the epoch or as a :py:class:`datetime.datetime` instance. Defaults to `now`.
:type execTime: int or float or datetime.datetime or None
:returns: The created job or `None`.
:rtype: AtJob or None
:raises: AttributeError: if the job cannot be found.
"""
atjob = load(nr, extended=True)
if atjob is None:
raise AttributeError('Could not find at job %s' % nr)
if atjob.command is None:
raise AttributeError('The command of the at job is not available')
atjob.rm()
return add(atjob.command, execTime, atjob.comments)
[docs]def list(extended=False):
# type: (bool) -> List[AtJob]
"""
Returns a list of all registered jobs.
:param bool extended: If set to `True` also the comments and the command to execute are fetched.
:returns: A list of :py:class:`AtJob` instances.
:rtype: list[AtJob]
This can be used to re-schedule a job.
"""
p = subprocess.Popen('/usr/bin/atq', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out = p.communicate()[0].decode('UTF-8', 'replace').splitlines()
jobs = []
for line in out:
ijob = _parseJob(line)
if ijob:
jobs.append(ijob)
if extended:
for job in jobs:
_parseScript(job)
return jobs
[docs]def load(nr, extended=False):
# type: (int, bool) -> Optional[AtJob]
"""
Load the job given.
:param nr: Job number.
:param bool extended: If set to `True` also the comments and the command to execute are fetched.
:returns: `None` if job does not exist, otherwise an instance of :py:class:`AtJob`.
:rtype: AtJob
"""
result = [p for p in list(extended) if p.nr == nr]
if len(result):
return result[0]
return None
[docs]def remove(nr):
# type: (int) -> Optional[int]
"""
Removes the at job with the given number.
:param int nr: Job number.
"""
for job in list():
if job.nr == nr:
return job.rm()
return None
def _parseScript(job):
# type: (AtJob) -> None
"""
Internal function to load the job details by parsing the job of :command:`atq`.
:param AtJob job: A job.
"""
# FIXME: This should be a method of the class.
p = subprocess.Popen(['/usr/bin/at', '-c', str(job.nr)], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out = p.communicate()[0].decode('UTF-8', 'replace').splitlines()
job.comments = {}
script = False
job.command = ''
for line in out:
if script:
job.command = '%s%s\n' % (job.command, line)
continue
if line.startswith(COMMENT_PREFIX):
line = line[len(COMMENT_PREFIX):]
try:
key, value = line.split(':', 1)
except ValueError:
continue
try:
key, value = key.encode('UTF-8').decode('unicode_escape'), value.encode('UTF-8').decode('unicode_escape')
except UnicodeDecodeError:
pass # can only happen if user manipulates/fakes atjob.
job.comments[key] = value
elif line.startswith(SCRIPT_PREFIX):
script = True
def _parseJob(string):
# type: (str) -> Optional[AtJob]
"""
Internal method to parse output of :command:`atq`.
:param str string: A output line of :command:`atq`.
:returns: A :py:class:`AtJob` instance or `None`
:rtype: AtJob
"""
timeLocale = locale.getlocale(locale.LC_TIME)
try:
# change the time locale temporarily to 'C' as atq uses English date format
# ignoring the currently set locale
locale.setlocale(locale.LC_TIME, 'C')
# parse string
tmp = _regWhiteSpace.split(string)
execTime = datetime.datetime.strptime(' '.join(tmp[1:6]), _dateTimeFormatRead)
isRunning = tmp[6] == '='
owner = tmp[7]
nr = int(tmp[0])
except (IndexError, ValueError):
# parsing failed
return None
finally:
# reset locale to default
locale.setlocale(locale.LC_TIME, timeLocale)
return AtJob(nr, owner, execTime, isRunning)
[docs]class AtJob(object):
"""
This class is an abstract representation of an at-job. Do not initiate
the class directly, but use the methods provided in this module.
:param int nr: Job number.
:param str owner: User owning the job.
:param datetime.datetime execTime: Planned job execution time.
:param bool isRunning: `True` is the jub is currently running, `False` otherwise.
"""
def __init__(self, nr, owner, execTime, isRunning):
# type: (int, str, datetime.datetime, bool) -> None
self.nr = nr
self.owner = owner
self.command = None # type: Optional[str]
self.execTime = execTime
self.isRunning = isRunning
self.comments = {} # type: Dict[str, str]
def __str__(self):
# type: () -> str
t = self.execTime.strftime(_dateTimeFormatWrite)
if self.isRunning:
t = 'running'
return 'Job #%d (%s)' % (self.nr, t)
def __repr__(self):
# type: () -> str
return self.__str__()
[docs] def rm(self):
# type: () -> int
"""
Remove the job from the queue.
"""
p = subprocess.Popen(['/usr/bin/atrm', str(self.nr)], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.communicate()
return p.returncode == 0