Source code for univention.updater.scripts.actualize

# -*- coding: utf-8 -*-
# Copyright 2004-2022 Univention GmbH
# All rights reserved.
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <>.

# TODO: use UCR variables for update, upgrade, install and remove commands
from __future__ import print_function

import os
import re
import shlex
import subprocess
import sys
import time
from argparse import ArgumentParser, Namespace  # noqa: F401
from typing import Any, Container, List, NoReturn, Optional, Sequence  # noqa: F401

from univention.config_registry import ConfigRegistry, handler_set, handler_unset
from univention.lib.policy_result import PolicyResultFailed, policy_result
from univention.updater.commands import (
from univention.updater.locking import UpdaterLock, apt_lock

    from typing_extensions import Literal  # noqa: F401
    _JOB = Literal["add", "remove"]
except ImportError:
    _JOB = str  # type: ignore

LOGNAME = '/var/log/univention/actualise.log'
RE_STAT = re.compile(br'^(\d+) upgraded, (\d+) newly installed, (\d+) to remove and (?:\d+) not upgraded.')
RE_APT = re.compile(r'^(?:deb|deb-src)\s+(?:\[[^]]+\]\s+)?([^:]*):')

configRegistry = ConfigRegistry()

ldap_hostdn = configRegistry.get('ldap/hostdn')

[docs]class Tee(object): ''' Writes the given string to several files at once. Could by used with the print statement ''' def __init__(self, files: Sequence[str] = [], stdout: bool = True, filter: Optional[str] = None) -> None: self.stdout = stdout self.files = files self.filter = filter
[docs] def call(self, command: Sequence[str], **kwargs: Any) -> int: p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs) tee_command = ['tee', '-a'] + list(self.files) if self.stdout: if self.filter: tee = subprocess.Popen(tee_command, stdin=p.stdout, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) egrep = subprocess.Popen(['egrep', self.filter], stdin=tee.stdout) ret = egrep.wait() else: tee = subprocess.Popen(tee_command, stdin=p.stdout) else: tee = subprocess.Popen(tee_command, stdin=p.stdout, stdout=subprocess.DEVNULL) # Must wait for exit from back to front, only the exit status of p is relevant ret = tee.wait() ret = p.wait() return ret
[docs]def getUpdate() -> None: """ Small function waiting for apt lockfile to vanish then starts apt-get update """ print("Running apt-get update") with apt_lock(), open(LOGNAME, 'a') as logfile: res =, stdout=logfile, stderr=logfile) if res != 0: print("E: failed to update", file=sys.stderr) sys.exit(res)
[docs]def check(configRegistry: ConfigRegistry, dist_upgrade: bool = False) -> bool: """ Just probe if there are packages to add or remove :param dist_upgrade: Perform distribution upgrade instead of package updates. :returns: `True` if there are package changes. """ getUpdate() # Probe for packages to actualise cmd = cmd_dist_upgrade_sim if dist_upgrade else cmd_upgrade_sim with apt_lock(): proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) assert proc.stdout for line in proc.stdout: match = RE_STAT.match(line) if match: upgraded, newlyinstalled, remove = (int(_) for _ in match.groups()) break else: return False return any(( upgraded, newlyinstalled, remove, getPackageList(configRegistry, 'remove'), getPackageList(configRegistry, 'add'), ))
[docs]def getPackageList(configRegistry: ConfigRegistry, job: _JOB) -> List[str]: """ Get a list of packages to remove or add, depending on the value of job. :param configRegistry: UCR instance. :param job: `add` or `remove`. :returns: List of package names. """ try: packages_name = 'univention%sPackages%s' % ( { "memberserver": "Member", "domaincontroller_slave": "Slave", "domaincontroller_master": "Master", "domaincontroller_backup": "Master", }[configRegistry['server/role']], { "remove": "Remove", "add": "", }[job], ) except LookupError: exit("E: no valid job defined") try: results, _policies = policy_result(ldap_hostdn) return results.get(packages_name, []) except PolicyResultFailed as ex: sys.exit('failed to execute univention_policy_result: %s' % ex)
[docs]def parse_args() -> Namespace: parser = ArgumentParser(description="Perform a (dist-)upgrade and (un-)install packages as set through policies") parser.add_argument("--dist-upgrade", action="store_true", help="Perform a dist-upgrade instead of a regular update") parser.add_argument("--silent", action="store_true", help="Don't show normal output, but error messages only") parser.add_argument("--check", action="store_true", help="Don't do anything, just check if updates are available") return parser.parse_args()
[docs]def run(opt: Namespace) -> int: os.putenv('PATH', '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin') os.environ['LC_ALL'] = 'C.UTF-8' os.environ['DEBIAN_FRONTEND'] = 'noninteractive' check_failed() remove_temp() pkgdb = None pkgdb_scope = None if opt.silent: # redirect stdout to /dev/null sys.stdout = open("/dev/null", "w") try: if opt.check: # Only probe for packages to add/remove return check(configRegistry, opt.dist_upgrade) if ldap_hostdn: logfile = open(LOGNAME, 'a') logfile.write('***** Starting univention-actualise at %s\n' % time.ctime()) getUpdate() # temporarily disable pkgdb pkgdb_scan = configRegistry.get('pkgdb/scan', getscope=True) if pkgdb_scan: # get value and UCR scope of variable pkgdb/scan pkgdb_scope, pkgdb = pkgdb_scan if pkgdb: # disable pkgdb in UCR scope FORCED handler_set(['pkgdb/scan=no'], {'force': True}) rem_packages = getPackageList(configRegistry, 'remove') for package in rem_packages: # check if the package exists with apt_lock(): res = + [package], stdout=logfile, stderr=logfile) if res == 0: print("Removing packages: %s" % package) with apt_lock(): res =, stdout=logfile, stderr=logfile) if not res: res = + [package], stdout=logfile, stderr=logfile) else: print("The package %s doesn't exist." % package) res = 0 if res != 0: print("E: failed to remove %s" % package, file=sys.stderr) sys.exit(res) add_packages = getPackageList(configRegistry, 'add') for package in add_packages: with apt_lock(): res = + [package], stdout=logfile, stderr=logfile) if res == 0: print("Installing packages: %s" % package) with apt_lock(): res =, stdout=logfile, stderr=logfile) if not res: res = + [package], stdout=logfile, stderr=logfile) else: print("The package %s doesn't exist." % package) res = 0 if res != 0: print("E: failed to install %s" % package, file=sys.stderr) sys.exit(res) else: # ldap/hostdn is not set if configRegistry['server/role'] != 'basesystem': print("W: ldap/hostdn is not set - please run univention-join", file=sys.stderr) if opt.dist_upgrade: msg = "Dist-upgrading system" cmd = cmd_dist_upgrade else: msg = "Upgrading system" cmd = cmd_upgrade print(msg) # TODO: use mkstemp and close directly the file descriptor with apt_lock(): tee = Tee([LOGNAME], stdout=not opt.silent) res = if res != 0: print("E: failed to configure packets, see %s for details." % LOGNAME, file=sys.stderr) else: tee = Tee([LOGNAME], stdout=not opt.silent, filter='(^Get|^Unpacking|^Preparing|^Setting up|packages upgraded)') res =' ')) if res != 0: print("E: failed to upgrade, see %s for details." % LOGNAME, file=sys.stderr) sys.exit(res) finally: if pkgdb: if pkgdb_scope == ConfigRegistry.FORCED: # old value was set in FORCED scope handler_set(['pkgdb/scan=%s' % pkgdb], {'force': True}) else: # old value was set in any other scope ==> remove value in FORCED scope handler_unset(['pkgdb/scan'], {'force': True}) if str(pkgdb).lower() in ("yes", "enable", "enabled", "true", "1"):'/usr/sbin/univention-pkgdb-scan',))
[docs]def update_ucr_updatestatus() -> None: try:'/usr/share/univention-updater/univention-updater-check', stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except Exception: print('Warning: calling univention-updater-check failed.')
[docs]def check_failed() -> None: failure = '/var/lib/univention-updater/update-failed' if os.path.exists(failure): print('univention-actualise: univention-updater failed, stopping...') print(' remove `%s\' to proceed' % failure) sys.exit(2)
[docs]def remove_temp() -> None: try: for root, dirs, files in os.walk('/etc/apt/sources.list.d'): for file in [file for file in files if file.startswith('00_ucs_temporary_')]: filename = os.path.join(root, file) print('Warning: Deleting `%s` from incomplete update.' % filename) os.remove(filename) del dirs[:] except EnvironmentError: print('Failed, aborting.') sys.exit(2)
[docs]def main() -> NoReturn: opt = parse_args() res = 0 try: with UpdaterLock(): res = run(opt) except SystemExit as ex: if ex.args[0] == 0: update_ucr_updatestatus() sys.exit(res)
if __name__ == '__main__': main()