# -*- coding: utf-8 -*-
# Copyright 2008-2022 Univention GmbH
# https://www.univention.de/
# All rights reserved.
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
Univention Update tools.
from __future__ import absolute_import
from __future__ import print_function
import univention.debug as ud
except ImportError:
import univention.debug2 as ud # type: ignore
# TODO: Convert to absolute imports only AFTER the unit test has been adopted
from .commands import (
from .errors import (
from .repo_url import UcsRepoUrl
from univention.lib.ucs import UCS_Version
import errno
import sys
import re
import os
import copy
from six.moves import http_client as httplib
import socket
from univention.config_registry import ConfigRegistry
from six.moves import urllib_request as urllib2, urllib_error
import json
import subprocess
import tempfile
import logging
import functools
import six
import base64
from typing import Any, AnyStr, Dict, Generator, Iterable, Iterator, List, Optional, Sequence, Set, Text, Tuple, Type, TypeVar, Union # noqa: F401
from typing_extensions import Literal # noqa: F401
_TS = TypeVar("_TS", bound="_UCSServer")
except ImportError:
if six.PY2:
from new import instancemethod
from backports.tempfile import TemporaryDirectory
from tempfile import TemporaryDirectory
RE_ALLOWED_DEBIAN_PKGNAMES = re.compile('^[a-z0-9][a-z0-9.+-]+$')
RE_SPLIT_MULTI = re.compile('[ ,]+')
RE_COMPONENT = re.compile(r'^repository/online/component/([^/]+)$')
RE_CREDENTIALS = re.compile(r'^repository/credentials/(?:(?P<realm>[^/]+)/)?(?P<key>[^/]+)$')
MIN_GZIP = 100 # size of non-empty gzip file
UUID_NULL = '00000000-0000-0000-0000-000000000000'
[docs]def verify_script(script, signature):
# type: (bytes, bytes) -> Optional[bytes]
Verify detached signature of script:
.. code-block: sh
gpg -a -u 6B6E7E3259A9F44F1452D1BE36602BA86B8BFD3C --passphrase-file /etc/archive-keys/ucs4.0.txt -o script.sh.gpg -b script.sh
repo-ng-sign-release-file --debug -k 6B6E7E3259A9F44F1452D1BE36602BA86B8BFD3C -p /etc/archive-keys/ucs4.0.txt -i script.sh -o script.sh.gpg
.. code-block: python
verify_script(open("script.sh", "r").read(), open("script.sh.gpg", "r").read())
:param str script: The script text to verify.
:param str signature: The detached signature.
:return: None or the error output.
:rtype: None or str
# write signature to temporary file
sig_fd, sig_name = tempfile.mkstemp()
os.write(sig_fd, signature)
# verify script
cmd = ["apt-key", "verify", sig_name, "-"]
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, close_fds=True)
stdout, _stderr = proc.communicate(script)
ret = proc.wait()
return stdout if ret != 0 else None
class _UCSRepo(UCS_Version):
Super class to build URLs for APT repositories.
ARCHS = {'all', 'amd64'}
def __init__(self, release=None, **kwargs):
# type: (Optional[UCS_Version], **Any) -> None
if release:
super(_UCSRepo, self).__init__(release)
for (k, v) in kwargs.items():
if isinstance(v, str) and '%(' in v:
self.__dict__[k] = _UCSRepo._substitution(v, self.__dict__)
self.__dict__[k] = v
def __repr__(self):
# type: () -> str
return '%s(**%r)' % (self.__class__.__name__, self.__dict__)
def __eq__(self, other):
# type: (object) -> bool
return isinstance(other, _UCSRepo) and self.path() == other.path()
def __ne__(self, other):
# type: (object) -> bool
return not isinstance(other, _UCSRepo) or self.path() != other.path()
def _format(self, format):
# type: (str) -> str
Format longest path for directory/file access.
while True:
return format % self
except KeyError as ex:
(k,) = ex.args
# strip missing part
i = format.index('%%(%s)' % k)
format = format[:i]
# strip partial part
i = format.rindex('/') + 1
except ValueError:
i = 0
format = format[:i]
class _substitution(object):
Helper to print dynamically substituted variable.
>>> h={'major':2}
>>> h['version'] = _UCSRepo._substitution('%(major)d.%(minor)d', h)
>>> h['minor'] = 3
>>> '%(version)s' % h
def __init__(self, format, values):
# type: (str, Any) -> None
self.format = format
self.values = values
def __str__(self):
# type: () -> str
return self.format % self.values
except KeyError as e:
for (k, v) in self.values.items():
if self == v:
raise KeyError(k)
raise e
def __repr__(self):
# type: () -> str
return repr(self.format)
def deb(self, server, type="deb"):
# type: (_UCSServer, str) -> str
Format for :file:`/etc/apt/sources.list`.
:param str server: The URL of the repository server.
:param str type: The repository type, e.g. `deb` for a binary and `deb-src` for source package repository.
:returns: The APT repository stanza.
:rtype: str
raise NotImplementedError()
def path(self, filename=None):
# type: (str) -> str
Format pool for directory/file access.
:param filename: The name of a file in the repository.
:returns: relative path.
:rtype: str
raise NotImplementedError()
def clean(self, server):
# type: (_UCSServer) -> str
Format for :file:`/etc/apt/mirror.list`
:param str server: The URL of the repository server.
:returns: The APT repository stanza.
:rtype: str
raise NotImplementedError()
[docs]class UCSRepoPool5(_UCSRepo):
APT repository using the debian pool structure (ucs5 and above).
def __init__(self, release=None, **kwargs):
# type: (UCS_Version, **Any) -> None
kwargs.setdefault('version', UCS_Version.FORMAT)
kwargs.setdefault('patch', UCS_Version.FULLFORMAT)
kwargs.setdefault('errata', False)
super(UCSRepoPool5, self).__init__(release, **kwargs)
def _suite(self): # type: () -> str
Format suite.
:returns: UCS suite name.
:rtype: str
>>> UCSRepoPool5(major=5, minor=1, patchlevel=0)._suite
>>> UCSRepoPool5(major=5, minor=1, patchlevel=0, errata=True)._suite
return "{1}{0.major}{0.minor}{0.patchlevel}".format(self, "errata" if self.errata else "ucs")
[docs] def deb(self, server, type="deb", mirror=False):
# type: (_UCSServer, str, bool) -> str
Format for :file:`/etc/apt/sources.list`.
:param str server: The URL of the repository server.
:param str type: The repository type, e.g. `deb` for a binary and `deb-src` for source package repository.
:param bool mirror: Also mirror files for Debian installer.
:returns: The APT repository stanza.
:rtype: str
>>> r=UCSRepoPool5(major=5, minor=1, patchlevel=0)
>>> r.deb('https://updates.software-univention.de/')
'deb https://updates.software-univention.de/ ucs510 main'
>>> r.deb('https://updates.software-univention.de/', mirror=True)
'deb https://updates.software-univention.de/ ucs510 main main/debian-installer'
>>> r=UCSRepoPool5(major=5, minor=1, patchlevel=0, errata=True)
>>> r.deb('https://updates.software-univention.de/')
'deb https://updates.software-univention.de/ errata510 main'
components = "main main/debian-installer" if mirror and not self.errata and type == "deb" else "main"
return "%s %s %s %s" % (type, server, self._suite, components)
[docs] def path(self, filename=None):
# type: (str) -> str
Format pool for directory/file access.
:param filename: The name of a file in the repository.
:returns: relative path.
:rtype: str
>>> UCSRepoPool5(major=5, minor=1, patchlevel=0).path()
>>> UCSRepoPool5(major=5, minor=1, patchlevel=0, errata=True).path()
return "dists/{}/{}".format(self._suite, filename or 'InRelease')
[docs]class UCSRepoPool(_UCSRepo):
Flat Debian APT repository.
def __init__(self, **kw):
# type: (**Any) -> None
kw.setdefault('version', UCS_Version.FORMAT)
kw.setdefault('patch', UCS_Version.FULLFORMAT)
super(UCSRepoPool, self).__init__(**kw)
[docs] def deb(self, server, type="deb"):
# type: (_UCSServer, str) -> str
Format for :file:`/etc/apt/sources.list`.
:param str server: The URL of the repository server.
:param str type: The repository type, e.g. `deb` for a binary and `deb-src` for source package repository.
:returns: The APT repository stanza.
:rtype: str
>>> r=UCSRepoPool(major=2,minor=3,patchlevel=1,part='maintained',arch='amd64')
>>> r.deb('https://updates.software-univention.de/')
'deb https://updates.software-univention.de/2.3/maintained/ 2.3-1/amd64/'
fmt = "%(version)s/%(part)s/ %(patch)s/%(arch)s/"
return "%s %s%s" % (type, server, super(UCSRepoPool, self)._format(fmt))
[docs] def path(self, filename=None):
# type: (str) -> str
Format pool for directory/file access.
:param filename: The name of a file in the repository.
:returns: relative path.
:rtype: str
>>> UCSRepoPool(major=2,minor=3).path()
>>> UCSRepoPool(major=2,minor=3,part='maintained').path()
>>> UCSRepoPool(major=2,minor=3,patchlevel=1,part='maintained').path()
>>> UCSRepoPool(major=2,minor=3,patchlevel=1,part='maintained',arch='amd64').path()
fmt = "%(version)s/%(part)s/%(patch)s/%(arch)s/" + (filename or 'Packages.gz')
return super(UCSRepoPool, self)._format(fmt)
[docs] def clean(self, server):
# type: (_UCSServer) -> str
Format for :file:`/etc/apt/mirror.list`
:param str server: The URL of the repository server.
:returns: The APT repository stanza.
:rtype: str
fmt = "%(version)s/%(part)s/%(patch)s/" # %(arch)s/
return "clean %s%s" % (server, super(UCSRepoPool, self)._format(fmt))
[docs]class UCSRepoPoolNoArch(_UCSRepo):
Flat Debian APT repository without explicit architecture subdirectory.
ARCHS = {''}
def __init__(self, **kw):
# type: (**Any) -> None
kw.setdefault('version', UCS_Version.FORMAT)
kw.setdefault('patch', UCS_Version.FULLFORMAT)
super(UCSRepoPoolNoArch, self).__init__(**kw)
[docs] def deb(self, server, type="deb"):
# type: (_UCSServer, str) -> str
Format for :file:`/etc/apt/sources.list`.
:param str server: The URL of the repository server.
:param str type: The repository type, e.g. `deb` for a binary and `deb-src` for source package repository.
:returns: The APT repository stanza.
:rtype: str
>>> r=UCSRepoPoolNoArch(major=2,minor=3,patch='comp',part='maintained/component',arch='all')
>>> r.deb('https://updates.software-univention.de/')
'deb https://updates.software-univention.de/2.3/maintained/component/comp/ ./'
fmt = "%(version)s/%(part)s/%(patch)s/ ./"
return "%s %s%s" % (type, server, super(UCSRepoPoolNoArch, self)._format(fmt))
[docs] def path(self, filename=None):
# type: (str) -> str
Format pool for directory/file access. Returns relative path.
:param filename: The name of a file in the repository.
:returns: relative path.
:rtype: str
>>> UCSRepoPoolNoArch(major=2,minor=3).path()
>>> UCSRepoPoolNoArch(major=2,minor=3,part='maintained/component').path()
>>> UCSRepoPoolNoArch(major=2,minor=3,part='maintained/component',patch='comp').path()
>>> UCSRepoPoolNoArch(major=2,minor=3,part='maintained/component',patch='comp',arch='all').path()
fmt = "%(version)s/%(part)s/%(patch)s/" + (filename or 'Packages.gz')
return super(UCSRepoPoolNoArch, self)._format(fmt)
[docs] def clean(self, server):
# type: (_UCSServer) -> str
Format for :file:`/etc/apt/mirror.list`
:param str server: The URL of the repository server.
:returns: The APT repository stanza.
:rtype: str
fmt = "%(version)s/%(part)s/%(patch)s/"
return "clean %s%s" % (server, super(UCSRepoPoolNoArch, self)._format(fmt))
class _UCSServer(object):
Abstrace base class to access UCS compatible update server.
def load_credentials(self, ucr):
# type: (ConfigRegistry) -> None
Load credentials from UCR.
:param ConfigRegistry ucr: An UCR instance.
def join(self, rel):
# type: (str) -> str
Return joined URI without credential.
:param str rel: relative URI.
:return: The joined URI.
:rtype: str
raise NotImplementedError()
def access(self, repo, filename=None, get=False):
# type: (Optional[_UCSRepo], str, bool) -> Tuple[int, int, bytes]
Access URI and optionally get data.
:param _UCSRepo repo: the URI to access as an instance of :py:class:`_UCSRepo`.
:param str filename: An optional relative path.
:param bool get: Fetch data if True - otherwise check only.
:return: a 3-tuple (code, size, content) or None on errors.
:rtype: tuple(int, int, bytes)
:raises DownloadError: if the server is unreachable.
:raises ValueError: if the credentials use an invalid encoding.
:raises ConfigurationError: if a permanent error in the configuration occurs, e.g. the credentials are invalid or the host is unresolvable.
:raises ProxyError: if the HTTP proxy returned an error.
raise NotImplementedError()
def __add__(self, rel):
# type: (_TS, str) -> _TS
Append relative path component.
:param str rel: Relative path.
:return: A clone of this instance using the new base path.
:rtype: UCSHttpServer
raise NotImplementedError()
def prefix(self):
# type: () -> str
raise NotImplementedError()
def __eq__(self, other):
# type: (object) -> bool
return isinstance(other, _UCSServer) and self.prefix == other.prefix
def __ne__(self, other):
# type: (object) -> bool
return not isinstance(other, _UCSServer) or self.prefix != other.prefix
[docs]class UCSHttpServer(_UCSServer):
Access to UCS compatible remote update server.
[docs] class HTTPHeadHandler(urllib2.BaseHandler):
Handle fallback from HEAD to GET if unimplemented.
[docs] def http_error_501(self, req, fp, code, msg, headers): # httplib.NOT_IMPLEMENTED
# type: (urllib2.Request, Any, int, str, Dict) -> Any
m = req.get_method()
if m == 'HEAD' == UCSHttpServer.http_method:
ud.debug(ud.NETWORK, ud.INFO, "HEAD not implemented at %s, switching to GET." % req)
UCSHttpServer.http_method = 'GET'
return self.parent.open(req, timeout=req.timeout)
return None
def __init__(self, baseurl, user_agent=None, timeout=None):
# type: (UcsRepoUrl, str, float) -> None
Setup URL handler for accessing a UCS repository server.
:param UcsRepoUrl baseurl: the base URL.
:param str user_agent: optional user agent string.
:param int timeout: optional timeout for network access.
self.baseurl = baseurl
self.user_agent = user_agent
self.timeout = timeout
log = logging.getLogger('updater.UCSHttp')
http_method = 'HEAD'
head_handler = HTTPHeadHandler()
password_manager = urllib2.HTTPPasswordMgrWithDefaultRealm()
auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
proxy_handler = urllib2.ProxyHandler()
# No need for ProxyBasicAuthHandler, since ProxyHandler parses netloc for @
opener = urllib2.build_opener(head_handler, auth_handler, proxy_handler)
failed_hosts = set() # type: Set[str]
def prefix(self):
# type: () -> str
return self.baseurl.path.lstrip('/')
[docs] @classmethod
def reinit(self):
# type: () -> None
Reload proxy settings and reset failed hosts.
self.proxy_handler = urllib2.ProxyHandler()
self.opener = urllib2.build_opener(self.head_handler, self.auth_handler, self.proxy_handler)
[docs] @classmethod
def load_credentials(self, ucr):
# type: (ConfigRegistry) -> None
Load credentials from UCR.
:param ConfigRegistry ucr: An UCR instance.
uuid = ucr.get('uuid/license', UUID_NULL)
groups = {} # type: Dict[str, Dict[str, str]]
for key, value in ucr.items():
match = RE_CREDENTIALS.match(key)
if match:
realm, key = match.groups()
cfg = groups.setdefault(realm, {})
cfg[key] = value
for realm, cfg in groups.items():
uris = cfg.pop('uris').split()
except KeyError:
self.log.error('Incomplete credentials for realm "%s": %r', realm, cfg)
username = cfg.pop('username', uuid)
password = cfg.pop('password', uuid)
if cfg:
self.log.warn('Extra credentials for realm "%s": %r', realm, cfg)
self.password_manager.add_password(realm, uris, username, password)
self.log.info('Loaded credentials for realm "%s"', realm)
def __str__(self):
# type: () -> str
URI with credentials.
return self.baseurl.private()
def __repr__(self):
# type: () -> str
Return canonical string representation.
return '%s(%r, timeout=%r)' % (
def __add__(self, rel):
# type: (str) -> UCSHttpServer
Append relative path component.
:param str rel: Relative path.
:return: A clone of this instance using the new base path.
:rtype: UCSHttpServer
uri = copy.copy(self)
uri.baseurl += rel
return uri
[docs] def join(self, rel):
# type: (str) -> str
Return joined URI without credential.
:param str rel: relative URI.
:return: The joined URI.
:rtype: str
return (self.baseurl + rel).public()
[docs] def access(self, repo, filename=None, get=False):
# type: (Optional[_UCSRepo], str, bool) -> Tuple[int, int, bytes]
Access URI and optionally get data.
:param _UCSRepo repo: the URI to access as an instance of :py:class:`_UCSRepo`.
:param str filename: An optional relative path.
:param bool get: Fetch data if True - otherwise check only.
:return: a 3-tuple (code, size, content)
:rtype: tuple(int, int, bytes)
:raises DownloadError: if the server is unreachable.
:raises ValueError: if the credentials use an invalid encoding.
:raises ConfigurationError: if a permanent error in the configuration occurs, e.g. the credentials are invalid or the host is unresolvable.
:raises ProxyError: if the HTTP proxy returned an error.
rel = filename if repo is None else repo.path(filename)
assert rel is not None
if self.user_agent:
UCSHttpServer.opener.addheaders = [('User-agent', self.user_agent)]
uri = self.join(rel)
if self.baseurl.username and self.baseurl.password:
UCSHttpServer.password_manager.add_password(realm=None, uri=uri, user=self.baseurl.username, passwd=self.baseurl.password)
req = urllib2.Request(uri)
def get_host():
# type: () -> str
return req.host if six.PY3 else req.get_host() # type: ignore
if get_host() in self.failed_hosts:
self.log.error('Already failed %s', get_host())
raise DownloadError(uri, -1)
if not get and UCSHttpServer.http_method != 'GET':
# Overwrite get_method() to return "HEAD"
def get_method(self, orig=req.get_method):
method = orig()
if method == 'GET':
return UCSHttpServer.http_method
return method
req.get_method = functools.partial(get_method, req) if six.PY3 else instancemethod(get_method, req, urllib2.Request) # type: ignore
self.log.info('Requesting %s', req.get_full_url())
ud.debug(ud.NETWORK, ud.ALL, "updater: %s %s" % (req.get_method(), req.get_full_url()))
res = UCSHttpServer.opener.open(req, timeout=self.timeout)
assert res
# <http://tools.ietf.org/html/rfc2617#section-2>
auth = req.unredirected_hdrs['Authorization']
scheme, credentials = auth.split(' ', 1)
if scheme.lower() != 'basic':
raise ValueError('Only "Basic" authorization is supported')
basic = base64.b64decode(credentials).decode('ISO8859-1')
except Exception:
raise ValueError('Invalid base64')
self.baseurl.username, self.baseurl.password = basic.split(':', 1)
except KeyError:
except ValueError as ex:
self.log.info("Failed to decode %s: %s", auth, ex)
code = res.getcode()
assert code
size = int(res.info().get('content-length', 0))
content = res.read()
self.log.info("Got %s %s: %d %d", req.get_method(), req.get_full_url(), code, size)
return (code, size, content)
# direct | proxy | Error cause
# | valid closed filter DNS auth |
# HTTP:200 | HTTP:200 URL:111 URL:110 GAI:-2 HTTP:407 | OK
# HTTP:404 | HTTP:404 URL:111 URL:110 GAI:-2 HTTP:407 | Path unknown
# ---------+----------------------------------------------+----------------------
# URL:111 | HTTP:404 URL:111 URL:110 GAI:-2 HTTP:407 | Port closed
# URL:110 | HTTP:404 URL:111 URL:110 GAI:-2 HTTP:407 | Port filtered
# GAI:-2 | HTTP:502/4URL:111 URL:110 GAI:-2 HTTP:407 | Host name unknown
# HTTP:401 | HTTP:401 URL:111 URL:110 GAI:-2 HTTP:407 | Authorization required
except urllib_error.HTTPError as res:
self.log.debug("Failed %s %s: %s", req.get_method(), req.get_full_url(), res, exc_info=True)
if res.code == httplib.UNAUTHORIZED: # 401
raise ConfigurationError(uri, 'credentials not accepted')
if res.code == httplib.PROXY_AUTHENTICATION_REQUIRED: # 407
raise ProxyError(uri, 'credentials not accepted')
if res.code in (httplib.BAD_GATEWAY, httplib.GATEWAY_TIMEOUT): # 502 504
raise ConfigurationError(uri, 'host is unresolvable')
raise DownloadError(uri, res.code)
except urllib_error.URLError as e:
self.log.debug("Failed %s %s: %s", req.get_method(), req.get_full_url(), e, exc_info=True)
if isinstance(e.reason, six.string_types):
reason = e.reason
elif isinstance(e.reason, socket.timeout):
raise ConfigurationError(uri, 'timeout in network connection')
reason = e.reason.args[1] # default value for error message
except IndexError:
reason = str(e) # unknown
if isinstance(e.reason, socket.gaierror):
if e.reason.args[0] == socket.EAI_NONAME: # -2
reason = 'host is unresolvable'
if e.reason.args[0] == errno.ETIMEDOUT: # 110
reason = 'port is blocked'
elif e.reason.args[0] == errno.ECONNREFUSED: # 111
reason = 'port is closed'
selector = req.selector if six.PY3 else req.get_selector() # type: ignore
if selector.startswith('/'): # direct
raise ConfigurationError(uri, reason)
else: # proxy
raise ProxyError(uri, reason)
except socket.timeout as ex:
self.log.debug("Failed %s %s: %s", req.get_method(), req.get_full_url(), ex, exc_info=True)
raise ConfigurationError(uri, 'timeout in network connection')
[docs]class UCSLocalServer(_UCSServer):
Access to UCS compatible local update server.
def __init__(self, prefix):
# type: (str) -> None
Setup URL handler for accessing a UCS repository server.
:param str prefix: The local path of the repository.
self.log = logging.getLogger('updater.UCSFile')
prefix = str(prefix).strip('/')
self._prefix = '%s/' % prefix if prefix else ''
def prefix(self):
# type: () -> str
return self._prefix
def __str__(self):
# type: () -> str
Absolute file-URI.
return 'file:///%s' % self.prefix
def __repr__(self):
# type: () -> str
Return canonical string representation.
return 'UCSLocalServer(prefix=%r)' % (self.prefix,)
def __add__(self, rel):
# type: (str) -> UCSLocalServer
Append relative path component.
:param str rel: Relative path.
:return: A clone of this instance using the new base path.
:rtype: UCSLocalServer
uri = copy.copy(self)
uri._prefix += str(rel).lstrip('/')
return uri
[docs] def join(self, rel):
# type: (str) -> str
Return joined URI without credential.
:param str rel: relative URI.
:return: The joined URI.
:rtype: str
uri = self.__str__()
uri += str(rel).lstrip('/')
return uri
[docs] def access(self, repo, filename=None, get=False):
# type: (Optional[_UCSRepo], str, bool) -> Tuple[int, int, bytes]
Access URI and optionally get data.
:param _UCSRepo repo: the URI to access as an instance of :py:class:`_UCSRepo`.
:param str filename: An optional relative path.
:param bool get: Fetch data if True - otherwise check only.
:return: a 3-tuple (code, size, content)
:rtype: tuple(int, int, bytes)
:raises DownloadError: if the server is unreachable.
:raises ValueError: if the credentials use an invalid encoding.
:raises ConfigurationError: if a permanent error in the configuration occurs, e.g. the credentials are invalid or the host is unresolvable.
:raises ProxyError: if the HTTP proxy returned an error.
rel = filename if repo is None else repo.path(filename)
assert rel is not None
uri = self.join(rel)
ud.debug(ud.NETWORK, ud.ALL, "updater: %s" % (uri,))
# urllib2.urlopen() doesn't work for directories
assert uri.startswith('file://')
path = uri[len('file://'):]
if os.path.exists(path):
if os.path.isdir(path):
self.log.info("Got %s", path)
return (httplib.OK, 0, b'') # 200
elif os.path.isfile(path):
with open(path, 'rb') as f:
data = f.read()
self.log.info("Got %s: %d", path, len(data))
return (httplib.OK, len(data), data) # 200
self.log.error("Failed %s", path)
raise DownloadError(uri, -1)
[docs]class Component(object):
FN_APTSOURCES = '/etc/apt/sources.list.d/20_ucs-online-component.list'
UCRV = "repository/online/component/{}/{}"
AVAILABLE = 'available'
NOT_FOUND = 'not_found'
DISABLED = 'disabled'
UNKNOWN = 'unknown'
PERMISSION_DENIED = 'permission_denied'
def __init__(self, updater, name):
# type: (UniventionUpdater, str) -> None
self.updater = updater
self.name = name
def __lt__(self, other):
return self.name < other.name if isinstance(other, Component) else NotImplemented
def __le__(self, other):
return self.name <= other.name if isinstance(other, Component) else NotImplemented
def __eq__(self, other):
return isinstance(other, Component) and self.name == other.name
def __ne__(self, other):
return not isinstance(other, Component) or self.name != other.name
def __ge__(self, other):
return self.name >= other.name if isinstance(other, Component) else NotImplemented
def __gt__(self, other):
return self.name > other.name if isinstance(other, Component) else NotImplemented
def __hash__(self):
return hash(self.name)
def __str__(self):
# type: () -> str
return "Component({0.name})".format(self)
[docs] def ucrv(self, key=""):
# type: (str) -> str
return "/".join(filter(None, ("repository", "online", "component", self.name, key)))
def __getitem__(self, key):
# type: (str) -> str
return self.updater.configRegistry.get(self.ucrv(key)) or ""
def __bool__(self):
# type: () -> bool
return self.updater.configRegistry.is_true(self.ucrv())
__nonzero__ = __bool__
def _versions(self, start=None, end=None):
# type: (Optional[UCS_Version], Optional[UCS_Version]) -> Set[UCS_Version]
version = self["version"]
versions = set(RE_SPLIT_MULTI.split(version))
return {
UCS_Version(ver.mm + (0,))
for ver, _data in self.updater.get_releases(start, end)
if {ver.FORMAT % ver, "current", ""} & versions
def current(self):
# type: () -> bool
version = self["version"]
versions = set(RE_SPLIT_MULTI.split(version))
return bool(versions & {"current"})
def default_packages(self):
# type: () -> Set[str]
Returns a set of (meta) package names to be installed for this component.
:returns: a set of package names.
return set((
for var in ('defaultpackages', 'defaultpackage')
for pkg in RE_SPLIT_MULTI.split(self[var])
)) - {""}
[docs] def defaultpackage_installed(self, ignore_invalid_package_names=True):
# type: (bool) -> Optional[bool]
Returns installation status of component's default packages
:param bool ignore_invalid_package_names: Ignore invalid package names.
:returns: On of the values:
no default packages are defined
all default packages are installed
at least one package is not installed
:rtype: None or bool
:raises ValueError: if UCR variable contains invalid package names if ignore_invalid_package_names=False
pkglist = self.default_packages
if not pkglist:
return None
# check package names
for pkg in pkglist:
match = RE_ALLOWED_DEBIAN_PKGNAMES.search(pkg)
if not match:
if ignore_invalid_package_names:
raise ValueError('invalid package name (%s)' % pkg)
cmd = ['/usr/bin/dpkg-query', '-W', '-f', '${Status}\\n']
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = (data.decode("UTF-8", errors="replace") for data in p.communicate())
# count number of "Status: install ok installed" lines
installed_correctly = [x for x in stdout.splitlines() if x.endswith(' ok installed')]
# if pkg count and number of counted lines match, all packages are installed
return len(pkglist) == len(installed_correctly)
[docs] def baseurl(self, for_mirror_list=False):
# type: (bool) -> UcsRepoUrl
Calculate the base URL for a component.
:param bool for_mirror_list: Use external or local repository.
CS (component server)
value of `repository/online/component/%s/server`
MS (mirror server)
value of `repository/mirror/server`
RS (repository server)
value of `repository/online/server`
value is unset or no entry
value is irrelevant
| UCR configuration |Result | |
+-------------+----------+------------+---------+------------+------------+ |
| isRepoServer|enabled |localmirror |server |sources.list mirror.list | |
| False |False |False |\- |\- |\- |no |
+ +----------+------------+---------+------------+------------+local |
| |True | |\- |RS |\- |repository |
+ +----------+------------+---------+------------+------------+mirror |
| |True | |CS |CS |\- | |
| True |False |False | |\- |\- |local |
+ +----------+------------+---------+------------+------------+repository |
| |False |True |\- |\- |MS |mirror |
+ +----------+------------+---------+------------+------------+ |
| |False |True |CS |\- |CS | |
+ +----------+------------+---------+------------+------------+ |
| |True |False |\- |MS |\- | |
+ +----------+------------+---------+------------+------------+ |
| |True |False |CS |CS |\- | |
+ +----------+------------+---------+------------+------------+ |
| |True |True |\- |RS |MS | |
+ +----------+------------+---------+------------+------------+ |
| |True |True |CS |RS |CS | |
+ +----------+------------+---------+------------+------------+-------------+
| |False |\- |\- |\- |\- |backward |
+ +----------+ +---------+------------+------------+compabibility|
| |True | |\- |RS |MS |[1]_ |
+ +----------+ +---------+------------+------------+ |
| |True | |CS |RS |CS | |
.. [1] if `repository/online/component/%s/localmirror` is unset, then the value of `repository/online/component/%s` will be used to achieve backward compatibility.
c_prefix = self.ucrv()
if self.updater.is_repository_server:
m_url = UcsRepoUrl(self.updater.configRegistry, 'repository/mirror')
c_enabled = bool(self)
c_localmirror = self.updater.configRegistry.is_true(self.ucrv("localmirror"), c_enabled)
if for_mirror_list: # mirror.list
if c_localmirror:
return UcsRepoUrl(self.updater.configRegistry, c_prefix, m_url)
else: # sources.list
if c_enabled:
if c_localmirror:
return self.updater.repourl
return UcsRepoUrl(self.updater.configRegistry, c_prefix, m_url)
return UcsRepoUrl(self.updater.configRegistry, c_prefix, self.updater.repourl)
raise CannotResolveComponentServerError(self.name, for_mirror_list)
[docs] def server(self, for_mirror_list=False):
# type: (bool) -> UCSHttpServer
Return :py:class:`UCSHttpServer` for component as configures via UCR.
:param bool for_mirror_list: component entries for `mirror.list` will be returned, otherwise component entries for local `sources.list`.
:returns: The repository server for the component.
:rtype: UCSHttpServer
:raises ConfigurationError: if the configured server is not usable.
c_url = copy.copy(self.baseurl(for_mirror_list))
c_url.path = ''
prefix = self["prefix"]
user_agent = self.updater._get_user_agent_string()
server = UCSHttpServer(
# if prefix.lower() == 'none' ==> use no prefix
if prefix and prefix.lower() == 'none':
assert server.access(None, '')
except DownloadError as e:
uri, code = e.args
raise ConfigurationError(uri, 'absent prefix forced - component %s not found: %s' % (self.name, uri))
# FIXME: PMH stop iterating
for testserver in [
server + '/univention-repository/',
server + self.updater.repourl.path if self.updater.repourl.path else None,
if not testserver:
if prefix: # append prefix if defined
testserver = testserver + '%s/' % (prefix.strip('/'),)
assert testserver.access(None, '')
return testserver
except DownloadError as e:
ud.debug(ud.NETWORK, ud.ALL, "%s" % e)
uri, code = e.args
raise ConfigurationError(uri, 'non-existing component prefix: %s' % (uri,))
except ConfigurationError:
if self.updater.check_access:
return server
[docs] def versions(self, start, end, for_mirror_list=False):
# type: (UCS_Version, UCS_Version, bool) -> Iterator[Tuple[UCSHttpServer, _UCSRepo]]
Iterate component versions.
:param start: Minimum requried version.
:param end: Maximum allowed version.
:param bool clean: Add additional `clean` statements for `apt-mirror`.
:param bool for_mirror_list: component entries for `mirror.list` will be returned, otherwise component entries for local `sources.list`.
:returns: A iterator returning 2-tuples (server, ver).
server = self.server(for_mirror_list=for_mirror_list)
struct = self.layout(prefix=server, patch=self.name)
versions = self._versions(start, end)
parts = self._parts
for ver in sorted(versions):
struct.mmp = ver.mmp
for struct.part in parts:
yield server, struct
[docs] def repositories(self, start, end, clean=False, for_mirror_list=False, failed=None):
# type: (UCS_Version, UCS_Version, bool, bool, Optional[Set[Tuple[Component, str]]]) -> Iterator[str]
Return list of Debian repository statements for requested component.
:param start: Minimum requried version.
:param end: Maximum allowed version.
:param bool clean: Add additional `clean` statements for `apt-mirror`.
:param bool for_mirror_list: component entries for `mirror.list` will be returned, otherwise component entries for local `sources.list`.
:param failed: A set to recive the failed component names.
:returns: A list of strings with APT statements.
for server, struct in self.versions(start, end, for_mirror_list):
for struct.arch in sorted(struct.ARCHS):
assert server.access(struct, "Packages.gz")
yield struct.deb(server)
if clean:
yield struct.clean(server)
if self.updater.sources:
struct.arch = "source"
assert server.access(struct, "Sources.gz")
yield struct.deb(server, "deb-src")
except DownloadError as ex:
if failed is not None:
failed.add((self, str(ex)))
[docs] def status(self):
# type: () -> str
Returns the current status of specified component based on :file:`/etc/apt/sources.list.d/20_ucs-online-component.list`
:returns: One of the strings:
component has been disabled via UCR
component is enabled and at least one valid repo string has been found in .list file
component is enabled but no valid repo string has been found in .list file
component is enabled but authentication failed
component's status is unknown
:rtype: str
if not bool(self):
return self.DISABLED
comp_file = open(self.FN_APTSOURCES, 'r')
except IOError:
return self.UNKNOWN
rePath = re.compile('(un)?maintained/component/ ?%s/' % self.name)
reDenied = re.compile('credentials not accepted: %s$' % self.name)
# default: file contains no valid repo entry
result = self.NOT_FOUND
for line in comp_file:
if line.startswith('deb ') and rePath.search(line):
# at least one repo has been found
result = self.AVAILABLE
elif reDenied.search(line):
# stop immediately if at least one repo has authentication problems
# return result
return result
def layout(self):
# type: () -> Type[_UCSRepo]
value = self["layout"]
layouts = {
"": UCSRepoPool,
"arch": UCSRepoPool,
"flat": UCSRepoPoolNoArch,
} # type: Dict[str, Type[_UCSRepo]]
return layouts[value]
except LookupError:
raise ValueError(value)
def _parts(self):
# type: () -> List[str]
parts = ["maintained"] + ["unmaintained"][:self.updater.configRegistry.is_true(self.ucrv('unmaintained'))]
return ['%s/component' % (part,) for part in parts]
[docs]class UniventionUpdater(object):
Handle UCS package repositories.
def __init__(self, check_access=True):
# type: (bool) -> None
Create new updater with settings from UCR.
:param bool check_access: Check if repository server is reachable on init.
:raises ConfigurationError: if configured server is not available immediately.
self.log = logging.getLogger('updater.Updater')
self.check_access = check_access
self.connection = None
self.configRegistry = ConfigRegistry()
[docs] def config_repository(self):
# type: () -> None
Retrieve configuration to access repository. Overridden by :py:class:`univention.updater.UniventionMirror`.
self.online_repository = self.configRegistry.is_true('repository/online', True)
self.repourl = UcsRepoUrl(self.configRegistry, 'repository/online')
self.sources = self.configRegistry.is_true('repository/online/sources', False)
self.timeout = float(self.configRegistry.get('repository/online/timeout', 30))
self.script_verify = self.configRegistry.is_true('repository/online/verify', True)
UCSHttpServer.http_method = self.configRegistry.get('repository/online/httpmethod', 'HEAD').upper()
[docs] def ucr_reinit(self):
# type: () -> None
Re-initialize settings.
self.is_repository_server = self.configRegistry.is_true('local/repository', False)
reinitUCSHttpServer = False
if 'proxy/http' in self.configRegistry and self.configRegistry['proxy/http']:
os.environ['http_proxy'] = self.configRegistry['proxy/http']
os.environ['https_proxy'] = self.configRegistry['proxy/http']
reinitUCSHttpServer = True
if 'proxy/https' in self.configRegistry and self.configRegistry['proxy/https']:
os.environ['https_proxy'] = self.configRegistry['proxy/https']
reinitUCSHttpServer = True
if 'proxy/no_proxy' in self.configRegistry and self.configRegistry['proxy/no_proxy']:
os.environ['no_proxy'] = self.configRegistry['proxy/no_proxy']
reinitUCSHttpServer = True
if reinitUCSHttpServer:
# UCS version
self.current_version = UCS_Version("%(version/version)s-%(version/patchlevel)s" % self.configRegistry)
self.erratalevel = int(self.configRegistry.get('version/erratalevel', 0))
# UniventionMirror needs to provide its own settings
if not self.online_repository:
self.server = UCSLocalServer('') # type: _UCSServer
self.releases = {"error": "offline"}
# generate user agent string
user_agent = self._get_user_agent_string()
self.server = UCSHttpServer(
def _get_releases(self):
# type: () -> None
Detect server prefix and download `ucs-releases.json` file.
if not self.repourl.path:
_code, _size, data = self.server.access(None, '/univention-repository/ucs-releases.json', get=True)
self.server += '/univention-repository/'
self.log.info('Using detected prefix /univention-repository/')
self.releases = json.loads(data)
except DownloadError as e:
self.log.info('No prefix /univention-repository/ detected, using /')
ud.debug(ud.NETWORK, ud.ALL, "%s" % e)
# Validate server settings
_code, _size, data = self.server.access(None, 'ucs-releases.json', get=True)
self.log.info('Using configured prefix %s', self.repourl.path)
self.releases = json.loads(data)
except DownloadError as e:
self.log.error('Failed configured prefix %s', self.repourl.path, exc_info=True)
uri, code = e.args
raise ConfigurationError(uri, 'non-existing prefix "%s": %s' % (self.repourl.path, uri))
except ConfigurationError as e:
if self.check_access:
self.log.fatal('Failed server detection: %s', e, exc_info=True)
self.releases = {"error": str(e)}
except (ValueError, LookupError) as exc:
ud.debug(ud.NETWORK, ud.ERROR, 'Querying maintenance information failed: %s' % (exc,))
self.releases = {"error": str(exc)}
[docs] def get_releases(self, start=None, end=None):
# type: (Optional[UCS_Version], Optional[UCS_Version]) -> Iterator[Tuple[UCS_Version, Dict[str, Any]]]
Return UCS releases in range.
:param start: Minimum requried version.
:param end: Maximum allowed version.
:returns: Iterator of 2-tuples (UCS_Version, data).
for major_release in self.releases.get('releases', []):
for minor_release in major_release['minors']:
for patchlevel_release in minor_release['patchlevels']:
ver = UCS_Version((
if start and ver < start:
if end and ver > end:
yield (ver, dict(patchlevel_release, major=major_release['major'], minor=minor_release['minor']))
[docs] def get_next_version(self, version, components=[], errorsto='stderr'):
# type: (UCS_Version, Iterable[Component], Literal["stderr", "exception", "none"]) -> Optional[UCS_Version]
Check if a new patchlevel, minor or major release is available for the given version.
Components must be available for the same major.minor version.
:param UCS_Version version: A UCS release version.
:param components: A list of components, which must be available for the next release.
:param str errorsto: Select method of reporting errors; on of 'stderr', 'exception', 'none'.
:returns: The next UCS release or None.
:rtype: UCS_Version or None
:raises RequiredComponentError: if a required component is missing
ver = min(ver for ver, _data in self.get_releases() if ver > version)
except ValueError:
return None
self.log.info('Found version %s', ver)
failed = set() # type: Set[Tuple[Component, str]]
for component in components:
self.log.info('Checking for component %s', component.name)
any(component.repositories(ver, ver, failed=failed if component.current else set()))
if failed:
ex = RequiredComponentError(str(ver), {comp.name for comp, ex in failed})
if errorsto == 'exception':
raise ex
elif errorsto == 'stderr':
print(ex, file=sys.stderr)
return None
self.log.info('Going for version %s', ver)
return ver
[docs] def get_all_available_release_updates(self, ucs_version=None):
# type: (Optional[UCS_Version]) -> Tuple[List[UCS_Version], Optional[Set[str]]]
Returns a list of all available release updates - the function takes required components into account
and stops if a required component is missing
:param ucs_version: starts travelling through available version from version.
:type ucs_version: UCS_Version or None
:returns: a list of 2-tuple `(versions, blocking_component)`, where `versions` is a list of UCS release and `blocking_component` is the first missing component blocking the update.
:rtype: tuple(list[str], str or None)
ucs_version = ucs_version or self.current_version
components = self.get_components(only_current=True)
result = [] # type: List[UCS_Version]
while ucs_version:
ucs_version = self.get_next_version(ucs_version, components, errorsto='exception')
except RequiredComponentError as ex:
self.log.warning('Update blocked by components %s', ', '.join(ex.components))
# ex.components blocks update to next version ==> return current list and blocking component
return result, ex.components
if not ucs_version:
self.log.info('Found release updates %r', result)
return result, None
[docs] def release_update_available(self, ucs_version=None, errorsto='stderr'):
# type: (Optional[UCS_Version], Literal["stderr", "exception", "none"]) -> Optional[UCS_Version]
Check if an update is available for the `ucs_version`.
:param str ucs_version: The UCS release to check.
:param str errorsto: Select method of reporting errors; on of 'stderr', 'exception', 'none'.
:returns: The next UCS release or None.
:rtype: str or None
ucs_version = ucs_version or self.current_version
components = self.get_components(only_current=True)
return self.get_next_version(UCS_Version(ucs_version), components, errorsto)
[docs] def release_update_temporary_sources_list(self, version):
# type: (UCS_Version) -> List[str]
Return list of Debian repository statements for the release update including all enabled components.
:param version: The UCS release.
:returns: A list of Debian APT `sources.list` lines.
:rtype: list[str]
result = [UCSRepoPool5(version).deb(self.server)]
for comp in self.get_components():
result += list(comp.repositories(version, version, failed=set()))
except (ConfigurationError, ProxyError):
if comp.current:
return result
[docs] def component(self, name):
# type: (str) -> Component
return Component(self, name)
[docs] def get_components(self, only_localmirror_enabled=False, all=False, only_current=False):
# type: (bool, bool, bool) -> Set[Component]
Retrieve all (enabled) components from registry as set().
By default, only "enabled" components will be returned (repository/online/component/%s=$TRUE).
:param bool only_localmirror_enabled:
Only the components enabled for local mirroring.
If only_`localmirror`_enabled is `True`, then all components with `repository/online/component/%s/localmirror=$TRUE` will be returned.
If `repository/online/component/%s/localmirror` is not set, then the value of `repository/online/component/%s` is used for backward compatibility.
:param bool all: Also return not enabled components.
:param bool only_current: Only return components marked as "current".
:returns: The set of (enabled) components.
components = set()
for key in self.configRegistry:
match = RE_COMPONENT.match(key)
if not match:
component, = match.groups()
comp = self.component(component)
enabled = bool(comp)
if only_localmirror_enabled:
enabled = self.configRegistry.is_true(comp.ucrv("localmirror"), enabled)
if only_current and not comp.current:
if all or enabled:
return components
[docs] def component_update_get_packages(self):
# type: () -> Tuple[List[Tuple[Text, Text]], List[Tuple[Text, Text, Text]], List[Tuple[Text, Text]]]
Return tuple with list of (new, upgradeable, removed) packages.
:return: A 3-tuple (new, upgraded, removed).
:rtype: tuple(list[str], list[str], list[str])
env = dict(os.environ, LC_ALL="C.UTF-8")
proc = subprocess.Popen(("univention-config-registry", "commit", Component.FN_APTSOURCES), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = (data.decode("UTF-8", errors="replace") for data in proc.communicate())
if stderr:
ud.debug(ud.NETWORK, ud.PROCESS, 'stderr=%s' % stderr)
if stdout:
ud.debug(ud.NETWORK, ud.INFO, 'stdout=%s' % stdout)
# FIXME: error handling
proc = subprocess.Popen(cmd_update, shell=True, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = (data.decode("UTF-8", errors="replace") for data in proc.communicate())
if stderr:
ud.debug(ud.NETWORK, ud.PROCESS, 'stderr=%s' % stderr)
if stdout:
ud.debug(ud.NETWORK, ud.INFO, 'stdout=%s' % stdout)
# FIXME: error handling
proc = subprocess.Popen(cmd_dist_upgrade_sim, shell=True, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = (data.decode("UTF-8", errors="replace") for data in proc.communicate())
if stderr:
ud.debug(ud.NETWORK, ud.PROCESS, 'stderr=%s' % stderr)
if stdout:
ud.debug(ud.NETWORK, ud.INFO, 'stdout=%s' % stdout)
if proc.returncode == 100:
raise UnmetDependencyError(stderr)
new_packages = [] # type: List[Tuple[Text, Text]]
upgraded_packages = [] # type: List[Tuple[Text, Text, Text]]
removed_packages = [] # type: List[Tuple[Text, Text]]
for line in stdout.splitlines():
line_split = line.split(' ')
if line.startswith('Inst '):
# upgrade:
# Inst univention-updater [3.1.1-5] (3.1.1-6.408.200810311159
# inst:
# Inst mc (1:4.6.1-6.12.200710211124 oxae-update.open-xchange.com)
if len(line_split) > 3:
if line_split[2].startswith('[') and line_split[2].endswith(']'):
ud.debug(ud.NETWORK, ud.PROCESS, 'Added %s to the list of upgraded packages' % line_split[1])
upgraded_packages.append((line_split[1], line_split[2].replace('[', '').replace(']', ''), line_split[3].replace('(', '')))
ud.debug(ud.NETWORK, ud.PROCESS, 'Added %s to the list of new packages' % line_split[1])
new_packages.append((line_split[1], line_split[2].replace('(', '')))
ud.debug(ud.NETWORK, ud.WARN, 'unable to parse the update line: %s' % line)
elif line.startswith('Remv '):
if len(line_split) > 3:
ud.debug(ud.NETWORK, ud.PROCESS, 'Added %s to the list of removed packages' % line_split[1])
removed_packages.append((line_split[1], line_split[2].replace('(', '')))
elif len(line_split) > 2:
ud.debug(ud.NETWORK, ud.PROCESS, 'Added %s to the list of removed packages' % line_split[1])
removed_packages.append((line_split[1], 'unknown'))
ud.debug(ud.NETWORK, ud.WARN, 'unable to parse the update line: %s' % line)
return (new_packages, upgraded_packages, removed_packages)
[docs] def run_dist_upgrade(self):
# type: () -> int
Run `apt-get dist-upgrade` command.
:returns: a 3-tuple (return_code, stdout, stderr)
:rtype: tuple(int, str, str)
env = dict(os.environ, DEBIAN_FRONTEND="noninteractive")
with open("/var/log/univention/updater.log", "a") as log:
return subprocess.call(cmd_dist_upgrade, shell=True, env=env, stdout=log, stderr=log)
[docs] def print_component_repositories(self, clean=False, start=None, end=None, for_mirror_list=False):
# type: (bool, Optional[UCS_Version], Optional[UCS_Version], bool) -> str
Return a string of Debian repository statements for all enabled components.
:param bool clean: Add additional `clean` statements for `apt-mirror` if enabled by UCRV `repository/online/component/%s/clean`.
:param UCS_Version start: optional smallest UCS release to return.
:param UCS_Version end: optional largest UCS release to return.
:param bool for_mirror_list: component entries for `mirror.list` will be returned, otherwise component entries for local `sources.list`.
:returns: A string with APT statement lines.
:rtype: str
if not self.online_repository:
return ''
if clean:
clean = self.configRegistry.is_true('online/repository/clean', False)
result = [] # type: List[str]
failed = set() # type: Set[Tuple[Component, str]]
for comp in sorted(self.get_components(only_localmirror_enabled=for_mirror_list)):
result += comp.repositories(start, end, clean=clean, for_mirror_list=for_mirror_list, failed=failed)
result += ["# Component %s: %s" % (comp.name, ex) for comp, ex in failed]
return '\n'.join(result)
def _get_user_agent_string(self):
# type: () -> str
Return the HTTP user agent string encoding the enabled components.
:returns: A HTTP user agent string.
:rtype: str
# USER_AGENT='updater/identify - version/version-version/patchlevel errata version/erratalevel - uuid/system - uuid/license'
# USER_AGENT='UCS updater - 3.1-0 errata28 - 77e6406d-7a3e-40b3-a398-81cf119c9ef7 - 4c52d2da-d04d-4b05-a593-1974ee851fc8'
# USER_AGENT='UCS updater - 3.1-0 errata28 - 77e6406d-7a3e-40b3-a398-81cf119c9ef7 - 00000000-0000-0000-0000-000000000000'
return '%s - %s-%s errata%s - %s - %s - %s - %s' % (
self.configRegistry.get('updater/identify', 'UCS'),
self.configRegistry.get('version/version'), self.configRegistry.get('version/patchlevel'),
self.configRegistry.get('uuid/system', UUID_NULL),
self.configRegistry.get('uuid/license', UUID_NULL),
','.join(self.configRegistry.get('repository/app_center/installed', '').split('-')),
self.configRegistry.get('updater/statistics', ''),
[docs] @staticmethod
def call_sh_files(scripts, logname, *args):
# type: (Iterable[Tuple[_UCSServer, _UCSRepo, Optional[str], str, bytes]], str, *str) -> Iterator[Tuple[str, str]]
Get pre- and postup.sh files and call them in the right order::
u = UniventionUpdater()
ver = u.get_next_version(u.current_version)
scripts = u.get_sh_files(ver, ver)
for phase, order in u.call_sh_files(scripts, '/var/log/univention/updater.log', ver):
if (phase, order) == ('update', 'main'):
:param scripts: A generator returning the script to call, e.g. :py:meth:`get_sh_files`
:param str logname: The file name of the log file.
:param args: Additional arguments to pass through to the scripts.
:returns: A generator returning 2-tuples (phase, part)
def call(*cmd):
# type: (*str) -> int
Execute script.
:param cmd: The command to execute in a sub-process.
:type cmd: list(str)
:returns: The exit code of the child process.
:rtype: int
commandline = ' '.join(["'%s'" % a.replace("'", "'\\''") for a in cmd])
ud.debug(ud.NETWORK, ud.INFO, "Calling %s" % commandline)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
tee = subprocess.Popen(('tee', '-a', logname), stdin=p.stdout)
# Order is important! See bug #16454
return p.returncode
# download scripts
yield "update", "pre"
main = {'preup': [], 'postup': []} # type: Dict[str, List[Tuple[str, str]]]
comp = {'preup': [], 'postup': []} # type: Dict[str, List[Tuple[str, str]]]
# save scripts to temporary files
with TemporaryDirectory() as tempdir:
for server, struct, phase, path, data in scripts:
if phase is None:
assert data is not None
uri = server.join(path)
name = os.path.join(tempdir, uri.replace("/", "_"))
with open(name, "wb") as fd:
os.fchmod(fd.fileno(), 0o744)
ud.debug(ud.NETWORK, ud.INFO, "%s saved to %s" % (uri, name))
is_component = hasattr(struct, 'part') and struct.part.endswith('/component')
memo = comp if is_component else main
memo[phase].append((name, str(struct.patch)))
except EnvironmentError as ex:
ud.debug(ud.NETWORK, ud.ERROR, "Error saving %s to %s: %s" % (uri, name, ex))
# call component/preup.sh pre $args
yield "preup", "pre"
for (script, patch) in comp['preup']:
if call(script, 'pre', *args) != 0:
raise PreconditionError('preup', 'pre', patch, script)
# call $next_version/preup.sh
yield "preup", "main"
for (script, patch) in main['preup']:
if call(script, *args) != 0:
raise PreconditionError('preup', 'main', patch, script)
# call component/preup.sh post $args
yield "preup", "post"
for (script, patch) in comp['preup']:
if call(script, 'post', *args) != 0:
raise PreconditionError('preup', 'post', patch, script)
# call $update/commands/distupgrade or $update/commands/upgrade
yield "update", "main"
# call component/postup.sh pos $args
yield "postup", "pre"
for (script, patch) in comp['postup']:
if call(script, 'pre', *args) != 0:
raise PreconditionError('postup', 'pre', patch, script)
# call $next_version/postup.sh
yield "postup", "main"
for (script, patch) in main['postup']:
if call(script, *args) != 0:
raise PreconditionError('postup', 'main', patch, script)
# call component/postup.sh post $args
yield "postup", "post"
for (script, patch) in comp['postup']:
if call(script, 'post', *args) != 0:
raise PreconditionError('postup', 'post', patch, script)
# clean up
yield "update", "post"
[docs] def get_sh_files(self, start, end, mirror=False):
# type: (UCS_Version, UCS_Version, bool) -> Iterator[Tuple[_UCSServer, _UCSRepo, Optional[str], str, bytes]]
Return all preup- and postup-scripts of repositories.
:param UCS_Version start: The UCS release to start from.
:param UCS_Version end: The UCS release where to stop.
:param bool mirror: Use the settings for mirroring.
:returns: iteratable (server, struct, phase, path, script)
:raises VerificationError: if the PGP signature is invalid.
See :py:meth:`call_sh_files` for an example.
def all_repos():
# type: () -> Iterator[Tuple[_UCSServer, _UCSRepo, bool]]
self.log.info('Searching releases [%s..%s]', start, end)
for ver, _data in self.get_releases(start, end):
yield self.server, UCSRepoPool5(release=ver, prefix=self.server), True
self.log.info('Searching components [%s..%s]', start, end)
components = self.get_components(only_localmirror_enabled=mirror)
for comp in components:
for server, struct in comp.versions(start, end, mirror):
struct.arch = "all"
self.log.info('Component %s from %s versions %r', comp.name, server, struct)
yield server, struct, comp.current
for server, struct, critical in all_repos():
uses_proxy = hasattr(server, "proxy_handler") and server.proxy_handler.proxies # type: ignore
for phase in ('preup', 'postup'):
name = '%s.sh' % phase
path = struct.path(name)
ud.debug(ud.NETWORK, ud.ALL, "Accessing %s" % path)
_code, _size, script = server.access(struct, name, get=True)
# Bug #37031: dansguarding is lying and returns 200 even for blocked content
if not script.startswith(b'#!') and uses_proxy:
uri = server.join(path)
raise ProxyError(uri, "download blocked by proxy?")
if self.script_verify and struct >= UCS_Version((3, 2, 0)):
name_gpg = name + '.gpg'
path_gpg = struct.path(name_gpg)
_code, _size, signature = server.access(struct, name_gpg, get=True)
if not signature.startswith(b"-----BEGIN PGP SIGNATURE-----") and uses_proxy:
uri = server.join(path_gpg)
raise ProxyError(uri, "download blocked by proxy?")
except DownloadError:
raise VerificationError(path_gpg, "Signature download failed")
error = verify_script(script, signature)
if error is not None:
raise VerificationError(path, "Invalid signature: %r" % error)
yield server, struct, None, path_gpg, signature
yield server, struct, phase, path, script
except DownloadError as e:
ud.debug(ud.NETWORK, ud.ALL, "%s" % e)
except ConfigurationError:
if critical:
[docs]class LocalUpdater(UniventionUpdater):
Direct file access to local repository.
def __init__(self):
# type: () -> None
self.log = logging.getLogger('updater.LocalUpdater')
repository_path = self.configRegistry.get('repository/mirror/basepath', '/var/lib/univention-repository')
self.server = UCSLocalServer("%s/mirror/" % repository_path) # type: _UCSServer