Source code for univention.management.console.modules.setup.netconf.conditions

# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""Univention Setup: network configuration conditions"""

import os
from abc import ABCMeta
from collections.abc import Iterator

from ldap import LDAPError
from ldap.filter import filter_format

from univention.config_registry.interfaces import Interfaces
from univention.management.console.modules.setup.netconf import Phase, SkipPhase
from univention.uldap import getMachineConnection


[docs] class AddressChange(Phase, metaclass=ABCMeta): """Check for at least one removed or added address."""
[docs] def check(self) -> None: super().check() old_ipv4s = {_.ip for _ in self.changeset.old_ipv4s} new_ipv4s = {_.ip for _ in self.changeset.new_ipv4s} old_ipv6s = {_.ip for _ in self.changeset.old_ipv6s} new_ipv6s = {_.ip for _ in self.changeset.new_ipv6s} if old_ipv4s == new_ipv4s and old_ipv6s == new_ipv6s: raise SkipPhase("No address change")
[docs] class Server(Phase, metaclass=ABCMeta): """Check server role for being a UCS server."""
[docs] def check(self) -> None: super().check() role = self.changeset.ucr.get("server/role") if role not in ( "domaincontroller_master", "domaincontroller_backup", "domaincontroller_slave", "memberserver", ): raise SkipPhase("Wrong server/role")
[docs] class Executable(Phase, metaclass=ABCMeta): """Check executable exists.""" executable = ""
[docs] def check(self) -> None: super().check() if not os.path.exists(self.executable): raise SkipPhase("Missing executable %s" % (self.executable,))
[docs] class Dhcp(Phase, metaclass=ABCMeta): """Check for interfaces using DHCP.""" @property def old_dhcps(self) -> set[str]: return set(self._find_dhcp_interfaces(self.changeset.old_interfaces)) @property def new_dhcps(self) -> set[str]: return set(self._find_dhcp_interfaces(self.changeset.new_interfaces)) @staticmethod def _find_dhcp_interfaces(interfaces: Interfaces) -> Iterator[str]: for name, iface in interfaces.ipv4_interfaces: if iface.type in ("dhcp", "dynamic"): yield name
[docs] class NotNetworkOnly(Phase, metaclass=ABCMeta): """Skip when not in network only mode."""
[docs] def check(self) -> None: super().check() if self.changeset.options.network_only: raise SkipPhase("Network only mode")
[docs] class Ldap(Phase, metaclass=ABCMeta): """Check LDAP server is available.""" binddn = None bindpwd = None available = None
[docs] def check(self) -> None: super().check() if self.available is None: self.load_state() if not self.available: raise SkipPhase("Missing LDAP")
[docs] def load_state(self) -> None: self.check_available() if self.available: self.load_credentials()
[docs] def check_available(self) -> None: self.available = not os.path.exists("/var/run/univention-system-setup.ldap")
[docs] def load_credentials(self) -> None: if self.is_master_or_backup(): self.load_admin_credentials() else: self.load_remote_credentials()
[docs] def is_master(self) -> bool: role = self.changeset.ucr.get("server/role") return role == "domaincontroller_master"
[docs] def is_master_or_backup(self) -> bool: role = self.changeset.ucr.get("server/role") return role in ( "domaincontroller_master", "domaincontroller_backup", )
[docs] def load_admin_credentials(self) -> None: self.binddn = "cn=admin,%(ldap/base)s" % self.changeset.ucr try: self.bindpwd = open("/etc/ldap.secret").read() except OSError: self.available = False
[docs] def load_remote_credentials(self) -> None: try: username = self.changeset.profile["ldap_username"] self.bindpwd = self.changeset.profile["ldap_password"] self.lookup_user(username) except KeyError: self.available = False
[docs] def lookup_user(self, username: str) -> None: try: ldap = getMachineConnection(ldap_master=True) ldap_filter = filter_format( "(&(objectClass=person)(uid=%s))", (username,), ) result = ldap.searchDn(ldap_filter) self.binddn = result[0] except LDAPError as ex: self.logger.warning("Failed LDAP search for '%s': %s", username, ex) self.available = False