Source code for univention.admin.uldap

# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""|UDM| wrapper around :py:mod:`univention.uldap` that replaces exceptions."""

from __future__ import annotations

import time
from typing import TYPE_CHECKING

import ldap

import univention.admin.license
import univention.uldap
from univention.admin import localization
from univention.admin._ucr import configRegistry
from univention.admin.log import log as udm_log, log_ldap as log
from univention.dn import DN


if TYPE_CHECKING:
    from collections.abc import Callable
    from typing import Any

__all__ = ('DN', 'access', 'domain', 'explodeDn', 'getAdminConnection', 'getBaseDN', 'getMachineConnection', 'position')

translation = localization.translation('univention/admin')
_ = translation.translate

explodeDn = univention.uldap.explodeDn


[docs] def getBaseDN(host: str = 'localhost', port: int | None = None, uri: str | None = None) -> str: """ Return the naming context of the LDAP server. :param str host: The hostname of the LDAP server. :param int port: The TCP port number of the LDAP server. :param str uri: A complete LDAP URI. :returns: The distinguished name of the LDAP root. """ if not uri: if not port: port = int(configRegistry.get('ldap/server/port', 7389)) uri = 'ldap://%s:%s' % (host, port) try: lo = ldap.ldapobject.ReconnectLDAPObject(uri, trace_stack_limit=None) result = lo.search_s('', ldap.SCOPE_BASE, 'objectClass=*', ['NamingContexts']) return result[0][1]['namingContexts'][0].decode('utf-8') except ldap.SERVER_DOWN: time.sleep(60) lo = ldap.ldapobject.ReconnectLDAPObject(uri, trace_stack_limit=None) result = lo.search_s('', ldap.SCOPE_BASE, 'objectClass=*', ['NamingContexts']) return result[0][1]['namingContexts'][0].decode('utf-8')
[docs] def getAdminConnection(start_tls: int | None = None, decode_ignorelist: None = None) -> tuple[univention.admin.uldap.access, univention.admin.uldap.position]: """ Open a LDAP connection using the admin credentials. :param int start_tls: Negotiate TLS with server. If `2` is given, the command will require the operation to be successful. :return: A 2-tuple (LDAP-access, LDAP-position) """ lo = univention.uldap.getAdminConnection(start_tls) pos = position(lo.base) return access(lo=lo), pos
[docs] def getMachineConnection(start_tls: int | None = None, decode_ignorelist: None = None, ldap_master: bool = True) -> tuple[univention.admin.uldap.access, univention.admin.uldap.position]: """ Open a LDAP connection using the machine credentials. :param int start_tls: Negotiate TLS with server. If `2` is given, the command will require the operation to be successful. :param bool ldap_master: Open a connection to the Primary if True, to the preferred LDAP server otherwise. :return: A 2-tuple (LDAP-access, LDAP-position) """ lo = univention.uldap.getMachineConnection(start_tls, ldap_master=ldap_master) pos = position(lo.base) return access(lo=lo), pos
def _err2str(err: Exception) -> str: """ Convert exception arguments to string. :param Exception err: An exception instance. :returns: A concatenated string formatted from the exception """ msgs = [] for iarg in err.args: if isinstance(iarg, dict): msg = ': '.join([str(m) for m in (iarg.get('desc'), iarg.get('info')) if m]) else: msg = str(iarg) if msg: msgs.append(msg) if not msgs: msgs.append(': '.join([str(type(err).__name__), str(err)])) return '. '.join(msgs)
[docs] class domain: """A |UDM| domain name.""" def __init__(self, lo: univention.admin.uldap.access, position: univention.admin.uldap.position) -> None: """ :param univention.admin.uldap.access lo: A LDAP connection object. :param univention.admin.uldap.position position: A UDM position specifying the LDAP base container. """ self.lo = lo self.position = position self.domain = self.lo.authz_connection.get(self.position.getDomain(), attr=['sambaDomain', 'sambaSID', 'krb5RealmName'])
[docs] def getKerberosRealm(self) -> str | None: """ Return the name of the Kerberos realms. :returns: The name of the Kerberos realm. """ if 'krb5RealmName' not in self.domain: return None return self.domain['krb5RealmName'][0].decode('ASCII')
[docs] class position: """ The position of a |LDAP| container. Supports relative distinguished names. """ def __init__(self, base: str, loginDomain: str = '') -> None: """ :param str base: The base distinguished name. :param str loginDomain: The login domain name. """ if not base: raise univention.admin.uexceptions.insufficientInformation(_('There was no LDAP base specified.')) self.__loginDomain = loginDomain or base self.__base = base self.__pos = '' self.__indomain = False
[docs] def setBase(self, base: str) -> None: """ Set a new base distinguished name. :param str base: The new base distinguished name. """ self.__base = base
[docs] def setLoginDomain(self, loginDomain: str) -> None: """ Set a new login domain name. :param str loginDomain: The new login domain name. """ self.__loginDomain = loginDomain
def __setPosition(self, pos: str) -> None: self.__pos = pos self.__indomain = any(y[0] == 'dc' for x in ldap.dn.str2dn(self.__pos) for y in x)
[docs] def getDn(self) -> str: """ Return the distinguished name. :returns: The absolute DN. """ return ldap.dn.dn2str(ldap.dn.str2dn(self.__pos) + ldap.dn.str2dn(self.__base))
[docs] def setDn(self, dn: str) -> None: """ Set a new distinguished name. :param str dn: The new distinguished name. """ # strip out the trailing base from the DN; store relative dn dn = ldap.dn.str2dn(dn) base = ldap.dn.str2dn(self.getBase()) if dn[-len(base):] == base: dn = dn[: -len(base)] self.__setPosition(ldap.dn.dn2str(dn))
[docs] def getRdn(self) -> str: """ Return the distinguished name relative to the LDAP base. :returns: The relative DN. """ return ldap.dn.explode_rdn(self.getDn())[0]
[docs] def getBase(self) -> str: """ Return the LDAP base DN. :returns: The distinguished name of the LDAP base. """ return self.__base
[docs] def isBase(self) -> bool: """ Check if the position equals the LDAP base DN. :returns: True if the position equals the base DN, False otherwise. """ return access.compare_dn(self.getDn(), self.getBase())
[docs] def getDomain(self) -> str: """ Return the distinguished name of the domain part of the position. :returns: The distinguished name. """ if not self.__indomain or self.getDn() == self.getBase(): return self.getBase() dn = [] for part in ldap.dn.str2dn(self.getDn())[::-1]: if not any(y[0] == 'dc' for y in part): break dn.append(part) return ldap.dn.dn2str(dn[::-1])
[docs] def getDomainConfigBase(self) -> str: """ Return the distinguished name of the configuration container. :returns: The distinguished name. """ return 'cn=univention,' + self.getDomain()
[docs] def isDomain(self) -> bool: """ Check if the position equals the domain DN. :returns: True if the position equals the domain DN, False otherwise. """ return self.getDn() == self.getDomain()
[docs] def getLoginDomain(self) -> str: """ Return the login domain name. :returns: The login domain name. """ return self.__loginDomain
[docs] def switchToParent(self) -> bool: """ Switch position to parent container. :returns: False if already at the Base, True otherwise. """ if self.isBase(): return False self.__setPosition(ldap.dn.dn2str(ldap.dn.str2dn(self.__pos)[1:])) return True
[docs] class access: """A |UDM| class to access a |LDAP| server.""" @property def binddn(self) -> str | None: """ Return the distinguished name of the account. :returns: The distinguished name of the account (or `None` with |SAML|). """ return self.lo.binddn @property def bindpw(self) -> str: """ Return the user password or credentials. :returns: The user password or credentials. """ return self.lo.bindpw @property def host(self) -> str: """ Return the host name of the LDAP server. :returns: the host name of the LDAP server. """ return self.lo.host @property def port(self) -> int: """ Return the TCP port number of the LDAP server. :returns: the TCP port number of the LDAP server. """ return self.lo.port @property def base(self) -> str: """ Return the LDAP base of the LDAP server. :returns: the LDAP base of the LDAP server. """ return self.lo.base @property def start_tls(self) -> int: return self.lo.start_tls def __init__( self, host: str = 'localhost', port: int | None = None, base: str = '', binddn: str = '', bindpw: str = '', start_tls: int | None = None, lo: univention.uldap.access | None = None, follow_referral: bool = False, uri: str | None = None, ) -> None: """ :param host: The hostname of the |LDAP| server. :param port: The |TCP| port number of the |LDAP| server. :param base: The base distinguished name. :param binddn: The distinguished name of the account. :param bindpw: The user password for simple authentication. :param start_tls: Negotiate |TLS| with server. If `2` is given, the command will require the operation to be successful. :param lo: |LDAP| connection. :param follow_referral: Follow |LDAP| referrals. :param uri: LDAP connection string. """ if lo: self.lo = lo else: if not port: port = int(configRegistry.get('ldap/server/port', 7389)) try: self.lo = univention.uldap.access(host, port, base, binddn, bindpw, start_tls, uri=uri, follow_referral=follow_referral) except ldap.INVALID_CREDENTIALS: raise univention.admin.uexceptions.authFail(_('Authentication failed')) except ldap.UNWILLING_TO_PERFORM: raise univention.admin.uexceptions.authFail(_('Authentication failed')) self.require_license = False self.allow_modify = True self.licensetypes = ['UCS'] from univention.admin.authorization import Authorization self.authz = Authorization() @property def authz_connection(self): """ A LDAP connection running with cn=admin priviledges (in case authorization engine is enabled, otherwise just self) .. warning :: use carefully: only in combination with manual access control checks to prevent information leak or priviledge escalation """ return self.authz.get_authz_connection(self)
[docs] def bind(self, binddn: str, bindpw: str) -> None: """ Do simple LDAP bind using DN and password. :param str binddn: The distinguished name of the account. :param str bindpw: The user password for simple authentication. """ try: self.lo.bind(binddn, bindpw) except ldap.INVALID_CREDENTIALS: raise univention.admin.uexceptions.authFail(_('Authentication failed')) except ldap.UNWILLING_TO_PERFORM: raise univention.admin.uexceptions.authFail(_('Authentication failed')) self.__require_licence()
[docs] def bind_saml(self, bindpw: str) -> None: """ Do LDAP bind using SAML message. :param str bindpw: The SAML authentication cookie. """ try: return self.lo.bind_saml(bindpw) except (ldap.INVALID_CREDENTIALS, ldap.UNWILLING_TO_PERFORM): raise univention.admin.uexceptions.authFail(_('Authentication failed')) self.__require_licence()
[docs] def bind_oauthbearer(self, authzid: str | None, bindpw: str) -> None: """ Do LDAP bind using OAuth 2.0 Access Token. :param str authzid: Authorization Identifier :param str bindpw: The Access Token (as JWT) """ try: return self.lo.bind_oauthbearer(authzid, bindpw) except (ldap.INVALID_CREDENTIALS, ldap.UNWILLING_TO_PERFORM) as exc: log.debug('OAUTHBEARER authentication failed', error=repr(exc)) raise univention.admin.uexceptions.authFail(_('Authentication failed')) self.__require_licence()
def __require_licence(self) -> None: if self.require_license: res = univention.admin.license.init_select(self.lo, 'admin') assert univention.admin.license._license self.licensetypes = univention.admin.license._license.types if res == 1: self.allow_modify = False raise univention.admin.uexceptions.licenseClients() elif res == 2: self.allow_modify = False raise univention.admin.uexceptions.licenseAccounts() elif res == 3: self.allow_modify = False raise univention.admin.uexceptions.licenseDesktops() elif res == 4: self.allow_modify = False raise univention.admin.uexceptions.licenseGroupware() elif res == 5: # Free for personal use edition raise univention.admin.uexceptions.freeForPersonalUse() # License Version 2: elif res == 6: self.allow_modify = False raise univention.admin.uexceptions.licenseUsers() elif res == 7: self.allow_modify = False raise univention.admin.uexceptions.licenseServers() elif res == 8: self.allow_modify = False raise univention.admin.uexceptions.licenseManagedClients() elif res == 9: self.allow_modify = False raise univention.admin.uexceptions.licenseCorporateClients() elif res == 10: self.allow_modify = False raise univention.admin.uexceptions.licenseDVSUsers() elif res == 11: self.allow_modify = False raise univention.admin.uexceptions.licenseDVSClients()
[docs] def unbind(self) -> None: """Unauthenticate.""" self.lo.unbind()
[docs] def whoami(self) -> str: """ Return the distinguished name of the authenticated user. :returns: The distinguished name. """ return self.lo.whoami()
[docs] def requireLicense(self, require: bool = True) -> None: """ Enable or disable the UCS licence check. :param bool require: `True` to require a valid licence. """ self.require_license = require
def _validateLicense(self) -> None: """Check if the UCS licence is valid.""" if self.require_license: univention.admin.license.select('admin')
[docs] def get_schema(self) -> ldap.schema.subentry.SubSchema: """ Retrieve |LDAP| schema information from |LDAP| server. :returns: The |LDAP| schema. """ return self.lo.get_schema()
[docs] @classmethod def compare_dn(cls, a: str, b: str) -> bool: """ Compare two distinguished names for equality. :param str a: The first distinguished name. :param str b: A second distinguished name. :returns: True if the DNs are the same, False otherwise. """ return univention.uldap.access.compare_dn(a, b)
[docs] def get(self, dn: str, attr: list[str] = [], required: bool = False, exceptions: bool = False) -> dict[str, list[bytes]]: """ Return multiple attributes of a single LDAP object. :param str dn: The distinguished name of the object to lookup. :param attr: The list of attributes to fetch. :param bool required: Raise an exception instead of returning an empty dictionary. :param bool exceptions: Ignore. :returns: A dictionary mapping the requested attributes to a list of their values. :raises ldap.NO_SUCH_OBJECT: If the LDAP object is not accessible. """ return self.lo.get(dn, attr, required)
[docs] def getAttr(self, dn: str, attr: str, required: bool = False, exceptions: bool = False) -> list[bytes]: """ Return a single attribute of a single LDAP object. :param str dn: The distinguished name of the object to lookup. :param str attr: The attribute to fetch. :param bool required: Raise an exception instead of returning an empty dictionary. :param bool exceptions: Ignore. :returns: A list of values. :raises ldap.NO_SUCH_OBJECT: If the LDAP object is not accessible. """ return self.lo.getAttr(dn, attr, required)
[docs] def search(self, filter: str = '(objectClass=*)', base: str = '', scope: str = 'sub', attr: list[str] = [], unique: bool = False, required: bool = False, timeout: int = -1, sizelimit: int = 0, serverctrls: list[ldap.controls.LDAPControl] | None = None, response: dict[str, ldap.controls.LDAPControl] | None = None) -> list[tuple[str, dict[str, list[bytes]]]]: """ Perform LDAP search and return values. :param str filter: LDAP search filter. :param str base: the starting point for the search. :param str scope: Specify the scope of the search to be one of `base`, `base+one`, `one`, `sub`, or `domain` to specify a base object, base plus one-level, one-level, subtree, or children search. :param attr: The list of attributes to fetch. :param bool unique: Raise an exception if more than one object matches. :param bool required: Raise an exception instead of returning an empty dictionary. :param int timeout: wait at most `timeout` seconds for a search to complete. `-1` for no limit. :param int sizelimit: retrieve at most `sizelimit` entries for a search. `0` for no limit. :param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request :param dict response: An optional dictionary to receive the server controls of the result. :returns: A list of 2-tuples (dn, values) for each LDAP object, where values is a dictionary mapping attribute names to a list of values. :raises univention.admin.uexceptions.noObject: Indicates the target object cannot be found. :raises univention.admin.uexceptions.insufficientInformation: Indicates that the matching rule specified in the search filter does not match a rule defined for the attribute's syntax. :raises univention.admin.uexceptions.ldapTimeout: Indicates that the time limit of the LDAP client was exceeded while waiting for a result. :raises univention.admin.uexceptions.ldapSizelimitExceeded: Indicates that in a search operation, the size limit specified by the client or the server has been exceeded. :raises univention.admin.uexceptions.ldapError: Indicates that the search method was called with an invalid search filter. :raises univention.admin.uexceptions.ldapError: Indicates that the syntax of the DN is incorrect. :raises univention.admin.uexceptions.ldapError: on any other LDAP error. """ try: return self.lo.search(filter, base, scope, attr, unique, required, timeout, sizelimit, serverctrls=serverctrls, response=response) except ldap.NO_SUCH_OBJECT as msg: raise univention.admin.uexceptions.noObject(_err2str(msg)) except ldap.INAPPROPRIATE_MATCHING as msg: raise univention.admin.uexceptions.insufficientInformation(_err2str(msg)) except (ldap.TIMEOUT, ldap.TIMELIMIT_EXCEEDED) as msg: raise univention.admin.uexceptions.ldapTimeout(_err2str(msg)) except (ldap.SIZELIMIT_EXCEEDED, ldap.ADMINLIMIT_EXCEEDED) as msg: raise univention.admin.uexceptions.ldapSizelimitExceeded(_err2str(msg)) except ldap.FILTER_ERROR as msg: raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), filter)) except ldap.INVALID_DN_SYNTAX as msg: raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), base), original_exception=msg) except ldap.LDAPError as msg: raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs] def searchDn(self, filter: str = '(objectClass=*)', base: str = '', scope: str = 'sub', unique: bool = False, required: bool = False, timeout: int = -1, sizelimit: int = 0, serverctrls: list[ldap.controls.LDAPControl] | None = None, response: dict[str, ldap.controls.LDAPControl] | None = None) -> list[str]: """ Perform LDAP search and return distinguished names only. :param str filter: LDAP search filter. :param str base: the starting point for the search. :param str scope: Specify the scope of the search to be one of `base`, `base+one`, `one`, `sub`, or `domain` to specify a base object, base plus one-level, one-level, subtree, or children search. :param bool unique: Raise an exception if more than one object matches. :param bool required: Raise an exception instead of returning an empty dictionary. :param int timeout: wait at most timeout seconds for a search to complete. `-1` for no limit. :param int sizelimit: retrieve at most sizelimit entries for a search. `0` for no limit. :param serverctrls: a list of :py:class:`ldap.controls.LDAPControl` instances sent to the server along with the LDAP request. :param dict response: An optional dictionary to receive the server controls of the result. :returns: A list of distinguished names. :raises univention.admin.uexceptions.noObject: Indicates the target object cannot be found. :raises univention.admin.uexceptions.insufficientInformation: Indicates that the matching rule specified in the search filter does not match a rule defined for the attribute's syntax. :raises univention.admin.uexceptions.ldapTimeout: Indicates that the time limit of the LDAP client was exceeded while waiting for a result. :raises univention.admin.uexceptions.ldapSizelimitExceeded: Indicates that in a search operation, the size limit specified by the client or the server has been exceeded. :raises univention.admin.uexceptions.ldapError: Indicates that the search method was called with an invalid search filter. :raises univention.admin.uexceptions.ldapError: Indicates that the syntax of the DN is incorrect. :raises univention.admin.uexceptions.ldapError: on any other LDAP error. """ try: return self.lo.searchDn(filter, base, scope, unique, required, timeout, sizelimit, serverctrls=serverctrls, response=response) except ldap.NO_SUCH_OBJECT as msg: raise univention.admin.uexceptions.noObject(_err2str(msg)) except ldap.INAPPROPRIATE_MATCHING as msg: raise univention.admin.uexceptions.insufficientInformation(_err2str(msg)) except (ldap.TIMEOUT, ldap.TIMELIMIT_EXCEEDED) as msg: raise univention.admin.uexceptions.ldapTimeout(_err2str(msg)) except (ldap.SIZELIMIT_EXCEEDED, ldap.ADMINLIMIT_EXCEEDED) as msg: raise univention.admin.uexceptions.ldapSizelimitExceeded(_err2str(msg)) except ldap.FILTER_ERROR as msg: raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), filter)) except ldap.INVALID_DN_SYNTAX as msg: raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), base), original_exception=msg) except ldap.LDAPError as msg: raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs] def getPolicies(self, dn: str, policies: list[str] | None = None, attrs: dict[str, list[Any]] | None = None, result: Any = None, fixedattrs: Any = None) -> dict[str, dict[str, Any]]: """ Return |UCS| policies for |LDAP| entry. :param str dn: The distinguished name of the |LDAP| entry. :param list policies: List of policy object classes... :param dict attrs: |LDAP| attributes. If not given, the data is fetched from LDAP. :param result: UNUSED! :param fixedattrs: UNUSED! :returns: A mapping of policy names to """ udm_log.debug('get policies', dn=dn) return self.lo.getPolicies(dn, policies, attrs, result, fixedattrs)
[docs] def add(self, dn: str, al: list[tuple[str, Any]], exceptions: bool = False, serverctrls: list[ldap.controls.LDAPControl] | None = None, response: dict | None = None, ignore_license: bool = False) -> None: """ Add LDAP entry at distinguished name and attributes in add_list=(attribute-name, old-values. new-values) or (attribute-name, new-values). :param str dn: The distinguished name of the object to add. :param al: The add-list of 2-tuples (attribute-name, new-values). :param bool exceptions: Raise the low level exception instead of the wrapping UDM exceptions. :param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request :param bool ignore_license: Ignore license check if True. :param dict response: An optional dictionary to receive the server controls of the result. :raises univention.admin.uexceptions.licenseDisableModify: if the UCS licence prohibits any modificcation :raises univention.admin.uexceptions.objectExists: if the LDAP object already exists. :raises univention.admin.uexceptions.permissionDenied: if the user does not have the required permissions. :raises univention.admin.uexceptions.ldapError: if the syntax of the DN is invalid. :raises univention.admin.uexceptions.ldapError: on any other LDAP error. """ self._validateLicense() if not self.allow_modify and not ignore_license: udm_log.error('adding disabled by license', dn=dn) raise univention.admin.uexceptions.licenseDisableModify() log.debug('add', dn=dn, addlist=al) if exceptions: return self.lo.add(dn, al, serverctrls=serverctrls, response=response) try: return self.lo.add(dn, al, serverctrls=serverctrls, response=response) except ldap.ALREADY_EXISTS as msg: log.debug('add failed', dn=dn, error=msg) raise univention.admin.uexceptions.objectExists(dn) except ldap.INSUFFICIENT_ACCESS as msg: log.debug('add failed', dn=dn, error=msg) raise univention.admin.uexceptions.permissionDenied() except ldap.INVALID_DN_SYNTAX as msg: raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), dn), original_exception=msg) except ldap.LDAPError as msg: log.debug('add failed', dn=dn, error=msg) raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs] def modify(self, dn: str, changes: list[tuple[str, Any, Any]], exceptions: bool = False, ignore_license: int = False, serverctrls: list[ldap.controls.LDAPControl] | None = None, response: dict | None = None, rename_callback: Callable | None = None) -> str: """ Modify LDAP entry DN with attributes in changes=(attribute-name, old-values, new-values). :param str dn: The distinguished name of the object to modify. :param changes: The modify-list of 3-tuples (attribute-name, old-values, new-values). :param bool exceptions: Raise the low level exception instead of the wrapping UDM exceptions. :param bool ignore_license: Ignore license check if True. :param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request :param dict response: An optional dictionary to receive the server controls of the result. :returns: The distinguished name. """ self._validateLicense() if not self.allow_modify and not ignore_license: udm_log.error('modify disabled by license', dn=dn) raise univention.admin.uexceptions.licenseDisableModify() log.debug('modify', dn=dn, modlist=changes) if exceptions: return self.lo.modify(dn, changes, serverctrls=serverctrls, response=response, rename_callback=rename_callback) try: return self.lo.modify(dn, changes, serverctrls=serverctrls, response=response, rename_callback=rename_callback) except ldap.NO_SUCH_OBJECT as msg: log.debug('modify failed', dn=dn, error=msg) raise univention.admin.uexceptions.noObject(dn) except ldap.INSUFFICIENT_ACCESS as msg: log.debug('modify failed', dn=dn, error=msg) raise univention.admin.uexceptions.permissionDenied() except ldap.INVALID_DN_SYNTAX as msg: raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), dn), original_exception=msg) except ldap.LDAPError as msg: log.debug('modify failed', dn=dn, error=msg) raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs] def rename(self, dn: str, newdn: str, move_childs: int = 0, ignore_license: bool = False, serverctrls: list[ldap.controls.LDAPControl] | None = None, response: dict | None = None) -> None: """ Rename a LDAP object. :param dn: The old distinguished name of the object to rename. :param newdn: The new distinguished name of the object to rename. :param move_childs: Also rename the sub children. Must be `0` always as `1` is not implemented. :param ignore_license: Ignore license check if True. :param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request :param response: An optional dictionary to receive the server controls of the result. """ if move_childs != 0: raise univention.admin.uexceptions.noObject(_('Moving children is not supported.')) self._validateLicense() if not self.allow_modify and not ignore_license: udm_log.warning('rename disabled by license', dn=dn) raise univention.admin.uexceptions.licenseDisableModify() log.debug('rename', dn=dn, newdn=newdn) try: return self.lo.rename(dn, newdn, serverctrls=serverctrls, response=response) except ldap.NO_SUCH_OBJECT as msg: log.debug('rename failed', dn=dn, error=msg) raise univention.admin.uexceptions.noObject(dn) except ldap.INSUFFICIENT_ACCESS as msg: log.debug('rename failed', dn=dn, error=msg) raise univention.admin.uexceptions.permissionDenied() except ldap.INVALID_DN_SYNTAX as msg: raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), dn), original_exception=msg) except ldap.LDAPError as msg: log.debug('rename failed', dn=dn, error=msg) raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs] def delete(self, dn: str, exceptions: bool = False) -> None: """ Delete a LDAP object. :param str dn: The distinguished name of the object to remove. :param bool exceptions: Raise the low level exception instead of the wrapping UDM exceptions. :raises univention.admin.uexceptions.noObject: if the object does not exist. :raises univention.admin.uexceptions.permissionDenied: if the user does not have the required permissions. :raises univention.admin.uexceptions.ldapError: if the syntax of the DN is invalid. :raises univention.admin.uexceptions.ldapError: on any other LDAP error. """ self._validateLicense() if exceptions: try: return self.lo.delete(dn) except ldap.INSUFFICIENT_ACCESS: raise univention.admin.uexceptions.permissionDenied() log.debug('delete', dn=dn) try: return self.lo.delete(dn) except ldap.NO_SUCH_OBJECT as msg: log.debug('delete failed', dn=dn, error=msg) raise univention.admin.uexceptions.noObject(dn) except ldap.INSUFFICIENT_ACCESS as msg: log.debug('delete failed', dn=dn, error=msg, binddn=self.binddn) raise univention.admin.uexceptions.permissionDenied() except ldap.INVALID_DN_SYNTAX as msg: raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), dn), original_exception=msg) except ldap.LDAPError as msg: log.debug('delete failed', dn=dn, error=msg) raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs] def parentDn(self, dn: str) -> str | None: """ Return the parent container of a distinguished name. :param str dn: The distinguished name. :return: The parent distinguished name or None if the LDAP base is reached. """ return self.lo.parentDn(dn)
[docs] def explodeDn(self, dn: str, notypes: bool = False) -> list[str]: """ Break up a DN into its component parts. :param str dn: The distinguished name. :param bool notypes: Return only the component's attribute values if True. Also the attribute types if False. :return: A list of relative distinguished names. """ return self.lo.explodeDn(dn, notypes)
[docs] def filter_lookup_results(self, results, context=None): """Evaluate access control rules for filtering of results""" # TODO: check if we are allowed at all to search in the base, with the scope and the given filter for the attrs return self.authz.filter_search_results(self, results, context=context)
[docs] def search_filtered(self, context, filter='(objectClass=*)', base='', *args, **kwargs): if not self._verify_search_base(base) or not self._verify_search_filter(filter): return [] results = self.authz_connection.search(filter, base, *args, **kwargs) return self._filter_ldap_search_results(results, dict(kwargs, **(context or {})))
[docs] def search_dn_filtered(self, context, filter='(objectClass=*)', base='', *args, **kwargs): if not self._verify_search_base(base) or not self._verify_search_filter(filter): return [] results = self.authz_connection.searchDn(filter, base, *args, **kwargs) return self._filter_ldap_search_dns(results, dict(kwargs, **(context or {})))
def _filter_ldap_search_results(self, results, options=None): """Evaluate access control rules for filtering of results""" return self.authz.filter_search_results_attrs(self, results) def _filter_ldap_search_dns(self, results, context=None): """Evaluate access control rules for filtering of results""" return self.authz.filter_search_results_dn(self, results, context=context) def _verify_search_base(self, base): # mimic ldapsearch behavior base = base or configRegistry['ldap/base'] if not self._filter_ldap_search_dns([base]): raise univention.admin.uexceptions.noObject(base) return True def _verify_search_filter(self, filter_s): # TODO: parse search filter and disallow certain filters for UDM property names where no "search" or "read" permission exists return True