#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
Tool for updating local system
"""
from __future__ import print_function
import os
import re
import sys
from argparse import SUPPRESS, ArgumentParser, Namespace # noqa: F401
from datetime import datetime
from errno import ENOENT
from subprocess import DEVNULL, call
from textwrap import dedent, wrap
try:
import univention.debug as ud
except ImportError:
import univention.debug2 as ud # type: ignore
from univention.config_registry import ConfigRegistry
from univention.lib.ucs import UCS_Version
from univention.admindiary.client import write_event
from univention.admindiary.events import UPDATE_STARTED, UPDATE_FINISHED_SUCCESS, UPDATE_FINISHED_FAILURE
from univention.updater.errors import PreconditionError, ConfigurationError, RequiredComponentError, VerificationError, DownloadError
from univention.updater.tools import Component, UniventionUpdater, LocalUpdater # noqa: F401
from univention.updater.locking import UpdaterLock, apt_lock
from univention.updater.commands import cmd_update, cmd_dist_upgrade
try:
from typing import Container, Dict, IO, Iterable, Iterator, List, NoReturn, Optional, Set, Sequence, Tuple # noqa: F401
from typing_extensions import Literal # noqa: F401
_ESRC = Literal["SETTINGS", "PREPARATION", "PREUP", "UPDATE", "POSTUP"]
_CMDS = Literal["local", "net"]
except ImportError:
pass
# TODO:
# * check the local repository path /var/lib/univention-reposiotry
# * changed variable update/server to repository/online/server
FN_STATUS = '/var/lib/univention-updater/univention-updater.status'
failure = '/var/lib/univention-updater/update-failed'
reboot_required = '/run/univention-updater-reboot'
TMPSOURCE = '/etc/apt/sources.list.d/00_ucs_temporary_installation.list'
TMPSOURCE2 = '/etc/apt/sources.list.d/00_ucs_update_in_progress.list'
LOGNAME = '/var/log/univention/updater.log'
fd_log = sys.stderr # type: IO[str]
stdout_orig = sys.stdout
nostdout = False
updater_status = {} # type: Dict[str, str]
RE_APT = re.compile(
r"""
^deb(?:-src)?
\s+
(?:\[ [^]]* \]\s*)?
([a-z][a-z0-9+.-]*)
:
""", re.VERBOSE)
[docs]class UpdateError(Exception):
""" Exception to signal errors on update.
:param msg: Human readable message.
:param errorsource: One of 'SETTINGS', 'PREPARATION', 'PREUP', 'UPDATE', 'POSTUP'
"""
def __init__(self, msg, errorsource):
# type: (str, _ESRC) -> None
Exception.__init__(self, msg)
self.errorsource = errorsource
[docs]def log(str):
# type: (str) -> None
""" Log message to LOGNAME. """
print(str, file=fd_log)
fd_log.flush()
[docs]def dprint(str): # type: (object) -> None
""" Print message to stdout and LOGNAME. """
for fd in (stdout_orig, fd_log)[nostdout:]:
print(str, file=fd)
fd.flush()
[docs]def update_status(**kwargs): # type: (**str) -> None
'''
update updater_status and write status to disk
Keys:
- current_version ==> UCS_Version ==> 2.3-1
- next_version ==> UCS_Version ==> 2.3-2
- target_version ==> UCS_Version ==> 2.4-0
- type ==> (LOCAL|NET)
- status ==> (RUNNING|FAILED|DONE)
- phase ==> (PREPARATION|PREUP|UPDATE|POSTUP) ==> only valid if status=RUNNING
- errorsource ==> (SETTINGS|PREPARATION|PREUP|UPDATE|POSTUP)
'''
updater_status.update(kwargs)
if updater_status.get('status') != 'RUNNING':
updater_status.pop('phase', None)
# write temporary file
fn = '%s.new' % FN_STATUS
try:
with open(fn, 'w+') as fd:
for key, val in updater_status.items():
fd.write('%s=%s\n' % (key, val))
os.rename(fn, FN_STATUS)
except EnvironmentError as ex:
dprint('Warning: cannot update status: %s' % (ex,))
[docs]def get_status():
# type: () -> Dict[str, str]
"""
Read Updater status from file.
:returns: Dictionary with status
.. seealso::
:py:func:`update_status`
"""
status = {} # type: Dict[str, str]
try:
with open(FN_STATUS, 'r') as fd:
for line in fd:
try:
key, value = line.rstrip().split('=', 1)
except ValueError:
continue
status[key] = value
except EnvironmentError:
pass
return status
[docs]def remove_temporary_sources_list():
# type: () -> None
""" Add the temporary sources.list. """
for fn in (TMPSOURCE, TMPSOURCE2):
try:
os.remove(fn)
except EnvironmentError as ex:
if ex.errno != ENOENT:
raise
[docs]def add_temporary_sources_list(temporary_sources_list):
# type: (Iterable[str]) -> None
""" Add line to a temporary sources.list. """
remove_temporary_sources_list()
with open(TMPSOURCE, 'w') as fp:
for entry in temporary_sources_list:
print(entry, file=fp)
[docs]def update_available(opt, ucr):
# type: (Namespace, ConfigRegistry) -> Tuple[UniventionUpdater, Optional[UCS_Version]]
""" Checks if there is an update available.
Returns the next version, or None if up-to-date, or throws an UpdateError if the next version can not be identified."""
log('--->DBG:update_available(mode={0.mode})'.format(opt))
if opt.mode == 'local':
return update_local(opt, ucr)
elif opt.mode == 'net':
return update_net(opt, ucr)
else:
raise ValueError(opt.mode)
[docs]def update_local(opt, ucr):
# type: (Namespace, ConfigRegistry) -> Tuple[UniventionUpdater, Optional[UCS_Version]]
dprint('Checking local repository')
updater = LocalUpdater()
try:
assert updater.server.access(None, '')
nextversion = updater.release_update_available(errorsto='exception')
except DownloadError:
raise UpdateError(
'A local repository was not found.\n'
' Please check the UCR variable repository/mirror/basepath\n'
' or try to install via "univention-updater net"', errorsource='SETTINGS')
return updater, nextversion
[docs]def update_net(opt, ucr):
# type: (Namespace, ConfigRegistry) -> Tuple[UniventionUpdater, Optional[UCS_Version]]
dprint('Checking network repository')
try:
updater = UniventionUpdater()
nextversion = updater.release_update_available(errorsto='exception')
except RequiredComponentError:
raise
except ConfigurationError as ex:
raise UpdateError('The configured repository is unavailable: %s' % (ex,), errorsource='SETTINGS')
return (updater, nextversion)
[docs]def update_ucr_updatestatus():
# type: () -> None
try:
call(('/usr/share/univention-updater/univention-updater-check',), stdout=DEVNULL, stderr=DEVNULL)
except EnvironmentError:
dprint('Warning: calling univention-updater-check failed.')
[docs]def call_local(opt):
# type: (Namespace) -> NoReturn
"""Call updater in "local" mode."""
cmd = [
arg for args in (
[sys.argv[0], 'local'],
['--updateto', str(opt.updateto)] if opt.updateto else [],
["--no-clean"][:opt.no_clean],
["--ignoressh"][:opt.ignoressh],
["--ignoreterm"][:opt.ignoreterm],
["--ignore-releasenotes"][:opt.ignore_releasenotes],
) for arg in args
]
os.execv(sys.argv[0], cmd)
dprint('Fatal: failed to exec: %r' % cmd)
sys.exit(1)
[docs]def parse_args(args=None): # type: (Optional[Sequence[str]]) -> Namespace
"""
Parse command line arguments.
"""
parser = ArgumentParser(description=__doc__)
parser.add_argument("--reboot", action="store_true", help=SUPPRESS) # Deprecated
parser.add_argument("--updateto", metavar="RELEAASE", type=UCS_Version, help="Upper limit for version")
parser.add_argument("--no-clean", action="store_true", help="Skip cleaning downloaded package file")
group = parser.add_argument_group("Non-interactive usage")
group.add_argument("--ignoressh", action="store_true", help="Skip check for SSH terminal")
group.add_argument("--ignoreterm", action="store_true", help="Skip check for X11 Terminal")
group.add_argument("--ignore-releasenotes", action="store_true", help="Skip showing release notes")
group.add_argument("--noninteractive", action="store_true", help="Do not ask interactive questions")
group = parser.add_argument_group("Verbosity options")
group.add_argument("--silent", action="store_true", help="No output to STDOUT")
group.add_argument("--verbose", "-v", action="count", default=2, help="Increase verbosity")
parser.add_argument("--check", action="store_true", help="Check if system is up-to-date")
parser.add_argument("mode", choices=("local", "net"), help="Update source")
return parser.parse_args(args)
[docs]def setup_logging(opt, ucr):
# type: (Namespace, ConfigRegistry) -> IO[str]
ud.init(LOGNAME, 0, 0)
try:
loglevel = int(ucr.get('update/debug/level', opt.verbose))
except ValueError:
loglevel = opt.verbose
ud.set_level(ud.NETWORK, loglevel)
if opt.silent:
global nostdout
nostdout = True
return open(LOGNAME, 'a+')
[docs]def check(opt, ucr):
# type: (Namespace, ConfigRegistry) -> bool
"""
Return pending update status.
"""
try:
_updater, nextversion = update_available(opt, ucr)
if nextversion:
dprint('Next version is %s' % nextversion)
return True
except UpdateError as msg:
dprint("Error: %s" % msg)
print('Error: Please check "%s" for details.' % LOGNAME, file=sys.stderr)
# Errors are handled as "update currently no available"
except RequiredComponentError as ex:
dprint("%s" % ex)
else:
dprint('System is up to date') # Sync with /etc/cron.d/univention-maintenance
return False
[docs]def find(opt, ucr):
# type: (Namespace, ConfigRegistry) -> Optional[Tuple[UniventionUpdater, UCS_Version]]
lastversion = '%(version/version)s-%(version/patchlevel)s' % ucr
log('**** Starting univention-updater %s with parameter=%s' % (lastversion, sys.argv))
# Bug #51880: if last postup.sh failed
last_status = get_status()
if last_status.get('status') == 'FAILED' and last_status.get('errorsource') == 'POSTUP':
dprint("ERROR: The postup.sh of the last update was not executed successfully.")
dprint(" Please check https://help.univention.com/t/what-to-do-if-postup-failed/15885 for further information.")
dprint(" The update can be started after the postup.sh has been successfully re-executed and ")
dprint(" /var/lib/univention-updater/univention-updater.status has been removed.")
sys.exit(1)
update_status(current_version=lastversion, type=opt.mode.upper(), status='RUNNING', phase='PREPARATION')
updater, nextversion = update_available(opt, ucr)
if not nextversion:
dprint('System is up to date (UCS %s)' % lastversion)
return None
if opt.updateto and nextversion > opt.updateto:
dprint('Update hold at %s, next %s is after %s' % (lastversion, nextversion, opt.updateto))
return None
return (updater, nextversion)
[docs]def run(opt, ucr, updater, nextversion):
# type: (Namespace, ConfigRegistry, UniventionUpdater, UCS_Version) -> None
if opt.noninteractive:
opt.ignore_releasenotes = True
os.environ['UCS_FRONTEND'] = 'noninteractive'
with open(os.path.devnull, 'r') as null:
os.dup2(null.fileno(), sys.stdin.fileno())
dprint('Update to = %s' % nextversion)
update_status(next_version=nextversion)
if opt.updateto:
update_status(target_version=opt.updateto)
if opt.ignore_releasenotes:
os.environ['update_warning_releasenotes_internal'] = 'no'
if opt.ignoressh:
os.environ['update%d%d_ignoressh' % nextversion.mm] = 'yes'
if opt.ignoreterm:
os.environ['update%d%d_ignoreterm' % nextversion.mm] = 'yes'
add_temporary_sources_list(updater.release_update_temporary_sources_list(nextversion))
try:
phase = 'preup'
update_status(phase='PREUP')
scripts = updater.get_sh_files(nextversion, nextversion)
for phase, order in updater.call_sh_files(scripts, LOGNAME, str(nextversion)):
# do not switch back and forth between PRE and UPDATE phase resp. UPDATE and POST phase.
if (phase, order) not in (
('update', 'pre'),
('update', 'post'),
):
update_status(phase=phase.upper())
if (phase, order) == ('update', 'pre'):
log('**** Downloading scripts at %s' % datetime.now().ctime())
elif (phase, order) == ('preup', 'pre'):
log('**** Starting actual update at %s' % datetime.now().ctime())
elif (phase, order) == ('update', 'main'):
with apt_lock():
if call(cmd_update, shell=True, stdout=fd_log, stderr=fd_log):
raise UpdateError('Failed to execute "%s"' % cmd_update, errorsource='UPDATE')
context_id = write_event(UPDATE_STARTED, {'hostname': ucr.get('hostname')})
if context_id:
os.environ['ADMINDIARY_CONTEXT'] = context_id
# Used by ../univention-maintenance-mode/univention-maintenance-mode-update-progress
detailed_status = FN_STATUS + '.details'
with apt_lock(), open(detailed_status, 'w+b') as detailed_status_fd:
fno = detailed_status_fd.fileno()
env = dict(os.environ, DEBIAN_FRONTEND="noninteractive")
cmd2 = "%s -o APT::Status-FD=%s" % (cmd_dist_upgrade, fno)
resultCode = call(cmd2, shell=True, stdout=fd_log, stderr=fd_log, env=env, pass_fds=(fno,))
if os.path.exists(detailed_status):
os.unlink(detailed_status)
if resultCode != 0:
raise UpdateError('Failed to execute "%s"' % cmd_dist_upgrade, errorsource='UPDATE')
elif (phase, order) == ('postup', 'main'):
# Bug #23202: After an update of Python ucr.handler_set() may not work any more
cmd = [
'univention-config-registry', 'set',
'version/version={}'.format(nextversion.FORMAT % nextversion),
'version/patchlevel={0.patchlevel}'.format(nextversion),
]
call(cmd, stdout=fd_log, stderr=fd_log)
except BaseException:
if phase == 'preup' or (phase == 'update' and order == 'pre'):
remove_temporary_sources_list()
raise
remove_temporary_sources_list()
if os.path.exists('/usr/sbin/univention-pkgdb-scan'):
call(['/usr/sbin/univention-pkgdb-scan'], stdout=fd_log, stderr=fd_log)
if not opt.no_clean:
call(['apt-get', 'clean'])
call(['touch', reboot_required])
write_event(UPDATE_FINISHED_SUCCESS, {'hostname': ucr.get('hostname'), 'version': 'UCS %(version/version)s-%(version/patchlevel)s errata%(version/erratalevel)s' % ucr})
[docs]def main():
# type: () -> None
# PATH does not contain */sbin when called from cron
os.environ['PATH'] = '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin'
opt = parse_args()
ucr = ConfigRegistry()
ucr.load()
global fd_log
with setup_logging(opt, ucr) as fd_log:
try:
try:
with UpdaterLock():
if opt.check:
sys.exit(check(opt, ucr))
ret = find(opt, ucr)
if ret:
run(opt, ucr, ret[0], ret[1])
update_status(status='DONE')
if os.path.exists(failure):
os.unlink(failure)
if ret:
os.execv(sys.argv[0], sys.argv)
except VerificationError as ex:
msg = '\n'.join([
"Update aborted due to verification error:",
"%s" % (ex,),
] + wrap(dedent(
"""\
This can and should only be disabled temporarily
using the UCR variable 'repository/online/verify'.
"""
)))
raise UpdateError(msg, errorsource='SETTINGS')
except ConfigurationError as e:
msg = 'Update aborted due to configuration error: %s' % e
raise UpdateError(msg, errorsource='SETTINGS')
except RequiredComponentError as ex:
update_status(status='DONE', errorsource='PREPARATION')
dprint(ex)
except PreconditionError as ex:
(phase, order, component, script) = ex.args
if phase == 'preup':
phase = 'pre-update'
errorsource = 'PREUP' # type: _ESRC
elif phase == 'postup':
phase = 'post-update'
errorsource = 'POSTUP'
else:
errorsource = 'UPDATE'
if order == 'main':
order = 'release %s' % component
elif order == 'pre':
order = 'component %s before calling release script' % component
elif order == 'post':
order = 'component %s after calling release script' % component
msg = 'Update aborted by %s script of %s' % (phase, order)
raise UpdateError(msg, errorsource=errorsource)
update_ucr_updatestatus()
except UpdateError as msg:
write_event(UPDATE_FINISHED_FAILURE, {'hostname': ucr.get('hostname')})
update_status(status='FAILED', errorsource=msg.errorsource)
dprint("Error: %s" % msg)
call(['touch', failure])
sys.exit('Error: Please check "%s" for details.' % LOGNAME)
except KeyboardInterrupt:
update_status(status='FAILED')
dprint("\nUpdate aborted by user (ctrl-c)\n")
sys.exit(1)
if __name__ == '__main__':
main()