#!/usr/bin/python3
#
# Univention App Center
# Utility functions
#
# SPDX-FileCopyrightText: 2015-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
#
from __future__ import annotations
import http.client
import ipaddress
import os
import os.path
import re
import shutil
import socket
import ssl
import time
import urllib.request
from collections import OrderedDict
from configparser import ParsingError, RawConfigParser
from hashlib import md5, sha256
from locale import getlocale
from shlex import quote
from subprocess import PIPE, STDOUT, Popen, list2cmdline
from threading import Thread
from typing import TYPE_CHECKING, Any, TypeVar, overload
from urllib.parse import urlencode
from uuid import uuid4
from ldap.filter import filter_format
from univention.appcenter.log import get_base_logger
from univention.appcenter.ucr import ucr_get, ucr_keys
from univention.config_registry import interfaces
from univention.config_registry.misc import key_shell_escape
from univention.lib.i18n import Translation
if TYPE_CHECKING:
from collections.abc import Container, Iterable, Mapping, Sequence
from logging import Logger
from univention.appcenter.app import App
_ConfigParser = TypeVar("_ConfigParser", bound=RawConfigParser)
_T = TypeVar("_T")
# "global" translation for univention-appcenter
# also provides translation for univention-appcenter-docker etc
_ = Translation('univention-appcenter').translate
utils_logger = get_base_logger().getChild('utils')
@overload
def read_ini_file(filename: str) -> RawConfigParser:
pass
@overload
def read_ini_file(filename: str, parser_class: type[_ConfigParser]) -> _ConfigParser:
pass
[docs]
def read_ini_file(filename, parser_class=RawConfigParser):
parser = parser_class()
try:
with open(filename) as f:
parser.read_file(f)
except TypeError:
pass
except OSError:
pass
except ParsingError as exc:
utils_logger.warning('Could not parse %s', filename)
utils_logger.warning(str(exc))
else:
return parser
# in case of error return empty parser
return parser_class()
[docs]
def docker_bridge_network_conflict() -> bool:
docker0_net = ipaddress.IPv4Network('%s' % (ucr_get('docker/daemon/default/opts/bip', '172.17.42.1/16'),), False)
for _name, iface in interfaces.Interfaces().ipv4_interfaces:
if 'address' in iface and 'netmask' in iface:
my_net = ipaddress.IPv4Network('%s/%s' % (iface['address'], iface['netmask']), False)
if my_net.overlaps(docker0_net):
return True
return False
[docs]
def app_is_running(app: App | str) -> bool | None:
from univention.appcenter.app_cache import Apps
if isinstance(app, str):
app = Apps().find(app)
if app:
if not app.docker:
return False
if not app.is_installed():
return False
try:
from univention.appcenter.docker import Docker
except ImportError:
return None
else:
docker = Docker(app)
return docker.is_running()
else:
return None
[docs]
def docker_is_running() -> bool:
return call_process(['invoke-rc.d', 'docker', 'status']).returncode == 0
[docs]
def app_ports() -> list[tuple[str, int, int]]:
"""
Returns a list for ports of an App:
[(app_id, container_port, host_port), ...]
"""
ret = []
for key in ucr_keys():
match = re.match(r'^appcenter/apps/(.*)/ports/(\d*)', key)
if match:
try:
ret.append((match.groups()[0], int(match.groups()[1]), int(ucr_get(key))))
except ValueError:
pass
return sorted(ret)
[docs]
def app_ports_with_protocol() -> list[tuple[str, int, int, str]]:
"""
Returns a list for ports of an App:
[(app_id, container_port, host_port, protocol), ...]
"""
ret = []
for app_id, container_port, host_port in app_ports():
protocol = ucr_get('appcenter/apps/%s/ports/%s/protocol' % (app_id, container_port), 'tcp')
for proto in protocol.split(', '):
ret.append((app_id, container_port, host_port, proto))
return ret
[docs]
class NoMorePorts(Exception):
pass
[docs]
def currently_free_port_in_range(lower_bound: int, upper_bound: int, blacklist: Container[int]) -> int:
for port in range(lower_bound, upper_bound):
if port in blacklist:
continue
s = socket.socket()
try:
s.bind(('', port))
except OSError:
pass
else:
s.close()
return port
raise NoMorePorts()
[docs]
def generate_password() -> str:
text = "%s%s" % (uuid4(), time.time())
return get_sha256(text.encode("utf-8"))
[docs]
def underscore(value: str) -> str:
return re.sub('([a-z])([A-Z])', r'\1_\2', value).lower()
[docs]
def capfirst(value: str) -> str:
return value[0].upper() + value[1:]
[docs]
def camelcase(value: str) -> str:
return ''.join(capfirst(part) for part in value.split('_'))
[docs]
def shell_safe(value: str) -> str:
return underscore(key_shell_escape(value))
[docs]
def mkdir(directory: str) -> None:
if os.path.exists(directory):
return
parent, child = os.path.split(directory)
mkdir(parent)
if child:
os.mkdir(directory)
[docs]
def rmdir(directory: str) -> None:
if os.path.exists(directory):
shutil.rmtree(directory)
[docs]
def call_process2(cmd: Sequence[str], logger: Logger | None = None, env: Mapping[str, str] | None = None, cwd: str | None = None) -> tuple[int, str]:
if logger is None:
logger = utils_logger
# make sure we log strings only
str_cmd = [str(x) for x in cmd]
if cwd:
logger.debug('Running in %s:', cwd)
logger.info('Running command: %s', ' '.join(str_cmd))
out = ""
ret = 0
try:
p = Popen(cmd, stdout=PIPE, stderr=STDOUT, close_fds=True, env=env, cwd=cwd)
assert p.stdout is not None
while p.poll() is None:
stdout = p.stdout.readline().decode('utf-8')
if stdout:
out += stdout
if logger:
logger.info(stdout.strip())
ret = p.returncode
except Exception as err:
out = str(err)
ret = 1
if ret:
logger.error('Command %s failed with: %s (%s)', ' '.join(str_cmd), out.strip(), ret)
return ret, out
[docs]
def call_process(args: Sequence[str], logger: Logger | None = None, env: Mapping[str, str] | None = None, cwd: str | None = None) -> Any:
process = Popen(args, stdout=PIPE, stderr=PIPE, close_fds=True, env=env, cwd=cwd)
if logger is not None:
if cwd:
logger.debug('Calling in %s:', cwd)
logger.debug('Calling %s', ' '.join(quote(arg) for arg in args))
remove_ansi_escape_sequence_regex = re.compile(r'\x1B\[[0-9;]*[a-zA-Z]')
def _handle_output(out, handler):
for line in iter(out.readline, b''):
line = line.decode('utf-8')
line = line.removesuffix('\n')
line = remove_ansi_escape_sequence_regex.sub(' ', line)
handler(line)
out.close()
stdout_thread = Thread(target=_handle_output, args=(process.stdout, logger.info))
stdout_thread.daemon = True
stdout_thread.start()
stderr_thread = Thread(target=_handle_output, args=(process.stderr, logger.warning))
stderr_thread.daemon = True
stderr_thread.start()
while stdout_thread.is_alive() or stderr_thread.is_alive():
time.sleep(0.2)
process.wait()
else:
process.communicate()
return process
[docs]
def call_process_as(user: str, args: Sequence[str], logger: Logger | None = None, env: Mapping[str, str] | None = None) -> Any:
args = list2cmdline(args)
args = ['/bin/su', '-', user, '-c', args]
return call_process(args, logger, env)
[docs]
def verbose_http_error(exc: Exception) -> str:
strerror = ''
getcode = getattr(exc, "getcode", None)
if getcode is not None:
code = getcode()
if code == 404:
url = getattr(exc, "url", None)
strerror = _('%s could not be downloaded. This seems to be a problem with the App Center server. Please try again later.') % url
elif code >= 500:
strerror = _('This is a problem with the App Center server. Please try again later.')
reason = getattr(exc, "reason", None)
if reason is not None and isinstance(reason, ssl.SSLError):
strerror = _('There is a problem with the certificate of the App Center server %s.') % get_server()
strerror += ' (%s)' % (reason,)
while reason:
exc = reason
reason = getattr(exc, "reason", None)
errno = getattr(exc, "errno", None)
if errno is not None:
version = ucr_get('version/version')
strerror += getattr(exc, 'strerror', '') or ''
if errno == 1: # gaierror(1, something like 'SSL Unknown protocol') SSLError(1, '_ssl.c:504: error:14090086:SSL routines:ssl3_get_server_certificate:certificate verify failed')
link_to_doc = _('https://docs.software-univention.de/manual-%s.html#ip-config:Web_proxy_for_caching_and_policy_management__virus_scan') % version
strerror += '. ' + _('This may be a problem with the firewall or proxy of your system. You may find help at %s.') % link_to_doc
if errno == -2: # gaierror(-2, 'Name or service not known')
link_to_doc = _('https://docs.software-univention.de/manual-%s.html#networks:dns') % version
strerror += '. ' + _('This is probably due to the DNS settings of your server. You may find help at %s.') % link_to_doc
if not strerror.strip():
strerror = str(exc)
if isinstance(exc, ssl.CertificateError):
strerror = _('There is a problem with the certificate of the App Center server %s.') % get_server() + ' (%s)' % (strerror,)
if isinstance(exc, http.client.BadStatusLine):
strerror = _('There was a problem with the HTTP response of the server (BadStatusLine). Please try again later.')
return strerror
[docs]
class HTTPSConnection(http.client.HTTPSConnection):
"""Verified HTTP Connection, Bug #30620"""
def __init__(self, *args, **kwargs):
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ssl_context.set_alpn_protocols(['http/1.1'])
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_verify_locations("/etc/ssl/certs/ca-certificates.crt")
super().__init__(*args, context=ssl_context, **kwargs)
[docs]
class HTTPSHandler(urllib.request.HTTPSHandler):
[docs]
def https_open(self, req):
return self.do_open(HTTPSConnection, req)
[docs]
def urlopen(request):
if not urlopen._opener_installed:
handler = []
proxy_http = ucr_get('proxy/http')
if proxy_http:
handler.append(urllib.request.ProxyHandler({'http': proxy_http, 'https': proxy_http}))
handler.append(HTTPSHandler())
opener = urllib.request.build_opener(*handler)
urllib.request.install_opener(opener)
urlopen._opener_installed = True
return urllib.request.urlopen(request, timeout=60) # noqa: S310
urlopen._opener_installed = False # type: ignore
[docs]
def get_md5(content: bytes) -> str:
m = md5()
if isinstance(content, str):
content = content.encode('utf-8')
m.update(content)
return m.hexdigest()
[docs]
def get_md5_from_file(filename: str) -> str | None:
if os.path.exists(filename):
with open(filename, 'rb') as f:
return get_md5(f.read())
[docs]
def get_sha256(content: bytes) -> str:
m = sha256()
if isinstance(content, str):
content = content.encode('utf-8')
m.update(content)
return m.hexdigest()
[docs]
def get_sha256_from_file(filename: str) -> str | None:
if os.path.exists(filename):
with open(filename, 'rb') as f:
return get_sha256(f.read())
[docs]
def get_current_ram_available() -> float:
"""Returns RAM currently available in MB, excluding Swap"""
# return (psutil.avail_phymem() + psutil.phymem_buffers() + psutil.cached_phymem()) / (1024*1024) # psutil is outdated. re-enable when methods are supported
# implement here. see http://code.google.com/p/psutil/source/diff?spec=svn550&r=550&format=side&path=/trunk/psutil/_pslinux.py
with open('/proc/meminfo') as f:
splitlines = map(lambda line: line.split(), f.readlines())
meminfo = {line[0]: int(line[1]) * 1024 for line in splitlines} # bytes
avail_phymem = meminfo['MemFree:'] # at least MemFree is required
# see also http://code.google.com/p/psutil/issues/detail?id=313
phymem_buffers = meminfo.get('Buffers:', 0) # OpenVZ does not have Buffers, calculation still correct, see Bug #30659
cached_phymem = meminfo.get('Cached:', 0) # OpenVZ might not even have Cached? Don't know if calculation is still correct but it is better than raising KeyError
return (avail_phymem + phymem_buffers + cached_phymem) / (1024 * 1024)
[docs]
def get_free_disk_space() -> float:
"""Returns disk space currently free in MB"""
docker_path = '/var/lib/docker'
try:
fd = os.open(docker_path, os.O_RDONLY)
try:
stats = os.fstatvfs(fd)
bytes_free = stats.f_frsize * stats.f_bavail # block size * number of free blocks
mb_free = bytes_free * 1e-6
return mb_free
finally:
os.close(fd)
except Exception:
utils_logger.debug('Free disk space could not be determined.')
return 0.0
[docs]
def flatten(list_of_lists: Iterable[Any]) -> list[Any]:
# return [item for sublist in list_of_lists for item in sublist]
# => does not work well for strings in list
ret = []
for sublist in list_of_lists:
if isinstance(sublist, list | tuple):
ret.extend(flatten(sublist))
else:
ret.append(sublist)
return ret
[docs]
def unique(sequence: Iterable[_T]) -> list[_T]:
# uniquifies any list; preserves ordering
return list(OrderedDict.fromkeys(sequence))
[docs]
def get_locale() -> str | None:
# returns currently set locale: de_AT.UTF-8 -> de
# may return None if not set (i.e. 'C')
locale = getlocale()[0]
if locale:
locale = locale.split('_', 1)[0]
return locale
[docs]
def gpg_verify(filename: str, signature: str | None = None) -> tuple[int, str]:
if signature is None:
signature = filename + '.gpg'
cmd = (
'apt-key',
'verify',
'--verbose',
signature,
filename,
)
p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True)
_stdout, stderr = p.communicate()
return (p.returncode, stderr.decode('utf-8'))
[docs]
def get_local_fqdn() -> str:
return '%s.%s' % (ucr_get('hostname'), ucr_get('domainname'))
[docs]
def get_server() -> str:
from univention.appcenter.app_cache import default_server
return default_server()
[docs]
def container_mode() -> bool:
"""returns True if this system is a container"""
return bool(ucr_get('docker/container/uuid'))
[docs]
def find_hosts_for_master_packages() -> list[tuple[str, bool]]:
from univention.appcenter.udm import get_machine_connection, search_objects
lo, pos = get_machine_connection()
hosts = []
for host in search_objects('computers/domaincontroller_master', lo, pos):
hosts.append((host.info.get('fqdn'), True))
for host in search_objects('computers/domaincontroller_backup', lo, pos):
hosts.append((host.info.get('fqdn'), False))
try:
local_fqdn = '%s.%s' % (ucr_get('hostname'), ucr_get('domainname'))
local_is_master = ucr_get('server/role') == 'domaincontroller_master'
hosts.remove((local_fqdn, local_is_master))
except ValueError:
# not in list
pass
return hosts
[docs]
def resolve_dependencies(apps: list[App], action: str) -> list[App]:
from univention.appcenter.app_cache import Apps
from univention.appcenter.udm import get_machine_connection
lo, _pos = get_machine_connection()
utils_logger.info('Resolving dependencies for %s', ', '.join(app.id for app in apps))
apps_with_their_dependencies = []
depends: dict[int, list[int]] = {}
checked = []
apps = apps[:]
if action == 'remove':
# special case: do not resolve dependencies as
# we are going to uninstall the app
# do not removed dependant apps either: the admin may want to keep them
# => will get an error afterwards
# BUT: reorder the apps if needed
original_app_ids = [_app.id for _app in apps]
for app in apps:
checked.append(app)
depends[app.id] = []
for app_id in app.required_apps:
if app_id not in original_app_ids:
continue
depends[app.id].append(app_id)
for app_id in app.required_apps_in_domain:
if app_id not in original_app_ids:
continue
depends[app.id].append(app_id)
apps = []
while apps:
app = apps.pop()
if app in checked:
continue
checked.insert(0, app)
dependencies = depends[app.id] = []
for app_id in app.required_apps:
required_app = Apps().find(app_id)
if required_app is None:
utils_logger.warning('Could not find required App %s', app_id)
continue
if not required_app.is_installed():
utils_logger.info('Adding %s to the list of Apps', required_app.id)
apps.append(required_app)
dependencies.append(app_id)
for app_id in app.required_apps_in_domain:
required_app = Apps().find(app_id)
if required_app is None:
utils_logger.warning('Could not find required App %s', app_id)
continue
if required_app.is_installed():
continue
if lo.search(filter_format('(&(univentionObjectType=appcenter/app)(univentionAppInstalledOnServer=*)(univentionAppID=%s_*))', [required_app.id])):
continue
utils_logger.info('Adding %s to the list of Apps', required_app.id)
apps.append(required_app)
dependencies.append(app_id)
max_loop = len(checked) ** 2
i = 0
while checked:
app = checked.pop(0)
if not depends[app.id]:
apps_with_their_dependencies.append(app)
for app_id, required_apps in depends.items():
try:
required_apps.remove(app.id)
except ValueError:
pass
else:
checked.append(app)
i += 1
if i > max_loop:
# this should never happen unless we release apps with dependency cycles
raise RuntimeError('Cannot resolve dependency cycle!')
if action == 'remove':
# another special case:
# we need to reverse the order: the app with the dependencies needs to be
# removed first
apps_with_their_dependencies.reverse()
return apps_with_their_dependencies