#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2010-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
Install UCS release and errata updates.
"""
from __future__ import print_function
import os
import sys
import time
import subprocess
import traceback
import logging
import socket
from argparse import ArgumentParser, FileType, Namespace
from typing import Iterable, List, Optional, NoReturn, Tuple
from univention.lib.ucs import UCS_Version
from univention.config_registry import ConfigRegistry, handler_set
from univention.admindiary.client import write_event
from univention.admindiary.events import UPDATE_STARTED, UPDATE_FINISHED_SUCCESS, UPDATE_FINISHED_FAILURE
from univention.updater.errors import ConfigurationError
from univention.updater.tools import UniventionUpdater
from univention.updater.locking import UpdaterLock
FN_STATUS = '/var/lib/univention-updater/univention-upgrade.status'
UCR_UPDATE_AVAILABLE = 'update/available'
LOGFN = '/var/log/univention/updater.log'
configRegistry = ConfigRegistry()
logfd = None
silent = False
[docs]def dprint(silent: bool, msg: str, newline: bool = True, debug: bool = False) -> None:
"""Print debug output."""
if silent:
return
if logfd:
if newline:
print(msg, file=logfd)
else:
print('%-55s' % msg, end=' ', file=logfd)
logfd.flush()
if not debug:
if newline:
print(msg)
else:
print('%-55s' % msg, end=' ')
sys.stdout.flush()
updater_status = {}
[docs]def update_status(**kwargs: str) -> None:
'''
update updater_status and write status to disk
Keys:
- current_version ==> UCS_Version ==> 2.3-1
- next_version ==> UCS_Version ==> 2.3-2
- updatetype ==> (LOCAL|NET)
- status ==> (RUNNING|FAILED|DONE)
- errorsource ==> (SETTINGS|PREPARATION|PREUP|UPDATE|POSTUP)
'''
global updater_status
updater_status.update(kwargs)
# write temporary file
fn = '%s.new' % FN_STATUS
try:
with open(fn, 'w+') as fd:
for key, val in updater_status.items():
fd.write('%s=%s\n' % (key, val))
except EnvironmentError:
dprint(silent, 'Warning: cannot update %s' % fn)
try:
os.rename(fn, FN_STATUS)
except EnvironmentError:
dprint(silent, 'Warning: cannot update %s' % FN_STATUS)
[docs]def readcontinue(msg: str) -> bool:
"""Print message and read yes/no/abort answer."""
while True:
try:
choice = input(msg).lower().strip()
if choice == '' or choice == 'y' or choice == 'j':
print('')
return True
elif choice == 'n':
print('')
return False
else:
continue
except KeyboardInterrupt:
print('\n')
return False
def _package_list(new_packages: Iterable[Tuple[str, ...]]) -> str:
"""Return comma separated list of packages."""
return ",".join(p[0] for p in new_packages)
[docs]def do_release_update(options: Namespace, checkForUpdates: bool, silent: bool) -> bool:
updater = UniventionUpdater()
# get next release update version
dprint(silent, 'Checking for release updates: ', newline=False)
version_next = updater.release_update_available()
if not version_next:
dprint(silent, 'none')
return False
if options.updateto and UCS_Version(options.updateto) < UCS_Version(version_next):
dprint(silent, '%s is available but updater has been instructed to stop at version %s.' % (version_next, options.updateto))
return False
dprint(silent, 'found: UCS %s' % version_next)
if checkForUpdates:
return True
interactive = not (options.noninteractive or checkForUpdates)
if interactive and not readcontinue('Do you want to update to %s [Y|n]?' % version_next):
return False
update_status(
current_version=updater.current_version,
next_version=version_next,
status='RUNNING')
dprint(silent, 'Starting update to UCS version %s at %s...' % (version_next, time.ctime()), debug=True)
dprint(silent, 'Starting update to UCS version %s' % (version_next))
time.sleep(1)
params = ['--silent']
if options.ignoressh:
params.append('--ignoressh')
if options.ignoreterm:
params.append('--ignoreterm')
retcode = subprocess.call(['/usr/share/univention-updater/univention-updater', 'net', '--updateto', '%s' % (version_next)] + params, env=os.environ)
if retcode:
dprint(silent, 'exitcode of univention-updater: %s' % retcode, debug=True)
dprint(silent, 'ERROR: update failed. Please check /var/log/univention/updater.log\n')
update_status(status='FAILED', errorsource='UPDATE')
sys.exit(1)
dprint(silent, 'Update to UCS version %s finished at %s...' % (version_next, time.ctime()), debug=True)
return True
[docs]def do_package_updates(options: Namespace, checkForUpdates: bool, silent: bool) -> bool:
interactive = not (options.noninteractive or checkForUpdates)
updater = UniventionUpdater()
# check if component updates are available
dprint(silent, 'Checking for package updates: ', newline=False)
new_packages, upgraded_packages, removed_packages = updater.component_update_get_packages()
update_available = bool(new_packages + upgraded_packages + removed_packages)
if not update_available:
dprint(silent, 'none')
return False
# updates available ==> stop here in "check-mode"
if checkForUpdates:
dprint(silent, 'found')
return True
dprint(silent, 'found\n')
if len(removed_packages) > 0:
dprint(silent, 'The following packages will be REMOVED:\n %s' % _package_list(removed_packages))
if len(new_packages) > 0:
dprint(silent, 'The following packages will be installed:\n %s' % _package_list(new_packages))
if len(upgraded_packages) > 0:
dprint(silent, 'The following packages will be upgraded:\n %s' % _package_list(upgraded_packages))
if interactive and not readcontinue('\nDo you want to continue [Y|n]?'):
return False
time.sleep(1)
dprint(silent, 'Starting dist-update at %s...' % (time.ctime()), debug=True)
dprint(silent, 'Starting package upgrade', newline=False)
hostname = socket.gethostname()
context_id = write_event(UPDATE_STARTED, {'hostname': hostname})
if context_id:
os.environ['ADMINDIARY_CONTEXT'] = context_id
returncode = updater.run_dist_upgrade()
if returncode:
dprint(silent, 'exitcode of apt-get dist-upgrade: %s' % returncode, debug=True)
dprint(silent, 'ERROR: update failed. Please check /var/log/univention/updater.log\n')
update_status(status='FAILED', errorsource='UPDATE')
write_event(UPDATE_FINISHED_FAILURE, {'hostname': hostname})
sys.exit(1)
dprint(silent, 'dist-update finished at %s...' % (time.ctime()), debug=True)
dprint(silent, 'done')
write_event(UPDATE_FINISHED_SUCCESS, {'hostname': hostname, 'version': 'UCS %(version/version)s-%(version/patchlevel)s errata%(version/erratalevel)s' % configRegistry})
time.sleep(1)
return True
[docs]def do_app_updates(options: Namespace, checkForUpdates: bool, silent: bool) -> Optional[bool]:
dprint(silent, 'Checking for app updates: ', newline=False)
if not options.app_updates:
dprint(silent, 'skipped')
return None
interactive = not (options.noninteractive or checkForUpdates)
try:
from univention.appcenter.actions import get_action, Abort
from univention.appcenter.app_cache import Apps
import univention.appcenter.log as appcenter_log
app_upgrade_search = get_action('upgrade-search')
app_upgrade = get_action('upgrade')
if app_upgrade is None:
raise ImportError()
except ImportError:
# the new univention.appcenter package is not installed. Never mind
# cannot be a dependency as the app center depends on updater...
dprint(silent, 'unavailable')
return False
# check if component updates are available
appcenter_log.log_to_logfile()
# own logging
logger = logging.getLogger('univention.appcenter.actions.upgrade.readme')
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
logger.addHandler(handler)
try:
app_upgrade_search.call()
except Abort:
pass
new_apps = list(app_upgrade.iter_upgradable_apps())
if not new_apps:
dprint(silent, 'none')
return False
elif checkForUpdates:
dprint(silent, 'found')
return True
dprint(silent, 'found\n')
dprint(silent, 'The following apps can be upgraded:\n')
for app in new_apps:
newer_app = Apps().find_candidate(app)
if newer_app:
dprint(silent, '%(name)s: Version %(old)s can be upgraded to %(new)s' % {
'name': app.name,
'old': app.version,
'new': newer_app.version,
})
elif app.docker:
dprint(silent, '%(name)s: The underlying container can be upgraded' % {
'name': app.name,
})
else:
# don't know how this is possible... but anyways
dprint(silent, '%(name)s' % {
'name': app.name,
})
dprint(silent, 'Starting univention-app upgrade at %s...' % time.ctime(), debug=True)
dprint(silent, 'Most of the output for App upgrades goes to %s' % appcenter_log.LOG_FILE, debug=True)
dprint(silent, '\nStarting app upgrade', newline=False)
success = True
for app in new_apps:
if interactive and not readcontinue('\nDo you want to upgrade %s [Y|n]?' % app.name):
continue
success &= bool(app_upgrade.call_safe(
app=[app],
noninteractive=not interactive,
username=options.username,
pwdfile=options.pwdfile.name if options.pwdfile else None,
))
if not success:
dprint(silent, 'ERROR: app upgrade failed. Please check %s\n' % appcenter_log.LOG_FILE)
return False # pretend no updates available; otherwise do_exec may result in infinite loop
dprint(silent, 'univention-app upgrade finished at %s...' % time.ctime(), debug=True)
dprint(silent, 'done')
return not success # pending updates
[docs]def parse_args(argv: Optional[List[str]] = None) -> Namespace:
parser = ArgumentParser(description=__doc__)
parser.add_argument(
"--updateto",
help="update up to specified version. Expected format is X.Y-Z")
parser.add_argument(
"--check",
action="store_true",
help="check if updates are available")
parser.add_argument(
"--setucr",
action="store_true",
help="if set, variable update/available will be updated")
parser.add_argument(
"--ignoressh",
action="store_true",
help="pass --ignoressh to univention-updater")
parser.add_argument(
"--ignoreterm",
action="store_true",
help="pass --ignoreterm to univention-updater")
parser.add_argument(
"--noninteractive",
action="store_true",
help="Perform a non-interactive update")
group = parser.add_argument_group("App updates")
group.add_argument(
"--enable-app-updates",
dest="app_updates",
action="store_true",
help="Update installed Apps")
group.add_argument(
"--disable-app-updates",
dest="app_updates",
action="store_false",
help="Skip updating installed Apps")
group.add_argument(
"--username",
help="Name of the user used for registering the app")
group.add_argument(
"--pwdfile",
type=FileType("r"),
help="Name of the file containing the user password")
parser.set_defaults(app_updates=True)
options = parser.parse_args()
if options.updateto:
try:
UCS_Version(options.updateto)
except ValueError:
dprint(silent, "Unexpected format of updateto: %s .Expected format: X.Y-Z" % options.updateto)
sys.exit(1)
if options.app_updates:
if options.pwdfile and not options.username:
parser.error("--pwdfile without --username")
elif options.username and not options.pwdfile:
parser.error("--username without --pwdfile")
return options
[docs]def main() -> None:
global logfd
options = parse_args()
try:
logfd = open(LOGFN, 'a+')
FORMAT = "%(asctime)-15s " + logging.BASIC_FORMAT
logger = logging.getLogger('updater')
handler = logging.StreamHandler(logfd)
formatter = logging.Formatter(FORMAT)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.ERROR)
except EnvironmentError:
print('Cannot open %s for writing' % LOGFN)
sys.exit(1)
configRegistry.load()
if options.noninteractive:
os.environ['UCS_FRONTEND'] = 'noninteractive'
dprint(silent, '\nStarting univention-upgrade. Current UCS version is %(version/version)s-%(version/patchlevel)s errata%(version/erratalevel)s\n' % configRegistry)
with UpdaterLock():
if options.check:
return do_update(options)
update_local_repository(options)
do_update(options)
[docs]def update_local_repository(options: Namespace) -> None:
dprint(silent, 'Checking for local repository: ', newline=False)
if configRegistry.is_true('local/repository', False) and configRegistry.is_true('repository/mirror', False):
dprint(silent, 'found\n')
if options.noninteractive or readcontinue('Update the local repository via network [Y|n]?'):
if options.updateto:
subprocess.call(('/usr/sbin/univention-repository-update', 'net', '--updateto', options.updateto))
else:
subprocess.call(('/usr/sbin/univention-repository-update', 'net'))
else:
dprint(silent, 'none')
[docs]def do_update(options: Namespace) -> None:
try:
update_available = performUpdate(options, checkForUpdates=options.check, silent=False)
except ConfigurationError as e:
update_status(status='FAILED', errorsource='SETTINGS')
print('The connection to the repository server failed: %s. Please check the repository configuration and the network connection.' % e, file=sys.stderr)
sys.exit(3)
except Exception:
update_status(status='FAILED')
print('An error occurred - see "%s" for details' % (LOGFN,))
print('Traceback in univention-upgrade:', file=logfd)
print(traceback.format_exc(), file=logfd)
sys.exit(2)
update_status(status='DONE')
if options.setucr and configRegistry.is_true(UCR_UPDATE_AVAILABLE) != update_available:
handler_set([
'%s=%s' % (UCR_UPDATE_AVAILABLE, 'yes' if update_available else 'no'),
])
if options.check:
if update_available:
dprint(silent, 'Please rerun command without --check argument to install.')
sys.exit(0)
else:
print('No update available.')
sys.exit(1)
if update_available:
do_exec()
[docs]def do_exec() -> NoReturn:
# The updater/UCR/libs might have been replaced - re-execute!
cmd = sys.argv + ['--setucr']
print("execv(%r)" % (cmd,), file=logfd)
os.execv(sys.argv[0], cmd)
if __name__ == '__main__':
main()