Source code for ucsschool.lib.models.dhcp

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# UCS@school python lib: models
#
# Copyright 2014-2025 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.

import ipaddress
from typing import TYPE_CHECKING, List, Optional  # noqa: F401

from ldap.dn import dn2str, str2dn
from ldap.filter import filter_format

from .attributes import (
    Attribute,
    BroadcastAddress,
    DHCPServerName,
    DHCPServiceAttribute,
    DHCPServiceName,
    DHCPSubnetMask,
    DHCPSubnetName,
)
from .base import UCSSchoolHelperAbstractClass
from .utils import _, ucr

if TYPE_CHECKING:
    from univention.admin.uldap import access as LoType  # noqa: F401

    from .base import UdmObject  # noqa: F401


[docs] class DHCPService(UCSSchoolHelperAbstractClass): name = DHCPServiceName(_("Service")) # type: str hostname = Attribute(_("Hostname")) # type: str domainname = Attribute(_("Domain")) # type: str
[docs] def do_create(self, udm_obj, lo): # type: (UdmObject, LoType) -> None udm_obj.options.append("options") udm_obj["option"] = ['wpad "http://%s.%s/proxy.pac"' % (self.hostname, self.domainname)] super(DHCPService, self).do_create(udm_obj, lo)
[docs] @classmethod def get_container(cls, school): # type: (str) -> str return cls.get_search_base(school).dhcp
[docs] def add_server(self, dc_name, lo, force_dhcp_server_move=False): # type: (str, LoType, Optional[bool]) -> None """ Create the given DHCP server within the DHCP service. If the DHCP server object already exists somewhere else within the LDAP tree, it may be moved to the DHCP service. PLEASE NOTE: In multiserver environments an existing DHCP server object is always moved to the current DHCP service. In single server environments the DHCP server object is *ONLY* moved, if the UCR variable dhcpd/ldap/base matches to the current DHCP service. """ from ucsschool.lib.models.school import School # create dhcp-server if not exsistant school = School.cache(self.school) dhcp_server = DHCPServer(name=dc_name, school=school.name, dhcp_service=self) existing_dhcp_server_dn = DHCPServer.find_any_dn_with_name(dc_name, lo) if existing_dhcp_server_dn: self.logger.info("DHCP server %s exists!", existing_dhcp_server_dn) old_dhcp_server_container = lo.parentDn(existing_dhcp_server_dn) dhcpd_ldap_base = ucr.get("dhcpd/ldap/base", "") # only move if # - forced via kwargs OR # - in multiserver environments OR # - desired dhcp server DN matches with UCR config if ( force_dhcp_server_move or not ucr.is_true("ucsschool/singlemaster", False) or dhcp_server.dn.lower().endswith(",%s" % dhcpd_ldap_base.lower()) ): # move if existing DN does not match with desired DN if existing_dhcp_server_dn != dhcp_server.dn: # move existing dhcp server object to OU/DHCP service self.logger.info( "DHCP server %s not in school %r! Removing and creating new one at %s!", existing_dhcp_server_dn, school, dhcp_server.dn, ) old_superordinate = DHCPServer.find_udm_superordinate(existing_dhcp_server_dn, lo) old_dhcp_server = DHCPServer.from_dn( existing_dhcp_server_dn, None, lo, superordinate=old_superordinate ) old_dhcp_server.remove(lo) dhcp_server.create(lo) # copy subnets # find local interfaces interfaces = [] for interface_name in { key.split("/")[1] for key in ucr.keys() if key.startswith("interfaces/eth") }: try: address = ipaddress.IPv4Network( u"%s/%s" % ( ucr["interfaces/%s/address" % interface_name], ucr["interfaces/%s/netmask" % interface_name], ), strict=False, ) interfaces.append(address) except ValueError as exc: self.logger.info("Skipping invalid interface %s:\n%s", interface_name, exc) subnet_dns = DHCPSubnet.find_all_dns_below_base(old_dhcp_server_container, lo) for subnet_dn in subnet_dns: dhcp_service = DHCPSubnet.find_udm_superordinate(subnet_dn, lo) dhcp_subnet = DHCPSubnet.from_dn(subnet_dn, self.school, lo, superordinate=dhcp_service) subnet = dhcp_subnet.get_ipv4_subnet() if subnet in interfaces: # subnet matches any local subnet self.logger.info("Creating new DHCPSubnet from %s", subnet_dn) new_dhcp_subnet = DHCPSubnet(**dhcp_subnet.to_dict()) new_dhcp_subnet.dhcp_service = self new_dhcp_subnet.position = new_dhcp_subnet.get_own_container() new_dhcp_subnet.set_dn(new_dhcp_subnet.dn) new_dhcp_subnet.create(lo) else: self.logger.info("Skipping non-local subnet %s", subnet) else: self.logger.info("No DHCP server named %s found! Creating new one!", dc_name) dhcp_server.create(lo)
[docs] def get_servers(self, lo): # type: (LoType) -> List[DHCPServer] ret = [] for dhcp_server in DHCPServer.get_all(lo, self.school, superordinate=self): dhcp_server.dhcp_service = self ret.append(dhcp_server) return ret
[docs] class Meta: udm_module = "dhcp/service"
[docs] class AnyDHCPService(DHCPService): school = None
[docs] @classmethod def get_container(cls, school=None): # type: (str) -> str return ucr.get("ldap/base")
[docs] def get_servers(self, lo): # type: (LoType) -> List[DHCPServer] old_name = self.name old_position = self.position old_dn = str2dn(self.old_dn or self.dn) self.position = dn2str(old_dn[1:]) self.name = old_dn[0][0][1] try: return super(AnyDHCPService, self).get_servers(lo) finally: self.position = old_position self.name = old_name
[docs] class DHCPServer(UCSSchoolHelperAbstractClass): name = DHCPServerName(_("Server name")) # type: str dhcp_service = DHCPServiceAttribute(_("DHCP service"), required=True) # type: DHCPService
[docs] def get_own_container(self): # type: () -> str if self.dhcp_service: return self.dhcp_service.dn
[docs] @classmethod def get_container(cls, school): # type: (str) -> str return cls.get_search_base(school).dhcp
[docs] def get_superordinate(self, lo): # type: (LoType) -> UdmObject if self.dhcp_service: return self.dhcp_service.get_udm_object(lo)
[docs] @classmethod def find_any_dn_with_name(cls, name, lo): # type: (str, LoType) -> str cls.logger.debug("Searching first dhcpServer with cn=%s", name) try: dn = lo.searchDn( filter=filter_format("(&(objectClass=dhcpServer)(cn=%s))", [name]), base=ucr.get("ldap/base"), )[0] except IndexError: dn = None cls.logger.debug("... %r found", dn) return dn
[docs] class Meta: udm_module = "dhcp/server" name_is_unique = True
[docs] class DHCPSubnet(UCSSchoolHelperAbstractClass): name = DHCPSubnetName(_("Subnet address")) # type: str subnet_mask = DHCPSubnetMask(_("Netmask")) # type: str broadcast = BroadcastAddress(_("Broadcast")) # type: str dhcp_service = DHCPServiceAttribute(_("DHCP service"), required=True) # type: DHCPService
[docs] def get_own_container(self): # type: () -> str if self.dhcp_service: return self.dhcp_service.dn
[docs] @classmethod def get_container(cls, school): # type: (str) -> str return cls.get_search_base(school).dhcp
[docs] def get_superordinate(self, lo): # type: (LoType) -> UdmObject if self.dhcp_service: return self.dhcp_service.get_udm_object(lo)
[docs] def get_ipv4_subnet(self): # type: () -> ipaddress.IPv4Network network_str = u"%s/%s" % (self.name, self.subnet_mask) try: return ipaddress.IPv4Network(network_str, strict=False) except ValueError as exc: self.logger.info("%r is no valid IPv4Network:\n%s", network_str, exc)
[docs] @classmethod def find_all_dns_below_base(cls, dn, lo): # type: (str, LoType) -> List[str] cls.logger.debug("Searching all univentionDhcpSubnet in %r", dn) return lo.searchDn(filter="(objectClass=univentionDhcpSubnet)", base=dn)
[docs] class Meta: udm_module = "dhcp/subnet"