#!/usr/bin/python3
# SPDX-FileCopyrightText: 2010-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""Install UCS release and errata updates."""
from __future__ import annotations
import logging
import os
import socket
import subprocess
import sys
import time
import traceback
from argparse import ArgumentParser, FileType, Namespace
from typing import TYPE_CHECKING, NoReturn
from univention.admindiary.client import write_event
from univention.admindiary.events import UPDATE_FINISHED_FAILURE, UPDATE_FINISHED_SUCCESS, UPDATE_STARTED
from univention.config_registry import ConfigRegistry, handler_set
from univention.lib.ucs import UCS_Version
from univention.updater.errors import ConfigurationError
from univention.updater.locking import UpdaterLock
from univention.updater.tools import UniventionUpdater
if TYPE_CHECKING:
from collections.abc import Iterable
FN_STATUS = '/var/lib/univention-updater/univention-upgrade.status'
UCR_UPDATE_AVAILABLE = 'update/available'
LOGFN = '/var/log/univention/updater.log'
configRegistry = ConfigRegistry()
logfd = None
silent = False
[docs]
def dprint(silent: bool, msg: str, newline: bool = True, debug: bool = False) -> None:
"""Print debug output."""
if silent:
return
if logfd:
if newline:
print(msg, file=logfd)
else:
print('%-55s' % msg, end=' ', file=logfd)
logfd.flush()
if not debug:
if newline:
print(msg)
else:
print('%-55s' % msg, end=' ')
sys.stdout.flush()
updater_status = {}
[docs]
def update_status(**kwargs: str) -> None:
"""
update updater_status and write status to disk
Keys:
- current_version ==> UCS_Version ==> 2.3-1
- next_version ==> UCS_Version ==> 2.3-2
- updatetype ==> (LOCAL|NET)
- status ==> (RUNNING|FAILED|DONE)
- errorsource ==> (SETTINGS|PREPARATION|PREUP|UPDATE|POSTUP)
"""
updater_status.update(kwargs)
# write temporary file
fn = '%s.new' % FN_STATUS
try:
with open(fn, 'w+') as fd:
fd.writelines('%s=%s\n' % (key, val) for key, val in updater_status.items())
except OSError:
dprint(silent, 'Warning: cannot update %s' % fn)
try:
os.rename(fn, FN_STATUS)
except OSError:
dprint(silent, 'Warning: cannot update %s' % FN_STATUS)
[docs]
def readcontinue(msg: str) -> bool:
"""Print message and read yes/no/abort answer."""
while True:
try:
choice = input(msg).lower().strip()
if choice in ('', 'y', 'j'):
print('')
return True
elif choice == 'n':
print('')
return False
else:
continue
except KeyboardInterrupt:
print('\n')
return False
def _package_list(new_packages: Iterable[tuple[str, ...]]) -> str:
"""Return comma separated list of packages."""
return ",".join(p[0] for p in new_packages)
[docs]
def do_release_update(options: Namespace, checkForUpdates: bool, silent: bool) -> bool:
updater = UniventionUpdater()
# get next release update version
dprint(silent, 'Checking for release updates: ', newline=False)
version_next = updater.release_update_available()
if not version_next:
dprint(silent, 'none')
return False
if options.updateto and UCS_Version(options.updateto) < UCS_Version(version_next):
dprint(silent, '%s is available but updater has been instructed to stop at version %s.' % (version_next, options.updateto))
return False
dprint(silent, 'found: UCS %s' % version_next)
if checkForUpdates:
return True
interactive = not (options.noninteractive or checkForUpdates)
if interactive and not readcontinue('Do you want to update to %s [Y|n]?' % version_next):
return False
update_status(
current_version=updater.current_version,
next_version=version_next,
status='RUNNING')
dprint(silent, 'Starting update to UCS version %s at %s...' % (version_next, time.ctime()), debug=True)
dprint(silent, 'Starting update to UCS version %s' % (version_next))
time.sleep(1)
params = ['--silent']
if options.ignoressh:
params.append('--ignoressh')
if options.ignoreterm:
params.append('--ignoreterm')
retcode = subprocess.call(['/usr/share/univention-updater/univention-updater', 'net', '--updateto', '%s' % version_next, *params], env=os.environ)
if retcode:
dprint(silent, 'exitcode of univention-updater: %s' % retcode, debug=True)
dprint(silent, 'ERROR: update failed. Please check /var/log/univention/updater.log\n')
update_status(status='FAILED', errorsource='UPDATE')
sys.exit(1)
dprint(silent, 'Update to UCS version %s finished at %s...' % (version_next, time.ctime()), debug=True)
return True
[docs]
def do_package_updates(options: Namespace, checkForUpdates: bool, silent: bool) -> bool:
interactive = not (options.noninteractive or checkForUpdates)
updater = UniventionUpdater()
# check if component updates are available
dprint(silent, 'Checking for package updates: ', newline=False)
new_packages, upgraded_packages, removed_packages = updater.component_update_get_packages()
update_available = bool(new_packages + upgraded_packages + removed_packages)
if not update_available:
dprint(silent, 'none')
return False
# updates available ==> stop here in "check-mode"
if checkForUpdates:
dprint(silent, 'found')
return True
dprint(silent, 'found\n')
if len(removed_packages) > 0:
dprint(silent, 'The following packages will be REMOVED:\n %s' % _package_list(removed_packages))
if len(new_packages) > 0:
dprint(silent, 'The following packages will be installed:\n %s' % _package_list(new_packages))
if len(upgraded_packages) > 0:
dprint(silent, 'The following packages will be upgraded:\n %s' % _package_list(upgraded_packages))
if interactive and not readcontinue('\nDo you want to continue [Y|n]?'):
return False
time.sleep(1)
dprint(silent, 'Starting dist-update at %s...' % (time.ctime()), debug=True)
dprint(silent, 'Starting package upgrade', newline=False)
hostname = socket.gethostname()
context_id = write_event(UPDATE_STARTED, {'hostname': hostname})
if context_id:
os.environ['ADMINDIARY_CONTEXT'] = context_id
returncode = updater.run_dist_upgrade()
if returncode:
dprint(silent, 'exitcode of apt-get dist-upgrade: %s' % returncode, debug=True)
dprint(silent, 'ERROR: update failed. Please check /var/log/univention/updater.log\n')
update_status(status='FAILED', errorsource='UPDATE')
write_event(UPDATE_FINISHED_FAILURE, {'hostname': hostname})
sys.exit(1)
dprint(silent, 'dist-update finished at %s...' % (time.ctime()), debug=True)
dprint(silent, 'done')
write_event(UPDATE_FINISHED_SUCCESS, {'hostname': hostname, 'version': 'UCS %(version/version)s-%(version/patchlevel)s errata%(version/erratalevel)s' % configRegistry})
time.sleep(1)
return True
[docs]
def do_app_updates(options: Namespace, checkForUpdates: bool, silent: bool) -> bool | None:
dprint(silent, 'Checking for app updates: ', newline=False)
if not options.app_updates:
dprint(silent, 'skipped')
return None
interactive = not (options.noninteractive or checkForUpdates)
try:
import univention.appcenter.log as appcenter_log
from univention.appcenter.actions import Abort, get_action
from univention.appcenter.app_cache import Apps
app_upgrade_search = get_action('upgrade-search')
app_upgrade = get_action('upgrade')
if app_upgrade is None:
raise ImportError()
except ImportError:
# the new univention.appcenter package is not installed. Never mind
# cannot be a dependency as the app center depends on updater...
dprint(silent, 'unavailable')
return False
# check if component updates are available
appcenter_log.log_to_logfile()
# own logging
logger = logging.getLogger('univention.appcenter.actions.upgrade.readme')
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
logger.addHandler(handler)
try:
app_upgrade_search.call()
except Abort:
pass
new_apps = list(app_upgrade.iter_upgradable_apps())
if not new_apps:
dprint(silent, 'none')
return False
elif checkForUpdates:
dprint(silent, 'found')
return True
dprint(silent, 'found\n')
dprint(silent, 'The following apps can be upgraded:\n')
for app in new_apps:
newer_app = Apps().find_candidate(app)
if newer_app:
dprint(silent, '%(name)s: Version %(old)s can be upgraded to %(new)s' % {
'name': app.name,
'old': app.version,
'new': newer_app.version,
})
elif app.docker:
dprint(silent, '%(name)s: The underlying container can be upgraded' % {
'name': app.name,
})
else:
# don't know how this is possible... but anyways
dprint(silent, '%(name)s' % {
'name': app.name,
})
dprint(silent, 'Starting univention-app upgrade at %s...' % time.ctime(), debug=True)
dprint(silent, 'Most of the output for App upgrades goes to %s' % appcenter_log.LOG_FILE, debug=True)
dprint(silent, '\nStarting app upgrade', newline=False)
success = True
for app in new_apps:
if interactive and not readcontinue('\nDo you want to upgrade %s [Y|n]?' % app.name):
continue
success &= bool(app_upgrade.call_safe(
app=[app],
noninteractive=not interactive,
username=options.username,
pwdfile=options.pwdfile.name if options.pwdfile else None,
))
if not success:
dprint(silent, 'ERROR: app upgrade failed. Please check %s\n' % appcenter_log.LOG_FILE)
return False # pretend no updates available; otherwise do_exec may result in infinite loop
dprint(silent, 'univention-app upgrade finished at %s...' % time.ctime(), debug=True)
dprint(silent, 'done')
return not success # pending updates
[docs]
def parse_args(argv: list[str] | None = None) -> Namespace:
parser = ArgumentParser(description=__doc__)
parser.add_argument(
"--updateto",
help="update up to specified version. Expected format is X.Y-Z")
parser.add_argument(
"--check",
action="store_true",
help="check if updates are available")
parser.add_argument(
"--setucr",
action="store_true",
help="if set, variable update/available will be updated")
parser.add_argument(
"--ignoressh",
action="store_true",
help="pass --ignoressh to univention-updater")
parser.add_argument(
"--ignoreterm",
action="store_true",
help="pass --ignoreterm to univention-updater")
parser.add_argument(
"--noninteractive",
action="store_true",
help="Perform a non-interactive update")
group = parser.add_argument_group("App updates")
group.add_argument(
"--enable-app-updates",
dest="app_updates",
action="store_true",
help="Update installed Apps")
group.add_argument(
"--disable-app-updates",
dest="app_updates",
action="store_false",
help="Skip updating installed Apps")
group.add_argument(
"--username",
help="Name of the user used for registering the app")
group.add_argument(
"--pwdfile",
type=FileType("r"),
help="Name of the file containing the user password")
parser.set_defaults(app_updates=True)
options = parser.parse_args()
if options.updateto:
try:
UCS_Version(options.updateto)
except ValueError:
dprint(silent, "Unexpected format of updateto: %s .Expected format: X.Y-Z" % options.updateto)
sys.exit(1)
if options.app_updates:
if options.pwdfile and not options.username:
parser.error("--pwdfile without --username")
elif options.username and not options.pwdfile:
parser.error("--username without --pwdfile")
return options
[docs]
def main() -> None:
global logfd
options = parse_args()
try:
logfd = open(LOGFN, 'a+')
FORMAT = "%(asctime)-15s " + logging.BASIC_FORMAT
logger = logging.getLogger('updater')
handler = logging.StreamHandler(logfd)
formatter = logging.Formatter(FORMAT)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.ERROR)
except OSError:
print('Cannot open %s for writing' % LOGFN)
sys.exit(1)
configRegistry.load()
if options.noninteractive:
os.environ['UCS_FRONTEND'] = 'noninteractive'
dprint(silent, '\nStarting univention-upgrade. Current UCS version is %(version/version)s-%(version/patchlevel)s errata%(version/erratalevel)s\n' % configRegistry)
with UpdaterLock():
if options.check:
return do_update(options)
update_local_repository(options)
do_update(options)
[docs]
def update_local_repository(options: Namespace) -> None:
dprint(silent, 'Checking for local repository: ', newline=False)
if configRegistry.is_true('local/repository', False) and configRegistry.is_true('repository/mirror', False):
dprint(silent, 'found\n')
if options.noninteractive or readcontinue('Update the local repository via network [Y|n]?'):
if options.updateto:
subprocess.call(('/usr/sbin/univention-repository-update', 'net', '--updateto', options.updateto))
else:
subprocess.call(('/usr/sbin/univention-repository-update', 'net'))
else:
dprint(silent, 'none')
[docs]
def do_update(options: Namespace) -> None:
try:
update_available = performUpdate(options, checkForUpdates=options.check, silent=False)
except ConfigurationError as e:
update_status(status='FAILED', errorsource='SETTINGS')
print('The connection to the repository server failed: %s. Please check the repository configuration and the network connection.' % e, file=sys.stderr)
sys.exit(3)
except Exception:
update_status(status='FAILED')
print('An error occurred - see "%s" for details' % (LOGFN,))
print('Traceback in univention-upgrade:', file=logfd)
print(traceback.format_exc(), file=logfd)
sys.exit(2)
update_status(status='DONE')
if options.setucr and configRegistry.is_true(UCR_UPDATE_AVAILABLE) != update_available:
handler_set([
'%s=%s' % (UCR_UPDATE_AVAILABLE, 'yes' if update_available else 'no'),
])
if options.check:
if update_available:
dprint(silent, 'Please rerun command without --check argument to install.')
sys.exit(0)
else:
print('No update available.')
sys.exit(1)
if update_available:
do_exec()
[docs]
def do_exec() -> NoReturn:
# The updater/UCR/libs might have been replaced - re-execute!
cmd = [*sys.argv, '--setucr']
print("execv(%r)" % (cmd,), file=logfd)
os.execv(sys.argv[0], cmd) # noqa: S606
if __name__ == '__main__':
main()