Source code for univention.management.console.modules.setup.util

#!/usr/bin/python3
# SPDX-FileCopyrightText: 2011-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""
Univention Management Console
 module: system setup
"""

from __future__ import annotations

import copy
import csv
import ipaddress
import json
import os
import os.path
import random
import re
import socket
import subprocess
import tempfile
import time
import traceback
from contextlib import contextmanager
from re import Pattern
from typing import IO, TYPE_CHECKING, Any

import dns.exception
import dns.resolver
import dns.reversename
import ldap
import psutil

import univention.config_registry
from univention.lib import atjobs
from univention.lib.admember import (
    check_ad_account, check_connection, connectionFailed, do_time_sync, failedADConnect, lookup_adds_dc,
    notDomainAdminInAD,
)
from univention.lib.i18n import Locale, Translation
from univention.management.console.log import MODULE
from univention.management.console.modules import UMC_Error


if TYPE_CHECKING:
    from collections.abc import Container, Iterator, Mapping


# FIXME: this triggers imports from univention-lib during build time test execution.
# This in effect imports univention-ldap which is not an explicit dependency for
# univention-lib as of writing. (Bug #43388)
# The try except can be removed as soon as the dependency problem is resolved.
try:
    from univention.appcenter.actions import get_action
    from univention.appcenter.app_cache import AppCache, Apps
except ImportError as exc:
    MODULE.warning('Ignoring import error: %s', exc)
_ = Translation('univention-management-console-module-setup').translate

ucr = univention.config_registry.ConfigRegistry()
ucr.load()

PATH_SYS_CLASS_NET = '/sys/class/net'
PATH_SETUP_SCRIPTS = '/usr/lib/univention-system-setup/scripts/'
PATH_JOIN_SCRIPT = '/usr/lib/univention-system-setup/scripts/setup-join.sh'
PATH_JOIN_LOG = '/var/log/univention/join.log'
PATH_PROFILE = '/var/cache/univention-system-setup/profile'
LOG_FILE = '/var/log/univention/setup.log'
PATH_PASSWORD_FILE = '/var/cache/univention-system-setup/secret'
PATH_STATUS_FILE = '/var/www/ucs_setup_process_status.json'
CMD_ENABLE_EXEC = ['/usr/share/univention-updater/enable-apache2-umc', '--no-restart']
CMD_ENABLE_EXEC_WITH_RESTART = '/usr/share/univention-updater/enable-apache2-umc'
CMD_DISABLE_EXEC = '/usr/share/univention-updater/disable-apache2-umc'
CMD_CLEANUP_SCRIPT = '/usr/lib/univention-system-setup/scripts/cleanup.py >>/var/log/univention/setup.log 2>&1'
CMD_APPLIANCE_HOOKS = '/usr/lib/univention-system-setup/scripts/appliance_hooks.py >>/var/log/univention/setup.log 2>&1'
CITY_DATA_PATH = '/usr/share/univention-system-setup/city_data.json'
COUNTRY_DATA_PATH = '/usr/share/univention-system-setup/country_data.json'

RE_LOCALE = re.compile(r'([^.@ ]+).*')

# list of all needed UCR variables
UCR_VARIABLES = [
    # common
    'server/role',
    # language
    'locale', 'locale/default',
    # keyboard
    'xorg/keyboard/options/XkbLayout', 'xorg/keyboard/options/XkbModel',
    'xorg/keyboard/options/XkbVariant',
    # basis
    'hostname', 'domainname', 'ldap/base', 'windows/domain',
    # net: ipv4
    'gateway',
    'nameserver1', 'nameserver2', 'nameserver3',
    'dns/forwarder1', 'dns/forwarder2', 'dns/forwarder3',
    'proxy/http',
    # net: ipv6
    'ipv6/gateway',
    'interfaces/primary',
    # ssl
    'ssl/common', 'ssl/locality', 'ssl/country', 'ssl/state',
    'ssl/organization', 'ssl/organizationalunit', 'ssl/email',
    # fqdn
    'hostname',
    'domainname',
]


[docs] def timestamp() -> str: return time.strftime('%Y-%m-%d %H:%M:%S')
[docs] def is_system_joined() -> bool: return os.path.exists('/var/univention-join/joined')
[docs] def load_values(lang: str | None = None) -> dict[str, str]: ucr.load() values = {ikey: ucr[ikey] for ikey in UCR_VARIABLES} # net from univention.management.console.modules.setup.network import Interfaces interfaces = Interfaces() values['interfaces'] = interfaces.to_dict() values['physical_interfaces'] = [idev['name'] for idev in detect_interfaces()] # see whether the system has been joined or not values['joined'] = is_system_joined() # root password values['root_password'] = '' # memory values['memory_total'] = psutil.virtual_memory().total / 1024.0 / 1024.0 # MiB # get timezone values['timezone'] = '' if os.path.exists('/etc/timezone'): with open('/etc/timezone') as fd: values['timezone'] = fd.readline().strip() # read license agreement for app appliance if lang and ucr.get('umc/web/appliance/data_path'): prefix = ucr.get('umc/web/appliance/data_path') license_path = '%sLICENSE_AGREEMENT' % prefix localized_license_path = '%s_%s' % (license_path, lang.upper()) english_license_path = '%s_EN' % license_path for ipath in (localized_license_path, license_path, english_license_path): if os.path.exists(ipath): with open(ipath) as license_file: values['license_agreement'] = license_file.read() break # check for installed system activation values['system_activation_installed'] = os.path.exists('/usr/sbin/univention-system-activation') return values
[docs] def auto_complete_values_for_join(newValues: dict[str, str], current_locale: Locale | None = None) -> dict[str, str]: # try to automatically determine the domain, except on a dcmaster if newValues['server/role'] != 'domaincontroller_master' and not newValues.get('domainname'): ucr.load() for nameserver in ('nameserver1', 'nameserver2', 'nameserver3'): if newValues.get('domainname'): break nameserver = newValues.get(nameserver, ucr.get(nameserver)) if not nameserver: continue newValues['domainname'] = get_ucs_domain(nameserver) if not newValues['domainname']: raise Exception(_("Cannot automatically determine the domain. Please specify the server's fully qualified domain name.")) isAdMember = 'ad/member' in newValues and 'ad/address' in newValues if 'windows/domain' not in newValues and isAdMember: MODULE.process('Searching for NETBIOS domain in AD') for nameserver in ('nameserver1', 'nameserver2', 'nameserver3'): ns = newValues.get(nameserver, ucr.get(nameserver)) if ns: try: ad_domain_info = lookup_adds_dc(newValues.get('ad/address'), ucr={'nameserver1': ns}) except failedADConnect: pass else: newValues['windows/domain'] = ad_domain_info['Netbios Domain'] MODULE.process('Setting NETBIOS domain to AD value: %s', newValues['windows/domain']) break domainname = newValues.get("domainname") if 'windows/domain' not in newValues and domainname: newValues['windows/domain'] = domain2windowdomain(domainname) MODULE.process('Setting NETBIOS domain to default: %s', newValues['windows/domain']) # make sure that AD connector package is installed if AD member mode is chosen selectedComponents = set(newValues.get('components', [])) if isAdMember and newValues['server/role'] == 'domaincontroller_master': selectedComponents.add('univention-ad-connector') # make sure to install the memberof overlay if it is installed on the Primary Directory Node if newValues['server/role'] not in ('domaincontroller_master', 'memberserver'): selectedComponents.add('univention-ldap-overlay-memberof') # add lists with all packages that should be removed/installed on the system if selectedComponents: currentComponents: set[str] = set() for iapp in get_apps(): if iapp['is_installed']: for ipackages in (iapp['default_packages'], iapp['default_packages_master']): currentComponents = currentComponents.union(ipackages) # set of all available software packages allComponents = {'univention-ldap-overlay-memberof'} for iapp in get_apps(): for ipackages in (iapp['default_packages'], iapp['default_packages_master']): allComponents = allComponents.union(ipackages) # get all packages that shall be removed removeComponents = list(allComponents & (currentComponents - selectedComponents)) newValues['packages_remove'] = ' '.join(removeComponents) # get all packages that shall be installed installComponents = list(allComponents & (selectedComponents - currentComponents)) newValues['packages_install'] = ' '.join(installComponents) current_locale = Locale(ucr.get('locale/default', 'en_US.UTF-8:UTF-8').split(':', 1)[0]) if newValues['server/role'] == 'domaincontroller_master': # add newValues for SSL UCR variables default_locale = current_locale if 'locale/default' in newValues: default_locale = Locale(newValues['locale/default'].split(':', 1)[0]) newValues['ssl/state'] = default_locale.territory newValues['ssl/locality'] = default_locale.territory newValues['ssl/organization'] = newValues.get('organization', default_locale.territory) newValues['ssl/organizationalunit'] = 'Univention Corporate Server' newValues['ssl/email'] = 'ssl@{domainname}'.format(**newValues) # make sure that the locale of the current session is also supported # ... otherwise the setup scripts will fail after regenerating the # locale data (in 20_language/10language) with some strange python # exceptions about unsupported locale strings... if 'locale' not in newValues: newValues['locale'] = newValues.get('locale/default', '') forcedLocales = ['en_US.UTF-8:UTF-8', 'de_DE.UTF-8:UTF-8'] # we need en_US and de_DE locale as default language if current_locale: forcedLocales.append(f'{current_locale}:{current_locale.codeset}') for ilocale in forcedLocales: if ilocale not in newValues['locale']: newValues['locale'] = '%s %s' % (newValues['locale'], ilocale) return newValues
[docs] def pre_save(newValues: dict[str, str]) -> None: """Modify the final dict before saving it to the profile file.""" # network interfaces from univention.management.console.modules.setup.network import Interfaces if 'interfaces' in newValues: interfaces = Interfaces() interfaces.from_dict(newValues.pop('interfaces')) interfaces.check_consistency() newValues.update({key: value or '' for key, value in interfaces.to_ucr().items()})
[docs] def write_profile(values: dict[str, str]) -> None: pre_save(values) old_umask = os.umask(0o177) try: with open(PATH_PROFILE, "w+") as cache_file: for ikey, ival in values.items(): if isinstance(ival, bool): ival = str(ival) cache_file.write('%s="%s"\n' % (ikey, ival or '')) finally: os.umask(old_umask)
[docs] def run_networkscrips(demo_mode: bool = False) -> None: # write header before executing scripts f = open(LOG_FILE, 'a') f.write('\n\n=== RUNNING NETWORK APPLY SCRIPTS (%s) ===\n\n' % timestamp()) f.flush() # make sure that UMC servers and apache will not be restartet subprocess.call(CMD_DISABLE_EXEC, stdout=f, stderr=f) # If fast demo mode is used, no additional parameters must be provided, # as they will prevent ldap modification. The host object has to be updated script_parameters = [] if not demo_mode: script_parameters = ['--network-only', '--appliance-mode'] try: netpath = os.path.join(PATH_SETUP_SCRIPTS, '30_net') for scriptpath in sorted(os.listdir(netpath)): scriptpath = os.path.join(netpath, scriptpath) # launch script try: # appliance-mode for temporary saving the old ip address # network-only for not restarting all those services (time consuming!) p = subprocess.Popen([scriptpath, *script_parameters], stdout=f, stderr=subprocess.STDOUT) MODULE.info("Running script '%s': pid=%d", scriptpath, p.pid) p.wait() except OSError as ex: MODULE.error("Failed to run '%s': %s", scriptpath, ex) finally: # enable execution of servers again subprocess.call(CMD_ENABLE_EXEC, stdout=f, stderr=f) f.write('\n=== DONE (%s) ===\n\n' % timestamp()) f.close()
[docs] @contextmanager def written_profile(values: dict[str, str]) -> Iterator[None]: write_profile(values) try: yield finally: os.remove(PATH_PROFILE)
[docs] class ProgressState: # noqa: PLW1641 def __init__(self) -> None: self.reset()
[docs] def reset(self) -> None: self.name = '' self.message = '' self._percentage = 0.0 self.fraction = 0.0 self.fractionName = '' self.steps = 1 self.step = 0.0 self.max = 100 self.errors: list[str] = [] self.critical = False
@property def percentage(self) -> float: return (self._percentage + self.fraction * (self.step / float(self.steps))) / self.max * 100 def __eq__(self, other: object) -> bool: return self.name == other.name and self.message == other.message and self.percentage == other.percentage and self.fraction == other.fraction and self.steps == other.steps and self.step == other.step and self.errors == other.errors and self.critical == other.critical def __ne__(self, other: object) -> bool: return not self.__eq__(other) def __bool__(self) -> bool: return bool(self.name or self.message or self.percentage) __nonzero__ = __bool__
[docs] class ProgressParser: # regular expressions NAME = re.compile('^__NAME__: *(?P<key>[^ ]*) (?P<name>.*)\n$') MSG = re.compile('^__MSG__: *(?P<message>.*)\n$') STEPS = re.compile('^__STEPS__: *(?P<steps>.*)\n$') STEP = re.compile('^__STEP__: *(?P<step>.*)\n$') JOINERROR = re.compile('^__JOINERR__: *(?P<error_message>.*)\n$') ERROR = re.compile('^__ERR__: *(?P<error_message>.*)\n$') # fractions of setup scripts FRACTIONS = { '05_role/10role': 30, '10_basis/12domainname': 15, '10_basis/14ldap_basis': 20, '20_language/11default_locale': 5, '30_net/10interfaces': 20, '30_net/12gateway': 10, '30_net/13ipv6gateway': 10, '40_ssl/10ssl': 10, '50_software/10software': 30, '90_postjoin/10admember': 30, '90_postjoin/20upgrade': 10, } # current status def __init__(self) -> None: self.current = ProgressState() self.old = ProgressState() self.allowed_subdirs: Container[str] | None = None self.reset()
[docs] def reset(self, allowed_subdirs: Container[str] | None = None) -> None: self.allowed_subdirs = allowed_subdirs ucr.load() self.current.reset() self.old.reset() self.fractions = copy.copy(ProgressParser.FRACTIONS) self.calculateFractions()
[docs] def calculateFractions(self) -> None: MODULE.info('Calculating maximum value for fractions ...') for category in [x for x in os.listdir(PATH_SETUP_SCRIPTS) if os.path.isdir(os.path.join(PATH_SETUP_SCRIPTS, x))]: cat_path = os.path.join(PATH_SETUP_SCRIPTS, category) for script in [x for x in os.listdir(cat_path) if os.path.isfile(os.path.join(cat_path, x))]: name = '%s/%s' % (category, script) if name not in self.fractions: self.fractions[name] = 1 if self.allowed_subdirs and category not in self.allowed_subdirs: self.fractions[name] = 0 self.current.max = sum(self.fractions.values()) MODULE.info('Calculated a maximum value of %d', self.current.max) MODULE.info('Dumping all fractions:\n%s', self.fractions)
@property def changed(self) -> bool: if self.current != self.old: MODULE.info('Progress state has changed!') self.old = copy.copy(self.current) return True return False
[docs] def parse(self, line: str) -> bool: # start new component name match = ProgressParser.NAME.match(line) if match is not None: self.current.name, self.current.fractionName = match.groups() self.current.message = '' self.current._percentage += self.current.fraction self.current.fraction = self.fractions.get(self.current.name, 1.0) self.current.step = 0 # reset current step self.current.steps = 1 return True # new status message match = ProgressParser.MSG.match(line) if match is not None: self.current.message = match.groups()[0] return True # number of steps match = ProgressParser.STEPS.match(line) if match is not None: try: self.current.steps = int(match.groups()[0]) self.current.step = 0 return True except ValueError: pass # current step match = ProgressParser.STEP.match(line) if match is not None: try: self.current.step = float(match.groups()[0]) if self.current.step > self.current.steps: self.current.step = self.current.steps return True except ValueError: pass # error message: why did the join fail? match = ProgressParser.JOINERROR.match(line) if match is not None: error = '%s: %s\n' % (self.current.fractionName, match.groups()[0]) with open(PATH_JOIN_LOG, 'rb') as join_log: log = join_log.read().decode('UTF-8', 'replace').splitlines(True) error_log = [] for line in reversed(log): error_log.append(line) if line.startswith('Configure'): break for line in reversed(error_log): error += line self.current.errors.append(error) self.current.critical = True return True # error message: why did the script fail? match = ProgressParser.ERROR.match(line) if match is not None: error = '%s: %s' % (self.current.fractionName, match.groups()[0]) self.current.errors.append(error) return True return False
[docs] def sorted_files_in_subdirs(directory: str, allowed_subdirs: Container[str] | None = None) -> Iterator[str]: for entry in sorted(os.listdir(directory)): if allowed_subdirs and entry not in allowed_subdirs: continue path = os.path.join(directory, entry) if os.path.isdir(path): for filename in sorted(os.listdir(path)): yield os.path.join(path, filename)
[docs] def run_scripts(progressParser: ProgressParser, restartServer: bool = False, allowed_subdirs: Container[str] | None = None, lang: str = 'C', args: list[str] = []) -> None: # write header before executing scripts f = open(LOG_FILE, 'a') f.write('\n\n=== RUNNING SETUP SCRIPTS (%s) ===\n\n' % timestamp()) f.flush() # read-only handle to LOG_FILE for observing file end fr = open(LOG_FILE) # start observing at the end of the file fr.seek(0, os.SEEK_END) lastPos = fr.tell() # next full line to pass to the progressParser fullLine = '' # make sure that UMC servers and apache will not be restartet subprocess.call(CMD_DISABLE_EXEC, stdout=f, stderr=f) for scriptpath in sorted_files_in_subdirs(PATH_SETUP_SCRIPTS, allowed_subdirs): # launch script icmd = [scriptpath, *args] f.write('== script: %s\n' % icmd) try: p = subprocess.Popen(icmd, stdout=f, stderr=subprocess.STDOUT, env={ 'PATH': '/bin:/sbin:/usr/bin:/usr/sbin', 'LANG': lang, }) MODULE.info("Running script '%s': pid=%d", icmd, p.pid) except OSError as exc: MODULE.error("Failed to run '%s': %s", icmd, exc) continue while p.poll() is None: fr.seek(0, os.SEEK_END) # update file handle fr.seek(lastPos, os.SEEK_SET) # continue reading at last position currentLine = fr.readline() # try to read until next line break if not currentLine: continue fullLine += currentLine lastPos += len(currentLine) if currentLine[-1] == '\n': progressParser.parse(fullLine) fullLine = '' fr.close() # Deactivate login message univention.config_registry.handler_set(['system/setup/showloginmessage=false']) # enable execution of servers again subprocess.call(CMD_ENABLE_EXEC, stdout=f, stderr=f) if restartServer: f.write('=== Restart of UMC server and web server (%s) ===\n' % timestamp()) f.flush() p = subprocess.Popen(['/usr/bin/at', 'now'], stdin=subprocess.PIPE, stderr=f, stdout=f) p.communicate(b'''#!/bin/sh sleep 5; # leave enough time to display error messages or indicate success deb-systemd-invoke restart univention-management-console-server;''') f.write('\n=== DONE (%s) ===\n\n' % timestamp()) f.close()
@contextmanager def _temporary_password_file(password: str) -> Iterator[str]: # write password file with open(PATH_PASSWORD_FILE, 'w') as fp: fp.write('%s' % password) os.chmod(PATH_PASSWORD_FILE, 0o600) try: yield PATH_PASSWORD_FILE finally: # remove password file os.remove(PATH_PASSWORD_FILE)
[docs] def run_joinscript(progressParser: ProgressParser, values: dict[str, str], _username: str, password: str, dcname: str | None = None, lang: str = 'C') -> None: # write header before executing join script f = open(LOG_FILE, 'a') f.write('\n\n=== RUNNING SETUP JOIN SCRIPT (%s) ===\n\n' % timestamp()) f.flush() # the following scripts will not be called via setup-join.sh progressParser.fractions['10_basis/10hostname'] = 0 progressParser.fractions['10_basis/12domainname'] = 0 progressParser.fractions['10_basis/14ldap_basis'] = 0 progressParser.fractions['10_basis/16windows_domain'] = 0 # check whether particular scripts are called if not values.get('ad/member'): progressParser.fractions['90_postjoin/10admember'] = 0 if not values.get('update/system/after/setup'): progressParser.fractions['90_postjoin/20upgrade'] = 0 if not values.get('packages_remove') and not values.get('packages_install'): progressParser.fractions['50_software/10software'] = 0 # additional entries that will be called via setup-join.sh progressParser.fractions['domain-join'] = 50 progressParser.fractions['appliance-hooks.d'] = 1 progressParser.fractions['create-ssh-keys'] = 10 # recompute sum progressParser.current.max = sum(progressParser.fractions.values()) def runit(command): p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={ 'PATH': '/bin:/sbin:/usr/bin:/usr/sbin', 'LANG': lang, }) while True: line = p.stdout.readline().decode("UTF-8", "replace") if not line: break progressParser.parse(line) f.write(line) f.flush() p.wait() cmd = [PATH_JOIN_SCRIPT] if _username and password: if dcname: cmd.extend(['--dcname', dcname]) with _temporary_password_file(password) as password_file: # sanitize username reg = re.compile('[^ a-zA-Z_0-9-]') username = reg.sub('_', _username) # run join scripts without the cleanup scripts runit([*cmd, '--dcaccount', username, '--password_file', password_file, '--run_cleanup_as_atjob']) else: # run join scripts without the cleanup scripts runit([*cmd, '--run_cleanup_as_atjob']) f.write('\n=== DONE (%s) ===\n\n' % timestamp()) f.close()
[docs] def cleanup(with_appliance_hooks: bool = False) -> None: # add delay of 1 sec before actually executing the commands # in order to avoid problems with restarting the UMC server # and thus killing the setup module process cmd = 'sleep 1; ' if with_appliance_hooks: cmd += CMD_APPLIANCE_HOOKS + '; ' cmd += CMD_CLEANUP_SCRIPT # start an at job in the background atjobs.add(cmd)
[docs] def run_scripts_in_path(path: str, logfile: IO[str], category_name: str = "") -> None: logfile.write('\n=== Running %s scripts (%s) ===\n' % (category_name, timestamp())) logfile.flush() if os.path.isdir(path): for filename in sorted(os.listdir(path)): logfile.write('= Running %s\n' % filename) logfile.flush() try: subprocess.call(os.path.join(path, filename), stdout=logfile, stderr=logfile) except OSError: logfile.write('%s' % (traceback.format_exc(),)) logfile.flush() logfile.write('\n=== done (%s) ===\n' % timestamp()) logfile.flush()
[docs] def create_status_file() -> None: with open(PATH_STATUS_FILE, 'w') as status_file: status_file.write('"setup-scripts"')
[docs] def detect_interfaces() -> list[dict[str, str | None]]: """ Function to detect network interfaces in local sysfs. The loopback interface "lo" will be filtered out. Returns a list of dicts with the entries 'name' and 'mac'. """ interfaces: list[dict[str, str | None]] = [] if not os.path.exists(PATH_SYS_CLASS_NET): return interfaces for dirname in os.listdir(PATH_SYS_CLASS_NET): pathname = os.path.join(PATH_SYS_CLASS_NET, dirname) if not os.path.isdir(pathname): continue # filter out lo, etc. interfaces if open(os.path.join(pathname, 'type')).read().strip() not in ('1', '2', '3', '4', '5', '6', '7', '8', '15', '19'): continue # filter out bridge, bond, tun/tap interfaces if any(os.path.exists(os.path.join(pathname, path)) for path in ('bridge', 'bonding', 'brport', 'tun_flags')): continue # filter out vlan devices if '.' in dirname: continue mac = None try: # try to read mac address mac = open(os.path.join(pathname, 'address')).read().strip() except OSError: pass interfaces.append({'name': dirname, 'mac': mac}) return interfaces
[docs] def dhclient(interface: str, timeout: float = 10.0) -> dict[str, str]: """ perform DHCP request for specified interface. If successful, returns a dict similar to the following:: { 'address': '10.200.26.51', 'broadcast': '10.200.26.255', 'domainname': 'univention.qa', 'gateway': '', 'nameserver_1': '10.200.26.27', 'nameserver_2': '', 'nameserver_3': '', 'netmask': '255.255.255.0' } """ dhcp = {} with tempfile.NamedTemporaryFile("w+") as tmp: cmd = ( '/usr/bin/timeout', '-k', '1', str(timeout), '/sbin/dhclient', "-q", "-d", # force dhclient to always run as a foreground process '-1', '-lf', '/tmp/dhclient.leases', '-sf', '/usr/share/univention-system-setup/dhclient-script-wrapper', '-e', 'dhclientscript_outputfile=%s' % (tmp.name,), interface, ) MODULE.info('Launch dhclient query via command: %s', cmd) subprocess.call(cmd, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) for line in tmp: key, _, value = line.strip().partition('=') dhcp[key] = value[1:-1] MODULE.info('dhclient returned the following values: %r', dhcp) return dhcp
[docs] def get_apps(no_cache: bool = False) -> list[dict[str, Any]]: if no_cache: AppCache().clear_cache() get = get_action('get') return [get.to_dict(app) for app in Apps().get_all_apps() if app.is_ucs_component()]
[docs] def is_proxy(proxy: str) -> bool: return not (proxy and proxy not in {'http://', 'https://'} and not proxy.startswith('http://') and not proxy.startswith('https://'))
[docs] def is_ipaddr(addr: str) -> bool: try: ipaddress.ip_address('%s' % (addr,)) except ValueError: return False return True
[docs] def is_ipv4addr(addr: str) -> bool: try: ipaddress.IPv4Address('%s' % (addr,)) except ValueError: return False return True
[docs] def is_ipv4netmask(addr_netmask: str) -> bool: try: ipaddress.IPv4Network('%s' % (addr_netmask,), False) except (ValueError, ipaddress.NetmaskValueError, ipaddress.AddressValueError): return False return True
[docs] def is_ipv6addr(addr: str) -> bool: try: ipaddress.IPv6Address('%s' % (addr,)) except ValueError: return False return True
[docs] def is_ipv6netmask(addr_netmask: str) -> bool: try: ipaddress.IPv6Network('%s' % (addr_netmask,), False) except (ValueError, ipaddress.NetmaskValueError, ipaddress.AddressValueError): return False return True
[docs] def is_hostname(hostname: str) -> bool: return is_hostname.RE.match(hostname) is not None # type: ignore
is_hostname.RE = re.compile("^[a-z0-9]([a-z0-9-]*[a-z0-9])?$", re.IGNORECASE) # type: ignore
[docs] def is_domainname(domainname: str) -> bool: """ Check if domainname is a valid DNS domainname according to RFC952/1123. >>> is_domainname('foo') True >>> is_domainname('f00.bar') True >>> is_domainname('-f.bar') False >>> is_domainname('f-.bar') False >>> is_domainname('f..bar') False >>> is_domainname('#.bar') False >>> is_domainname('1234567890123456789012345678901234567890123456789012345678901234.bar') False """ return all(is_domainname.RE.match(_) for _ in domainname.split('.')) # type: ignore
is_domainname.RE = re.compile(r'^[a-z0-9]([a-z0-9-]{0,61}[a-z0-9])?$', re.I) # type: ignore
[docs] def is_windowsdomainname(domainname: str) -> bool: return is_windowsdomainname.RE.match(domainname) is not None and len(domainname) < 14 # type: ignore
is_windowsdomainname.RE = re.compile(r"^[A-Z](?:[A-Z0-9-]*[A-Z0-9])?$") # type: ignore
[docs] def domain2windowdomain(domainname: str) -> str: windomain = domainname.split('.', 1)[0].upper() invalidChars = re.compile(r"^[^A-Z]*([A-Z0-9-]*?)[^A-Z0-9]*$") match = invalidChars.match(windomain) windomain = match.group(1) if match else '' windomain = windomain[:15] # enforce netbios limit if not windomain: # fallback name windomain = 'UCSDOMAIN' return windomain
[docs] def is_domaincontroller(domaincontroller: str) -> bool: return is_domaincontroller.RE.match(domaincontroller) is not None # type: ignore
is_domaincontroller.RE = re.compile(r"^[a-zA-Z].*\..*$") # type: ignore
[docs] def is_ldap_base(ldap_base: str) -> bool: """ >>> is_ldap_base('dc=foo,dc=bar') True >>> is_ldap_base('cn=foo,c=De,dc=foo,dc=bar') True >>> is_ldap_base('cn=foo,c=DED,dc=foo,dc=bar') False >>> is_ldap_base('dc=foo,') False >>> is_ldap_base(',dc=bar') False >>> is_ldap_base('dc=foo') False >>> is_ldap_base('cn=foo,c=ZZ,dc=foo,dc=bar') False """ match = is_ldap_base.RE.match(ldap_base) # type: ignore return match is not None and not any(part.upper().startswith('C=') and part.upper()[2:] not in is_ldap_base.CC for part in ldap.dn.explode_dn(ldap_base)) # type: ignore
is_ldap_base.RE = re.compile('^(c=[A-Za-z]{2}|(dc|cn|o|l)=[a-zA-Z0-9-]+)(,(c=[A-Za-z]{2}|((dc|cn|o|l)=[a-zA-Z0-9-]+)))+$') # type: ignore is_ldap_base.CC = ['AD', 'AE', 'AF', 'AG', 'AI', 'AL', 'AM', 'AO', 'AQ', 'AR', 'AS', 'AT', 'AU', 'AW', 'AX', 'AZ', 'BA', 'BB', 'BD', 'BE', 'BF', 'BG', 'BH', 'BI', 'BJ', 'BL', 'BM', 'BN', 'BO', 'BQ', 'BR', 'BS', 'BT', 'BV', 'BW', 'BY', 'BZ', 'CA', 'CC', 'CD', 'CF', 'CG', 'CH', 'CI', 'CK', 'CL', 'CM', 'CN', 'CO', 'CR', 'CU', 'CV', 'CW', 'CX', 'CY', 'CZ', 'DE', 'DJ', 'DK', 'DM', 'DO', 'DZ', 'EC', 'EE', 'EG', 'EH', 'ER', 'ES', 'ET', 'FI', 'FJ', 'FK', 'FM', 'FO', 'FR', 'GA', 'GB', 'GD', 'GE', 'GF', 'GG', 'GH', 'GI', 'GL', 'GM', 'GN', 'GP', 'GQ', 'GR', 'GS', 'GT', 'GU', 'GW', 'GY', 'HK', 'HM', 'HN', 'HR', 'HT', 'HU', 'ID', 'IE', 'IL', 'IM', 'IN', 'IO', 'IQ', 'IR', 'IS', 'IT', 'JE', 'JM', 'JO', 'JP', 'KE', 'KG', 'KH', 'KI', 'KM', 'KN', 'KP', 'KR', 'KW', 'KY', 'KZ', 'LA', 'LB', 'LC', 'LI', 'LK', 'LR', 'LS', 'LT', 'LU', 'LV', 'LY', 'MA', 'MC', 'MD', 'ME', 'MF', 'MG', 'MH', 'MK', 'ML', 'MM', 'MN', 'MO', 'MP', 'MQ', 'MR', 'MS', 'MT', 'MU', 'MV', 'MW', 'MX', 'MY', 'MZ', 'NA', 'NC', 'NE', 'NF', 'NG', 'NI', 'NL', 'NO', 'NP', 'NR', 'NU', 'NZ', 'OM', 'PA', 'PE', 'PF', 'PG', 'PH', 'PK', 'PL', 'PM', 'PN', 'PR', 'PS', 'PT', 'PW', 'PY', 'QA', 'RE', 'RO', 'RS', 'RU', 'RW', 'SA', 'SB', 'SC', 'SD', 'SE', 'SG', 'SH', 'SI', 'SJ', 'SK', 'SL', 'SM', 'SN', 'SO', 'SR', 'SS', 'ST', 'SV', 'SX', 'SY', 'SZ', 'TC', 'TD', 'TF', 'TG', 'TH', 'TJ', 'TK', 'TL', 'TM', 'TN', 'TO', 'TR', 'TT', 'TV', 'TW', 'TZ', 'UA', 'UG', 'UM', 'US', 'UY', 'UZ', 'VA', 'VC', 'VE', 'VG', 'VI', 'VN', 'VU', 'WF', 'WS', 'YE', 'YT', 'ZA', 'ZM', 'ZW'] # type: ignore # new defined methods
[docs] def is_ascii(string: str) -> bool: try: string.encode("ascii") return True except UnicodeEncodeError: return False
def _get_dns_resolver(nameserver: str) -> dns.resolver.Resolver: resolver = dns.resolver.Resolver() resolver.lifetime = 10 # make sure that we get an early timeout resolver.nameservers = [nameserver] return resolver
[docs] def is_ucs_domain(nameserver: str, domain: str) -> bool: return bool(get_ucs_domaincontroller_master_query(nameserver, domain))
[docs] def get_ucs_domaincontroller_master_query(nameserver: str, domain: str) -> dns.resolver.Answer | None: if not nameserver or not domain: return None # register nameserver resolver = _get_dns_resolver(nameserver) # perform a SRV lookup try: return resolver.query('_domaincontroller_master._tcp.%s.' % domain, 'SRV') except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers): MODULE.warning('No valid UCS domain (%s) at nameserver %s!', domain, nameserver) except dns.exception.Timeout as exc: MODULE.warning('Lookup for Primary Directory Node record at nameserver %s timed out: %s', nameserver, exc) except dns.exception.DNSException: MODULE.exception('DNS Exception') return None
[docs] def resolve_domaincontroller_master_srv_record(nameserver: str, domain: str) -> bool: response = get_ucs_domaincontroller_master_query(nameserver, domain) if not response: return False try: return response[0].target.to_text().rstrip('.') except LookupError: return False
[docs] def is_ssh_reachable(host: str) -> bool: if not host: return False s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: # TODO: timeout? s.connect((host, 22)) return True except OSError: pass finally: try: s.close() except OSError: pass return False
[docs] def get_ucs_domain(nameserver: str) -> str: domain = get_domain(nameserver) if not is_ucs_domain(nameserver, domain): return "" return domain
[docs] def get_domain(nameserver: str) -> str: fqdn = get_fqdn(nameserver) or "" _, _, domain = fqdn.partition(".") return domain
[docs] def get_fqdn(nameserver: str) -> str | None: # register nameserver resolver = _get_dns_resolver(nameserver) # perform a reverse lookup try: reverse_address = dns.reversename.from_address(nameserver) MODULE.info('Found reverse address: %s', reverse_address) reverse_lookup = resolver.query(reverse_address, 'PTR') if not len(reverse_lookup): return None fqdn = reverse_lookup[0] parts = [i.decode('ASCII') for i in fqdn.target.labels if i] domain = '.'.join(parts) return domain except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers) as exc: MODULE.warning('Lookup for nameserver %s failed: %s %s', nameserver, type(exc).__name__, exc) except dns.exception.Timeout as exc: MODULE.warning('Lookup for nameserver %s timed out: %s', nameserver, exc) except dns.exception.DNSException: MODULE.exception('DNS Exception') return None
[docs] def get_available_locales(pattern: Pattern[str], category: str = 'language_en') -> list[dict[str, str]] | None: """Return a list of all available locales.""" try: fsupported = open('/usr/share/i18n/SUPPORTED') flanguages = open('/usr/share/univention-system-setup/locale/languagelist') except Exception: MODULE.error('Cannot find locale data for languages in /usr/share/univention-system-setup/locale') return None # get all locales that are supported rsupported = csv.reader(fsupported, delimiter=' ') supportedLocales = {'C': True} for ilocale in rsupported: # we only support UTF-8 if ilocale[1] != 'UTF-8': continue # get the locale m = RE_LOCALE.match(ilocale[0]) if m: supportedLocales[m.groups()[0]] = True column = {'langcode': 0, 'language_en': 1, 'language': 2, 'countrycode': 4, 'fallbacklocale': 5}.get(category, 1) # open all languages rlanguages = csv.reader(flanguages, delimiter=';') locales = [] for ilang in rlanguages: if ilang[0].startswith('#'): continue if not pattern.match(ilang[column]): continue # each language might be spoken in several countries ipath = '/usr/share/univention-system-setup/locale/short-list/%s.short' % ilang[0] if os.path.exists(ipath): try: # open the short list with countries belonging to the language fshort = open(ipath) rshort = csv.reader(fshort, delimiter='\t') # create for each country a locale entry for jcountry in rshort: code = '%s_%s' % (ilang[0], jcountry[0]) if code in supportedLocales: locales.append({ 'id': '%s.UTF-8:UTF-8' % code, 'label': '%s (%s)' % (ilang[1], jcountry[2]), }) continue except Exception: pass # get the locale code code = ilang[0] if code.find('_') < 0 and code != 'C': # no underscore -> we need to build the locale ourself code = '%s_%s' % (ilang[0], ilang[4]) # final entry if code in supportedLocales: locales.append({ 'id': '%s.UTF-8:UTF-8' % code, 'label': ilang[1], }) return locales
_city_data = None
[docs] def get_city_data() -> Any: global _city_data if not _city_data: with open(CITY_DATA_PATH) as infile: _city_data = json.load(infile) return _city_data
_country_data = None
[docs] def get_country_data() -> Any: global _country_data if not _country_data: with open(COUNTRY_DATA_PATH) as infile: _country_data = json.load(infile) return _country_data
[docs] def get_random_nameserver(country: Mapping[str, Any]) -> dict[str, str | None]: ipv4_servers = country.get('ipv4') or country.get('ipv4_erroneous') or [None] ipv6_servers = country.get('ipv6') or country.get('ipv6_erroneous') or [None] return { "ipv4_nameserver": random.choice(ipv4_servers), "ipv6_nameserver": random.choice(ipv6_servers), }
[docs] def check_credentials_ad(nameserver: str, address: str, username: str, password: str) -> str: try: ad_domain_info = lookup_adds_dc(address, ucr={'nameserver1': nameserver}) check_connection(ad_domain_info, username, password) do_time_sync(address) check_ad_account(ad_domain_info, username, password) except failedADConnect: # Not checked... no AD! raise UMC_Error(_('The connection to the Active Directory server failed. Please recheck the address.')) except connectionFailed: # checked: failed! raise UMC_Error(_('The connection to the Active Directory server was refused. Please recheck the password.')) except notDomainAdminInAD: # check_ad_account() # checked: Not a Domain Administrator! raise UMC_Error(_("The given user is not member of the Domain Admins group in Active Directory. This is a requirement for the Active Directory domain join.")) else: return ad_domain_info['Domain']