#!/usr/bin/python3
#
# Univention Management Console
# module: updater
#
# SPDX-FileCopyrightText: 2011-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
import importlib.util
import os
from collections.abc import Callable, Iterable
from datetime import datetime
from hashlib import md5
from os import getpid, stat
from shlex import quote
from time import time
from traceback import format_exc
from typing import TYPE_CHECKING, Any
import psutil
from apt import Cache
from univention.lib import atjobs
from univention.lib.i18n import Translation
from univention.management.console.config import ucr
from univention.management.console.log import MODULE
from univention.management.console.modules import Base, UMC_Error
from univention.management.console.modules.decorators import sanitize, simple_response, threaded
from univention.management.console.modules.sanitizers import (
ChoicesSanitizer, IntegerSanitizer, ListSanitizer, StringSanitizer,
)
from univention.updater.errors import RequiredComponentError
from univention.updater.tools import UniventionUpdater
if TYPE_CHECKING:
from types import ModuleType
_ = Translation('univention-management-console-module-updater').translate
# the file whose file time is used as the 'serial' value for the 'Components' grid.
COMPONENTS_SERIAL_FILE = '/etc/apt/sources.list.d/20_ucs-online-component.list'
HOOK_DIRECTORY = '/usr/share/univention-updater/hooks'
INSTALLERS = {
'release': {
'purpose': _("Release update to version %s"),
'command': "/usr/share/univention-updater/univention-updater net --updateto %s --ignoressh --ignoreterm",
'prejob': 'ucr set updater/maintenance=true',
'postjob': 'ucr set updater/maintenance=false',
'logfile': '/var/log/univention/updater.log',
'statusfile': '/var/lib/univention-updater/univention-updater.status',
},
'distupgrade': {
'purpose': _("Package update"),
'command': "/usr/share/univention-updater/univention-updater-umc-dist-upgrade; /usr/share/univention-updater/univention-updater-check",
'prejob': '/usr/share/univention-updater/disable-apache2-umc',
'postjob': '/usr/share/univention-updater/enable-apache2-umc --no-restart',
'logfile': '/var/log/univention/updater.log',
'statusfile': '/var/lib/univention-updater/umc-dist-upgrade.status',
},
}
[docs]
class Watched_File:
"""
A class that takes a file name and watches changes to this file.
We don't use any advanced technologies (FAM, inotify etc.) but
rather basic 'stat' calls, monitoring mtime and size.
"""
def __init__(self, path: str, count: int = 2) -> None:
self._file = path
self._count = count
self._last_returned_stamp = 0 # the last result we returned to the caller. will be returned as long as there are not enough changes.
self._unchanged_count = 0 # incremented if size and timestamp didn't change
self._last_stamp = 0 # last timestamp we've seen
self._last_size = 0 # last size we've seen
self._last_md5 = ''
[docs]
def timestamp(self) -> int:
"""
Main function. returns the current timestamp whenever size or mtime
have changed. Defers returning the new value until changes have
settled down, e.g. until the same values have appeared 'count' times.
"""
current_stamp = 0
current_size = 0
st = stat(self._file)
if st:
current_stamp = int(st.st_mtime)
current_size = st.st_size
# Fake a changed mtime if size is different. Subsequent processing
# only depends on the mtime field.
if current_size != self._last_size:
current_stamp = int(time())
MODULE.info("Size of '%s': %s -> %s", self._file, self._last_size, current_size)
self._last_size = current_size
if current_stamp == self._last_stamp:
self._unchanged_count += 1
if self._unchanged_count >= self._count:
# Don't record new timestamp if MD5 of file is the same
try:
with open(self._file, 'rb') as fd:
hash_ = md5(fd.read()).hexdigest()
except OSError:
pass
else:
if hash_ != self._last_md5:
self._last_md5 = hash_
self._last_returned_stamp = current_stamp
else:
MODULE.info("Hash of '%s' unchanged", self._file)
else:
self._unchanged_count = 0
self._last_stamp = current_stamp
return self._last_returned_stamp
[docs]
class Watched_Files:
"""Convenience class to monitor more than one file at a time."""
def __init__(self, files: Iterable[str], count: int = 2) -> None:
self._count = count
self._files = [Watched_File(f, 0) for f in files]
self._last_returned_stamp = 0 # the last result we returned to the caller. will be returned as long as there are not enough changes.
self._unchanged_count = 0 # incremented if size and timestamp didn't change
self._last_stamp = 0 # last timestamp we've seen
[docs]
def timestamp(self) -> int:
latest = max(f.timestamp() for f in self._files)
if latest == self._last_stamp:
self._unchanged_count += 1
if self._unchanged_count >= self._count:
self._last_returned_stamp = latest
else:
self._unchanged_count = 0
self._last_stamp = latest
return self._last_returned_stamp
[docs]
class Instance(Base):
[docs]
def init(self) -> None:
MODULE.info("Initializing 'updater' module (PID = %d)", getpid())
self._current_job = ''
self._logfile_start_line = 0
self._serial_file = Watched_File(COMPONENTS_SERIAL_FILE)
try:
self.uu = UniventionUpdater(False)
except Exception as exc: # FIXME: let it raise
self.uu = None
MODULE.error("init() ERROR: %s", exc)
[docs]
@simple_response
def query_maintenance_information(self) -> dict[str, Any]:
ret = self._maintenance_information()
ret.update(self._last_update())
return ret
def _last_update(self) -> dict[str, Any]:
status_file = '/var/lib/univention-updater/univention-updater.status'
ret: dict[str, Any] = {'last_update_failed': False, 'last_update_version': None}
try:
fstat = stat(status_file)
mtime = datetime.fromtimestamp(fstat.st_mtime)
delta = datetime.now() - mtime
if delta.days != 0: # no fresh failure
return ret
with open(status_file) as fd:
info: dict[str, str] = dict(
line.strip().split('=', 1) # type: ignore
for line in fd
)
ret['last_update_failed'] = info.get('status') == 'FAILED'
if ret['last_update_failed']:
ret['last_update_version'] = info.get('next_version')
except (OSError, ValueError) as exc:
MODULE.error(str(exc))
return ret
def _maintenance_information(self) -> dict[str, Any]:
default = {'show_warning': False}
if not self.uu:
return default
ucr.load()
if ucr.is_true('license/extended_maintenance/disable_warning'):
return default
version = self.uu.current_version
for _ver, data in self.uu.get_releases(version, version):
status = data.get('status', 'unmaintained')
maintenance_extended = status == 'extended'
show_warning = maintenance_extended or status != 'maintained'
return {
'ucs_version': str(version),
'show_warning': show_warning,
'maintenance_extended': maintenance_extended,
'base_dn': ucr.get('license/base'),
}
return default
[docs]
@simple_response
def query_releases(self) -> list[dict[str, str]]:
"""
Returns a list of system releases suitable for the
corresponding ComboBox
"""
# be as current as possible.
self.uu.ucr_reinit()
ucr.load()
appliance_mode = ucr.is_true('server/appliance')
available_versions, blocking_components = self.uu.get_all_available_release_updates()
result = [{'id': str(rel), 'label': 'UCS %s' % (rel,)} for rel in available_versions]
#
# appliance_mode=no ; blocking_comp=no → add "latest version"
# appliance_mode=no ; blocking_comp=yes → no "latest version"
# appliance_mode=yes; blocking_comp=no → add "latest version"
# appliance_mode=yes; blocking_comp=yes → add "latest version"
#
if result and (appliance_mode or not blocking_components):
# UniventionUpdater returns available version in ascending order, so
# the last returned entry is the one to be flagged as 'latest' if there's
# no blocking component.
result[-1]['label'] = '%s (%s)' % (result[-1]['label'], _('latest version'))
return result
[docs]
@sanitize(
hooks=ListSanitizer(StringSanitizer(minimum=1), required=True),
)
@threaded
def call_hooks(self, request) -> Any:
"""Calls the specified hooks and returns data given back by each hook"""
result = {}
hookmanager = HookManager(HOOK_DIRECTORY) # , raise_exceptions=False
hooks = request.options['hooks']
MODULE.info('requested hooks: %s', hooks)
for hookname in hooks:
MODULE.info('calling hook %s', hookname)
result[hookname] = hookmanager.call_hook(hookname)
MODULE.info('result: %r', result)
return result
[docs]
@simple_response
def updates_check(self) -> dict[str, list[tuple[str, str]]]:
"""
Returns the list of packages to be updated/installed
by a dist-upgrade.
"""
install = []
update = []
remove = []
apt = Cache(memonly=True)
apt.update()
apt.open()
apt.clear()
apt.upgrade(dist_upgrade=True)
for pkg in apt.get_changes():
if pkg.marked_install:
assert pkg.candidate is not None
install.append((pkg.name, pkg.candidate.version))
if pkg.marked_upgrade:
assert pkg.candidate is not None
update.append((pkg.name, pkg.candidate.version))
if pkg.marked_delete:
assert pkg.installed is not None
remove.append((pkg.name, pkg.installed.version))
return {
"update": sorted(update),
"install": sorted(install),
"remove": sorted(remove),
}
[docs]
@simple_response
def updates_available(self) -> bool:
"""
Asks if there are package updates available. (don't get confused
by the name of the UniventionUpdater function that is called here.)
This is a separate call since it can take an amount of time, thus
being invoked by a separate button (and not in the background)
"""
ucr.load()
try:
# be as current as possible.
what = 'reinitializing UniventionUpdater'
self.uu.ucr_reinit()
what = 'checking update availability'
new, upgrade, removed = self.uu.component_update_get_packages()
return any((new, upgrade, removed))
except Exception as ex:
typ = str(type(ex)).strip('<>')
msg = '[while %s] [%s] %s' % (what, typ, str(ex))
MODULE.error(msg)
return False
[docs]
def status(self, request) -> None: # TODO: remove unneeded things
"""One call for all single-value variables."""
result: dict[str, Any] = {}
ucr.load()
try:
result['erratalevel'] = int(ucr.get('version/erratalevel', 0))
except ValueError:
result['erratalevel'] = 0
result['appliance_mode'] = ucr.is_true('server/appliance')
result['timestamp'] = int(time())
result['reboot_required'] = ucr.is_true('update/reboot/required', False)
try:
# be as current as possible.
what = 'reinitializing UniventionUpdater'
self.uu.ucr_reinit()
what = 'getting UCS version'
result['ucs_version'] = str(self.uu.current_version)
# if nothing is returned -> convert to empty string.
what = 'querying available release updates'
try:
ver = self.uu.release_update_available(errorsto='exception')
result['release_update_available'] = '' if ver is None else str(ver)
except RequiredComponentError as exc:
result['release_update_available'] = exc.version
what = 'querying update-blocking components'
blocking_components = self.uu.get_all_available_release_updates()[1] or set()
# check apps
if result['release_update_available']:
try:
from univention.appcenter.actions import get_action
update_check = get_action('update-check')
if update_check:
blocking_apps = update_check.get_blocking_apps(ucs_version=result['release_update_available'])
if blocking_apps:
blocking_components.update(set(blocking_apps))
except (ImportError, ValueError):
# the new univention.appcenter package is not installed.
# Cannot be a dependency as the app center depends on updater...
raise UMC_Error(_('Error checking if installed apps are available for next UCS version.'))
result['release_update_blocking_components'] = ' '.join(blocking_components)
# Component counts are now part of the general 'status' data.
what = "counting components" # noqa: F841
components = [bool(comp) for comp in self.uu.get_components(all=True)]
result['components'] = len(components)
result['enabled'] = sum(components)
# HACK: the 'Updates' form polls on the serial file
# to refresh itself. Including the serial value
# into the form helps us to have a dependent field
# that can trigger the refresh of the "Releases"
# combobox and the 'package updates available' field.
result['serial'] = self._serial_file.timestamp()
except Exception as exc: # FIXME: don't catch everything
raise UMC_Error("%s %s %s" % (
_('Error contacting the update server. Please check your proxy or firewall settings, if any. Or it may be a problem with your configured DNS server.'),
_('This is the error message:'),
exc,
), traceback=format_exc())
self.finished(request.id, [result])
[docs]
@simple_response
def running(self) -> str:
"""
Returns the id (key into INSTALLERS) of a currently
running job, or the empty string if nothing is running.
"""
return self.__which_job_is_running()
[docs]
@sanitize(
job=ChoicesSanitizer([*list(INSTALLERS), ''], required=True),
count=IntegerSanitizer(default=0),
)
@simple_response
def updater_log_file(self, job: str, count: int) -> float | list[str] | None:
"""
returns the content of the log file associated with
the job.
:param job: Job name.
:param count: has the same meaning as already known:
<0 ...... return timestamp of file (for polling)
0 ....... return whole file as a string list
>0 ...... ignore this many lines, return the rest of the file
.. note::
As soon as we have looked for a running job at least once,
we know the job key and can associate it here.
TODO: honor a given 'job' argument
"""
job = self._current_job or job
if not job:
return None
fname = INSTALLERS[job]['logfile']
if count < 0:
try:
return stat(fname).st_ctime
except OSError:
return 0
# don't read complete file if we have an 'ignore' count
count += self._logfile_start_line
return self._logview(fname, -count)
def _logview(self, fname: str, count: int) -> list[str]:
"""
Contains all functions needed to view or 'tail' an arbitrary text file.
:param count: can have different values:
< 0 ... ignore this many lines, return the rest of the file
0 ..... return the whole file, split into lines.
> 0 ... return the last 'count' lines of the file. (a.k.a. tail -n <count>)
"""
lines = []
try:
with open(fname, 'rb') as fd:
for line in fd:
if (count < 0):
count += 1
else:
lines.append(line.rstrip().decode('utf-8', 'replace'))
if (count > 0) and (len(lines) > count):
lines.pop(0)
except OSError:
pass
return lines
[docs]
@sanitize(
job=ChoicesSanitizer(INSTALLERS, required=True),
)
@simple_response
def updater_job_status(self, job: str) -> dict[str, Any]: # TODO: remove this completely
"""Returns the status of the current/last update even if the job is not running anymore."""
result: dict[str, Any] = {}
try:
with open(INSTALLERS[job]['statusfile']) as fd:
for line in fd:
fields = line.strip().split('=')
if len(fields) == 2:
result['_%s_' % fields[0]] = fields[1]
except OSError:
pass
result['running'] = self.__which_job_is_running() != ''
return result
[docs]
@sanitize(
job=ChoicesSanitizer(INSTALLERS, required=True),
detail=StringSanitizer(r'^[A-Za-z0-9\.\- ]*$'),
)
@simple_response
def run_installer(self, job: str, detail: str = '') -> dict[str, int]:
"""
This is the function that invokes any kind of installer. Arguments accepted:
:param job: ..... the main thing to do. can be one of:
'release' ...... perform a release update
'distupgrade' .. update all currently installed packages (distupgrade)
:param detail: ....... an argument that specifies the subject of the installer:
for 'release' .... the target release number,
for all other subjects: detail has no meaning.
"""
MODULE.info("Starting function %r", job)
self._current_job = job
spec = INSTALLERS[job]
# remember initial lines of logfile before starting update to not show it in the frontend
logfile = spec['logfile']
try:
with open(logfile, 'rb') as fd:
self._logfile_start_line = sum(1 for line in fd)
except OSError:
pass
command = spec['command']
if '%' in command:
command = command % (quote(detail).translate({0: None, 10: None, 13: None}),)
MODULE.info("Creating job: %r", command)
command = '''
%s
%s < /dev/null
%s''' % (spec["prejob"], command, spec["postjob"])
atjobs.add(command, comments={"lines": self._logfile_start_line})
return {'status': 0}
def __which_job_is_running(self) -> str:
# first check running at jobs
for atjob in atjobs.list(True):
for job, inst in INSTALLERS.items():
cmd = inst['command'].split('%')[0]
if cmd in atjob.command:
self._current_job = job
try:
self._logfile_start_line = int(atjob.comments.get('lines', 0))
except ValueError:
pass
return job
# no atjob found, parse process list (if univention-upgrade was started via CLI)
commands = [
('/usr/share/univention-updater/univention-updater-umc-dist-upgrade', 'distupgrade'),
('/usr/share/univention-updater/univention-updater', 'release'),
('/usr/sbin/univention-upgrade', 'distupgrade'), # we don't know if it is a dist-upgrade or a release upgrade
]
for cmd, job in commands:
for process in psutil.process_iter():
try:
cmdline = process.cmdline() if callable(process.cmdline) else process.cmdline
except psutil.NoSuchProcess:
pass
if cmd in cmdline:
self._current_job = job
self._logfile_start_line = 0
return job
return ''
[docs]
class HookManager:
"""
This class tries to provide a simple interface to load and call hooks within existing code.
Python modules are loaded from specified `module_dir` and automatically registered.
These Python modules have to contain at least a global method `register_hooks()` that returns
a list of tuples (`hook_name`, `callable`).
Simple hook file example::
def test_hook(*args, **kwargs):
print('1ST_TEST_HOOK:', args, kwargs)
return ('Result', 1)
def other_hook(*args, **kwargs):
print('OTHER_HOOK:', args, kwargs)
return 'Other result'
def register_hooks():
return [
('test_hook', test_hook),
('pre_hook', other_hook),
]
The method `call_hook(hookname, *args, **kwargs)` calls all registered methods for specified
hookname and passes `*args` and `**kwargs` to them. The return value of each method will be
saved and returned by `call_hook()` as a list. If no method has been registered for
specified hookname, an empty list will be returned.
If `raise_exceptions` has been set to `False`, exceptions while loading Python modules will be
discarded silently. If a hook raises an exception, it will be caught and returned in
result list of `call_hooks()` instead of corresponding return value. E.g.::
[['Mein', 'Result', 123], <exceptions.ValueError instance at 0x7f80496f6638>]
How to use HookManager:
>>> hm = HookManager(TESTDIR)
>>> list(hm.get_hook_list())
['test_hook', 'pre_hook']
>>> result = hm.call_hook('test_hook', 'abc', 123, x=1)
1ST_TEST_HOOK: ('abc', 123) {'x': 1}
2ND_TEST_HOOK: ('abc', 123) {'x': 1}
>>> result
[('Result', 1), ('Result', 2)]
>>> hm.call_hook('unknown_hook')
[]
"""
def __init__(self, module_dir: str, raise_exceptions: bool = True) -> None:
"""
:param module_dir: path to directory that contains Python modules with hook functions
:param raise_exceptions: if `False`, all exceptions while loading Python modules will be dropped and all exceptions while calling hooks will be caught and returned in result list
"""
self.__loaded_modules: dict[str, ModuleType] = {}
self.__registered_hooks: dict[str, list[Callable[..., Any]]] = {}
self.__module_dir = module_dir
self.__raise_exceptions = raise_exceptions
self.__load_hooks()
self.__register_hooks()
def __load_hooks(self) -> None:
"""loads all Python modules in specified module directory."""
if os.path.exists(self.__module_dir) and os.path.isdir(self.__module_dir):
for f in os.listdir(self.__module_dir):
if f.endswith('.py') and len(f) > 3:
modname = f[0:-3]
try:
spec = importlib.util.spec_from_file_location(modname, os.path.join(self.__module_dir, f))
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) # type: ignore
self.__loaded_modules[modname] = module
except Exception:
if self.__raise_exceptions:
raise
def __register_hooks(self) -> None:
for module in self.__loaded_modules.values():
try:
hooklist = module.register_hooks() # type: ignore
for hookname, func in hooklist:
# if returned function is not callable then continue
if not callable(func):
continue
self.__registered_hooks.setdefault(hookname, []).append(func)
except Exception:
if self.__raise_exceptions:
raise
[docs]
def set_raise_exceptions(self, val: bool) -> None:
"""
Enable or disable raising exceptions.
:param val: `True` to pass exceptions through, `False` to return them instead of the return value.
"""
if val in (True, False):
self.__raise_exceptions = val
else:
raise ValueError('boolean value required')
[docs]
def get_hook_list(self) -> Iterable[str]:
"""returns a list of hook names that have been defined by loaded Python modules."""
return self.__registered_hooks.keys()
[docs]
def call_hook(self, name: str, *args: Any, **kwargs: Any) -> list[Any]:
"""
All additional arguments are passed to hook methods.
If `self.__raise_exceptions` is `False`, all exceptions while calling hooks will be caught and returned in result list.
If return value is an empty list, no hook has been called.
"""
result = []
for func in self.__registered_hooks.get(name, []):
try:
res = func(*args, **kwargs)
result.append(res)
except Exception as e:
if self.__raise_exceptions:
raise
else:
result.append(e)
return result