Source code for univention.updater.scripts.upgrade

#!/usr/bin/python3
# SPDX-FileCopyrightText: 2010-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""Install UCS release and errata updates."""

from __future__ import annotations

import logging
import os
import socket
import subprocess
import sys
import time
import traceback
from argparse import ArgumentParser, FileType, Namespace
from typing import TYPE_CHECKING, NoReturn

from univention.admindiary.client import write_event
from univention.admindiary.events import UPDATE_FINISHED_FAILURE, UPDATE_FINISHED_SUCCESS, UPDATE_STARTED
from univention.config_registry import ConfigRegistry, handler_set
from univention.lib.ucs import UCS_Version
from univention.updater.errors import ConfigurationError
from univention.updater.locking import UpdaterLock
from univention.updater.tools import UniventionUpdater


if TYPE_CHECKING:
    from collections.abc import Iterable


FN_STATUS = '/var/lib/univention-updater/univention-upgrade.status'

UCR_UPDATE_AVAILABLE = 'update/available'
LOGFN = '/var/log/univention/updater.log'

configRegistry = ConfigRegistry()
logfd = None
silent = False


[docs] def dprint(silent: bool, msg: str, newline: bool = True, debug: bool = False) -> None: """Print debug output.""" if silent: return if logfd: if newline: print(msg, file=logfd) else: print('%-55s' % msg, end=' ', file=logfd) logfd.flush() if not debug: if newline: print(msg) else: print('%-55s' % msg, end=' ') sys.stdout.flush()
updater_status = {}
[docs] def update_status(**kwargs: str) -> None: """ update updater_status and write status to disk Keys: - current_version ==> UCS_Version ==> 2.3-1 - next_version ==> UCS_Version ==> 2.3-2 - updatetype ==> (LOCAL|NET) - status ==> (RUNNING|FAILED|DONE) - errorsource ==> (SETTINGS|PREPARATION|PREUP|UPDATE|POSTUP) """ updater_status.update(kwargs) # write temporary file fn = '%s.new' % FN_STATUS try: with open(fn, 'w+') as fd: fd.writelines('%s=%s\n' % (key, val) for key, val in updater_status.items()) except OSError: dprint(silent, 'Warning: cannot update %s' % fn) try: os.rename(fn, FN_STATUS) except OSError: dprint(silent, 'Warning: cannot update %s' % FN_STATUS)
[docs] def readcontinue(msg: str) -> bool: """Print message and read yes/no/abort answer.""" while True: try: choice = input(msg).lower().strip() if choice in ('', 'y', 'j'): print('') return True elif choice == 'n': print('') return False else: continue except KeyboardInterrupt: print('\n') return False
def _package_list(new_packages: Iterable[tuple[str, ...]]) -> str: """Return comma separated list of packages.""" return ",".join(p[0] for p in new_packages)
[docs] def performUpdate(options: Namespace, checkForUpdates: bool = False, silent: bool = False) -> bool | None: for func in (do_package_updates, do_app_updates, do_release_update): if func(options, checkForUpdates, silent): if checkForUpdates: return True else: do_exec() return None
[docs] def do_release_update(options: Namespace, checkForUpdates: bool, silent: bool) -> bool: updater = UniventionUpdater() # get next release update version dprint(silent, 'Checking for release updates: ', newline=False) version_next = updater.release_update_available() if not version_next: dprint(silent, 'none') return False if options.updateto and UCS_Version(options.updateto) < UCS_Version(version_next): dprint(silent, '%s is available but updater has been instructed to stop at version %s.' % (version_next, options.updateto)) return False dprint(silent, 'found: UCS %s' % version_next) if checkForUpdates: return True interactive = not (options.noninteractive or checkForUpdates) if interactive and not readcontinue('Do you want to update to %s [Y|n]?' % version_next): return False update_status( current_version=updater.current_version, next_version=version_next, status='RUNNING') dprint(silent, 'Starting update to UCS version %s at %s...' % (version_next, time.ctime()), debug=True) dprint(silent, 'Starting update to UCS version %s' % (version_next)) time.sleep(1) params = ['--silent'] if options.ignoressh: params.append('--ignoressh') if options.ignoreterm: params.append('--ignoreterm') retcode = subprocess.call(['/usr/share/univention-updater/univention-updater', 'net', '--updateto', '%s' % version_next, *params], env=os.environ) if retcode: dprint(silent, 'exitcode of univention-updater: %s' % retcode, debug=True) dprint(silent, 'ERROR: update failed. Please check /var/log/univention/updater.log\n') update_status(status='FAILED', errorsource='UPDATE') sys.exit(1) dprint(silent, 'Update to UCS version %s finished at %s...' % (version_next, time.ctime()), debug=True) return True
[docs] def do_package_updates(options: Namespace, checkForUpdates: bool, silent: bool) -> bool: interactive = not (options.noninteractive or checkForUpdates) updater = UniventionUpdater() # check if component updates are available dprint(silent, 'Checking for package updates: ', newline=False) new_packages, upgraded_packages, removed_packages = updater.component_update_get_packages() update_available = bool(new_packages + upgraded_packages + removed_packages) if not update_available: dprint(silent, 'none') return False # updates available ==> stop here in "check-mode" if checkForUpdates: dprint(silent, 'found') return True dprint(silent, 'found\n') if len(removed_packages) > 0: dprint(silent, 'The following packages will be REMOVED:\n %s' % _package_list(removed_packages)) if len(new_packages) > 0: dprint(silent, 'The following packages will be installed:\n %s' % _package_list(new_packages)) if len(upgraded_packages) > 0: dprint(silent, 'The following packages will be upgraded:\n %s' % _package_list(upgraded_packages)) if interactive and not readcontinue('\nDo you want to continue [Y|n]?'): return False time.sleep(1) dprint(silent, 'Starting dist-update at %s...' % (time.ctime()), debug=True) dprint(silent, 'Starting package upgrade', newline=False) hostname = socket.gethostname() context_id = write_event(UPDATE_STARTED, {'hostname': hostname}) if context_id: os.environ['ADMINDIARY_CONTEXT'] = context_id returncode = updater.run_dist_upgrade() if returncode: dprint(silent, 'exitcode of apt-get dist-upgrade: %s' % returncode, debug=True) dprint(silent, 'ERROR: update failed. Please check /var/log/univention/updater.log\n') update_status(status='FAILED', errorsource='UPDATE') write_event(UPDATE_FINISHED_FAILURE, {'hostname': hostname}) sys.exit(1) dprint(silent, 'dist-update finished at %s...' % (time.ctime()), debug=True) dprint(silent, 'done') write_event(UPDATE_FINISHED_SUCCESS, {'hostname': hostname, 'version': 'UCS %(version/version)s-%(version/patchlevel)s errata%(version/erratalevel)s' % configRegistry}) time.sleep(1) return True
[docs] def do_app_updates(options: Namespace, checkForUpdates: bool, silent: bool) -> bool | None: dprint(silent, 'Checking for app updates: ', newline=False) if not options.app_updates: dprint(silent, 'skipped') return None interactive = not (options.noninteractive or checkForUpdates) try: import univention.appcenter.log as appcenter_log from univention.appcenter.actions import Abort, get_action from univention.appcenter.app_cache import Apps app_upgrade_search = get_action('upgrade-search') app_upgrade = get_action('upgrade') if app_upgrade is None: raise ImportError() except ImportError: # the new univention.appcenter package is not installed. Never mind # cannot be a dependency as the app center depends on updater... dprint(silent, 'unavailable') return False # check if component updates are available appcenter_log.log_to_logfile() # own logging logger = logging.getLogger('univention.appcenter.actions.upgrade.readme') handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.INFO) logger.addHandler(handler) try: app_upgrade_search.call() except Abort: pass new_apps = list(app_upgrade.iter_upgradable_apps()) if not new_apps: dprint(silent, 'none') return False elif checkForUpdates: dprint(silent, 'found') return True dprint(silent, 'found\n') dprint(silent, 'The following apps can be upgraded:\n') for app in new_apps: newer_app = Apps().find_candidate(app) if newer_app: dprint(silent, '%(name)s: Version %(old)s can be upgraded to %(new)s' % { 'name': app.name, 'old': app.version, 'new': newer_app.version, }) elif app.docker: dprint(silent, '%(name)s: The underlying container can be upgraded' % { 'name': app.name, }) else: # don't know how this is possible... but anyways dprint(silent, '%(name)s' % { 'name': app.name, }) dprint(silent, 'Starting univention-app upgrade at %s...' % time.ctime(), debug=True) dprint(silent, 'Most of the output for App upgrades goes to %s' % appcenter_log.LOG_FILE, debug=True) dprint(silent, '\nStarting app upgrade', newline=False) success = True for app in new_apps: if interactive and not readcontinue('\nDo you want to upgrade %s [Y|n]?' % app.name): continue success &= bool(app_upgrade.call_safe( app=[app], noninteractive=not interactive, username=options.username, pwdfile=options.pwdfile.name if options.pwdfile else None, )) if not success: dprint(silent, 'ERROR: app upgrade failed. Please check %s\n' % appcenter_log.LOG_FILE) return False # pretend no updates available; otherwise do_exec may result in infinite loop dprint(silent, 'univention-app upgrade finished at %s...' % time.ctime(), debug=True) dprint(silent, 'done') return not success # pending updates
[docs] def parse_args(argv: list[str] | None = None) -> Namespace: parser = ArgumentParser(description=__doc__) parser.add_argument( "--updateto", help="update up to specified version. Expected format is X.Y-Z") parser.add_argument( "--check", action="store_true", help="check if updates are available") parser.add_argument( "--setucr", action="store_true", help="if set, variable update/available will be updated") parser.add_argument( "--ignoressh", action="store_true", help="pass --ignoressh to univention-updater") parser.add_argument( "--ignoreterm", action="store_true", help="pass --ignoreterm to univention-updater") parser.add_argument( "--noninteractive", action="store_true", help="Perform a non-interactive update") group = parser.add_argument_group("App updates") group.add_argument( "--enable-app-updates", dest="app_updates", action="store_true", help="Update installed Apps") group.add_argument( "--disable-app-updates", dest="app_updates", action="store_false", help="Skip updating installed Apps") group.add_argument( "--username", help="Name of the user used for registering the app") group.add_argument( "--pwdfile", type=FileType("r"), help="Name of the file containing the user password") parser.set_defaults(app_updates=True) options = parser.parse_args() if options.updateto: try: UCS_Version(options.updateto) except ValueError: dprint(silent, "Unexpected format of updateto: %s .Expected format: X.Y-Z" % options.updateto) sys.exit(1) if options.app_updates: if options.pwdfile and not options.username: parser.error("--pwdfile without --username") elif options.username and not options.pwdfile: parser.error("--username without --pwdfile") return options
[docs] def main() -> None: global logfd options = parse_args() try: logfd = open(LOGFN, 'a+') FORMAT = "%(asctime)-15s " + logging.BASIC_FORMAT logger = logging.getLogger('updater') handler = logging.StreamHandler(logfd) formatter = logging.Formatter(FORMAT) handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.ERROR) except OSError: print('Cannot open %s for writing' % LOGFN) sys.exit(1) configRegistry.load() if options.noninteractive: os.environ['UCS_FRONTEND'] = 'noninteractive' dprint(silent, '\nStarting univention-upgrade. Current UCS version is %(version/version)s-%(version/patchlevel)s errata%(version/erratalevel)s\n' % configRegistry) with UpdaterLock(): if options.check: return do_update(options) update_local_repository(options) do_update(options)
[docs] def update_local_repository(options: Namespace) -> None: dprint(silent, 'Checking for local repository: ', newline=False) if configRegistry.is_true('local/repository', False) and configRegistry.is_true('repository/mirror', False): dprint(silent, 'found\n') if options.noninteractive or readcontinue('Update the local repository via network [Y|n]?'): if options.updateto: subprocess.call(('/usr/sbin/univention-repository-update', 'net', '--updateto', options.updateto)) else: subprocess.call(('/usr/sbin/univention-repository-update', 'net')) else: dprint(silent, 'none')
[docs] def do_update(options: Namespace) -> None: try: update_available = performUpdate(options, checkForUpdates=options.check, silent=False) except ConfigurationError as e: update_status(status='FAILED', errorsource='SETTINGS') print('The connection to the repository server failed: %s. Please check the repository configuration and the network connection.' % e, file=sys.stderr) sys.exit(3) except Exception: update_status(status='FAILED') print('An error occurred - see "%s" for details' % (LOGFN,)) print('Traceback in univention-upgrade:', file=logfd) print(traceback.format_exc(), file=logfd) sys.exit(2) update_status(status='DONE') if options.setucr and configRegistry.is_true(UCR_UPDATE_AVAILABLE) != update_available: handler_set([ '%s=%s' % (UCR_UPDATE_AVAILABLE, 'yes' if update_available else 'no'), ]) if options.check: if update_available: dprint(silent, 'Please rerun command without --check argument to install.') sys.exit(0) else: print('No update available.') sys.exit(1) if update_available: do_exec()
[docs] def do_exec() -> NoReturn: # The updater/UCR/libs might have been replaced - re-execute! cmd = [*sys.argv, '--setucr'] print("execv(%r)" % (cmd,), file=logfd) os.execv(sys.argv[0], cmd) # noqa: S606
if __name__ == '__main__': main()