Source code for univention.admin.handlers

# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""
This module is the base for all Univention Directory Management handler modules.
A UDM handler represents an abstraction of an LDAP object.

.. seealso:: :mod:`univention.admin.uldap`
.. seealso:: :mod:`univention.admin.modules`
.. seealso:: :mod:`univention.admin.objects`
.. seealso:: :mod:`univention.admin.mapping`
.. seealso:: :mod:`univention.admin.syntax`
.. seealso:: :mod:`univention.admin.uexceptions`

"""

import copy
import inspect
import re
import sys
import time
import traceback
import uuid
import warnings
from collections.abc import Iterable
from ipaddress import IPv4Address, IPv6Address, ip_address, ip_network
from typing import TYPE_CHECKING, Any, Self, overload

import ldap
from ldap.controls.readentry import PostReadControl
from ldap.controls.simple import RelaxRulesControl
from ldap.dn import dn2str, escape_dn_chars, explode_rdn, str2dn
from ldap.filter import filter_format

import univention.admin.allocators
import univention.admin.blocklist
import univention.admin.filter
import univention.admin.localization
import univention.admin.mapping
import univention.admin.modules
import univention.admin.recyclebin
import univention.admin.syntax
import univention.admin.uexceptions
import univention.admin.uldap
from univention.admin import configRegistry
from univention.admindiary.client import write_event
from univention.admindiary.events import DiaryEvent
from univention.dn import DN


if TYPE_CHECKING:
    import univention.admin.handlers.dns.forward_zone
    import univention.admin.handlers.dns.reverse_zone
    import univention.admin.handlers.networks.network

try:
    from typing import Literal

    _HookName = Literal[
        'hook_open',
        'hook_ldap_pre_create',
        'hook_ldap_addlist',
        'hook_ldap_post_create',
        'hook_ldap_pre_modify',
        'hook_ldap_modlist',
        'hook_ldap_post_modify',
        'hook_ldap_pre_remove',
        'hook_ldap_post_remove',
    ]
except ImportError:
    pass


from univention.admin.log import log


try:
    import univention.lib.admember

    _prevent_to_change_ad_properties = univention.lib.admember.is_localhost_in_admember_mode()
except ImportError:
    log.warning('Failed to import univention.lib.admember')
    _prevent_to_change_ad_properties = False

_Attributes = dict[str, list[bytes]]
_Properties = dict[str, str | list[str]]
_Encoding = tuple[str, ...]

translation = univention.admin.localization.translation('univention/admin/handlers')
_ = translation.translate

# global caching variable
if configRegistry.is_true('directory/manager/samba3/legacy', False):
    s4connector_present: bool | None = False
elif configRegistry.is_false('directory/manager/samba3/legacy', False):
    s4connector_present = True
else:
    s4connector_present = None


[docs] def disable_ad_restrictions(disable: bool = True) -> None: global _prevent_to_change_ad_properties _prevent_to_change_ad_properties = disable
[docs] class simpleLdap: """ The base class for all UDM handler modules. :param co: *deprecated* parameter for a config. Please pass `None`. :param lo: A required LDAP connection object which is used for all LDAP operations (search, create, modify). It should be bound to a user which has the LDAP permissions to do the required operations. :param position: The LDAP container where a new object should be created in, or `None` for existing objects. :param dn: The DN of an existing LDAP object. If a object should be created the DN must not be passed here! :param superordinate: The superordinate object of this object. Can be omitted. It is automatically searched by the given DN or position. :param attributes: The LDAP attributes of the LDAP object as dict. This should by default be omitted. To save performance when an LDAP search is done this can be used, e.g. by the lookup() method. If given make sure the dict contains all attributes which are required by :meth:`_ldap_attributes`. The following attributes hold information about the state of this object: :ivar str dn: A LDAP distinguished name (DN) of this object (if exists, otherwise None) :ivar str module: the UDM handlers name (e.g. users/user) :ivar dict oldattr: The LDAP attributes of this object as dict. If the object does not exists the dict is empty. :ivar dict info: A internal dictionary which holds the values for every property. :ivar list options: A list of UDM options which are enabled on this object. Enabling options causes specific object classes and attributes to be added to the object. :ivar list policies: A list of DNs containing references to assigned policies. :ivar dict properties: a dict which maps all UDM properties to :class:`univention.admin.property` instances. :ivar univention.admin.mapping.mapping mapping: A :class:`univention.admin.mapping.mapping` instance containing a mapping of UDM property names to LDAP attribute names. :ivar dict oldinfo: A private copy of :attr:`info` containing the original properties which were set during object loading. This is only set by :func:`univention.admin.handlers.simpleLdap.save`. :ivar list old_options: A private copy of :attr:`options` containing the original options which were set during object loading. This is only set by :func:`univention.admin.handlers.simpleLdap.save`. :ivar list oldpolicies: A private copy of :attr:`policies` containing the original policies which were set during object loading. This is only set by :func:`univention.admin.handlers.simpleLdap.save`. .. caution:: Do not operate on :attr:`info` directly because this would bypass syntax validations. This object should be used like a dict. Properties should be assigned in the following way: obj['name'] = 'value' """ module = '' # the name of the module use_performant_ldap_search_filter = False ldap_base = configRegistry['ldap/base'] _lo_machine_primary = None default_containers_attribute_name = None def __init__( self, co: None, lo: univention.admin.uldap.access, position: univention.admin.uldap.position | None, dn: str = '', superordinate: Self | None = None, attributes: _Attributes | None = None, ) -> None: self.log = None self._exists = False self.co = None if isinstance(lo, univention.admin.uldap.access): self.lo: univention.admin.uldap.access = lo elif isinstance(lo, univention.uldap.access): log.error('using univention.uldap.access instance is deprecated. Use univention.admin.uldap.access instead.') warnings.warn('using univention.uldap.access instance is deprecated. Use univention.admin.uldap.access instead.', DeprecationWarning, stacklevel=3) self.lo = univention.admin.uldap.access(lo=lo) if configRegistry.is_true('directory/manager/type-checking/strict'): raise TypeError('Expect univention.admin.uldap.access!') else: raise TypeError('lo must be instance of univention.admin.uldap.access.') self.dn: str | None = dn.decode('utf-8') if isinstance(dn, bytes) else dn self.old_dn: str | None = self.dn self.superordinate: simpleLdap | None = superordinate self._set_log() self.set_defaults = not self.dn # this object is newly created and so we can use the default values self.position: univention.admin.uldap.position = position or univention.admin.uldap.position(self.ldap_base) if not position and self.dn: self.position.setDn(self.dn) self.info: _Properties = {} self.oldinfo: _Properties = {} self.policies: list[str] = [] self.oldpolicies: list[str] = [] self.policyObjects: dict[str, simplePolicy] = {} self.__no_default: list[str] = [] self._open = False self.options: list[str] = [] self.old_options: list[str] = [] self.alloc: list[tuple[str, str] | tuple[str, str, bool]] = [] # name,value,updateLastUsedValue # s4connector_present is a global caching variable than can be # None ==> ldap has not been checked for servers with service "S4 Connector" # True ==> at least one server with IP address (aRecord) is present # False ==> no server is present global s4connector_present if s4connector_present is None: s4connector_present = False searchResult = self.lo.authz_connection.searchDn( '(&(|(objectClass=univentionDomainController)(objectClass=univentionMemberServer))(univentionService=S4 Connector)(|(aRecord=*)(aAAARecord=*)))', ) s4connector_present = bool(searchResult) self.s4connector_present = s4connector_present if not univention.admin.modules.modules: self.log.warning('univention.admin.modules.update() was not called') univention.admin.modules.update() m = univention.admin.modules._get(self.module) if not hasattr(self, 'mapping'): self.mapping: univention.admin.mapping.mapping | None = getattr(m, 'mapping', None) self.oldattr: _Attributes = {} if attributes: self.oldattr = attributes elif self.dn: try: attr = self._ldap_attributes() self.oldattr = self.lo.authz_connection.get(self.dn, attr=attr, required=True) except ldap.NO_SUCH_OBJECT: raise univention.admin.uexceptions.noObject(self.dn) if self.oldattr: self._exists = True if not univention.admin.modules.virtual(self.module) and not univention.admin.modules.recognize(self.module, self.dn, self.oldattr): raise univention.admin.uexceptions.wrongObjectType('%s is not recognized as %s.' % (self.dn, self.module)) oldinfo = self.mapping.unmapValues(self.oldattr) oldinfo = self._post_unmap(oldinfo, self.oldattr) oldinfo = self._falsy_boolean_extended_attributes(oldinfo) self.info.update(oldinfo) self.policies = [x.decode('utf-8') for x in self.oldattr.get('univentionPolicyReference', [])] self.__set_options() self.save() self._validate_superordinate(False) def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) module = sys.modules.get(cls.__module__) if not module: return cls._register_univention_object_identifier_property(module) @classmethod def _register_univention_object_identifier_property(cls, module): prop = { 'univentionObjectIdentifier': univention.admin.property( short_description=_('Immutable Object Identifier'), long_description=_('Immutable attribute to track the identity of an object in UDM'), syntax=univention.admin.syntax.UUID, may_change=False, dontsearch=True, default=(lambda o, p: str(uuid.uuid4()) if configRegistry.is_true('directory/manager/object-identifier/autogeneration') else None, [], []), ), } if hasattr(module, 'property_descriptions') and 'univentionObjectIdentifier' not in module.property_descriptions: module.mapping.register('univentionObjectIdentifier', 'univentionObjectIdentifier', None, univention.admin.mapping.ListToString) module.property_descriptions.update(prop) if hasattr(module, 'default_property_descriptions') and 'univentionObjectIdentifier' not in module.default_property_descriptions: module.default_property_descriptions.update(copy.deepcopy(prop)) @property def authz(self): return self.lo.authz
[docs] def set_lo_machine_primary(self, lo: univention.admin.uldap.access) -> None: self._lo_machine_primary = lo
@property def lo_machine_primary(self) -> univention.admin.uldap.access: try: # maybe check if we have long lived connections which are invalidated?! if self._lo_machine_primary is not None and not self._lo_machine_primary.whoami(): raise univention.admin.uexceptions.base('invalid LDAP connection lo_machine_primary') except (ldap.LDAPError, univention.admin.uexceptions.base, AttributeError): self._lo_machine_primary = None if self._lo_machine_primary is None: try: simpleLdap._lo_machine_primary = univention.admin.uldap.getMachineConnection(ldap_master=True)[0] except OSError: # This is for joining UCS systems into the domain. # During join the joining system calls udm-cli via ssh on the primary # as, usally, domain admin. This account does not have read permission for machine.secret # so we use the user connection instead. # Problems: # * the join user (if not domain admins group) has no read permissions for blocklist, so he can bypass the blocklist check # * any other user with permissions to create user objects (connector's?) but without permissions to read # blocklists can bypass the blocklist check with udm-cli self._lo_machine_primary = self.lo return self._lo_machine_primary def _set_log(self): self.log = log.getChild('object').bind(type=self.module, dn=self.dn) @property def descriptions(self) -> dict[str, univention.admin.property]: return univention.admin.modules.get(self.module).property_descriptions @property def entry_uuid(self) -> str | None: """The entry UUID of the object (if object exists)""" if 'entryUUID' in self.oldattr: return self.oldattr['entryUUID'][0].decode('ASCII') @property def object_identifier(self) -> str | None: """The univentionObjectIdentifier of the object (if object exists)""" if 'univentionObjectIdentifier' in self.oldattr: return self.oldattr['univentionObjectIdentifier'][0].decode('ASCII')
[docs] def save(self) -> None: """ Saves the current internal object state as old state for later comparison when e.g. modifying this object. .. seealso:: This method should be called by :func:`univention.admin.handlers.simpleLdap.open` and after further modifications in modify() / create(). .. note:: self.oldattr is not set and must be set manually """ self.oldinfo = copy.deepcopy(self.info) self.old_dn = self.dn self.oldpolicies = copy.deepcopy(self.policies) self.options = list(set(self.options)) self.old_options = [] if self.exists(): self.old_options = copy.deepcopy(self.options)
[docs] def diff(self) -> list[tuple[str, Any, Any]]: """ Returns the difference between old and current state as a UDM modlist. :returns: A list of 3-tuples (udm-property-name, old-property-value, new-property-values). """ changes: list[tuple[str, Any, Any]] = [] for key, prop in self.descriptions.items(): null: list | None = [] if prop.multivalue else None # remove properties which are disabled by options if prop.options and not set(prop.options) & set(self.options): if self.oldinfo.get(key, null) not in (null, None): self.log.debug('key not valid (option not set)', key=key) changes.append((key, self.oldinfo[key], null)) continue if (self.oldinfo.get(key) or self.info.get(key)) and self.oldinfo.get(key, null) != self.info.get(key, null): changes.append((key, self.oldinfo.get(key, null), self.info.get(key, null))) return changes
[docs] def hasChanged(self, key: str | Iterable[str]) -> bool: """ Checks if the given attribute(s) was (were) changed. :param key: The name of a property. :returns: True if the property changed, False otherwise. """ # FIXME: key can even be nested if not isinstance(key, str): return any(self.hasChanged(i) for i in key) if (not self.oldinfo.get(key, '') or self.oldinfo[key] == ['']) and (not self.info.get(key, '') or self.info[key] == ['']): return False return not univention.admin.mapping.mapCmp(self.mapping, key, self.oldinfo.get(key, ''), self.info.get(key, ''))
[docs] def ready(self) -> None: """ Makes sure all preconditions are met before creating or modifying this object. It checks if all properties marked required are set. It checks if the superordinate is valid. :raises: :class:`univention.admin.uexceptions.insufficientInformation` """ missing = [] for name, p in self.descriptions.items(): # skip if this property is not present in the current option set if p.options and not set(p.options) & set(self.options): continue if p.required and (not self[name] or (isinstance(self[name], list) and self[name] == [''])): self.log.debug('property is required but not set.', property=name) missing.append(name) if missing: raise univention.admin.uexceptions.insufficientInformation(_('The following properties are missing:\n%s') % ('\n'.join(missing),), missing_properties=missing) # when creating a object make sure that its position is underneath of its superordinate if not self.exists() and self.position and self.superordinate and not self._ensure_dn_in_subtree(self.superordinate.dn, self.position.getDn()): raise univention.admin.uexceptions.insufficientInformation(_('The position must be in the subtree of the superordinate.')) self._validate_superordinate(True)
[docs] def has_property(self, key: str) -> bool: """ Checks if the property exists in this module and if it is enabled in the set UDM options. :param str key: The name of a property. :returns: True if the property exists and is enabled, False otherwise. """ try: p = self.descriptions[key] except KeyError: return False if p.options: return bool(set(p.options) & set(self.options)) return True
def __setitem__(self, key: str, value: Any) -> None: """ Sets or unsets the property to the given value. :param str key: The name of a property. :param value: The value to set. :raises KeyError: if the property belongs to an option, which is currently not enabled. :raises: :class:`univention.admin.uexceptions.noProperty` or :class:`KeyError` if the property does not exists or is not enabled by the UDM options. :raises: :class:`univention.admin.uexceptions.valueRequired` if the value is unset but required. :raises: :class:`univention.admin.uexceptions.valueMayNotChange` if the values cannot be modified. :raises: :class:`univention.admin.uexceptions.valueInvalidSyntax` if the value is invalid. """ def _changeable(): yield self.descriptions[key].editable if not self.descriptions[key].may_change: yield key not in self.oldinfo or self.oldinfo[key] == value # if _prevent_to_change_ad_properties: # FIXME: users.user.object.__init__ modifies firstname and lastname by hand # yield not (self.descriptions[key].readonly_when_synced and self._is_synced_object() and self.exists()) # property does not exist if not self.has_property(key): # don't set value if the option is not enabled self.log.warning('Ignoring property', func='__setitem__', property=key) try: self.descriptions[key] except KeyError: # raise univention.admin.uexceptions.noProperty(key) raise return # attribute may not be changed elif not all(_changeable()): raise univention.admin.uexceptions.valueMayNotChange(_('key=%(key)s old=%(old)s new=%(new)s') % {'key': key, 'old': self[key], 'new': value}, property=key) # required attribute may not be removed elif self.descriptions[key].required and not value: raise univention.admin.uexceptions.valueRequired(_('The property %s is required') % self.descriptions[key].short_description, property=key) # do nothing if self.info.get(key, None) == value: self.log.debug('values are identical', key=key, value=value) return if self.info.get(key, None) == self.descriptions[key].default(self): self.__no_default.append(key) if self.descriptions[key].multivalue: # make sure value is list if isinstance(value, str): value = [value] elif not isinstance(value, list): raise univention.admin.uexceptions.valueInvalidSyntax(_('The property %s must be a list') % (self.descriptions[key].short_description,), property=key) self.info[key] = [] for v in value: if not v: continue err = '' p = None try: s = self.descriptions[key].syntax p = s.parse(v) except univention.admin.uexceptions.valueError as emsg: err = str(emsg) if not p: if not err: err = '' try: raise univention.admin.uexceptions.valueInvalidSyntax('%s: %s' % (key, err), property=key) except UnicodeEncodeError: # raise fails if err contains umlauts or other non-ASCII-characters raise univention.admin.uexceptions.valueInvalidSyntax(self.descriptions[key].short_description, property=key) self.info[key].append(p) elif not value and key in self.info: del self.info[key] elif value: err = '' p = None try: s = self.descriptions[key].syntax p = s.parse(value) except univention.admin.uexceptions.valueError as e: err = str(e) if not p: if not err: err = '' try: raise univention.admin.uexceptions.valueInvalidSyntax('%s: %s' % (self.descriptions[key].short_description, err), property=key) except UnicodeEncodeError: # raise fails if err contains umlauts or other non-ASCII-characters raise univention.admin.uexceptions.valueInvalidSyntax('%s' % self.descriptions[key].short_description, property=key) self.info[key] = p def __getitem__(self, key: str) -> Any: """ Get the currently set value of the given property. :param str key: The name of a property. :returns: The currently set value. If the value is not set the default value is returned. .. warning:: this method changes the set value to the default if it is unset. For a side effect free retrieval of the value use :func:`univention.admin.handlers.simpleLdap.get`. """ if not key: return None if key in self.info: if self.descriptions[key].multivalue and not isinstance(self.info[key], list): # why isn't this correct in the first place? self.log.warning('The mapping for property is broken!', property=key) self.info[key] = [self.info[key]] return self.info[key] elif key not in self.__no_default and self.descriptions[key].editable: self.info[key] = self.descriptions[key].default(self) return self.info[key] elif self.descriptions[key].multivalue: return [] else: return None
[docs] def get(self, key: str, default: Any = None) -> Any: """ Return the currently set value of the given property. :param str key: The name of a property. :param default: The default to return if the property is not set. :returns: The currently set value. If the value is not set :attr:`default` is returned. """ return self.info.get(key, default)
def __contains__(self, key: str) -> bool: """ Checks if the property exists in this module. :param key: The name of a property. :returns: True if the property exists, False otherwise. .. warning:: This does not check if the property is also enabled by the UDM options. Use :func:`univention.admin.handlers.simpleLdap.has_property` instead. """ return key in self.descriptions
[docs] def keys(self) -> Iterable[str]: """ Returns the names of all properties this module has. :returns: The list of property names. """ return self.descriptions.keys()
[docs] def items(self) -> Iterable[tuple[str, Any]]: """ Return all items which belong to the current options - even if they are empty. :returns: a list of 2-tuples (udm-property-name, property-value). .. warning:: In certain circumstances this sets the default value for every property (e.g. when having a new object). """ return [(key, self[key]) for key in self.keys() if self.has_property(key)]
[docs] def create(self, serverctrls: list[ldap.controls.LDAPControl] | None = None, response: dict[str, Any] | None = None, ignore_license: bool = False) -> str: """ Creates the LDAP object if it does not exists by building the list of attributes (addlist) and write it to LDAP. If this call raises an exception it is necessary to instantiate a new object before trying to create it again. :raises: :class:`univention.admin.uexceptions.invalidOperation` if objects of this type do not support to be created. :raises: :class:`univention.admin.uexceptions.objectExists` if the object already exists. :raises: :class:`univention.admin.uexceptions.permissionDenied` if no permissions for creation exists :raises: :class:`univention.admin.uexceptions.insufficientInformation` :param serverctrls: a list of :py:class:`ldap.controls.LDAPControl` instances sent to the server along with the LDAP request. :param dict response: An optional dictionary to receive the server controls of the result. :param ignore_license: If the license is exceeded the modification may fail. Setting this to True causes license checks to be disabled :returns: The DN of the created object. """ if not univention.admin.modules.supports(self.module, 'add'): # if the licence is exceeded 'add' is removed from the modules operations. Blocklist objects may need to be added anyway. if not ignore_license: raise univention.admin.uexceptions.invalidOperation(_('Objects of the "%s" object type can not be created.') % (self.module,)) self.authz.is_create_allowed(self) if self.exists(): raise univention.admin.uexceptions.objectExists(self.dn) if not isinstance(response, dict): response = {} try: self._ldap_pre_ready() self.ready() dn = self._create(response=response, serverctrls=serverctrls, ignore_license=ignore_license) except Exception: self._safe_cancel() raise for c in response.get('ctrls', []): if c.controlType == PostReadControl.controlType: self.oldattr.update({k: [v if isinstance(v, bytes) else v.encode('ISO8859-1') for v in val] for k, val in c.entry.items()}) self._write_admin_diary_create() return dn
def _get_admin_diary_event(self, event_name: str) -> DiaryEvent: name = self.module.replace('/', '_').upper() return DiaryEvent.get('UDM_%s_%s' % (name, event_name)) or DiaryEvent.get('UDM_GENERIC_%s' % event_name) def _get_admin_diary_args_names(self, event: DiaryEvent) -> list[str]: return [ name for name in self.descriptions if name in event.args ] def _get_admin_diary_args(self, event: DiaryEvent) -> dict[str, Any]: args: dict[str, Any] = {'module': self.module} if event.name.startswith('UDM_GENERIC_'): value = self.dn for k, v in self.descriptions.items(): if v.identifies: value = self[k] break args['id'] = value else: for name in self._get_admin_diary_args_names(event): args[name] = str(self[name]) return args def _get_admin_diary_username(self) -> str: username = ldap.dn.explode_rdn(self.lo.binddn)[0] if username != 'cn=admin': username = username.rsplit('=', 1)[1] return username def _write_admin_diary_event(self, event: DiaryEvent, additional_args: dict[str, Any] | None = None) -> None: try: event = self._get_admin_diary_event(event) if not event: return args = self._get_admin_diary_args(event) if args: if additional_args: args.update(additional_args) username = self._get_admin_diary_username() write_event(event, args, username=username) except Exception: self.log.exception('Failed to write Admin Diary entry') def _write_admin_diary_create(self) -> None: self._write_admin_diary_event('CREATED')
[docs] def modify(self, modify_childs: bool = True, ignore_license: bool = False, serverctrls: list[ldap.controls.LDAPControl] | None = None, response: dict[str, Any] | None = None) -> str: """ Modifies the LDAP object by building the difference between the current state and the old state of this object and write this modlist to LDAP. :param modify_childs: Specifies if child objects should be modified as well. :param ignore_license: If the license is exceeded the modification may fail. Setting this to True causes license checks to be disabled :raises: :class:`univention.admin.uexceptions.invalidOperation` if objects of this type do not support to be modified. :raises: :class:`univention.admin.uexceptions.noObject` if the object does not exists. :raises: :class:`univention.admin.uexceptions.permissionDenied` if no permissions for modification exists :raises: :class:`univention.admin.uexceptions.insufficientInformation` :returns: The DN of the modified object. """ if not univention.admin.modules.supports(self.module, 'edit'): # if the licence is exceeded 'edit' is removed from the modules operations. Nevertheless we need a way to make modifications then. if not ignore_license: raise univention.admin.uexceptions.invalidOperation(_('Objects of the "%s" object type can not be modified.') % (self.module,)) self.authz.object_exists(self) self.authz.is_modify_allowed(self) if not self.exists(): raise univention.admin.uexceptions.noObject(self.dn) if not isinstance(response, dict): response = {} try: self._ldap_pre_ready() self.ready() dn = self._modify(modify_childs, ignore_license=ignore_license, response=response, serverctrls=serverctrls) except Exception: self._safe_cancel() raise for c in response.get('ctrls', []): if c.controlType == PostReadControl.controlType: self.oldattr.update({k: [v if isinstance(v, bytes) else v.encode('ISO8859-1') for v in val] for k, val in c.entry.items()}) return dn
def _write_admin_diary_modify(self) -> None: self._write_admin_diary_event('MODIFIED') def _create_temporary_ou(self) -> str: name = 'temporary_move_container_%s' % time.time() module = univention.admin.modules.get('container/ou') position = univention.admin.uldap.position('%s' % self.lo.base) temporary_object: simpleLdap = module.object(None, self.lo, position) temporary_object.open() temporary_object['name'] = name temporary_object.create() return 'ou=%s' % ldap.dn.escape_dn_chars(name) def _delete_temporary_ou_if_empty(self, temporary_ou: str | None) -> None: """ Try to delete the organizational unit entry if it is empty. :param str temporary_ou: The distinguished name of the container. """ if not temporary_ou: return dn = '%s,%s' % (temporary_ou, self.lo.base) module = univention.admin.modules.get('container/ou') temporary_object = univention.admin.modules.lookup(module, None, self.lo, scope='base', base=dn, required=True, unique=True)[0] temporary_object.open() try: temporary_object.remove() except (univention.admin.uexceptions.ldapError, ldap.NOT_ALLOWED_ON_NONLEAF): pass
[docs] def move(self, newdn: str, ignore_license: bool = False, temporary_ou: str | None = None) -> str: """ Moves the LDAP object to the target position. :param str newdn: The DN of the target position. :param bool ignore_license: If the license is exceeded the modification may fail. Setting this to True causes license checks to be disabled. :param str temporary_ou: The distiguished name of a temporary container which is used to rename the object if only is letter casing changes. :raises: :class:`univention.admin.uexceptions.invalidOperation` if objects of this type do not support to be moved. :raises: :class:`univention.admin.uexceptions.noObject` if the object does not exists. :raises: :class:`univention.admin.uexceptions.permissionDenied` if no permissions for moving exists :returns: The new DN of the moved object """ self.log.debug('Moving object', dn=self.dn, new_dn=newdn) if not (univention.admin.modules.supports(self.module, 'move') or univention.admin.modules.supports(self.module, 'subtree_move')): raise univention.admin.uexceptions.invalidOperation(_('Objects of the "%s" object type can not be moved.') % (self.module,)) self.authz.object_exists(self) # FIXME: check for information leak via specifying arbitrary destinations self.authz.is_move_allowed(self, newdn) if self.lo.compare_dn(self.dn, self.lo.whoami()): raise univention.admin.uexceptions.invalidOperation(_('The own object cannot be moved.')) if self.lo.compare_dn(self.dn, self.lo.authz_connection.whoami()): raise univention.admin.uexceptions.invalidOperation(_('The own object cannot be moved.')) if not self.exists(): raise univention.admin.uexceptions.noObject(self.dn) if _prevent_to_change_ad_properties and self._is_synced_object(): raise univention.admin.uexceptions.invalidOperation(_('Objects from Active Directory can not be moved.')) def n(x: str) -> str: return dn2str(str2dn(x)) assert self.dn is not None newdn = n(newdn) self.dn = n(self.dn) self._set_log() goaldn = self.lo.parentDn(newdn) goalmodule = univention.admin.modules.identifyOne(goaldn, self.lo.authz_connection.get(goaldn)) goalmodule = univention.admin.modules.get(goalmodule) if not goalmodule or not hasattr(goalmodule, 'childs') or goalmodule.childs != 1: raise univention.admin.uexceptions.invalidOperation(_("Destination object can't have sub objects.")) if self.lo.compare_dn(self.dn.lower(), newdn.lower()): if self.dn == newdn: raise univention.admin.uexceptions.ldapError(_('Moving not possible: old and new DN are identical.')) else: # We must use a temporary folder because OpenLDAP does not allow a rename of an container with subobjects temporary_ou = self._create_temporary_ou() temp_dn = dn2str(str2dn(newdn)[:1] + str2dn(temporary_ou) + str2dn(self.lo.base)) self.dn = n(self.move(temp_dn, ignore_license, temporary_ou)) self._set_log() if newdn.lower().endswith(self.dn.lower()): raise univention.admin.uexceptions.ldapError(_("Moving into one's own sub container not allowed.")) if univention.admin.modules.supports(self.module, 'subtree_move'): # check if is subtree: subelements = self.lo.authz_connection.search(base=self.dn, scope='one', attr=[]) if subelements: olddn = self.dn self.log.debug('move: found subelements, do subtree move', new_dn=newdn) # create copy of myself module = univention.admin.modules.get(self.module) position = univention.admin.uldap.position(self.lo.base) position.setDn(self.lo.parentDn(newdn)) copyobject = module.object(None, self.lo, position) copyobject.options = self.options[:] copyobject.open() for key in self.keys(): if key == 'univentionObjectIdentifier': continue copyobject[key] = self[key] copyobject.policies = self.policies copyobject.create() to_be_moved = [] moved = [] pattern = re.compile('%s$' % (re.escape(self.dn),), flags=re.I) try: for subolddn, suboldattrs in subelements: # Convert the DNs to lowercase before the replacement. The cases might be mixed up if the Python lib is # used by the connector, for example: # subolddn: uid=user_test_h80,ou=TEST_H81,$LDAP_BASE # self.dn: ou=test_h81,$LDAP_BASE # newdn: OU=TEST_H81,ou=test_h82,$LDAP_BASE # -> subnewdn: uid=user_test_h80,OU=TEST_H81,ou=test_h82,$LDAP_BASE subnew_position = pattern.sub(dn2str(str2dn(self.lo.parentDn(subolddn))), newdn) subnewdn = dn2str(str2dn(subolddn)[:1] + str2dn(subnew_position)) self.log.debug('Moving subelement', old=subolddn, new=subnewdn) submodule = univention.admin.modules.identifyOne(subolddn, suboldattrs) submodule = univention.admin.modules.get(submodule) subobject = univention.admin.objects.get(submodule, None, self.lo, position='', dn=subolddn) if not subobject or not (univention.admin.modules.supports(submodule, 'move') or univention.admin.modules.supports(submodule, 'subtree_move')): subold_rdn = '+'.join(explode_rdn(subolddn, 1)) type_ = univention.admin.modules.identifyOne(subolddn, suboldattrs) raise univention.admin.uexceptions.invalidOperation( _('Unable to move object %(name)s (%(type)s) in subtree, trying to revert changes.') % { 'name': subold_rdn, 'type': type_ and type_.module, }, ) to_be_moved.append((subobject, subolddn, subnewdn)) for subobject, subolddn, subnewdn in to_be_moved: subobject.open() subobject.move(subnewdn) moved.append((subolddn, subnewdn)) univention.admin.objects.get(univention.admin.modules.get(self.module), None, self.lo, position='', dn=self.dn).remove() self._delete_temporary_ou_if_empty(temporary_ou) except BaseException: self.log.error('move: subtree move failed, trying to move back.') position = univention.admin.uldap.position(self.lo.base) position.setDn(self.lo.parentDn(olddn)) for subolddn, subnewdn in moved: submodule = univention.admin.modules.identifyOne(subnewdn, self.lo.authz_connection.get(subnewdn)) submodule = univention.admin.modules.get(submodule) subobject = univention.admin.objects.get(submodule, None, self.lo, position='', dn=subnewdn) subobject.open() subobject.move(subolddn) copyobject.remove() self._delete_temporary_ou_if_empty(temporary_ou) raise self.dn = newdn self._set_log() return newdn else: # normal move, fails on subtrees res = n(self._move(newdn, ignore_license=ignore_license)) self._delete_temporary_ou_if_empty(temporary_ou) return res else: res = n(self._move(newdn, ignore_license=ignore_license)) self._delete_temporary_ou_if_empty(temporary_ou) return res
[docs] def move_subelements(self, olddn: str, newdn: str, subelements: list[tuple[str, dict]], ignore_license: bool = False) -> list[tuple[str, str]] | None: """ Internal function to move all children of a container. :param str olddn: The old distinguished name of the parent container. :param str newdn: The new distinguished name of the parent container. :param subelements: A list of 2-tuples (old-dn, old-attrs) for each child of the parent container. :param bool ignore_license: If the license is exceeded the modification may fail. Setting this to True causes license checks to be disabled. :returns: A list of 2-tuples (old-dn, new-dn) """ if subelements: self.log.debug('move: found subelements, do subtree move') moved = [] try: for subolddn, suboldattrs in subelements: self.log.debug('move: subelement', dn=subolddn) subnewdn = re.sub('%s$' % (re.escape(olddn),), newdn, subolddn) # FIXME: looks broken submodule = univention.admin.modules.identifyOne(subolddn, suboldattrs) submodule = univention.admin.modules.get(submodule) subobject = univention.admin.objects.get(submodule, None, self.lo, position='', dn=subolddn) if not subobject or not (univention.admin.modules.supports(submodule, 'move') or univention.admin.modules.supports(submodule, 'subtree_move')): subold_rdn = '+'.join(explode_rdn(subolddn, 1)) raise univention.admin.uexceptions.invalidOperation( _('Unable to move object %(name)s (%(type)s) in subtree, trying to revert changes.') % {'name': subold_rdn, 'type': univention.admin.modules.identifyOne(subolddn, suboldattrs)}, ) subobject.open() subobject._move(subnewdn) moved.append((subolddn, subnewdn)) return moved except Exception: self.log.error('move: subtree move failed, try to move back') for subolddn, subnewdn in moved: submodule = univention.admin.modules.identifyOne(subnewdn, self.lo.authz_connection.get(subnewdn)) submodule = univention.admin.modules.get(submodule) subobject = univention.admin.objects.get(submodule, None, self.lo, position='', dn=subnewdn) subobject.open() subobject.move(subolddn) raise return None # FIXME:
[docs] def restore(self) -> None: # TODO: support destination: str """ Restore LDAP object from recyclebin with all the LDAP attributes from the deleted object (excluding operational and recyclebin attributes) :raises: :class:`univention.admin.uexceptions.restoreFailed` if e.g. parent DN doesn't exists. :raises: :class:`univention.admin.uexceptions.invalidOperation` if objects of this type do not support to be restored. :raises: :class:`univention.admin.uexceptions.noObject` if the object does not exist. :raises: :class:`univention.admin.uexceptions.permissionDenied` if no permissions for restore exists. """ if not univention.admin.modules.supports(self.module, 'restore'): raise univention.admin.uexceptions.invalidOperation(_('Objects of the "%s" object type can not be restored.') % (self.module,)) self.authz.is_restore_allowed(self) if not self.exists(): raise univention.admin.uexceptions.noObject(self.dn) original_dn = self.info['originalDN'] operational_attributes = univention.admin.modules._ldap_operational_attribute_names(self.lo) | univention.admin.recyclebin.IGNORE_ATTRS restore = { attr: value for attr, value in self.oldattr.items() if attr.lower() not in operational_attributes and not attr.startswith('univentionRecycleBin') } if 'univentionRecycleBinOriginalObjectClass' in self.oldattr: restore['objectClass'] = self.oldattr['univentionRecycleBinOriginalObjectClass'] if 'univentionRecycleBinOriginalType' in self.oldattr: restore['univentionObjectType'] = self.oldattr['univentionRecycleBinOriginalType'] if 'univentionRecycleBinOriginalEntryUUID' in self.oldattr: restore['entryUUID'] = self.oldattr['univentionRecycleBinOriginalEntryUUID'] if 'univentionRecycleBinOriginalUniventionObjectIdentifier' in self.oldattr: restore['univentionObjectIdentifier'] = self.oldattr['univentionRecycleBinOriginalUniventionObjectIdentifier'] ml = list(restore.items()) self.log.trace('Restoring object', original_dn=original_dn, modlist=ml) # check parent parent_dn = self.lo.parentDn(original_dn) try: self.lo.authz_connection.get(parent_dn, required=True) except ldap.NO_SUCH_OBJECT: raise univention.admin.uexceptions.restoreFailed( _('Cannot restore object to %s: Parent container %s does not exist.') % (original_dn, parent_dn), ) mod = univention.admin.modules.get(self.info['originalObjectType']) position = univention.admin.uldap.position(configRegistry['ldap/base']) position.setDn(parent_dn) obj = mod.object(None, self.lo, position, attributes=restore) # FIXME: # for setting entryUUID we need relax controls and the manage permissions # we have manage currently only for the root_dn # so for now restore only on primary and backup? from univention.admin.uldap import getAdminConnection try: loa, pos = getAdminConnection() # noqa: RUF059 except Exception: raise univention.admin.uexceptions.permissionDenied(_('Restore not allowed on this machine.')) # NOTE: Running validation/hooks during restore: # Ensures restored objects meet current validation rules, allocates missing auto-generated # attributes (e.g., uidNumber if policy changed), runs extended attribute hooks, checks blocklist # May prevent restoring objects that were valid when deleted but violate current rules # (e.g., new password policy, changed syntax rules, blocklist additions) try: obj.open() obj._exists = False obj.oldinfo = {} univention.admin.blocklist.check_blocklistentry(obj) # Check for conflicts with blocklisted attributes obj._ldap_pre_ready() # Module-specific pre-validation obj.ready() # Allocate auto-generated attributes (uidNumber, gidNumber, etc.) obj._ldap_pre_create() # Final pre-create validation and hooks obj.call_udm_property_hook('hook_ldap_pre_create', obj) # Extended attribute hooks self._update_policies() # Update policy references relax_rules_control = RelaxRulesControl(criticality=True) # self.lo.authz_connection.add(original_dn, ml, serverctrls=[relax_rules_control]) loa.authz_connection.add(original_dn, ml, serverctrls=[relax_rules_control]) except Exception: obj.cancel() raise obj._ldap_post_create() # Post-create hooks (e.g., group cache updates) self.log.info('Object restored from recyclebin', original_dn=original_dn, object_type=self.info.get('originalObjectType', 'unknown')) self.restore_references() self.lo.authz_connection.delete(self.dn) self._write_admin_diary_restore() # TODO: self.call_udm_property_hook('hook_ldap_post_remove', self) return original_dn
[docs] def restore_references(self): """Restore references"""
# implemented in subclass def _write_admin_diary_restore(self) -> None: self._write_admin_diary_event('RESTORED')
[docs] def remove(self, remove_childs: bool = False) -> None: """ Removes this LDAP object. :param bool remove_childs: Specifies to remove children objects before removing this object. :raises: :class:`univention.admin.uexceptions.ldapError` (Operation not allowed on non-leaf: subordinate objects must be deleted first) if the object contains childrens and *remove_childs* is False. :raises: :class:`univention.admin.uexceptions.invalidOperation` if objects of this type do not support to be removed. :raises: :class:`univention.admin.uexceptions.permissionDenied` if no permissions for removal exists :raises: :class:`univention.admin.uexceptions.noObject` if the object does not exists. """ if not univention.admin.modules.supports(self.module, 'remove'): raise univention.admin.uexceptions.invalidOperation(_('Objects of the "%s" object type can not be removed.') % (self.module,)) self.authz.object_exists(self) self.authz.is_remove_allowed(self) if not self.dn or not self.lo.authz_connection.get(self.dn): raise univention.admin.uexceptions.noObject(self.dn) if self.lo.compare_dn(self.dn, self.lo.whoami()): raise univention.admin.uexceptions.invalidOperation(_('The own object cannot be removed.')) if self.lo.compare_dn(self.dn, self.lo.authz_connection.whoami()): raise univention.admin.uexceptions.invalidOperation(_('The own object cannot be removed.')) return self._remove(remove_childs)
[docs] def get_gid_for_primary_group(self) -> str: """ Return the numerical group ID of the primary group. :returns: The numerical group ID as a string or "99999" if no primary group is declared. :raises univention.admin.uexceptions.primaryGroup: if the object has no primary group. """ gidNum = '99999' if self['primaryGroup']: try: gidNum = self.lo.authz_connection.getAttr(self['primaryGroup'], 'gidNumber', required=True)[0].decode('ASCII') except ldap.NO_SUCH_OBJECT: raise univention.admin.uexceptions.primaryGroup(self['primaryGroup']) return gidNum
[docs] def get_sid_for_primary_group(self) -> str: """ Return the Windows security ID for the primary group. :returns: The security identifier of the primary group. :raises univention.admin.uexceptions.primaryGroup: if the object has no primary group. """ try: sidNum = self.lo.authz_connection.getAttr(self['primaryGroup'], 'sambaSID', required=True)[0].decode('ASCII') except ldap.NO_SUCH_OBJECT: raise univention.admin.uexceptions.primaryGroupWithoutSamba(self['primaryGroup']) return sidNum
def _ldap_pre_ready(self) -> None: """Hook which is called before :func:`univention.admin.handlers.simpleLdap.ready`.""" def _ldap_pre_create(self) -> None: """Hook which is called before the object creation.""" self.dn = self._ldap_dn() self._set_log() self.request_lock('cn-uid-position', self.dn) try: if self.has_property('univentionObjectIdentifier'): self.request_unique('univentionObjectIdentifier', str(uuid.uuid4())) except univention.admin.uexceptions.permissionDenied: self.log.info('acquireUnique univentionObjectIdentifier got permissionDenied, continuing without lock, relying on slapo-unique') def _ldap_dn(self) -> str: """ Builds the LDAP DN of the object before creation by using the identifying properties to build the RDN. :returns: the distringuised name. """ identifier = [ (self.mapping.mapName(name), self.mapping.mapValueDecoded(name, self.info[name]), 2) for name, prop in self.descriptions.items() if prop.identifies ] return '%s,%s' % (dn2str([identifier]), dn2str(str2dn(self.dn)[1:]) if self.exists() else self.position.getDn()) def _ldap_post_create(self) -> None: """Hook which is called after the object creation.""" self._confirm_locks() def _ldap_pre_modify(self) -> None: """Hook which is called before the object modification.""" def _ldap_post_modify(self) -> None: """Hook which is called after the object modification.""" self._confirm_locks() def _ldap_pre_rename(self, newdn: str) -> None: """ Hook which is called before renaming the object. :param str newdn: The new distiguished name the object will be renamed to. """ self.request_lock('cn-uid-position', newdn) def _ldap_post_rename(self, olddn: str) -> None: """ Hook which is called after renaming the object. :param str olddn: The old distiguished name the object was renamed from. """ def _ldap_pre_move(self, newdn: str) -> None: """ Hook which is called before the object moving. :param str newdn: The new distiguished name the object will be moved to. """ self.request_lock('cn-uid-position', newdn) def _ldap_post_move(self, olddn: str) -> None: """ Hook which is called after the object moving. :param str olddn: The old distiguished name the object was moved from. """ def _ldap_pre_remove(self) -> None: """Hook which is called before the object removal.""" def _ldap_post_remove(self) -> None: """Hook which is called after the object removal.""" self._release_locks() def _safe_cancel(self) -> None: try: self.cancel() except (KeyboardInterrupt, SystemExit, SyntaxError): raise except Exception: self.log.exception('cancel() failed:') def _falsy_boolean_extended_attributes(self, info: _Properties) -> _Properties: m = univention.admin.modules._get(self.module) for prop in getattr(m, 'extended_udm_attributes', []): if prop.syntax == 'boolean' and not info.get(prop.name): info[prop.name] = '0' return info
[docs] def exists(self) -> bool: """ Indicates that this object exists in LDAP. :returns: True if the object exists in LDAP, False otherwise. """ return self._exists
def _validate_superordinate(self, must_exists: bool = True) -> None: """ Checks if the superordinate is set to a valid :class:`univention.admin.handlers.simpleLdap` object if this module requires a superordinate. It is ensured that the object type of the superordinate is correct. It is ensured that the object lies underneath of the superordinate position. :raises: :class:`univention.admin.uexceptions.insufficientInformation` :raises: :class:`univention.admin.uexceptions.noSuperordinate` """ superordinate_names = set(univention.admin.modules.superordinate_names(self.module)) if not superordinate_names: return # module has no superodinates if not self.dn and not self.position: # this check existed in all modules with superordinates, so still check it here, too raise univention.admin.uexceptions.insufficientInformation(_('Neither DN nor position given.')) if not self.superordinate: self.superordinate = univention.admin.objects.get_superordinate(self.module, None, self.lo, self.dn or self.position.getDn()) if not self.superordinate: if superordinate_names == {'settings/cn'}: self.log.warning('No settings/cn superordinate was given.') return # settings/cn might be misued as superordinate, don't risk currently if not must_exists: return raise univention.admin.uexceptions.noSuperordinate(_('No superordinate object given')) # check if the superordinate is of the correct object type if not {self.superordinate.module} & superordinate_names: raise univention.admin.uexceptions.insufficientInformation( _('The given %r superordinate is expected to be of type %s.') % (self.superordinate.module, ', '.join(superordinate_names)), ) if self.dn and not self._ensure_dn_in_subtree(self.superordinate.dn, self.lo.parentDn(self.dn)): raise univention.admin.uexceptions.insufficientInformation(_('The DN must be underneath of the superordinate.')) def _ensure_dn_in_subtree(self, parent: str, dn: str | None) -> bool: """ Checks if the given DN is underneath of the subtree of the given parent DN. :param str parent: The distiguished name of the parent container. :param str dn: The distinguished name to check. :returns: True if `dn` is underneath of `parent`, False otherwise. """ while dn: if self.lo.compare_dn(dn, parent): return True dn = self.lo.parentDn(dn) return False
[docs] def call_udm_property_hook( self, hookname: _HookName, module: Self, changes=None, # dict[str, tuple] | None ) -> dict[str, tuple] | None: """ Internal method to call a hook scripts of extended attributes. :param str hookname: The name of the hook function to call. :param str module: The name of the UDM module. :param dict changes: A list of changes. :returns: The (modified) list of changes. """ m = univention.admin.modules._get(module.module) attrs = getattr(m, 'extended_udm_attributes', None) if attrs: for prop in attrs: if prop.hook is not None: func = getattr(prop.hook, hookname) if changes is None: func(module) else: changes = func(module, changes) return changes
[docs] def open(self) -> None: """ Opens this object. During the initialization of this object the current set LDAP attributes are mapped into :py:attr:`info`. This method makes it possible to e.g. resolve external references to other objects which are not represented in the raw LDAP attributes of this object, for example the group memberships of a user. By default only the `open` hook for extended attributes is called. This method can be subclassed. .. warning:: If this method changes anything in self.info it *must* call :py:meth:`save` afterwards. .. warning:: If your are going to do any modifications (such as creating, modifying, moving, removing this object) this method must be called directly after the constructor and before modifying any property. """ self._open = True self.call_udm_property_hook('hook_open', self) self.save()
def _remove_option(self, name: str) -> None: """ Removes the UDM option if it is set. :param str name: The name of the option to remove. """ if name in self.options: self.options.remove(name) def __set_options(self) -> None: """Enables the UDM options of this object by evaluating the currently set LDAP object classes. If the object does not exists yet the default options are enabled.""" options = univention.admin.modules.options(self.module) if 'objectClass' in self.oldattr: ocs = {x.decode('UTF-8') for x in self.oldattr['objectClass']} self.options = [ opt for opt, option in options.items() if not option.disabled and option.matches(ocs) and self.__app_option_enabled(opt, option) ] else: self.log.debug('reset options to default by _define_options') self.options = [] self._define_options(options) def _define_options(self, module_options: dict[str, Any]) -> None: """ Enables all UDM options which are enabled by default. :param dict module_options: A mapping of option-name to option. """ self.log.debug('reset to default options') self.options.extend( name for name, opt in module_options.items() if not opt.disabled and opt.default )
[docs] def option_toggled(self, option: str) -> bool: """ Checks if an UDM option was changed. :param str option: The name of the option to check. :returns: True if the option was changed, False otherwise. .. warning:: This does not work for not yet existing objects. """ return option in set(self.options) ^ set(self.old_options)
[docs] def policy_reference(self, *policies): for policy in policies: if not ldap.dn.is_dn(policy): raise univention.admin.uexceptions.valueInvalidSyntax(policy) try: if b'univentionPolicy' not in self.lo.authz_connection.getAttr(policy, 'objectClass', required=True): raise univention.admin.uexceptions.valueError('Object is not a policy', policy) except ldap.NO_SUCH_OBJECT: raise univention.admin.uexceptions.noObject('Policy does not exists', policy) self.policies.extend(policy for policy in policies if not any(self.lo.compare_dn(pol, policy) for pol in self.policies))
[docs] def policy_dereference(self, *policies): for policy in policies: if not ldap.dn.is_dn(policy): raise univention.admin.uexceptions.valueInvalidSyntax(policy) self.policies = [policy for policy in self.policies if not any(self.lo.compare_dn(pol, policy) for pol in policies)]
[docs] def policiesChanged(self) -> bool: return set(self.oldpolicies) != set(self.policies)
def __app_option_enabled(self, name: str, option: univention.admin.option) -> bool: if option.is_app_option: return all( self[pname] in ('TRUE', '1', 'OK') for pname, prop in self.descriptions.items() if name in prop.options and prop.syntax.name in ('AppActivatedBoolean', 'AppActivatedTrue', 'AppActivatedOK') ) return True
[docs] def description(self) -> str: """ Return a descriptive string for the object. By default the relative distinguished name is returned. :returns: A descriptive string or `none` as fallback. """ if self.dn: return '+'.join(explode_rdn(self.dn, 1)) else: for name, property in self.descriptions.items(): if property.identifies: syntax = property.syntax return syntax.tostring(self[name] or '') return 'none'
def _post_unmap(self, info: _Properties, values: _Attributes) -> _Properties: """ This method can be overwritten to define special un-map methods to map back from LDAP to UDM that can not be done with the default mapping API. :param info: The list of UDM properties. :param values: The list of LDAP attributes. :returns: The (modified) list of UDM properties. """ return info def _post_map(self, modlist: list[tuple[str, Any, Any]], diff: list[tuple[str, Any, Any]]) -> list[tuple[str, Any, Any]]: """ This method can be overwritten to define special map methods to map from UDM to LDAP that can not be done with the default mapping API. :param modlist: The list of LDAP modifications. :param list diff: A list of modified UDM properties. :returns: The (modified) list of LDAP modifications. """ return modlist def _ldap_addlist(self) -> list[tuple[str, Any]]: return [] def _ldap_modlist(self) -> list[tuple[str, Any, Any]]: """ Builds the list of modifications when creating and modifying this object. It compares the old properties (:py:attr:`oldinfo`) with the new properties (:py:attr:`info`) and applies the LDAP mapping. Differences are added to the modlist which consists of a tuple with three items: ("LDAP attribute-name", [old, values], [new, values]) ("LDAP attribute-name", old_value, new_value) ("LDAP attribute-name", None, added_value) .. seealso:: :mod:`univention.uldap` for further information about the format of the modlist. This method can be overridden in a subclass to add special behavior, e.g. for properties which have no mapping defined. .. caution:: The final modlist used for creation of objects is mixed with the :func:`univention.admin.handlers.simpleLdap._ldap_addlist`. Make sure this method don't add attributes which are already set. """ diff_ml = self.diff() ml = univention.admin.mapping.mapDiff(self.mapping, diff_ml) ml = self._post_map(ml, diff_ml) if self.policiesChanged(): policy_ocs_set = b'univentionPolicyReference' in self.oldattr.get('objectClass', []) if self.policies and not policy_ocs_set: ml.append(('objectClass', b'', [b'univentionPolicyReference'])) elif not self.policies and policy_ocs_set: ml.append(('objectClass', b'univentionPolicyReference', b'')) ml.append(('univentionPolicyReference', [x.encode('UTF-8') for x in self.oldpolicies], [x.encode('UTF-8') for x in self.policies])) return ml def _create(self, response: dict[str, Any] | None = None, serverctrls: list[ldap.controls.LDAPControl] | None = None, ignore_license: bool = False) -> str: """Create the object. Should only be called by :func:`univention.admin.handlers.simpleLdap.create`.""" univention.admin.blocklist.check_blocklistentry(self) self._ldap_pre_create() self._update_policies() self.call_udm_property_hook('hook_ldap_pre_create', self) self.set_default_values() self._call_checkLdap_on_all_property_syntaxes() al = self._ldap_addlist() al.extend(self._ldap_modlist()) al = self._ldap_object_classes_add(al) al = self.call_udm_property_hook('hook_ldap_addlist', self, al) # ensure univentionObject is set al.append(('objectClass', [b'univentionObject'])) al.append(('univentionObjectType', [self.module.encode('utf-8')])) self.log.debug('Create object.', dn=self.dn) self.log.log(1, 'Create object', dn=self.dn, addlist=al) # if anything goes wrong we need to remove the already created object, otherwise we run into 'already exists' errors try: self.lo.authz_connection.add(self.dn, al, serverctrls=serverctrls, response=response, ignore_license=ignore_license) self._exists = True self._ldap_post_create() except Exception: # ensure that there is no lock left exc = sys.exc_info() self.log.info('Creation failed', dn=self.dn, error=exc[1]) try: self.cancel() except Exception: self.log.exception('Post-create: cancel() failed:') try: if self._exists: # add succeeded but _ldap_post_create failed! obj = univention.admin.objects.get(univention.admin.modules._get(self.module), None, self.lo, self.position, self.dn) obj.open() obj.remove() except Exception: self.log.exception('Post-create: remove() failed') raise exc[1].with_traceback(exc[2]) self.call_udm_property_hook('hook_ldap_post_create', self) self.save() return self.dn def _ldap_object_classes_add(self, al: list[tuple[str, Any]]) -> list[tuple[str, Any]]: m = univention.admin.modules._get(self.module) # evaluate extended attributes ocs: set[str] = set() for prop in getattr(m, 'extended_udm_attributes', []): self.log.debug('Creating extended attributes', property=prop.name, has_property=self.has_property(prop.name), value=self.info.get(prop.name)) if prop.syntax == 'boolean' and self.info.get(prop.name) == '0': continue if self.has_property(prop.name) and self.info.get(prop.name): ocs.add(prop.objClass) module_options = univention.admin.modules.options(self.module) # add object classes of (especially extended) options for option in ['default', *self.options]: try: opt = module_options[option] except KeyError: self.log.debug('Unknown option.', type=m.module, option=option) continue ocs |= set(opt.objectClasses) # remove duplicated object classes for i in al: key, val = i[0], i[-1] # might be a triple if val and key.lower() == 'objectclass': val_list = [val] if not isinstance(val, tuple | list) else val val_unicode = [x.decode('UTF-8') if isinstance(x, bytes) else x for x in val_list] ocs -= set(val_unicode) # TODO: check str vs bytes everywhere for ocs calculations if ocs: al.append(('objectClass', [x.encode('UTF-8') for x in ocs])) return al def _modify(self, modify_childs=True, ignore_license=False, response=None, serverctrls=None): """Modify the object. Should only be called by :func:`univention.admin.handlers.simpleLdap.modify`.""" self.__prevent_ad_property_change() univention.admin.blocklist.check_blocklistentry(self) self._ldap_pre_modify() self._update_policies() self.call_udm_property_hook('hook_ldap_pre_modify', self) self.set_default_values() self._fix_app_options() # iterate over all properties and call checkLdap() of corresponding syntax self._call_checkLdap_on_all_property_syntaxes() ml = self._ldap_modlist() ml = self.call_udm_property_hook('hook_ldap_modlist', self, ml) ml = self._ldap_object_classes(ml) class wouldRename(Exception): @classmethod def on_rename(cls, dn, new_dn, ml): raise cls(dn, new_dn) self.log.log(1, 'Modify object', dn=self.dn, modlist=ml, oldattr=self.oldattr) blocklist_entries = univention.admin.blocklist.create_blocklistentry(self) try: try: self.dn = self.lo.authz_connection.modify(self.dn, ml, ignore_license=ignore_license, serverctrls=serverctrls, response=response, rename_callback=wouldRename.on_rename) self._set_log() except wouldRename as exc: self.authz.is_rename_allowed(self) self._ldap_pre_rename(exc.args[1]) self.dn = self.lo.authz_connection.modify(self.dn, ml, ignore_license=ignore_license, serverctrls=serverctrls, response=response) self._set_log() self._ldap_post_rename(exc.args[0]) except Exception: univention.admin.blocklist.cleanup_blocklistentry(blocklist_entries, self) raise if ml: self._write_admin_diary_modify() self._ldap_post_modify() self.call_udm_property_hook('hook_ldap_post_modify', self) self.save() return self.dn
[docs] def set_default_values(self) -> None: """Sets all the default values of all properties.""" # Make sure all default values are set... for name, p in self.descriptions.items(): # ... if property has no option or any required option is currently enabled if not self.has_property(name): continue set_defaults = self.set_defaults if not self.set_defaults and p.options and not set(self.old_options) & set(p.options): # set default values of properties which depend on an option but weren't activated prior modifying self.set_defaults = True try: if p.default(self): self[name] # __getitem__ sets default value finally: self.set_defaults = set_defaults
def _fix_app_options(self) -> None: # for objects with objectClass=appObject and appObjectActivated=0 we must set appObjectActivated=1 for option, opt in getattr(univention.admin.modules._get(self.module), 'options', {}).items(): if not opt.is_app_option or not self.option_toggled(option) or option not in self.options: continue for pname, prop in self.descriptions.items(): if option in prop.options and prop.syntax.name in ('AppActivatedBoolean', 'AppActivatedTrue', 'AppActivatedOK'): self[pname] = True def _ldap_object_classes(self, ml: list[tuple]) -> list[tuple]: """Detects the attributes changed in the given modlist, calculates the changes of the object class and appends it to the modlist.""" m = univention.admin.modules._get(self.module) def lowerset(vals: Iterable[str]) -> set[str]: return {x.lower() for x in vals} ocs = lowerset(x.decode('UTF-8') for x in _MergedAttributes(self, ml).get_attribute('objectClass')) unneeded_ocs: set[str] = set() required_ocs: set[str] = set() # evaluate (extended) options module_options = univention.admin.modules.options(self.module) available_options = set(module_options) options = set(self.options) if 'default' in available_options: options |= {'default'} old_options = set(self.old_options) if options != old_options: self.log.debug('Changed options', old_option=options, options=options) unavailable_options = (options - available_options) | (old_options - available_options) if unavailable_options: # Bug #46586: as we simulate legacy options, this is no longer an error self.log.debug('Unknown options', type=self.module, options=unavailable_options) added_options = options - old_options - unavailable_options removed_options = old_options - options - unavailable_options # evaluate extended attributes for prop in getattr(m, 'extended_udm_attributes', []): self.log.debug('Modify object classes for extended attribute', property=prop.name, objectclass=prop.objClass) if self.has_property(prop.name) and self.info.get(prop.name) and (True if prop.syntax != 'boolean' else self.info.get(prop.name) != '0'): required_ocs |= {prop.objClass} continue if prop.deleteObjClass: unneeded_ocs |= {prop.objClass} # if the value is unset we need to remove the attribute completely if self.oldattr.get(prop.ldapMapping): ml = [x for x in ml if x[0].lower() != prop.ldapMapping.lower()] ml.append((prop.ldapMapping, self.oldattr.get(prop.ldapMapping), b'')) unneeded_ocs |= {oc for option in removed_options for oc in module_options[option].objectClasses} required_ocs |= {oc for option in added_options for oc in module_options[option].objectClasses} ocs -= lowerset(unneeded_ocs) ocs |= lowerset(required_ocs) if lowerset(x.decode('utf-8') for x in self.oldattr.get('objectClass', [])) == ocs: return ml self.log.debug('Changed OCS', ocs=ocs, required=required_ocs, removed=unneeded_ocs) # case normalize object class names schema = self.lo.authz_connection.get_schema() ocs = {x.names[0] for x in (schema.get_obj(ldap.schema.models.ObjectClass, x) for x in ocs) if x} # make sure we still have a structural object class if not schema.get_structural_oc(ocs): structural_ocs = schema.get_structural_oc(unneeded_ocs) if not structural_ocs: self.log.error('missing structural object class. Modify will fail.') return ml self.log.warning('Preventing to remove last structural object class', object_class=structural_ocs) ocs -= set(schema.get_obj(ldap.schema.models.ObjectClass, structural_ocs).names) # validate removal of object classes must, may = schema.attribute_types(ocs) allowed = {name.lower() for attr in may.values() for name in attr.names} | {name.lower() for attr in must.values() for name in attr.names} ml = [x for x in ml if x[0].lower() != 'objectclass'] ml.append(('objectClass', self.oldattr.get('objectClass', []), [x.encode('utf-8') for x in ocs])) newattr = ldap.cidict.cidict(_MergedAttributes(self, ml).get_attributes()) # make sure only attributes known by the object classes are set for attr, val in newattr.items(): if not val: continue if re.sub(r';binary$', '', attr.lower()) not in allowed: self.log.warning('Attribute is not allowed by any object class.', attribute=attr) # ml.append((attr, val, [])) # TODO: Remove the now invalid attribute instead return ml # require all MUST attributes to be set for attr in must.values(): if not any(newattr.get(name) or newattr.get('%s;binary' % (name,)) for name in attr.names): self.log.warning('The attribute is required by the current object classes.', attribute=attr.names) return ml ml = [x for x in ml if x[0].lower() != 'objectclass'] ml.append(('objectClass', self.oldattr.get('objectClass', []), [x.encode('utf-8') for x in ocs])) return ml def _move_in_subordinates(self, olddn: str) -> None: result = self.lo.authz_connection.searchDn(base=self.lo.base, filter=filter_format('(&(objectclass=person)(secretary=%s))', [olddn])) for subordinate in result: self.lo.authz_connection.modify(subordinate, [('secretary', olddn.encode('utf-8'), self.dn.encode('utf-8'))]) def _move_in_groups(self, olddn: str) -> None: for group in [*self.oldinfo.get('groups', []), self.oldinfo.get('machineAccountGroup', '')]: if group != '': try: self.lo.authz_connection.modify(group, [('uniqueMember', [olddn.encode('UTF-8')], None)]) except univention.admin.uexceptions.ldapError as exc: if not isinstance(exc.original_exception, ldap.NO_SUCH_ATTRIBUTE): raise try: self.lo.authz_connection.modify(group, [('uniqueMember', None, [self.dn.encode('UTF-8')])]) except univention.admin.uexceptions.ldapError as exc: if not isinstance(exc.original_exception, ldap.TYPE_OR_VALUE_EXISTS): raise def _move(self, newdn: str, modify_childs: bool = True, ignore_license: bool = False) -> str: """Moves this object to the new DN. Should only be called by :func:`univention.admin.handlers.simpleLdap.move`.""" self._ldap_pre_move(newdn) olddn = self.dn self.lo.authz_connection.rename(self.dn, newdn) self.dn = newdn self._set_log() try: self._move_in_groups(olddn) # can be done always, will do nothing if oldinfo has no attribute 'groups' self._move_in_subordinates(olddn) self._ldap_post_move(olddn) except Exception: # move back self.log.warning('ldap_post_move failed, move object back', dn=newdn, old_dn=olddn) self.lo.authz_connection.rename(self.dn, olddn) self.dn = olddn self._set_log() raise self._write_admin_diary_move(newdn) return self.dn def _write_admin_diary_move(self, position: str) -> None: self._write_admin_diary_event('MOVED', {'position': position}) def _remove(self, remove_childs: bool = False) -> None: """Removes this object. Should only be called by :func:`univention.admin.handlers.simpleLdap.remove`.""" self.log.debug('Removing object', dn=self.dn, remove_childs=remove_childs) if _prevent_to_change_ad_properties and self._is_synced_object(): raise univention.admin.uexceptions.invalidOperation(_('Objects from Active Directory can not be removed.')) self._ldap_pre_remove() self.call_udm_property_hook('hook_ldap_pre_remove', self) if remove_childs: subelements: list[tuple[str, dict[str, list[str]]]] = [] # TODO: perf: instead of searching, get it from oldattr if b'FALSE' not in self.lo.authz_connection.getAttr(self.dn, 'hasSubordinates'): self.log.debug('Removing children', dn=self.dn) subelements = self.lo.authz_connection.search(base=self.dn, scope='one', attr=[]) for subolddn, suboldattrs in subelements: self.log.debug('Removing child', dn=subolddn) for submodule in univention.admin.modules.identify(subolddn, suboldattrs): subobject = submodule.object(None, self.lo, None, dn=subolddn, attributes=suboldattrs) subobject.open() try: subobject.remove(remove_childs) except univention.admin.uexceptions.base as exc: self.log.error('Could not remove child', **{'dn': subolddn, 'error-name': type(exc).__name__, 'error': exc}) break else: self.log.warning('Could not remove child: could not identify UDM module', dn=subolddn) self._exists = False blocklist_entries = univention.admin.blocklist.create_blocklistentry(self) try: self.lo.authz_connection.delete(self.dn) except Exception: univention.admin.blocklist.cleanup_blocklistentry(blocklist_entries, self) self._exists = True raise self._ldap_post_remove() self.call_udm_property_hook('hook_ldap_post_remove', self) self.oldattr = {} self._write_admin_diary_remove() self.save() def _write_admin_diary_remove(self) -> None: self._write_admin_diary_event('REMOVED')
[docs] def loadPolicyObject(self, policy_type: str, reset: int = 0) -> 'simplePolicy': pathlist = [] self.log.debug('load policy object', type=policy_type) policy_module = univention.admin.modules._get(policy_type) # overwrite property descriptions univention.admin.ucr_overwrite_properties(policy_module, self.lo) # re-build layout if there any overwrites defined univention.admin.ucr_overwrite_module_layout(policy_module) # retrieve path info from 'cn=directory,cn=univention,<current domain>' object pathResult = self.lo.authz_connection.get('cn=directory,cn=univention,' + self.position.getDomain()) if not pathResult: pathResult = self.lo.authz_connection.get('cn=default containers,cn=univention,' + self.position.getDomain()) for j in pathResult.get('univentionPolicyObject', []): i = j.decode('utf-8') try: self.lo.authz_connection.searchDn(base=i, scope='base') pathlist.append(i) self.log.debug('loadPolicyObject: added path', **{'policy-dn': i}) except Exception: self.log.debug('loadPolicyObject: invalid path setting: does not exist in LDAP', **{'policy-dn': i}) continue # looking for next policy container break # at least one item has been found; so we can stop here since only pathlist[0] is used if not pathlist: policy_position = self.position else: policy_position = univention.admin.uldap.position(self.position.getBase()) policy_path = pathlist[0] try: prefix = univention.admin.modules.policyPositionDnPrefix(policy_module) self.lo.authz_connection.searchDn(base='%s,%s' % (prefix, policy_path), scope='base') policy_position.setDn('%s,%s' % (prefix, policy_path)) except Exception: policy_position.setDn(policy_path) for dn in self.policies: if ( univention.admin.modules.recognize(policy_module, dn, self.lo.authz_connection.get(dn)) and self.policyObjects.get(policy_type, None) and self.policyObjects[policy_type].cloned == dn and not reset ): return self.policyObjects[policy_type] for dn in self.policies[:]: modules = univention.admin.modules.identify(dn, self.lo.authz_connection.get(dn)) for module in modules: if univention.admin.modules.name(module) == policy_type: self.policyObjects[policy_type] = univention.admin.objects.get(module, None, self.lo, policy_position, dn=dn) self.policyObjects[policy_type].clone(self) self._init_ldap_search(self.policyObjects[policy_type]) return self.policyObjects[policy_type] if not modules: self.policies.remove(dn) if not self.policyObjects.get(policy_type, None) or reset: self.policyObjects[policy_type] = univention.admin.objects.get(policy_module, None, self.lo, policy_position) self.policyObjects[policy_type].copyIdentifier(self) self._init_ldap_search(self.policyObjects[policy_type]) return self.policyObjects[policy_type]
def _init_ldap_search(self, policy: 'simplePolicy') -> None: properties: dict[str, univention.admin.property] = {} if hasattr(policy, 'property_descriptions'): properties = policy.property_descriptions elif hasattr(policy, 'descriptions'): properties = policy.descriptions for pname, prop in properties.items(): if prop.syntax.name == 'LDAP_Search': prop.syntax._load(self.lo) if prop.syntax.viewonly: policy.mapping.unregister(pname, False) def _update_policies(self) -> None: for policy_type, policy_object in self.policyObjects.items(): self.log.debug('processing policy', type=policy_type) if policy_object.changes: self.log.debug('Creating policy', type=policy_type, info=policy_object.info) policy_object.create() univention.admin.objects.replacePolicyReference(self, policy_type, policy_object.dn)
[docs] def closePolicyObjects(self) -> None: self.policyObjects = {}
[docs] def savePolicyObjects(self) -> None: self._update_policies() self.closePolicyObjects()
[docs] def cancel(self) -> None: """Cancels the object creation or modification. This method can be subclassed to revert changes for example releasing locks.""" self._release_locks()
def _release_locks(self, name: str | None = None) -> None: """Release all temporary done locks""" for lock in self.alloc[:]: key, value = lock[0:2] if name and key != name: continue self.alloc.remove(lock) self.log.debug('release lock', lock=key, value=value) univention.admin.allocators.release(self.lo, self.position, key, value) def _confirm_locks(self) -> None: """ Confirm all temporary done locks. self.alloc should contain a 2-tuple or 3-tuple: (name:str, value:str) or (name:str, value:str, updateLastUsedValue:bool) """ while self.alloc: item = self.alloc.pop() name, value = item[0:2] updateLastUsedValue = True if len(item) > 2: updateLastUsedValue = item[2] univention.admin.allocators.confirm(self.lo, self.position, name, value, updateLastUsedValue=updateLastUsedValue) @overload def request_lock(self, name: univention.admin.allocators._TypesUidGid, value: str | None = None, updateLastUsedValue: bool = True) -> str: pass @overload def request_lock(self, name: univention.admin.allocators._Types, value: str, updateLastUsedValue: bool = True) -> str: pass
[docs] def request_lock(self, name: univention.admin.allocators._Types, value: str | None = None, updateLastUsedValue: bool = True) -> str: """Request a lock for the given value""" try: if name == 'sid+user': value = univention.admin.allocators.requestUserSid(self.lo, self.position, value) name = 'sid' else: value = univention.admin.allocators.request(self.lo, self.position, name, value) except univention.admin.uexceptions.noLock: self._release_locks(name) raise if not updateLastUsedValue: # backwards compatibility: 2er-tuples required! self.alloc.append((name, value, updateLastUsedValue)) else: self.alloc.append((name, value)) return value
[docs] def request_unique(self, name, value: str | None = None): # request a new $name or get lock for manually set $name if self[name]: univention.admin.allocators.acquireUnique(self.lo, univention.admin.uldap.position(configRegistry['ldap/base']), name, self[name], name, scope='base') # "False" ==> do not update univentionLastUsedValue in LDAP if a specific value has been specified self.alloc.append((name, self[name], False)) else: if name != 'univentionObjectIdentifier' or configRegistry.is_true('directory/manager/object-identifier/autogeneration'): self[name] = self.request_lock(name)
def _call_checkLdap_on_all_property_syntaxes(self) -> None: """ Calls checkLdap() method on every property if present. checkLdap() may raise an exception if the value does not match the constraints of the underlying syntax. .. deprecated:: 5.0-2 Univention internal use only! """ for pname, prop in self.descriptions.items(): if hasattr(prop.syntax, 'checkLdap') and (not self.exists() or self.hasChanged(pname)): if len(inspect.getfullargspec(prop.syntax.checkLdap).args) > 3: prop.syntax.checkLdap(self.lo, self.info.get(pname), pname) else: prop.syntax.checkLdap(self.lo, self.info.get(pname)) def __prevent_ad_property_change(self) -> None: if not _prevent_to_change_ad_properties or not self._is_synced_object(): return for key in self.descriptions: if self.descriptions[key].readonly_when_synced: value = self.info.get(key) oldval = self.oldinfo.get(key) null = [] if self.descriptions[key].multivalue else '' if oldval in (None, null) and value in (None, null): continue if oldval != value: raise univention.admin.uexceptions.valueMayNotChange(_('key=%(key)s old=%(old)s new=%(new)s') % {'key': key, 'old': oldval, 'new': value}, property=key) def _is_synced_object(self) -> bool: """Checks whether this object was synchronized from Active Directory to UCS.""" flags = self.oldattr.get('univentionObjectFlag', []) return b'synced' in flags and b'docker' not in flags
[docs] @classmethod def get_default_containers(cls, lo: univention.admin.uldap.access) -> list[str]: """ Returns list of default containers for this module. :param univention.admin.uldap.access lo: UDM LDAP access object. """ containers = univention.admin.modules.defaultContainers(univention.admin.modules._get(cls.module)) settings_directory = univention.admin.modules._get('settings/directory') position = univention.admin.uldap.position(lo.base) try: univention.admin.modules.init(lo, position, settings_directory) except univention.admin.uexceptions.noObject: pass try: # no authz here, this is a "management" operation to get the settings object, # we filter the defaut containers in the next step default_containers = settings_directory.lookup(None, lo.authz_connection, '', required=True)[0] except (univention.admin.uexceptions.noObject, IndexError): return containers if cls.default_containers_attribute_name: base = cls.default_containers_attribute_name else: base = cls.module.split('/', 1)[0] containers.extend(default_containers.info.get(base, [])) containers = lo._filter_ldap_search_dns(containers) return containers
[docs] @classmethod def lookup( cls, co: None, lo: univention.admin.uldap.access, filter_s: str, base: str = '', superordinate: Self | None = None, scope: str = 'sub', unique: bool = False, required: bool = False, timeout: int = -1, sizelimit: int = 0, serverctrls: list | None = None, response: dict | None = None, authz: bool = True, ) -> list[Self]: """ Perform a LDAP search and return a list of instances. :param co: obsolete config :param lo: UDM LDAP access object. :param filter_s: LDAP filter string. :param base: LDAP search base distinguished name. :param superordinate: Distinguished name of a superordinate object. :param scope: Specify the scope of the search to be one of `base`, `base+one`, `one`, `sub`, or `domain` to specify a base object, base plus one-level, one-level, subtree, or children search. :param unique: Raise an exception if more than one object matches. :param required: Raise an exception instead of returning an empty dictionary. :param timeout: wait at most `timeout` seconds for a search to complete. `-1` for no limit. :param sizelimit: retrieve at most `sizelimit` entries for a search. `0` for no limit. :param serverctrls: a list of :py:class:`ldap.controls.LDAPControl` instances sent to the server along with the LDAP request. :param response: An optional dictionary to receive the server controls of the result. :param authz: ignore authorization checks (**dangerous!**) :return: A list of UDM objects. """ if isinstance(lo, univention.uldap.access): log.error('Wrong access class in use! Use univention.admin.uldap instead of univention.uldap! %s', ''.join(traceback.format_stack())) warnings.warn('Wrong access class in use! Use univention.admin.uldap instead of univention.uldap!', DeprecationWarning, stacklevel=3) if configRegistry.is_true('directory/manager/type-checking/strict'): raise TypeError('Expect univention.admin.uldap.access!') filter_e = cls.lookup_filter(filter_s, lo) if superordinate: filter_e = cls.lookup_filter_superordinate(filter_e, superordinate) filter_str = str(filter_e or '') attr = cls._ldap_attributes() result = [] search_base = base or cls.ldap_base if authz and lo.authz.enabled and (not lo._verify_search_base(search_base) or not lo._verify_search_filter(filter_str)): return result for dn, attrs in lo.authz_connection.search(filter_str, search_base, scope, attr, unique, required, timeout, sizelimit, serverctrls=serverctrls, response=response): try: result.append(cls(co, lo, None, dn=dn, superordinate=superordinate, attributes=attrs)) except univention.admin.uexceptions.base as exc: log.error('lookup() of object failed', dn=dn, error=exc) if authz: result = lo.filter_lookup_results(result, {'module': cls.module, 'filter': filter_str, 'base': base or cls.ldap_base, 'scope': scope, 'attr': attr}) if required and not result: raise univention.admin.uexceptions.noObject('lookup(base=%r, filter_s=%r)' % (base, filter_e)) return result
[docs] @classmethod def lookup_filter(cls, filter_s: str | None = None, lo: univention.admin.uldap.access | None = None) -> univention.admin.filter.conjunction: """ Return a LDAP filter as a UDM filter expression. :param str filter_s: LDAP filter string. :param univention.admin.uldap.access lo: UDM LDAP access object. :returns: A LDAP filter expression. See :py:meth:`lookup`. """ filter_p = cls.unmapped_lookup_filter() # there are instances where the lookup/lookup_filter method of an module handler is called before # univention.admin.modules.update() was performed. (e.g. management/univention-directory-manager-modules/univention-dnsedit) module = univention.admin.modules._get(cls.module) filter_p.append_unmapped_filter_string(filter_s, cls.rewrite_filter, module.mapping) return filter_p
[docs] @classmethod def lookup_filter_superordinate(cls, filter: univention.admin.filter.conjunction, superordinate: Self) -> univention.admin.filter.conjunction: return filter
[docs] @classmethod def unmapped_lookup_filter(cls) -> univention.admin.filter.conjunction: """ Return a LDAP filter UDM filter expression. :returns: A LDAP filter expression. See :py:meth:`lookup_filter`. """ filter_conditions = [] if cls.use_performant_ldap_search_filter: filter_conditions.append(univention.admin.filter.expression('univentionObjectType', cls.module, escape=True)) else: object_classes = univention.admin.modules.options(cls.module).get('default', univention.admin.option()).objectClasses - { 'top', 'univentionPolicy', 'univentionObjectMetadata', 'person', } filter_conditions.extend(univention.admin.filter.expression('objectClass', ocs) for ocs in object_classes) return univention.admin.filter.conjunction('&', filter_conditions)
[docs] @classmethod def rewrite_filter(cls, filter: univention.admin.filter.expression, mapping: univention.admin.mapping.mapping) -> None: key = filter.variable try: should_map = mapping.shouldMap(key) except KeyError: should_map = False if should_map: filter.variable = mapping.mapName(key) if filter.operator == '=*': # 1. presence match. We only need to change the variable name. value is not set # 2. special case for syntax classes IStates and boolean: # properties that are represented as Checkboxes in the # frontend should include '(!(propertyName=*))' in the ldap filter # if the Checkbox is set to False to also find objects where the property # is not set. In that case we don't want to map the '*' to a different value. return # management/univention-management-console/src/univention/management/console/acl.py does not call univention.admin.modules.update() mod = univention.admin.modules._get(cls.module) property_ = mod.property_descriptions.get(key) # map options to corresponding objectClass if not property_ and key == 'options' and filter.value in getattr(mod, 'options', {}): ocs = mod.options[filter.value] filter.variable = 'objectClass' if len(ocs.objectClasses) > 1: con = univention.admin.filter.conjunction('&', [univention.admin.filter.expression('objectClass', oc, escape=True) for oc in ocs.objectClasses]) filter.transform_to_conjunction(con) elif ocs.objectClasses: filter.value = next(iter(ocs.objectClasses)) return if not should_map: return if property_ and not isinstance(filter.value, list | tuple): if property_.multivalue: # special case: mutlivalue properties need to be a list when map()-ing filter.value = [filter.value] if issubclass(property_.syntax if inspect.isclass(property_.syntax) else type(property_.syntax), univention.admin.syntax.complex): # special case: complex syntax properties need to be a list (of lists, if multivalue) filter.value = [filter.value] filter.value = mapping.mapValueDecoded(key, filter.value, encoding_errors='ignore') if isinstance(filter.value, list | tuple) and filter.value: # complex syntax filter.value = filter.value[0]
[docs] @classmethod def identify(cls, dn: str, attr: _Attributes, canonical: bool = False) -> bool: ocs = {x.decode('utf-8') for x in attr.get('objectClass', [])} required_object_classes = univention.admin.modules.options(cls.module).get('default', univention.admin.option()).objectClasses - { 'top', 'univentionPolicy', 'univentionObjectMetadata', 'person', } return (ocs & required_object_classes) == required_object_classes
_static_ldap_attributes: set[str] = set() @classmethod def _ldap_attributes(cls) -> list[str]: """Get a list of additional (operational) LDAP attributes which needs to be fetched from the LDAP server when creating an instance of this object""" return list({'*', 'entryUUID', 'entryCSN', 'modifyTimestamp'} | cls._static_ldap_attributes)
[docs] class simpleComputer(simpleLdap): def __init__( self, co: None, lo: univention.admin.uldap.access, position: univention.admin.uldap.position | None, dn: str = '', superordinate: simpleLdap | None = None, attributes: _Attributes | None = None, ) -> None: simpleLdap.__init__(self, co, lo, position, dn, superordinate, attributes) self.__changes: dict[str, Any] = {} self.newPrimaryGroupDn = 0 self.oldPrimaryGroupDn = 0 self.ip: list[str] = [] self.network_object: univention.admin.handlers.networks.network.object | None = None self.old_network = 'None' self.__saved_dhcp_entry = None # read-only attribute containing the FQDN of the host self.descriptions['fqdn'] = univention.admin.property( short_description='FQDN', long_description='', syntax=univention.admin.syntax.string, may_change=False, ) self['dnsAlias'] = [] # defined here to avoid pseudo non-None value of [''] in modwizard search self.oldinfo['ip'] = [] self.info['ip'] = [] if self.exists(): ips = [ip_address(addr.decode('ASCII')).exploded for key in ('aRecord', 'aAAARecord') for addr in self.oldattr.get(key, [])] self.oldinfo['ip'] += ips self.info['ip'] += ips
[docs] def getMachineSid(self, lo: univention.admin.uldap.access, position: univention.admin.uldap.position, uidNum: str, rid: str | None = None) -> str: # if rid is given, use it regardless of s4 connector if rid: searchResult = self.lo.authz_connection.search(filter='objectClass=sambaDomain', attr=['sambaSID']) domainsid = searchResult[0][1]['sambaSID'][0].decode('ASCII') sid = domainsid + '-' + rid return self.request_lock('sid', sid) else: # if no rid is given, create a domain sid or local sid if connector is present if self.s4connector_present: return 'S-1-4-%s' % uidNum else: num = uidNum while True: try: return self.request_lock('sid+user', num) except univention.admin.uexceptions.noLock: num = str(int(num) + 1)
# HELPER @classmethod def _ip_from_ptr(cls, zoneName: str, relativeDomainName: str) -> str: """ Extract IP address from reverse DNS record. >>> simpleComputer._ip_from_ptr("2.1.in-addr.arpa", "4.3") '1.2.3.4' >>> simpleComputer._ip_from_ptr("0.0.0.0.0.0.0.0.0.8.b.d.1.0.0.2.ip6.arpa", "1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0") '2001:db80:0000:0000:0000:0000:0000:0001' """ if 'ip6' in zoneName: return cls._ipv6_from_ptr(zoneName, relativeDomainName) else: return cls._ipv4_from_ptr(zoneName, relativeDomainName) @staticmethod def _ipv4_from_ptr(zoneName: str, relativeDomainName: str) -> str: """ Extract IPv4 address from reverse DNS record. >>> simpleComputer._ipv4_from_ptr("2.1.in-addr.arpa", "4.3") '1.2.3.4' """ return '%s.%s' % ( '.'.join(reversed(zoneName.replace('.in-addr.arpa', '').split('.'))), '.'.join(reversed(relativeDomainName.split('.'))), ) @staticmethod def _ipv6_from_ptr(zoneName: str, relativeDomainName: str) -> str: """ Extract IPv6 address from reverse DNS record. >>> simpleComputer._ipv6_from_ptr("0.0.0.0.0.0.0.0.0.8.b.d.1.0.0.2.ip6.arpa", "1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0") '2001:db80:0000:0000:0000:0000:0000:0001' """ fullName = relativeDomainName + '.' + zoneName.replace('.ip6.arpa', '') digits = fullName.split('.') blocks = [''.join(reversed(digits[i:i + 4])) for i in range(0, len(digits), 4)] return ':'.join(reversed(blocks)) @staticmethod def _is_ip(ip: str) -> bool: """ Check if valid IPv4 (0.0.0.0 is allowed) or IPv6 address. :param ip: string. :returns: `True` if it is a valid IPv4 or IPv6 address., `False` otherwise. >>> simpleComputer._is_ip('192.0.2.0') True >>> simpleComputer._is_ip('::1') True >>> simpleComputer._is_ip('') False """ try: ip_address('%s' % (ip,)) log.trace('IP[%s]? -> Yes', ip) return True except ValueError: log.debug('IP[%s]? -> No', ip) return False
[docs] def open(self) -> None: """Load the computer object from LDAP.""" simpleLdap.open(self) self.newPrimaryGroupDn = 0 self.oldPrimaryGroupDn = 0 self.ip_already_requested = 0 self.ip_freshly_set = False self.__multiip = len(self['mac']) > 1 or len(self['ip']) > 1 self['dnsEntryZoneForward'] = [] self['dnsEntryZoneReverse'] = [] self['dhcpEntryZone'] = [] self['groups'] = [] self['dnsEntryZoneAlias'] = [] # search forward zone and insert into the object if self['name']: tmppos = univention.admin.uldap.position(self.position.getDomain()) zones = [] searchFilter = filter_format('(&(objectClass=dNSZone)(relativeDomainName=%s)(!(cNAMERecord=*)))', [self['name']]) try: result = self.lo.authz_connection.search(base=tmppos.getBase(), scope='domain', filter=searchFilter, attr=['zoneName', 'aRecord', 'aAAARecord'], unique=False) for dn, attr in result: zoneName = attr['zoneName'][0].decode('UTF-8') for key in ('aRecord', 'aAAARecord'): if key in attr: zones.append((zoneName, [ip_address(x.decode('ASCII')).exploded for x in attr[key]])) self.log.debug('open zoneNames', zones=zones) for zoneName, ips in zones: searchFilter = filter_format('(&(objectClass=dNSZone)(zoneName=%s)(sOARecord=*))', [zoneName]) for dn in self.lo.authz_connection.searchDn(base=tmppos.getBase(), scope='domain', filter=searchFilter, unique=False): for ip in ips: self['dnsEntryZoneForward'].append([dn, ip]) self.log.debug('open dnsEntryZoneForward', value=self['dnsEntryZoneForward']) except univention.admin.uexceptions.insufficientInformation: self['dnsEntryZoneForward'] = [] raise for zoneName, ips in zones: searchFilter = filter_format('(&(objectClass=dNSZone)(|(PTRRecord=%s)(PTRRecord=%s.%s.)))', (self['name'], self['name'], zoneName)) try: for dn, attr in self.lo.authz_connection.search(base=tmppos.getBase(), scope='domain', attr=['relativeDomainName', 'zoneName'], filter=searchFilter, unique=False): ip = self._ip_from_ptr(attr['zoneName'][0].decode('UTF-8'), attr['relativeDomainName'][0].decode('UTF-8')) if not self._is_ip(ip): self.log.warning('open dnsEntryZoneReverse: invalid IP address generated', ip=ip) continue entry = [self.lo.parentDn(dn), ip] if entry not in self['dnsEntryZoneReverse']: self['dnsEntryZoneReverse'].append(entry) except univention.admin.uexceptions.insufficientInformation: self['dnsEntryZoneReverse'] = [] raise self.log.debug('open dnsEntryZoneReverse', value=self['dnsEntryZoneReverse']) for zoneName, ips in zones: searchFilter = filter_format('(&(objectClass=dNSZone)(|(cNAMERecord=%s)(cNAMERecord=%s.%s.)))', (self['name'], self['name'], zoneName)) try: for dn, attr in self.lo.authz_connection.search(base=tmppos.getBase(), scope='domain', attr=['relativeDomainName', 'cNAMERecord', 'zoneName'], filter=searchFilter, unique=False): dnsAlias = attr['relativeDomainName'][0].decode('UTF-8') self['dnsAlias'].append(dnsAlias) dnsAliasZoneContainer = self.lo.parentDn(dn) if attr['cNAMERecord'][0].decode('UTF-8') == self['name']: dnsForwardZone = attr['zoneName'][0].decode('UTF-8') else: dnsForwardZone = zoneName entry = [dnsForwardZone, dnsAliasZoneContainer, dnsAlias] if entry not in self['dnsEntryZoneAlias']: self['dnsEntryZoneAlias'].append(entry) except univention.admin.uexceptions.insufficientInformation: self['dnsEntryZoneAlias'] = [] raise self.log.debug('open dnsEntryZoneAlias', value=self['dnsEntryZoneAlias']) for macAddress in self['mac']: # mac address may be an empty string (Bug #21958) if not macAddress: continue ethernet = 'ethernet ' + macAddress searchFilter = filter_format('(&(dhcpHWAddress=%s)(objectClass=univentionDhcpHost))', (ethernet,)) self.log.debug('open: DHCP mac address', value=macAddress, search=searchFilter) try: for dn, attr in self.lo.authz_connection.search(base=tmppos.getBase(), scope='domain', attr=['univentionDhcpFixedAddress'], filter=searchFilter, unique=False): service = self.lo.parentDn(dn) if 'univentionDhcpFixedAddress' in attr: for ip in attr['univentionDhcpFixedAddress']: entry = (service, ip.decode('ASCII'), macAddress) if entry not in self['dhcpEntryZone']: self['dhcpEntryZone'].append(entry) else: entry = (service, '', macAddress) if entry not in self['dhcpEntryZone']: self['dhcpEntryZone'].append(entry) self.log.debug('open: setting dhcpEntryZone', value=self['dhcpEntryZone']) except univention.admin.uexceptions.insufficientInformation: raise if self.exists(): if self.has_property('network'): self.old_network = self['network'] # get groupmembership self['groups'] = self.lo.authz_connection.searchDn(base=self.lo.base, filter=filter_format('(&(objectclass=univentionGroup)(uniqueMember=%s))', [self.dn])) if 'name' in self.info and 'domain' in self.info: self.info['fqdn'] = '%s.%s' % (self['name'], self['domain'])
def __modify_dhcp_object(self, position: str, mac: str, ip: str | None = None) -> None: # identify the dhcp object with the mac address name = self['name'] self.log.debug('modify DHCP object', position=position, host=name, mac=mac, ip=ip) if not all((name, mac)): return ethernet = 'ethernet %s' % mac bip = ip.encode('ASCII') if ip else b'' tmppos = univention.admin.uldap.position(self.position.getDomain()) if not position: self.log.warning('could not access network object and given position is "None", using LDAP root as position for DHCP entry') position = tmppos.getBase() results = self.lo.authz_connection.search(base=position, scope='domain', attr=['univentionDhcpFixedAddress'], filter=filter_format('dhcpHWAddress=%s', [ethernet]), unique=False) if not results: # if the dhcp object doesn't exists, then we create it # but it is possible, that the hostname for the dhcp object is already used, so we use the _uv$NUM extension self.log.debug('The DHCP object with the MAC address does not exists, we create one', mac=ethernet) results = self.lo.authz_connection.searchDn(base=position, scope='domain', filter=filter_format('(&(objectClass=univentionDhcpHost)(|(cn=%s)(cn=%s_uv*)))', (name, name)), unique=False) if results: self.log.debug('the host "%s" already has a dhcp object, so we search for the next free uv name', name) RE = re.compile(r'cn=[^,]+_uv(\d+),') taken = {int(m.group(1)) for m in (RE.match(dn) for dn in results) if m} n = min(set(range(max(taken) + 2)) - taken) if taken else 0 name = '%s_uv%d' % (name, n) dn = 'cn=%s,%s' % (escape_dn_chars(name), position) ml = [ ('objectClass', [b'top', b'univentionObject', b'univentionDhcpHost']), ('univentionObjectType', [b'dhcp/host']), ('cn', [name.encode('UTF-8')]), ('dhcpHWAddress', [ethernet.encode('ASCII')]), ] if ip: ml.append(('univentionDhcpFixedAddress', [bip])) self.lo.authz_connection.add(dn, ml) self.log.debug('Created dhcp/host object', dn=dn) elif ip: # if the object already exists, we append or remove the ip address self.log.debug('the dhcp object with the mac address "%s" exists, we change the ip', ethernet) for dn, attr in results: if bip in attr.get('univentionDhcpFixedAddress', []): continue self.lo.authz_connection.modify(dn, [('univentionDhcpFixedAddress', b'', bip)]) self.log.debug('Modified dhcp/host object', ip=ip, dn=dn) def __rename_dns_object(self, position: univention.admin.uldap.position | None = None, old_name: str | None = None, new_name: str | None = None) -> None: for dns_line in self['dnsEntryZoneForward']: # dns_line may be the empty string if not dns_line: continue dn, ip = self.__split_dns_line(dns_line) if ':' in ip: # IPv6 results = self.lo.authz_connection.searchDn(base=dn, scope='domain', filter=filter_format('(&(relativeDomainName=%s)(aAAARecord=%s))', (old_name, ip)), unique=False) else: results = self.lo.authz_connection.searchDn(base=dn, scope='domain', filter=filter_format('(&(relativeDomainName=%s)(aRecord=%s))', (old_name, ip)), unique=False) for result in results: host = univention.admin.objects.get(univention.admin.modules.get('dns/host_record'), self.co, self.lo, position=self.position, dn=result) assert host is not None host.open() host['name'] = new_name host.modify() for dns_line in self['dnsEntryZoneReverse']: # dns_line may be the empty string if not dns_line: continue dn, ip = self.__split_dns_line(dns_line) results = self.lo.authz_connection.searchDn(base=dn, scope='domain', filter=filter_format('(|(pTRRecord=%s)(pTRRecord=%s.*))', (old_name, old_name)), unique=False) for result in results: ptr = univention.admin.objects.get(univention.admin.modules.get('dns/ptr_record'), self.co, self.lo, position=self.position, dn=result) assert ptr is not None ptr.open() ptr['ptr_record'] = [ptr_record.replace(old_name, new_name) for ptr_record in ptr.get('ptr_record', [])] ptr.modify() for entry in self['dnsEntryZoneAlias']: # entry may be the empty string if not entry: continue dnsforwardzone, dnsaliaszonecontainer, alias = entry results = self.lo.authz_connection.searchDn(base=dnsaliaszonecontainer, scope='domain', filter=filter_format('relativedomainname=%s', [alias]), unique=False) for result in results: alias = univention.admin.objects.get(univention.admin.modules.get('dns/alias'), self.co, self.lo, position=self.position, dn=result) alias.open() alias['cname'] = '%s.%s.' % (new_name, dnsforwardzone) alias.modify() def __rename_dhcp_object(self, old_name: str, new_name: str) -> None: module = univention.admin.modules.get('dhcp/host') tmppos = univention.admin.uldap.position(self.position.getDomain()) for mac in self['mac']: # mac may be the empty string if not mac: continue ethernet = 'ethernet %s' % mac results = self.lo.authz_connection.searchDn(base=tmppos.getBase(), scope='domain', filter=filter_format('dhcpHWAddress=%s', [ethernet]), unique=False) if not results: continue self.log.debug('Found DHCP objects', dhcpHWAddress=ethernet, results=results) for result in results: dhcp = univention.admin.objects.get(module, self.co, self.lo, position=self.position, dn=result) assert dhcp is not None dhcp.open() dhcp['host'] = dhcp['host'].replace(old_name, new_name) dhcp.modify() def __remove_from_dhcp_object(self, mac: str | None = None, ip: str | None = None) -> str | None: # if we got the mac address, then we remove the object # if we only got the ip address, we remove the ip address self.log.debug('we should remove a dhcp object', mac=mac, ip=ip) dn = None tmppos = univention.admin.uldap.position(self.position.getDomain()) if ip and mac: ethernet = 'ethernet %s' % mac self.log.debug('we only remove the ip from the dhcp object', ip=ip) results = self.lo.authz_connection.search( base=tmppos.getBase(), scope='domain', attr=['univentionDhcpFixedAddress'], filter=filter_format('(&(dhcpHWAddress=%s)(univentionDhcpFixedAddress=%s))', (ethernet, ip)), unique=False, ) for dn, _attr in results: host = univention.admin.objects.get(univention.admin.modules.get('dhcp/host'), self.co, self.lo, position=self.position, dn=dn) assert host is not None host.open() if ip in host['fixedaddress']: self.log.debug('fixedaddress: "%s"', host['fixedaddress']) host['fixedaddress'].remove(ip) if not host['fixedaddress']: host.remove() else: host.modify() dn = host.dn elif mac: ethernet = 'ethernet %s' % mac self.log.debug('Remove the following mac', mac=mac) results = self.lo.authz_connection.search( base=tmppos.getBase(), scope='domain', attr=['univentionDhcpFixedAddress'], filter=filter_format('dhcpHWAddress=%s', [ethernet]), unique=False, ) for dn, _attr in results: self.log.debug('... done') host = univention.admin.objects.get(univention.admin.modules.get('dhcp/host'), self.co, self.lo, position=self.position, dn=dn) assert host is not None host.remove() dn = host.dn elif ip: self.log.debug('Remove the following ip', ip=ip) results = self.lo.authz_connection.search( base=tmppos.getBase(), scope='domain', attr=['univentionDhcpFixedAddress'], filter=filter_format('univentionDhcpFixedAddress=%s', [ip]), unique=False, ) for dn, _attr in results: self.log.debug('... done') host = univention.admin.objects.get(univention.admin.modules.get('dhcp/host'), self.co, self.lo, position=self.position, dn=dn) assert host is not None host.remove() dn = host.dn return dn @classmethod def __split_dhcp_line(cls, entry: list[str]) -> tuple[str, str, str]: """ >>> simpleComputer._simpleComputer__split_dhcp_line(["service", "0011.2233.4455"]) ('service', '', '00:11:22:33:44:55') >>> simpleComputer._simpleComputer__split_dhcp_line(["service", "1.2.3.4", "00:11:22:33:44:55"]) ('service', '1.2.3.4', '00:11:22:33:44:55') """ service = entry[0] ip = '' try: # sanitize mac address # 0011.2233.4455 -> 00:11:22:33:44:55 -> is guaranteed to work together with our DHCP server # __split_dhcp_line may be used outside of UDM which means that MAC_Address.parse may not be called. mac = univention.admin.syntax.MAC_Address.parse(entry[-1]) if cls._is_ip(entry[-2]): ip = entry[-2] except univention.admin.uexceptions.valueError: mac = '' return (service, ip, mac) @classmethod def __split_dns_line(cls, entry: list[str]) -> tuple[str, str | None]: """ >>> simpleComputer._simpleComputer__split_dns_line(["zoneName"]) ('zoneName', None) >>> simpleComputer._simpleComputer__split_dns_line(["zoneName", "1.2.3.4"]) ('zoneName', '1.2.3.4') """ zone = entry[0] ip = entry[1] if len(entry) > 1 and cls._is_ip(entry[1]) else None log.debug('Split entry into zone and ip', entry=entry, zone=zone, ip=ip) return (zone, ip) def __remove_dns_reverse_object(self, name: str, dnsEntryZoneReverse: str | None, ip: str) -> None: def modify(rdn: str, zoneDN: str) -> None: zone_name = explode_rdn(zoneDN, True)[0] for dn, attributes in self.lo.authz_connection.search( scope='domain', attr=['pTRRecord'], filter=filter_format('(&(relativeDomainName=%s)(zoneName=%s))', (rdn, zone_name)), ): ptr_records = attributes.get('pTRRecord', []) removals = [] if len(ptr_records) > 1: removals = [ b'%s.%s.' % (name.encode('UTF-8'), attributes2['zoneName'][0]) for dn2, attributes2 in self.lo.authz_connection.search( scope='domain', attr=['zoneName'], filter=filter_format('(&(relativeDomainName=%s)(objectClass=dNSZone))', [name]), unique=False, ) ] if len(ptr_records) <= 1 or set(ptr_records) == set(removals): self.lo.authz_connection.delete('relativeDomainName=%s,%s' % (escape_dn_chars(rdn), zoneDN)) else: self.lo.authz_connection.modify(dn, [('pTRRecord', removals, b'')]) zone = univention.admin.handlers.dns.reverse_zone.object(self.co, self.lo, self.position, zoneDN) zone.open() zone.modify() self.log.debug('we should remove a dns reverse object', dnsEntryZoneReverse=dnsEntryZoneReverse, record_name=name, ip=ip) if dnsEntryZoneReverse: try: rdn = self.calc_dns_reverse_entry_name(ip, dnsEntryZoneReverse) except ValueError: pass else: modify(rdn, dnsEntryZoneReverse) elif ip: tmppos = univention.admin.uldap.position(self.position.getDomain()) results = self.lo.authz_connection.search( base=tmppos.getBase(), scope='domain', attr=['zoneDn'], filter=filter_format('(&(objectClass=dNSZone)(|(pTRRecord=%s)(pTRRecord=%s.*)))', (name, name)), unique=False, ) for dn, _attr in results: zone = self.lo.parentDn(dn) try: rdn = self.calc_dns_reverse_entry_name(ip, zone) modify(rdn, zone) except ValueError as ex: self.log.debug('DEBUG: rdn', error=ex) except univention.admin.uexceptions.noObject: pass def __add_dns_reverse_object(self, name: str, zoneDn: str, ip: str) -> None: self.log.debug('we should create a dns reverse object', zone=zoneDn, record_name=name, ip=ip) if not all((name, zoneDn, ip)): return addr, attr = self._ip2dns(ip) try: ipPart = self.calc_dns_reverse_entry_name(ip, zoneDn) except ValueError: raise univention.admin.uexceptions.missingInformation(_('Reverse zone and IP address are incompatible.')) tmppos = univention.admin.uldap.position(self.position.getDomain()) results = self.lo.authz_connection.search(base=tmppos.getBase(), scope='domain', attr=['zoneName'], filter=filter_format('(&(relativeDomainName=%s)(zoneName=*)(%s=%s))', (name, attr, addr.exploded)), unique=False) hostname_list = { '%s.%s.' % (name, attr['zoneName'][0].decode('UTF-8')) for dn, attr in results } if not hostname_list: self.log.error('Could not determine host record. Not creating pointer record.', record_name=name, ip=ip) return results = self.lo.authz_connection.searchDn( base=tmppos.getBase(), scope='domain', filter=filter_format('(&(relativeDomainName=%s)(%s=%s))', [ipPart, *list(str2dn(zoneDn)[0][0][:2])]), unique=False, ) if not results: self.lo.authz_connection.add( 'relativeDomainName=%s,%s' % (escape_dn_chars(ipPart), zoneDn), [ ('objectClass', [b'top', b'dNSZone', b'univentionObject']), ('univentionObjectType', [b'dns/ptr_record']), ('zoneName', [explode_rdn(zoneDn, True)[0].encode('UTF-8')]), ('relativeDomainName', [ipPart.encode('ASCII')]), ('PTRRecord', [x.encode('UTF-8') for x in hostname_list]), ], ) # update Serial zone = univention.admin.handlers.dns.reverse_zone.object(self.co, self.lo, self.position, zoneDn) zone.open() zone.modify() def __remove_dns_forward_object(self, name: str, zoneDn: str | None, ip: str | None = None) -> None: self.log.debug('we should remove a dns forward object', zone=zoneDn, record_name=name, ip=ip) if name: # check if dns forward object has more than one ip address if not ip: if zoneDn: self.lo.authz_connection.delete('relativeDomainName=%s,%s' % (escape_dn_chars(name), zoneDn)) fzo = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, zoneDn) fzo.open() fzo.modify() else: if zoneDn: base = zoneDn else: tmppos = univention.admin.uldap.position(self.position.getDomain()) base = tmppos.getBase() self.log.trace('search DNS forward object', base=base) if ':' in ip: ip = IPv6Address('%s' % (ip,)).exploded (attrEdit, attrOther) = ('aAAARecord', 'aRecord') else: (attrEdit, attrOther) = ('aRecord', 'aAAARecord') results = self.lo.authz_connection.search( base=base, scope='domain', attr=['aRecord', 'aAAARecord'], filter=filter_format('(&(relativeDomainName=%s)(%s=%s))', (name, attrEdit, ip)), unique=False, required=False, ) for dn, attr in results: if [x.decode('ASCII') for x in attr[attrEdit]] == [ip] and not attr.get(attrOther): # the <ip> to be removed is the last on the object # remove the object self.lo.authz_connection.delete(dn) else: # remove only the ip address attribute new_ip_list = copy.deepcopy(attr[attrEdit]) new_ip_list.remove(ip.encode('ASCII')) self.lo.authz_connection.modify(dn, [(attrEdit, attr[attrEdit], new_ip_list)]) zone = zoneDn or self.lo.parentDn(dn) fzo = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, zone) fzo.open() fzo.modify() def __add_related_ptrrecords(self, zoneDN: str, ip: str) -> None: if not all((zoneDN, ip)): return ptrrecord = '%s.%s.' % (self.info['name'], explode_rdn(zoneDN, True)[0]) ip_split = ip.split('.') ip_split.reverse() search_filter = filter_format('(|(relativeDomainName=%s)(relativeDomainName=%s)(relativeDomainName=%s))', (ip_split[0], '.'.join(ip_split[:1]), '.'.join(ip_split[:2]))) for dn, _attributes in self.lo.authz_connection.search(base=zoneDN, scope='domain', attr=['pTRRecord'], filter=search_filter): self.lo.authz_connection.modify(dn, [('pTRRecord', '', ptrrecord)]) def __remove_related_ptrrecords(self, zoneDN: str, ip: str) -> None: ptrrecord = '%s.%s.' % (self.info['name'], explode_rdn(zoneDN, True)[0]) ip_split = ip.split('.') ip_split.reverse() search_filter = filter_format('(|(relativeDomainName=%s)(relativeDomainName=%s)(relativeDomainName=%s))', (ip_split[0], '.'.join(ip_split[:1]), '.'.join(ip_split[:2]))) for dn, attributes in self.lo.authz_connection.search(base=zoneDN, scope='domain', attr=['pTRRecord'], filter=search_filter): if ptrrecord in attributes['pTRRecord']: self.lo.authz_connection.modify(dn, [('pTRRecord', ptrrecord, '')])
[docs] def check_common_name_length(self) -> None: self.log.debug('check_common_name_length', ip=self['ip'], dnsEntryZoneForward=self['dnsEntryZoneForward']) if self['ip'] and self['dnsEntryZoneForward']: for zone in self['dnsEntryZoneForward']: if zone == '': continue zoneName = explode_rdn(zone[0], True)[0] if len(zoneName) + len(self['name']) >= 63: self.log.warning('length of Common Name is too long', length=len(zoneName) + len(self['name']) + 1) raise univention.admin.uexceptions.commonNameTooLong()
@staticmethod def _ip2dns(addr: str) -> tuple[IPv4Address | IPv6Address, str]: """ Convert IP address string to 2-tuple (IPAddress, LdapAttributeName). :param addr: an IPv4 or IPv6 address. :returns: 2-tuple (IPAddress, LdapAttributeName) >>> simpleComputer._ip2dns('127.0.0.1') (IPv4Address(u'127.0.0.1'), 'aRecord') >>> simpleComputer._ip2dns('::1') (IPv6Address(u'::1'), 'aAAARecord') """ ip = ip_address('%s' % (addr,)) return (ip, 'aAAARecord' if isinstance(ip, IPv6Address) else 'aRecord') def __modify_dns_forward_object(self, name: str, zoneDn: str | None, new_ip: str, old_ip: str) -> None: self.log.debug('we should modify a dns forward object', zone=zoneDn, record_name=name, new_ip=new_ip, old_ip=old_ip) zone: str | None = None if old_ip and new_ip: if not zoneDn: tmppos = univention.admin.uldap.position(self.position.getDomain()) base = tmppos.getBase() else: base = zoneDn naddr, _nattr = self._ip2dns(new_ip) oaddr, oattr = self._ip2dns(old_ip) results = self.lo.authz_connection.search( base=base, scope='domain', attr=['aRecord', 'aAAARecord'], filter=filter_format('(&(relativeDomainName=%s)(%s=%s))', (name, oattr, old_ip)), unique=False, ) for dn, attr in results: old_aRecord = attr.get('aRecord', []) new_aRecord = copy.deepcopy(old_aRecord) old_aAAARecord = attr.get('aAAARecord', []) new_aAAARecord = copy.deepcopy(old_aAAARecord) if isinstance(oaddr, IPv6Address): new_aAAARecord.remove(old_ip.encode('ASCII')) else: new_aRecord.remove(old_ip.encode('ASCII')) ip = naddr.exploded.encode('ASCII') if isinstance(naddr, IPv6Address): if ip not in new_aAAARecord: new_aAAARecord.append(ip) else: if ip not in new_aRecord: new_aRecord.append(ip) modlist = [] if old_aAAARecord != new_aAAARecord: modlist.append(('aAAARecord', old_aAAARecord, new_aAAARecord)) if old_aRecord != new_aRecord: modlist.append(('aRecord', old_aRecord, new_aRecord)) self.lo.authz_connection.modify(dn, modlist) if not zoneDn: zone = self.lo.parentDn(dn) if zoneDn: zone = zoneDn if zone: self.log.debug('update the zone sOARecord for the zone', zone=zone) fzo = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, zone) fzo.open() fzo.modify() def __add_dns_forward_object(self, name: str, zoneDn: str, ip: str) -> None: self.log.debug('we should add a dns forward object', zone=zoneDn, record_name=name, ip=ip) if not all((name, ip, zoneDn)): return addr = ip_address('%s' % (ip,)) if isinstance(addr, IPv6Address): self.__add_dns_forward_object_ipv6(name, zoneDn, addr) elif isinstance(addr, IPv4Address): self.__add_dns_forward_object_ipv4(name, zoneDn, addr) def __add_dns_forward_object_ipv6(self, name: str, zoneDn: str, addr: IPv6Address) -> None: ip = addr.exploded.encode('ASCII') results = self.lo.authz_connection.search( base=zoneDn, scope='domain', attr=['aAAARecord'], filter=filter_format('(&(relativeDomainName=%s)(!(cNAMERecord=*)))', (name,)), unique=False, ) if not results: try: self.lo.authz_connection.add( 'relativeDomainName=%s,%s' % (escape_dn_chars(name), zoneDn), [ ('objectClass', [b'top', b'dNSZone', b'univentionObject']), ('univentionObjectType', [b'dns/host_record']), ('zoneName', explode_rdn(zoneDn, True)[0].encode('UTF-8')), ('aAAARecord', [ip]), ('relativeDomainName', [name.encode('UTF-8')]), ], ) except univention.admin.uexceptions.objectExists as ex: raise univention.admin.uexceptions.dnsAliasRecordExists(ex.dn) # TODO: check if zoneDn really a forwardZone, maybe it is a container under a zone zone = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, zoneDn) zone.open() zone.modify() else: for dn, attr in results: if 'aAAARecord' in attr: new_ip_list = copy.deepcopy(attr['aAAARecord']) if ip not in new_ip_list: new_ip_list.append(ip) self.lo.authz_connection.modify(dn, [('aAAARecord', attr['aAAARecord'], new_ip_list)]) else: self.lo.authz_connection.modify(dn, [('aAAARecord', b'', ip)]) def __add_dns_forward_object_ipv4(self, name: str, zoneDn: str, addr: IPv4Address) -> None: ip = addr.exploded.encode('ASCII') results = self.lo.authz_connection.search( base=zoneDn, scope='domain', attr=['aRecord'], filter=filter_format('(&(relativeDomainName=%s)(!(cNAMERecord=*)))', (name,)), unique=False, ) if not results: try: self.lo.authz_connection.add( 'relativeDomainName=%s,%s' % (escape_dn_chars(name), zoneDn), [ ('objectClass', [b'top', b'dNSZone', b'univentionObject']), ('univentionObjectType', [b'dns/host_record']), ('zoneName', explode_rdn(zoneDn, True)[0].encode('UTF-8')), ('ARecord', [ip]), ('relativeDomainName', [name.encode('UTF-8')]), ], ) except univention.admin.uexceptions.objectExists as ex: raise univention.admin.uexceptions.dnsAliasRecordExists(ex.dn) # TODO: check if zoneDn really a forwardZone, maybe it is a container under a zone zone = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, zoneDn) zone.open() zone.modify() else: for dn, attr in results: if 'aRecord' in attr: new_ip_list = copy.deepcopy(attr['aRecord']) if ip not in new_ip_list: new_ip_list.append(ip) self.lo.authz_connection.modify(dn, [('aRecord', attr['aRecord'], new_ip_list)]) else: self.lo.authz_connection.modify(dn, [('aRecord', b'', ip)]) def __add_dns_alias_object(self, name: str, dnsForwardZone: str, dnsAliasZoneContainer: str, alias: str) -> None: self.log.debug('add a dns alias object', record_name=name, dnsForwardZone=dnsForwardZone, dnsAliasZoneContainer=dnsAliasZoneContainer, alias=alias) alias = alias.rstrip('.') if name and dnsForwardZone and dnsAliasZoneContainer and alias: results = self.lo.authz_connection.search( base=dnsAliasZoneContainer, scope='domain', attr=['cNAMERecord'], filter=filter_format('relativeDomainName=%s', (alias,)), unique=False, ) if not results: self.lo.authz_connection.add( 'relativeDomainName=%s,%s' % (escape_dn_chars(alias), dnsAliasZoneContainer), [ ('objectClass', [b'top', b'dNSZone', b'univentionObject']), ('univentionObjectType', [b'dns/alias']), ('zoneName', explode_rdn(dnsAliasZoneContainer, True)[0].encode('UTF-8')), ('cNAMERecord', [b'%s.%s.' % (name.encode('UTF-8'), dnsForwardZone.encode('UTF-8'))]), ('relativeDomainName', [alias.encode('UTF-8')]), ], ) # TODO: check if dnsAliasZoneContainer really is a forwardZone, maybe it is a container under a zone zone = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, dnsAliasZoneContainer) zone.open() zone.modify() else: # throw exception, cNAMERecord is single value raise univention.admin.uexceptions.dnsAliasAlreadyUsed(_('DNS alias is already in use.')) def __remove_dns_alias_object(self, name: str, dnsForwardZone: str, dnsAliasZoneContainer: str, alias: str | None = None) -> None: self.log.debug('remove a dns alias object', record_name=name, dnsForwardZone=dnsForwardZone, dnsAliasZoneContainer=dnsAliasZoneContainer, alias=alias) if name: if alias: if dnsAliasZoneContainer: self.lo.authz_connection.delete('relativeDomainName=%s,%s' % (escape_dn_chars(alias), dnsAliasZoneContainer)) zone = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, dnsAliasZoneContainer) zone.open() zone.modify() elif dnsForwardZone: tmppos = univention.admin.uldap.position(self.position.getDomain()) base = tmppos.getBase() self.log.trace('search DNS Alias object', base=base) results = self.lo.authz_connection.search( base=base, scope='domain', attr=['zoneName'], filter=filter_format('(&(objectClass=dNSZone)(relativeDomainName=%s)(cNAMERecord=%s.%s.))', (alias, name, dnsForwardZone)), unique=False, required=False, ) for dn, attr in results: # remove the object self.lo.authz_connection.delete(dn) # and update the SOA version number for the zone results = self.lo.authz_connection.searchDn( base=tmppos.getBase(), scope='domain', filter=filter_format('(&(objectClass=dNSZone)(zoneName=%s)(sOARecord=*))', (attr['zoneName'][0].decode('UTF-8'),)), unique=False, ) for zoneDn in results: zone = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, zoneDn) zone.open() zone.modify() else: if dnsForwardZone: tmppos = univention.admin.uldap.position(self.position.getDomain()) base = tmppos.getBase() self.log.debug('search DNS Alias object', base=base) results = self.lo.authz_connection.search( base=base, scope='domain', attr=['zoneName'], filter=filter_format('(&(objectClass=dNSZone)(&(cNAMERecord=%s)(cNAMERecord=%s.%s.))', (name, name, dnsForwardZone)), unique=False, required=False, ) for dn, attr in results: # remove the object self.lo.authz_connection.delete(dn) # and update the SOA version number for the zone results = self.lo.authz_connection.searchDn( base=tmppos.getBase(), scope='domain', filter=filter_format('(&(objectClass=dNSZone)(zoneName=%s)(sOARecord=*))', (attr['zoneName'][0].decode('UTF-8'),)), unique=False, ) for zoneDn in results: zone = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, zoneDn) zone.open() zone.modify() else: # not enough info to remove alias entries pass def _ldap_post_modify(self) -> None: super()._ldap_post_modify() self.__multiip |= len(self['mac']) > 1 or len(self['ip']) > 1 for entry in self.__changes['dhcpEntryZone']['remove']: self.log.debug('dhcp check', removed=entry) dn, ip, mac = self.__split_dhcp_line(entry) if not ip and not mac and not self.__multiip: mac = '' if self['mac']: mac = self['mac'][0] self.__remove_from_dhcp_object(mac=mac) else: self.__remove_from_dhcp_object(ip=ip, mac=mac) for entry in self.__changes['dhcpEntryZone']['add']: self.log.debug('dhcp check', added=entry) dn, ip, mac = self.__split_dhcp_line(entry) if not ip and not mac and not self.__multiip: ip, mac = ('', '') if self['ip']: ip = self['ip'][0] if self['mac']: mac = self['mac'][0] self.__modify_dhcp_object(dn, mac, ip=ip) for entry in self.__changes['dnsEntryZoneForward']['remove']: dn, ip = self.__split_dns_line(entry) if not ip and not self.__multiip: ip = '' if self['ip']: ip = self['ip'][0] self.__remove_dns_forward_object(self['name'], dn, ip) self.__remove_related_ptrrecords(dn, ip) else: self.__remove_dns_forward_object(self['name'], dn, ip) self.__remove_related_ptrrecords(dn, ip) for entry in self.__changes['dnsEntryZoneForward']['add']: dn, ip = self.__split_dns_line(entry) self.log.debug('we should add a dns forward object', entry=entry, dn=dn, ip=ip) if not ip and not self.__multiip: self.log.trace('no multiip environment') ip = '' if self['ip']: ip = self['ip'][0] self.__add_dns_forward_object(self['name'], dn, ip) self.__add_related_ptrrecords(dn, ip) else: self.__add_dns_forward_object(self['name'], dn, ip) self.__add_related_ptrrecords(dn, ip) for entry in self.__changes['dnsEntryZoneReverse']['remove']: dn, ip = self.__split_dns_line(entry) if not ip and not self.__multiip: ip = '' if self['ip']: ip = self['ip'][0] self.__remove_dns_reverse_object(self['name'], dn, ip) else: self.__remove_dns_reverse_object(self['name'], dn, ip) for entry in self.__changes['dnsEntryZoneReverse']['add']: dn, ip = self.__split_dns_line(entry) if not ip and not self.__multiip: ip = '' if self['ip']: ip = self['ip'][0] self.__add_dns_reverse_object(self['name'], dn, ip) else: self.__add_dns_reverse_object(self['name'], dn, ip) for entry in self.__changes['dnsEntryZoneAlias']['remove']: dnsForwardZone, dnsAliasZoneContainer, alias = entry if not alias: # nonfunctional code since self[ 'alias' ] should be self[ 'dnsAlias' ], but this case does not seem to occur self.__remove_dns_alias_object(self['name'], dnsForwardZone, dnsAliasZoneContainer, self['alias'][0]) else: self.__remove_dns_alias_object(self['name'], dnsForwardZone, dnsAliasZoneContainer, alias) for entry in self.__changes['dnsEntryZoneAlias']['add']: dnsForwardZone, dnsAliasZoneContainer, alias = entry self.log.debug('we should add a dns alias object', entry=entry, dnsForwardZone=dnsForwardZone, dnsAliasZoneContainer=dnsAliasZoneContainer, alias=alias) if not alias: self.__add_dns_alias_object(self['name'], dnsForwardZone, dnsAliasZoneContainer, self['alias'][0]) else: self.__add_dns_alias_object(self['name'], dnsForwardZone, dnsAliasZoneContainer, alias) for entry in self.__changes['mac']['remove']: self.__remove_from_dhcp_object(mac=entry) changed_ip = False for entry in self.__changes['ip']['remove']: # self.__remove_from_dhcp_object(ip=entry) if not self.__multiip: if self.__changes['ip']['add']: # we change single_ip = self.__changes['ip']['add'][0] self.__modify_dns_forward_object(self['name'], None, single_ip, entry) changed_ip = True for mac in self['mac']: dn = self.__remove_from_dhcp_object(ip=entry, mac=mac) try: dn = self.lo.parentDn(dn) self.__modify_dhcp_object(dn, mac, ip=single_ip) except Exception: pass else: # remove the dns objects self.__remove_dns_forward_object(self['name'], None, entry) else: self.__remove_dns_forward_object(self['name'], None, entry) self.__remove_from_dhcp_object(ip=entry) self.__remove_dns_reverse_object(self['name'], None, entry) for entry in self.__changes['ip']['add']: if not self.__multiip: if self.get('dnsEntryZoneForward', []) and not changed_ip: self.__add_dns_forward_object(self['name'], self['dnsEntryZoneForward'][0][0], entry) for dnsEntryZoneReverse in self.get('dnsEntryZoneReverse', []): x, ip = self.__split_dns_line(dnsEntryZoneReverse) zoneIsV6 = explode_rdn(x, True)[0].endswith('.ip6.arpa') entryIsV6 = ':' in entry if zoneIsV6 == entryIsV6: self.__add_dns_reverse_object(self['name'], x, entry) if self.__changes['name']: self.log.debug('simpleComputer: name has changed') self.__update_groups_after_namechange() self.__rename_dhcp_object(old_name=self.__changes['name'][0], new_name=self.__changes['name'][1]) self.__rename_dns_object(position=None, old_name=self.__changes['name'][0], new_name=self.__changes['name'][1]) self.update_groups() def __remove_associated_domain(self, entry: list[str]) -> None: dn, _ip = self.__split_dns_line(entry) domain = explode_rdn(dn, 1)[0] if self.info.get('domain', None) == domain: self.info['domain'] = None def __set_associated_domain(self, entry: list[str]) -> None: dn, _ip = self.__split_dns_line(entry) domain = explode_rdn(dn, 1)[0] if not self.info.get('domain', None): self.info['domain'] = domain def _ldap_modlist(self) -> list[tuple[str, Any, Any]]: self.__changes = { 'mac': {'remove': [], 'add': []}, 'ip': {'remove': [], 'add': []}, 'name': None, 'dnsEntryZoneForward': {'remove': [], 'add': []}, 'dnsEntryZoneReverse': {'remove': [], 'add': []}, 'dnsEntryZoneAlias': {'remove': [], 'add': []}, 'dhcpEntryZone': {'remove': [], 'add': []}, } ml: list[tuple[str, Any, Any]] = [] if self.hasChanged('mac'): for macAddress in self.info.get('mac', []): if macAddress in self.oldinfo.get('mac', []): continue try: self.__changes['mac']['add'].append(self.request_lock('mac', macAddress)) except univention.admin.uexceptions.noLock: raise univention.admin.uexceptions.macAlreadyUsed(macAddress) for macAddress in self.oldinfo.get('mac', []): if macAddress in self.info.get('mac', []): continue self.__changes['mac']['remove'].append(macAddress) oldAddresses = self.oldinfo.get('ip') or () newAddresses = self.info.get('ip') or () if oldAddresses != newAddresses: old_addr = [ip_address('%s' % addr) for addr in oldAddresses] old_ipv4 = [addr.exploded.encode('ASCII') for addr in old_addr if isinstance(addr, IPv4Address)] old_ipv6 = [addr.exploded.encode('ASCII') for addr in old_addr if isinstance(addr, IPv6Address)] new_addr = [ip_address('%s' % addr) for addr in newAddresses] new_ipv4 = [addr.exploded.encode('ASCII') for addr in new_addr if isinstance(addr, IPv4Address)] new_ipv6 = [addr.exploded.encode('ASCII') for addr in new_addr if isinstance(addr, IPv6Address)] ml.append(('aRecord', old_ipv4, new_ipv4)) ml.append(('aAAARecord', old_ipv6, new_ipv6)) if self.hasChanged('ip'): for ipAddress in self['ip']: if not ipAddress: continue if ipAddress in self.oldinfo.get('ip'): continue if not self.ip_already_requested: try: ipAddress = self.request_lock('aRecord', ipAddress) except univention.admin.uexceptions.noLock: self.ip_already_requested = 0 raise univention.admin.uexceptions.ipAlreadyUsed(ipAddress) self.__changes['ip']['add'].append(ipAddress) for ipAddress in self.oldinfo.get('ip', []): if ipAddress in self.info['ip']: continue self.__changes['ip']['remove'].append(ipAddress) if self.hasChanged('name'): ml.append(('sn', self.oldattr.get('sn', [None])[0], self['name'].encode('UTF-8'))) self.__changes['name'] = (self.oldattr.get('sn', [b''])[0].decode('UTF-8') or None, self['name']) if self.hasChanged('ip') or self.hasChanged('mac'): dhcp = [self.__split_dhcp_line(entry) for entry in self.info.get('dhcpEntryZone', [])] if len(newAddresses) <= 1 and len(self.info.get('mac', [])) == 1 and dhcp: # In this special case, we assume the mapping between ip/mac address to be # unique. The dhcp entry needs to contain the mac address (as specified by # the ldap search for dhcp entries), the ip address may not correspond to # the ip address associated with the computer ldap object, but this would # be erroneous anyway. We therefore update the dhcp entry to correspond to # the current ip and mac address. (Bug #20315) self.info['dhcpEntryZone'] = [ (dn, newAddresses[0] if newAddresses else '', self.info['mac'][0]) for (dn, ip, _mac) in dhcp ] else: # in all other cases, we remove old dhcp entries that do not match ip or # mac addresses (Bug #18966) removedIPs = set(self.oldinfo.get('ip', [])) - set(self['ip']) removedMACs = set(self.oldinfo.get('mac', [])) - set(self['mac']) self.info['dhcpEntryZone'] = [ (dn, ip, _mac) for (dn, ip, _mac) in dhcp if not (ip in removedIPs or _mac in removedMACs) ] if self.hasChanged('dhcpEntryZone'): if 'dhcpEntryZone' in self.oldinfo: if 'dhcpEntryZone' in self.info: for entry in self.oldinfo['dhcpEntryZone']: if entry not in self.info['dhcpEntryZone']: self.__changes['dhcpEntryZone']['remove'].append(entry) else: for entry in self.oldinfo['dhcpEntryZone']: self.__changes['dhcpEntryZone']['remove'].append(entry) if 'dhcpEntryZone' in self.info: for entry in self.info['dhcpEntryZone']: # check if line is valid dn, _ip, mac = self.__split_dhcp_line(entry) if dn and mac: if entry not in self.oldinfo.get('dhcpEntryZone', []): self.__changes['dhcpEntryZone']['add'].append(entry) else: raise univention.admin.uexceptions.invalidDhcpEntry(_('The DHCP entry for this host should contain the zone LDAP-DN, the IP address and the MAC address.')) if self.hasChanged('dnsEntryZoneForward'): for entry in self.oldinfo.get('dnsEntryZoneForward', []): if entry not in self.info.get('dnsEntryZoneForward', []): self.__changes['dnsEntryZoneForward']['remove'].append(entry) self.__remove_associated_domain(entry) for entry in self.info.get('dnsEntryZoneForward', []): if entry == '': continue if entry not in self.oldinfo.get('dnsEntryZoneForward', []): self.__changes['dnsEntryZoneForward']['add'].append(entry) self.__set_associated_domain(entry) if self.hasChanged('dnsEntryZoneReverse'): for entry in self.oldinfo.get('dnsEntryZoneReverse', []): if entry not in self.info.get('dnsEntryZoneReverse', []): self.__changes['dnsEntryZoneReverse']['remove'].append(entry) for entry in self.info.get('dnsEntryZoneReverse', []): if entry not in self.oldinfo.get('dnsEntryZoneReverse', []): self.__changes['dnsEntryZoneReverse']['add'].append(entry) if self.hasChanged('dnsEntryZoneAlias'): for entry in self.oldinfo.get('dnsEntryZoneAlias', []): if entry not in self.info.get('dnsEntryZoneAlias', []): self.__changes['dnsEntryZoneAlias']['remove'].append(entry) for entry in self.info.get('dnsEntryZoneAlias', []): # check if line is valid dnsForwardZone, dnsAliasZoneContainer, alias = entry if dnsForwardZone and dnsAliasZoneContainer and alias: if entry not in self.oldinfo.get('dnsEntryZoneAlias', []): self.__changes['dnsEntryZoneAlias']['add'].append(entry) else: raise univention.admin.uexceptions.invalidDNSAliasEntry( _('The DNS alias entry for this host should contain the zone name, the alias zone container LDAP-DN and the alias.'), ) self.__multiip = len(self['mac']) > 1 or len(self['ip']) > 1 ml += super()._ldap_modlist() return ml
[docs] @classmethod def calc_dns_reverse_entry_name(cls, sip: str, reverseDN: str) -> str: """ >>> simpleComputer.calc_dns_reverse_entry_name('10.200.2.5', 'subnet=2.200.10.in-addr.arpa') u'5' >>> simpleComputer.calc_dns_reverse_entry_name('10.200.2.5', 'subnet=200.10.in-addr.arpa') u'5.2' >>> simpleComputer.calc_dns_reverse_entry_name('2001:db8::3', 'subnet=0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa') u'3.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0' >>> simpleComputer.calc_dns_reverse_entry_name('1.2.3.4', 'subnet=2.in-addr.arpa') Traceback (most recent call last): ... ValueError: 4.3.2.1.in-addr.arpa not in .2.in-addr.arpa """ addr = ip_address('%s' % (sip,)) rev = addr.reverse_pointer subnet = '.%s' % (explode_rdn(reverseDN, True)[0],) if not rev.endswith(subnet): raise ValueError('%s not in %s' % (rev, subnet)) return rev[: -len(subnet)]
def _ldap_pre_create(self) -> None: super()._ldap_pre_create() self.check_common_name_length() def _ldap_pre_modify(self) -> None: super()._ldap_pre_modify() self.check_common_name_length() def _ldap_post_create(self) -> None: super()._ldap_post_create() for entry in self.__changes['dhcpEntryZone']['remove']: self.log.debug('dhcp check', removed=entry) dn, ip, mac = self.__split_dhcp_line(entry) if not ip and not mac and not self.__multiip: mac = self['mac'][0] if self['mac'] else '' self.__remove_from_dhcp_object(mac=mac) else: self.__remove_from_dhcp_object(ip=ip, mac=mac) for entry in self.__changes['dhcpEntryZone']['add']: self.log.debug('dhcp check', added=entry) dn, ip, mac = self.__split_dhcp_line(entry) if not ip and not mac and not self.__multiip: if self['ip'] and self['mac']: self.__modify_dhcp_object(dn, self['mac'][0], ip=self['ip'][0]) else: self.__modify_dhcp_object(dn, mac, ip=ip) for entry in self.__changes['dnsEntryZoneForward']['remove']: dn, ip = self.__split_dns_line(entry) if not ip and not self.__multiip: ip = self['ip'][0] if self['ip'] else '' self.__remove_dns_forward_object(self['name'], dn, ip) else: self.__remove_dns_forward_object(self['name'], dn, ip) for entry in self.__changes['dnsEntryZoneForward']['add']: dn, ip = self.__split_dns_line(entry) self.log.debug('we should add a dns forward object', entry=entry, dn=dn, ip=ip) if not ip and not self.__multiip: self.log.trace('no multiip environment') ip = self['ip'][0] if self['ip'] else '' self.__add_dns_forward_object(self['name'], dn, ip) else: self.__add_dns_forward_object(self['name'], dn, ip) for entry in self.__changes['dnsEntryZoneReverse']['remove']: dn, ip = self.__split_dns_line(entry) if not ip and not self.__multiip: ip = self['ip'][0] if self['ip'] else '' self.__remove_dns_reverse_object(self['name'], dn, ip) else: self.__remove_dns_reverse_object(self['name'], dn, ip) for entry in self.__changes['dnsEntryZoneReverse']['add']: dn, ip = self.__split_dns_line(entry) if not ip and not self.__multiip: ip = self['ip'][0] if self['ip'] else '' self.__add_dns_reverse_object(self['name'], dn, ip) else: self.__add_dns_reverse_object(self['name'], dn, ip) if not self.__multiip and self.get('dhcpEntryZone', []): dn, ip, mac = self['dhcpEntryZone'][0] for entry in self.__changes['mac']['add']: if self['ip']: self.__modify_dhcp_object(dn, entry, ip=self['ip'][0]) else: self.__modify_dhcp_object(dn, entry) if self['mac']: for entry in self.__changes['ip']['add']: self.__modify_dhcp_object(dn, self['mac'][0], ip=entry) for entry in self.__changes['dnsEntryZoneAlias']['remove']: dnsForwardZone, dnsAliasZoneContainer, alias = entry # nonfunctional code since self['alias'] should be self['dnsAlias'], but this case does not seem to occur self.__remove_dns_alias_object(self['name'], dnsForwardZone, dnsAliasZoneContainer, alias or self['alias'][0]) for entry in self.__changes['dnsEntryZoneAlias']['add']: dnsForwardZone, dnsAliasZoneContainer, alias = entry self.log.debug('we should add a dns alias object', entry=entry, dnsForwardZone=dnsForwardZone, dnsAliasZoneContainer=dnsAliasZoneContainer, alias=alias) self.__add_dns_alias_object(self['name'], dnsForwardZone, dnsAliasZoneContainer, alias or self['alias'][0]) self.update_groups() def _ldap_post_remove(self) -> None: if self['mac']: for macAddress in self['mac']: if macAddress: self.alloc.append(('mac', macAddress)) if self['ip']: for ipAddress in self['ip']: if ipAddress: self.alloc.append(('aRecord', ipAddress)) super()._ldap_post_remove() # remove computer from groups groups = copy.deepcopy(self['groups']) if self.oldinfo.get('primaryGroup'): groups.append(self.oldinfo.get('primaryGroup')) for group in groups: groupObject = univention.admin.objects.get(univention.admin.modules.get('groups/group'), self.co, self.lo, self.position, group) groupObject.fast_member_remove([self.dn], [x.decode('UTF-8') for x in self.oldattr.get('uid', [])], ignore_license=True) def __update_groups_after_namechange(self) -> None: oldname = self.oldinfo.get('name') newname = self.info.get('name') if not oldname: self.log.error('__update_groups_after_namechange: oldname is empty') return olddn = self.old_dn.encode('UTF-8') newdn = self.dn.encode('UTF-8') oldUid = b'%s$' % oldname.encode('UTF-8') newUid = b'%s$' % newname.encode('UTF-8') self.log.debug('__update_groups_after_namechange', olddn=olddn, newdn=newdn) new_groups = set(self.info.get('groups', [])) old_groups = set(self.oldinfo.get('groups', [])) for group in new_groups | old_groups: # Using the UDM groups/group object does not work at this point. The computer object has already been renamed. # During open() of groups/group each member is checked if it exists. Because the computer object with "olddn" is missing, # it won't show up in groupobj['hosts']. That's why the uniqueMember/memberUid updates is done directly via # self.lo.authz_connection.modify() oldMemberUids = self.lo.authz_connection.getAttr(group, 'memberUid') newMemberUids = copy.deepcopy(oldMemberUids) if group in new_groups: self.log.debug('__update_groups_after_namechange: changing memberUid', group=group) if oldUid in newMemberUids: newMemberUids.remove(oldUid) if newUid not in newMemberUids: newMemberUids.append(newUid) self.lo.authz_connection.modify(group, [('memberUid', oldMemberUids, newMemberUids)]) else: self.log.debug('__update_groups_after_namechange: removing memberUid', group=group) if oldUid in oldMemberUids: oldMemberUids = oldUid newMemberUids = b'' self.lo.authz_connection.modify(group, [('memberUid', oldMemberUids, newMemberUids)]) # we are doing the uniqueMember seperately because of a potential refint overlay that already changed the dn for us oldUniqueMembers = self.lo.authz_connection.getAttr(group, 'uniqueMember') newUniqueMembers = copy.deepcopy(oldUniqueMembers) if group in new_groups: self.log.debug('__update_groups_after_namechange: changing uniqueMember', group=group) if olddn in newUniqueMembers: newUniqueMembers.remove(olddn) if newdn not in newUniqueMembers: newUniqueMembers.append(newdn) self.lo.authz_connection.modify(group, [('uniqueMember', oldUniqueMembers, newUniqueMembers)]) else: if olddn in oldUniqueMembers: self.log.debug('__update_groups_after_namechange: removing uniqueMember', group=group) oldUniqueMembers = olddn newUniqueMembers = b'' self.lo.authz_connection.modify(group, [('uniqueMember', oldUniqueMembers, newUniqueMembers)]) if newdn in oldUniqueMembers: self.log.debug('__update_groups_after_namechange: removing uniqueMember', group=group) oldUniqueMembers = newdn newUniqueMembers = b'' self.lo.authz_connection.modify(group, [('uniqueMember', oldUniqueMembers, newUniqueMembers)])
[docs] def update_groups(self) -> None: if not self.hasChanged('groups') and not self.oldPrimaryGroupDn and not self.newPrimaryGroupDn: return self.log.debug('updating groups') old_groups = DN.set(self.oldinfo.get('groups', [])) new_groups = DN.set(self.info.get('groups', [])) if self.oldPrimaryGroupDn: old_groups += DN.set([self.oldPrimaryGroupDn]) if self.newPrimaryGroupDn: new_groups.add(DN(self.newPrimaryGroupDn)) # prevent machineAccountGroup from being removed if self.has_property('machineAccountGroup'): machine_account_group = DN.set([self['machineAccountGroup']]) new_groups += machine_account_group old_groups -= machine_account_group for group in old_groups ^ new_groups: groupdn = str(group) groupObject = univention.admin.objects.get(univention.admin.modules.get('groups/group'), self.co, self.lo, self.position, groupdn) assert groupObject is not None groupObject.open() # add this computer to the group hosts = DN.set(groupObject['hosts']) if group not in new_groups: # remove this computer from the group hosts.discard(DN(self.old_dn)) else: hosts.add(DN(self.dn)) groupObject['hosts'] = list(DN.values(hosts)) groupObject.modify(ignore_license=True)
[docs] def primary_group(self) -> None: if not self.hasChanged('primaryGroup'): return self.log.debug('updating primary groups') primaryGroupNumber = self.lo.authz_connection.getAttr(self['primaryGroup'], 'gidNumber', required=True) self.newPrimaryGroupDn = self['primaryGroup'] self.lo.authz_connection.modify(self.dn, [('gidNumber', b'None', primaryGroupNumber[0])]) if 'samba' in self.options: primaryGroupSambaNumber = self.lo.authz_connection.getAttr(self['primaryGroup'], 'sambaSID', required=True) self.lo.authz_connection.modify(self.dn, [('sambaPrimaryGroupSID', b'None', primaryGroupSambaNumber[0])])
[docs] def cleanup(self) -> None: self.open() if self['dnsEntryZoneForward']: for dnsEntryZoneForward in self['dnsEntryZoneForward']: dn, ip = self.__split_dns_line(dnsEntryZoneForward) try: self.__remove_dns_forward_object(self['name'], dn, None) except Exception as e: self.log.warning('error removing dnsEntryZoneForward', dnsEntryZoneForward=dnsEntryZoneForward, error=e) if self['dnsEntryZoneReverse']: for dnsEntryZoneReverse in self['dnsEntryZoneReverse']: dn, ip = self.__split_dns_line(dnsEntryZoneReverse) try: self.__remove_dns_reverse_object(self['name'], dn, ip) except Exception as e: self.log.warning('error removing dnsEntryZoneReverse', dnsEntryZoneReverse=dnsEntryZoneReverse, error=e) if self['dhcpEntryZone']: for dhcpEntryZone in self['dhcpEntryZone']: dn, ip, mac = self.__split_dhcp_line(dhcpEntryZone) try: self.__remove_from_dhcp_object(mac=mac) except Exception as e: self.log.warning('error removing dhcpEntryZone', dhcpEntryZone=dhcpEntryZone, error=e) if self['dnsEntryZoneAlias']: for entry in self['dnsEntryZoneAlias']: dnsForwardZone, dnsAliasZoneContainer, alias = entry try: self.__remove_dns_alias_object(self['name'], dnsForwardZone, dnsAliasZoneContainer, alias) except Exception as e: self.log.warning('error removing dnsEntryZoneAlias', dnsEntryZoneAlias=entry, error=e) # remove service record entries (see Bug #26400) self.log.debug('_ldap_post_remove: clean up service records, host records, and IP address saved at the forward zone') ips = set(self['ip'] or []) fqdn = self['fqdn'] fqdnDot = '%s.' % fqdn # we might have entries w/ or w/out trailing '.' # iterate over all reverse zones for zone in self['dnsEntryZoneReverse'] or []: # load zone object self.log.debug('clean up entries for zone', zone=zone) if not zone: continue zoneObj = univention.admin.objects.get(univention.admin.modules.get('dns/reverse_zone'), self.co, self.lo, self.position, dn=zone[0]) assert zoneObj is not None zoneObj.open() # clean up nameserver records if 'nameserver' in zoneObj and fqdnDot in zoneObj['nameserver']: self.log.debug('removing from dns zone', value=fqdnDot, zone=zone[0]) # nameserver is required in reverse zone if len(zoneObj['nameserver']) > 1: zoneObj['nameserver'].remove(fqdnDot) zoneObj.modify() # iterate over all forward zones (again, as we are doing it already above!) for zone in self['dnsEntryZoneForward'] or []: # load zone object self.log.debug('clean up entries for zone', zone=zone) if not zone: continue zoneObj = univention.admin.objects.get(univention.admin.modules.get('dns/forward_zone'), self.co, self.lo, self.position, dn=zone[0]) assert zoneObj is not None zoneObj.open() self.log.debug('zone', aRecords=zoneObj['a']) zone_obj_modified = False # clean up nameserver records if 'nameserver' in zoneObj and fqdnDot in zoneObj['nameserver']: self.log.debug('removing from dns zone', value=fqdnDot, zone=zone) # nameserver is required in forward zone if len(zoneObj['nameserver']) > 1: zoneObj['nameserver'].remove(fqdnDot) zone_obj_modified = True # clean up aRecords of zone itself new_entries = list(set(zoneObj['a']) - ips) if len(new_entries) != len(zoneObj['a']): self.log.debug('Clean up zone records', old=zoneObj['a'], new=new_entries) zoneObj['a'] = new_entries zone_obj_modified = True if zone_obj_modified: zoneObj.modify() # clean up service records for irecord in univention.admin.modules.lookup('dns/srv_record', self.co, self.lo, base=self.lo.base, scope='sub', superordinate=zoneObj): irecord.open() new_entries = [j for j in irecord['location'] if fqdn not in j and fqdnDot not in j] if len(new_entries) != len(irecord['location']): self.log.debug('Entry found in SRV record', record=irecord.dn, old=irecord['location'], new=new_entries) irecord['location'] = new_entries irecord.modify() # clean up host records again (that should probably be done correctly by Samba4); note: this will usually not find anything as this is already done above host_filter = univention.admin.filter.conjunction('|', [univention.admin.filter.expression('a', _ip, escape=True) for _ip in ips]) for irecord in univention.admin.modules.lookup('dns/host_record', self.co, self.lo, str(host_filter), base=self.lo.base, scope='sub', superordinate=zoneObj): irecord.open() new_entries = list(set(irecord['a']) - ips) if len(new_entries) != len(irecord['a']): self.log.debug('Entry found in Host record', record=irecord.dn, old=irecord['a'], new=new_entries) irecord['a'] = new_entries irecord.modify()
def __setitem__(self, key: str, value: object) -> None: raise_after = None ips = [ip for ip in self['ip'] if ip] if self.has_property('ip') and self['ip'] else [] ip1 = self['ip'][0] if len(ips) == 1 else '' macs = [mac for mac in self['mac'] if mac] if self.has_property('mac') and self['mac'] else [] mac1 = self['mac'][0] if len(macs) == 1 else '' if key == 'network': if self.old_network != value and value and value != 'None': assert isinstance(value, str) network_object = univention.admin.handlers.networks.network.object(self.co, self.lo, self.position, value) network_object.open() subnet = ip_network('%(network)s/%(netmask)s' % network_object, strict=False) if not ips or ip_address('%s' % (ip1,)) not in subnet: if self.ip_freshly_set: raise_after = univention.admin.uexceptions.ipOverridesNetwork else: # get next IP network_object.refreshNextIp() self['ip'] = network_object['nextIp'] ips = [ip for ip in self['ip'] if ip] if self.has_property('ip') and self['ip'] else [] ip1 = self['ip'][0] if len(ips) == 1 else '' try: self.ip = self.request_lock('aRecord', self['ip'][0]) self.ip_already_requested = True except univention.admin.uexceptions.noLock: pass self.network_object = network_object if network_object['dnsEntryZoneForward'] and ip1: self['dnsEntryZoneForward'] = [[network_object['dnsEntryZoneForward'], ip1]] if network_object['dnsEntryZoneReverse'] and ip1: self['dnsEntryZoneReverse'] = [[network_object['dnsEntryZoneReverse'], ip1]] if network_object['dhcpEntryZone']: if ip1 and mac1: self['dhcpEntryZone'] = [(network_object['dhcpEntryZone'], ip1, mac1)] else: self.__saved_dhcp_entry = network_object['dhcpEntryZone'] self.old_network = value elif key == 'ip': ips = [ip for ip in value if ip] if self.has_property('ip') else [] ip1 = ips[0] if len(ips) >= 1 else '' self.ip_freshly_set = True if not self.ip or self.ip != value: if self.ip_already_requested: univention.admin.allocators.release(self.lo, self.position, 'aRecord', self.ip) self.ip_already_requested = 0 if value and self.network_object: if self.network_object['dnsEntryZoneForward'] and ip1: self['dnsEntryZoneForward'] = [[self.network_object['dnsEntryZoneForward'], ip1]] if self.network_object['dnsEntryZoneReverse'] and ip1: self['dnsEntryZoneReverse'] = [[self.network_object['dnsEntryZoneReverse'], ip1]] if self.network_object['dhcpEntryZone']: if ip1 and macs: self['dhcpEntryZone'] = [(self.network_object['dhcpEntryZone'], ip1, mac1)] else: self.__saved_dhcp_entry = self.network_object['dhcpEntryZone'] if not self.ip: self.ip_freshly_set = False elif key == 'mac' and self.__saved_dhcp_entry and ip1 and macs: if isinstance(value, list): self['dhcpEntryZone'] = [(self.__saved_dhcp_entry, ip1, value[0])] else: self['dhcpEntryZone'] = [(self.__saved_dhcp_entry, ip1, value)] super().__setitem__(key, value) if raise_after: raise raise_after
[docs] class simplePolicy(simpleLdap): """Base class for policies/* UDM modules""" def __init__( self, co: None, lo: univention.admin.uldap.access, position: univention.admin.uldap.position, dn: str = '', superordinate: simpleLdap | None = None, attributes: _Attributes | None = None, ) -> None: self.resultmode = 0 if not hasattr(self, 'cloned'): self.cloned: str | None = None if not hasattr(self, 'changes'): self.changes = 0 if not hasattr(self, 'policy_attrs'): self.policy_attrs: dict[str, dict[str, Any]] = {} if not hasattr(self, 'referring_object_dn'): self.referring_object_dn: str | None = None simpleLdap.__init__(self, co, lo, position, dn, superordinate, attributes) def _ldap_post_remove(self) -> None: super()._ldap_post_remove() for object_dn in self.lo.authz_connection.searchDn(filter_format('univentionPolicyReference=%s', [self.dn])): try: self.lo.authz_connection.modify(object_dn, [('univentionPolicyReference', self.dn.encode('UTF-8'), None)]) except (univention.admin.uexceptions.base, ldap.LDAPError) as exc: self.log.error('Could not remove policy reference', policy=self.dn, dn=object_dn, error=exc)
[docs] def copyIdentifier(self, from_object: simpleLdap) -> None: """Activate the result mode and set the referring object""" self.resultmode = 1 for key, property in from_object.descriptions.items(): if property.identifies: for key2, property2 in self.descriptions.items(): if property2.identifies: self.info[key2] = from_object.info[key] self.referring_object_dn = from_object.dn if not self.referring_object_dn: self.referring_object_dn = from_object.position.getDn() self.referring_object_position_dn = from_object.position.getDn()
[docs] def clone(self, referring_object: simpleLdap) -> None: """ Marks the object as a not existing one containing values retrieved by evaluating the policies for the given object """ self.cloned = self.dn self.dn = '' self._set_log() self.copyIdentifier(referring_object)
[docs] def getIdentifier(self) -> str: for key, property in self.descriptions.items(): if property.identifies and key in self.info and self.info[key]: return key raise ValueError()
def __makeUnique(self) -> None: identifier = self.getIdentifier() components = self.info[identifier].split('_uv') if len(components) > 1: try: n = int(components[1]) n += 1 except ValueError: n = 1 else: n = 0 self.info[identifier] = '%s_uv%d' % (components[0], n) self.log.debug('make unique', result=self.info[identifier])
[docs] def create(self, serverctrls: list[ldap.controls.LDAPControl] | None = None, response: dict[str, Any] | None = None) -> str: if not self.resultmode: return super().create(serverctrls=serverctrls, response=response) self._exists = False try: self.oldinfo = {} dn = super().create(serverctrls=serverctrls, response=response) self.log.debug('Created policy', properties=self.info) except univention.admin.uexceptions.objectExists: self.__makeUnique() dn = self.create() return dn
[docs] def policy_result(self, faked_policy_reference: str | list[str] | None = None) -> None: """ This method retrieves the policy values currently effective for this object. If the 'resultmode' is not active the evaluation is cancelled. If faked_policy_reference is given at the top object (referring_object_dn) this policy object temporarily referenced. faked_policy_reference can be a string or a list of strings. """ if not self.resultmode: return self.polinfo_more = {} if not self.policy_attrs: policies = [] if isinstance(faked_policy_reference, list | tuple): policies.extend(faked_policy_reference) elif faked_policy_reference: policies.append(faked_policy_reference) self.__load_policies(policies) if hasattr(self, '_custom_policy_result_map'): self._custom_policy_result_map() else: values = {} for attr_name, value_dict in self.policy_attrs.items(): value_dict = copy.deepcopy(value_dict) values[attr_name] = copy.copy(value_dict['value']) value_dict['value'] = [x.decode('UTF-8') for x in value_dict['value']] self.polinfo_more[self.mapping.unmapName(attr_name)] = value_dict self.polinfo = univention.admin.mapping.mapDict(self.mapping, values) self.polinfo = self._post_unmap(self.polinfo, values)
def __load_policies(self, policies: list[str] | None = None) -> None: if not self.policy_attrs: # the referring object does not exist yet if self.referring_object_dn != self.referring_object_position_dn: result = self.lo.authz_connection.getPolicies(self.lo.parentDn(self.referring_object_dn), policies=policies) else: result = self.lo.authz_connection.getPolicies(self.referring_object_position_dn, policies=policies) for policy_oc, attrs in result.items(): if univention.admin.objects.ocToType(policy_oc) == self.module: self.policy_attrs = attrs def __getitem__(self, key: str) -> object: if not self.resultmode: if self.has_property('emptyAttributes') and self.mapping.mapName(key) and self.mapping.mapName(key) in simpleLdap.__getitem__(self, 'emptyAttributes'): self.log.debug('simplePolicy.__getitem__: Empty Attribute', property=key) if self.descriptions[key].multivalue: return [] else: return '' return simpleLdap.__getitem__(self, key) self.policy_result() if (key in self.polinfo and not (key in self.info or key in self.oldinfo)) or ( key in self.polinfo_more and 'fixed' in self.polinfo_more[key] and self.polinfo_more[key]['fixed'] ): if self.descriptions[key].multivalue and not isinstance(self.polinfo[key], list): # why isn't this correct in the first place? self.polinfo[key] = [self.polinfo[key]] self.log.debug('simplePolicy.__getitem__', property=key, value=self.polinfo[key]) return self.polinfo[key] result = simpleLdap.__getitem__(self, key) self.log.debug('simplePolicy.__getitem__', property=key, value=result) return result
[docs] def fixedAttributes(self) -> dict[str, bool]: """Return effectively fixed attributes.""" if not self.resultmode: return {} self.__load_policies(None) return { self.mapping.unmapName(attr_name): value_dict.get('fixed', False) for attr_name, value_dict in self.policy_attrs.items() }
[docs] def emptyAttributes(self) -> dict[str, bool]: """return effectively empty attributes.""" if not self.has_property('emptyAttributes'): return {} return { self.mapping.unmapName(attrib): True for attrib in simpleLdap.__getitem__(self, 'emptyAttributes') or () }
def __setitem__(self, key: str, newvalue: object) -> None: if not self.resultmode: simpleLdap.__setitem__(self, key, newvalue) return self.policy_result() if key in self.polinfo: if self.polinfo[key] != newvalue or self.polinfo_more[key]['policy'] == self.cloned or (key in self.info and self.info[key] != newvalue): if self.polinfo_more[key]['fixed'] and self.polinfo_more[key]['policy'] != self.cloned: raise univention.admin.uexceptions.policyFixedAttribute(key) simpleLdap.__setitem__(self, key, newvalue) self.log.debug('polinfo: setting property', property=key, value=newvalue) if self.hasChanged(key): self.log.debug('polinfo: has changed', property=key) self.changes = 1 return # this object did not exist before if not self.oldinfo: # if this attribute is of type boolean and the new value is equal to the default, than ignore this "change" if isinstance(self.descriptions[key].syntax, univention.admin.syntax.boolean): default = self.descriptions[key].base_default if isinstance(self.descriptions[key].base_default, tuple | list): default = self.descriptions[key].base_default[0] if (not default and newvalue == '0') or default == newvalue: return simpleLdap.__setitem__(self, key, newvalue) if self.hasChanged(key): self.changes = 1
class _MergedAttributes: """Evaluates old attributes and the modlist to get a new representation of the object.""" def __init__(self, obj: simpleLdap, modlist: Iterable[tuple[Any, ...]]) -> None: self.obj = obj self.modlist = [x if len(x) == 3 else (x[0], None, x[-1]) for x in modlist] self.case_insensitive_attributes = ['objectClass'] def get_attributes(self) -> dict[str, list[bytes]]: attributes = set(self.obj.oldattr.keys()) | {x[0] for x in self.modlist} return {attr: self.get_attribute(attr) for attr in attributes} def get_attribute(self, attr: str) -> list[bytes]: values = set(self.obj.oldattr.get(attr, [])) # evaluate the modlist and apply all changes to the current values for att, old, new in self.modlist: if att.lower() != attr.lower(): continue new = [] if not new else [new] if isinstance(new, bytes) else new old = [] if not old else [old] if isinstance(old, bytes) else old if not old and new: # MOD_ADD values |= set(new) elif not new and old: # MOD_DELETE values -= set(old) elif old and new: # MOD_REPLACE values = set(new) return list(values)