Source code for univention.management.console.modules.setup.netconf.common

# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""Univention Setup: network configuration abstract common classes"""

import os
from abc import ABCMeta
from ipaddress import IPv4Interface, IPv4Network, IPv6Interface, IPv6Network

from univention.admin import uldap
from univention.management.console.modules.setup.netconf import ChangeSet
from univention.management.console.modules.setup.netconf.conditions import AddressChange, Executable, Ldap


[docs] class RestartService(Executable, metaclass=ABCMeta): """Helper to restart a single service.""" service = "" PREFIX = "/etc/init.d" @property def executable(self) -> str: return os.path.join(self.PREFIX, self.service)
[docs] def pre(self) -> None: super().pre() self.call(["systemctl", "stop", self.service])
[docs] def post(self) -> None: super().pre() self.call(["systemctl", "start", self.service])
[docs] class AddressMap(AddressChange, metaclass=ABCMeta): """Helper to provide a mapping from old addresses to new addresses.""" def __init__(self, changeset: ChangeSet) -> None: super().__init__(changeset) self.old_primary, self.new_primary = ( iface.get_default_ip_address() for iface in ( self.changeset.old_interfaces, self.changeset.new_interfaces, ) ) self.net_changes = self._map_ip() self.ip_mapping = self._get_address_mapping() def _map_ip(self) -> dict[IPv4Interface | IPv6Interface, IPv4Interface | IPv6Interface | None]: ipv4_changes = self.ipv4_changes() ipv6_changes = self.ipv6_changes() net_changes: dict[IPv4Interface | IPv6Interface, IPv4Interface | IPv6Interface | None] = {} net_changes.update(ipv4_changes) # type: ignore net_changes.update(ipv6_changes) # type: ignore return net_changes
[docs] def ipv4_changes(self) -> dict[IPv4Interface, IPv4Interface | None]: ipv4s = { name: iface.ipv4_address() for name, iface in self.changeset.new_interfaces.ipv4_interfaces } default = self.changeset.new_interfaces.get_default_ipv4_address() mapping = {} for name, iface in self.changeset.old_interfaces.ipv4_interfaces: old_addr = iface.ipv4_address() new_addr = ipv4s.get(name, default) if new_addr is None or old_addr.ip != new_addr.ip: mapping[old_addr] = new_addr return mapping
[docs] def ipv6_changes(self) -> dict[IPv6Interface, IPv6Interface | None]: ipv6s = { (iface.name, name): iface.ipv6_address(name) for (iface, name) in self.changeset.new_interfaces.ipv6_interfaces } default = self.changeset.new_interfaces.get_default_ipv6_address() mapping = {} for iface, name in self.changeset.old_interfaces.ipv6_interfaces: old_addr = iface.ipv6_address(name) new_addr = ipv6s.get((iface.name, name), default) if new_addr is None or old_addr.ip != new_addr.ip: mapping[old_addr] = new_addr return mapping
def _get_address_mapping(self) -> dict[str, str | None]: mapping = { str(old_ip.ip): str(new_ip.ip) if new_ip else None for (old_ip, new_ip) in self.net_changes.items() } return mapping
[docs] class LdapChange(AddressChange, Ldap, metaclass=ABCMeta): """Helper to provide access to LDAP through UDM.""" def __init__(self, changeset: ChangeSet) -> None: super().__init__(changeset) self.ldap = None self.position = None
[docs] def open_ldap(self) -> None: ldap_host = self.changeset.ucr["ldap/master"] ldap_base = self.changeset.ucr["ldap/base"] self.ldap = uldap.access( host=ldap_host, base=ldap_base, binddn=self.binddn, bindpw=self.bindpwd, ) self.position = uldap.position(ldap_base)
[docs] def convert_udm_subnet_to_network(subnet: str) -> IPv4Network | IPv6Network: if ":" in subnet: return convert_udm_subnet_to_ipv6_network(subnet) else: return convert_udm_subnet_to_ipv4_network(subnet)
[docs] def convert_udm_subnet_to_ipv4_network(subnet: str) -> IPv4Network: octets = subnet.split('.') count = len(octets) assert 1 <= count <= 4 prefix_length = 8 * count octets += ["0"] * (4 - count) address = '.'.join(octets) return IPv4Network("%s/%d" % (address, prefix_length), False)
[docs] def convert_udm_subnet_to_ipv6_network(subnet: str) -> IPv6Network: prefix = subnet.replace(":", "") count = len(prefix) assert 1 <= count <= 32 prefix_length = 4 * count address = subnet if count <= 28: address += "::" return IPv6Network("%s/%d" % (address, prefix_length), False)