# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""|UDM| wrapper around :py:mod:`univention.uldap` that replaces exceptions."""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
import ldap
import univention.admin.license
import univention.uldap
from univention.admin import localization
from univention.admin._ucr import configRegistry
from univention.admin.log import log as udm_log, log_ldap as log
from univention.dn import DN
if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any
__all__ = ('DN', 'access', 'domain', 'explodeDn', 'getAdminConnection', 'getBaseDN', 'getMachineConnection', 'position')
translation = localization.translation('univention/admin')
_ = translation.translate
explodeDn = univention.uldap.explodeDn
[docs]
def getBaseDN(host: str = 'localhost', port: int | None = None, uri: str | None = None) -> str:
"""
Return the naming context of the LDAP server.
:param str host: The hostname of the LDAP server.
:param int port: The TCP port number of the LDAP server.
:param str uri: A complete LDAP URI.
:returns: The distinguished name of the LDAP root.
"""
if not uri:
if not port:
port = int(configRegistry.get('ldap/server/port', 7389))
uri = 'ldap://%s:%s' % (host, port)
try:
lo = ldap.ldapobject.ReconnectLDAPObject(uri, trace_stack_limit=None)
result = lo.search_s('', ldap.SCOPE_BASE, 'objectClass=*', ['NamingContexts'])
return result[0][1]['namingContexts'][0].decode('utf-8')
except ldap.SERVER_DOWN:
time.sleep(60)
lo = ldap.ldapobject.ReconnectLDAPObject(uri, trace_stack_limit=None)
result = lo.search_s('', ldap.SCOPE_BASE, 'objectClass=*', ['NamingContexts'])
return result[0][1]['namingContexts'][0].decode('utf-8')
[docs]
def getAdminConnection(start_tls: int | None = None, decode_ignorelist: None = None) -> tuple[univention.admin.uldap.access, univention.admin.uldap.position]:
"""
Open a LDAP connection using the admin credentials.
:param int start_tls: Negotiate TLS with server. If `2` is given, the command will require the operation to be successful.
:return: A 2-tuple (LDAP-access, LDAP-position)
"""
lo = univention.uldap.getAdminConnection(start_tls)
pos = position(lo.base)
return access(lo=lo), pos
[docs]
def getMachineConnection(start_tls: int | None = None, decode_ignorelist: None = None, ldap_master: bool = True) -> tuple[univention.admin.uldap.access, univention.admin.uldap.position]:
"""
Open a LDAP connection using the machine credentials.
:param int start_tls: Negotiate TLS with server. If `2` is given, the command will require the operation to be successful.
:param bool ldap_master: Open a connection to the Primary if True, to the preferred LDAP server otherwise.
:return: A 2-tuple (LDAP-access, LDAP-position)
"""
lo = univention.uldap.getMachineConnection(start_tls, ldap_master=ldap_master)
pos = position(lo.base)
return access(lo=lo), pos
def _err2str(err: Exception) -> str:
"""
Convert exception arguments to string.
:param Exception err: An exception instance.
:returns: A concatenated string formatted from the exception
"""
msgs = []
for iarg in err.args:
if isinstance(iarg, dict):
msg = ': '.join([str(m) for m in (iarg.get('desc'), iarg.get('info')) if m])
else:
msg = str(iarg)
if msg:
msgs.append(msg)
if not msgs:
msgs.append(': '.join([str(type(err).__name__), str(err)]))
return '. '.join(msgs)
[docs]
class domain:
"""A |UDM| domain name."""
def __init__(self, lo: univention.admin.uldap.access, position: univention.admin.uldap.position) -> None:
"""
:param univention.admin.uldap.access lo: A LDAP connection object.
:param univention.admin.uldap.position position: A UDM position specifying the LDAP base container.
"""
self.lo = lo
self.position = position
self.domain = self.lo.authz_connection.get(self.position.getDomain(), attr=['sambaDomain', 'sambaSID', 'krb5RealmName'])
[docs]
def getKerberosRealm(self) -> str | None:
"""
Return the name of the Kerberos realms.
:returns: The name of the Kerberos realm.
"""
if 'krb5RealmName' not in self.domain:
return None
return self.domain['krb5RealmName'][0].decode('ASCII')
[docs]
class position:
"""
The position of a |LDAP| container.
Supports relative distinguished names.
"""
def __init__(self, base: str, loginDomain: str = '') -> None:
"""
:param str base: The base distinguished name.
:param str loginDomain: The login domain name.
"""
if not base:
raise univention.admin.uexceptions.insufficientInformation(_('There was no LDAP base specified.'))
self.__loginDomain = loginDomain or base
self.__base = base
self.__pos = ''
self.__indomain = False
[docs]
def setBase(self, base: str) -> None:
"""
Set a new base distinguished name.
:param str base: The new base distinguished name.
"""
self.__base = base
[docs]
def setLoginDomain(self, loginDomain: str) -> None:
"""
Set a new login domain name.
:param str loginDomain: The new login domain name.
"""
self.__loginDomain = loginDomain
def __setPosition(self, pos: str) -> None:
self.__pos = pos
self.__indomain = any(y[0] == 'dc' for x in ldap.dn.str2dn(self.__pos) for y in x)
[docs]
def getDn(self) -> str:
"""
Return the distinguished name.
:returns: The absolute DN.
"""
return ldap.dn.dn2str(ldap.dn.str2dn(self.__pos) + ldap.dn.str2dn(self.__base))
[docs]
def setDn(self, dn: str) -> None:
"""
Set a new distinguished name.
:param str dn: The new distinguished name.
"""
# strip out the trailing base from the DN; store relative dn
dn = ldap.dn.str2dn(dn)
base = ldap.dn.str2dn(self.getBase())
if dn[-len(base):] == base:
dn = dn[: -len(base)]
self.__setPosition(ldap.dn.dn2str(dn))
[docs]
def getRdn(self) -> str:
"""
Return the distinguished name relative to the LDAP base.
:returns: The relative DN.
"""
return ldap.dn.explode_rdn(self.getDn())[0]
[docs]
def getBase(self) -> str:
"""
Return the LDAP base DN.
:returns: The distinguished name of the LDAP base.
"""
return self.__base
[docs]
def isBase(self) -> bool:
"""
Check if the position equals the LDAP base DN.
:returns: True if the position equals the base DN, False otherwise.
"""
return access.compare_dn(self.getDn(), self.getBase())
[docs]
def getDomain(self) -> str:
"""
Return the distinguished name of the domain part of the position.
:returns: The distinguished name.
"""
if not self.__indomain or self.getDn() == self.getBase():
return self.getBase()
dn = []
for part in ldap.dn.str2dn(self.getDn())[::-1]:
if not any(y[0] == 'dc' for y in part):
break
dn.append(part)
return ldap.dn.dn2str(dn[::-1])
[docs]
def getDomainConfigBase(self) -> str:
"""
Return the distinguished name of the configuration container.
:returns: The distinguished name.
"""
return 'cn=univention,' + self.getDomain()
[docs]
def isDomain(self) -> bool:
"""
Check if the position equals the domain DN.
:returns: True if the position equals the domain DN, False otherwise.
"""
return self.getDn() == self.getDomain()
[docs]
def getLoginDomain(self) -> str:
"""
Return the login domain name.
:returns: The login domain name.
"""
return self.__loginDomain
[docs]
def switchToParent(self) -> bool:
"""
Switch position to parent container.
:returns: False if already at the Base, True otherwise.
"""
if self.isBase():
return False
self.__setPosition(ldap.dn.dn2str(ldap.dn.str2dn(self.__pos)[1:]))
return True
[docs]
class access:
"""A |UDM| class to access a |LDAP| server."""
@property
def binddn(self) -> str | None:
"""
Return the distinguished name of the account.
:returns: The distinguished name of the account (or `None` with |SAML|).
"""
return self.lo.binddn
@property
def bindpw(self) -> str:
"""
Return the user password or credentials.
:returns: The user password or credentials.
"""
return self.lo.bindpw
@property
def host(self) -> str:
"""
Return the host name of the LDAP server.
:returns: the host name of the LDAP server.
"""
return self.lo.host
@property
def port(self) -> int:
"""
Return the TCP port number of the LDAP server.
:returns: the TCP port number of the LDAP server.
"""
return self.lo.port
@property
def base(self) -> str:
"""
Return the LDAP base of the LDAP server.
:returns: the LDAP base of the LDAP server.
"""
return self.lo.base
@property
def start_tls(self) -> int:
return self.lo.start_tls
def __init__(
self,
host: str = 'localhost',
port: int | None = None,
base: str = '',
binddn: str = '',
bindpw: str = '',
start_tls: int | None = None,
lo: univention.uldap.access | None = None,
follow_referral: bool = False,
uri: str | None = None,
) -> None:
"""
:param host: The hostname of the |LDAP| server.
:param port: The |TCP| port number of the |LDAP| server.
:param base: The base distinguished name.
:param binddn: The distinguished name of the account.
:param bindpw: The user password for simple authentication.
:param start_tls: Negotiate |TLS| with server. If `2` is given, the command will require the operation to be successful.
:param lo: |LDAP| connection.
:param follow_referral: Follow |LDAP| referrals.
:param uri: LDAP connection string.
"""
if lo:
self.lo = lo
else:
if not port:
port = int(configRegistry.get('ldap/server/port', 7389))
try:
self.lo = univention.uldap.access(host, port, base, binddn, bindpw, start_tls, uri=uri, follow_referral=follow_referral)
except ldap.INVALID_CREDENTIALS:
raise univention.admin.uexceptions.authFail(_('Authentication failed'))
except ldap.UNWILLING_TO_PERFORM:
raise univention.admin.uexceptions.authFail(_('Authentication failed'))
self.require_license = False
self.allow_modify = True
self.licensetypes = ['UCS']
from univention.admin.authorization import Authorization
self.authz = Authorization()
@property
def authz_connection(self):
"""
A LDAP connection running with cn=admin priviledges (in case authorization engine is enabled, otherwise just self)
.. warning :: use carefully: only in combination with manual access control checks to prevent information leak or priviledge escalation
"""
return self.authz.get_authz_connection(self)
[docs]
def bind(self, binddn: str, bindpw: str) -> None:
"""
Do simple LDAP bind using DN and password.
:param str binddn: The distinguished name of the account.
:param str bindpw: The user password for simple authentication.
"""
try:
self.lo.bind(binddn, bindpw)
except ldap.INVALID_CREDENTIALS:
raise univention.admin.uexceptions.authFail(_('Authentication failed'))
except ldap.UNWILLING_TO_PERFORM:
raise univention.admin.uexceptions.authFail(_('Authentication failed'))
self.__require_licence()
[docs]
def bind_saml(self, bindpw: str) -> None:
"""
Do LDAP bind using SAML message.
:param str bindpw: The SAML authentication cookie.
"""
try:
return self.lo.bind_saml(bindpw)
except (ldap.INVALID_CREDENTIALS, ldap.UNWILLING_TO_PERFORM):
raise univention.admin.uexceptions.authFail(_('Authentication failed'))
self.__require_licence()
[docs]
def bind_oauthbearer(self, authzid: str | None, bindpw: str) -> None:
"""
Do LDAP bind using OAuth 2.0 Access Token.
:param str authzid: Authorization Identifier
:param str bindpw: The Access Token (as JWT)
"""
try:
return self.lo.bind_oauthbearer(authzid, bindpw)
except (ldap.INVALID_CREDENTIALS, ldap.UNWILLING_TO_PERFORM) as exc:
log.debug('OAUTHBEARER authentication failed', error=repr(exc))
raise univention.admin.uexceptions.authFail(_('Authentication failed'))
self.__require_licence()
def __require_licence(self) -> None:
if self.require_license:
res = univention.admin.license.init_select(self.lo, 'admin')
assert univention.admin.license._license
self.licensetypes = univention.admin.license._license.types
if res == 1:
self.allow_modify = False
raise univention.admin.uexceptions.licenseClients()
elif res == 2:
self.allow_modify = False
raise univention.admin.uexceptions.licenseAccounts()
elif res == 3:
self.allow_modify = False
raise univention.admin.uexceptions.licenseDesktops()
elif res == 4:
self.allow_modify = False
raise univention.admin.uexceptions.licenseGroupware()
elif res == 5:
# Free for personal use edition
raise univention.admin.uexceptions.freeForPersonalUse()
# License Version 2:
elif res == 6:
self.allow_modify = False
raise univention.admin.uexceptions.licenseUsers()
elif res == 7:
self.allow_modify = False
raise univention.admin.uexceptions.licenseServers()
elif res == 8:
self.allow_modify = False
raise univention.admin.uexceptions.licenseManagedClients()
elif res == 9:
self.allow_modify = False
raise univention.admin.uexceptions.licenseCorporateClients()
elif res == 10:
self.allow_modify = False
raise univention.admin.uexceptions.licenseDVSUsers()
elif res == 11:
self.allow_modify = False
raise univention.admin.uexceptions.licenseDVSClients()
[docs]
def unbind(self) -> None:
"""Unauthenticate."""
self.lo.unbind()
[docs]
def whoami(self) -> str:
"""
Return the distinguished name of the authenticated user.
:returns: The distinguished name.
"""
return self.lo.whoami()
[docs]
def requireLicense(self, require: bool = True) -> None:
"""
Enable or disable the UCS licence check.
:param bool require: `True` to require a valid licence.
"""
self.require_license = require
def _validateLicense(self) -> None:
"""Check if the UCS licence is valid."""
if self.require_license:
univention.admin.license.select('admin')
[docs]
def get_schema(self) -> ldap.schema.subentry.SubSchema:
"""
Retrieve |LDAP| schema information from |LDAP| server.
:returns: The |LDAP| schema.
"""
return self.lo.get_schema()
[docs]
@classmethod
def compare_dn(cls, a: str, b: str) -> bool:
"""
Compare two distinguished names for equality.
:param str a: The first distinguished name.
:param str b: A second distinguished name.
:returns: True if the DNs are the same, False otherwise.
"""
return univention.uldap.access.compare_dn(a, b)
[docs]
def get(self, dn: str, attr: list[str] = [], required: bool = False, exceptions: bool = False) -> dict[str, list[bytes]]:
"""
Return multiple attributes of a single LDAP object.
:param str dn: The distinguished name of the object to lookup.
:param attr: The list of attributes to fetch.
:param bool required: Raise an exception instead of returning an empty dictionary.
:param bool exceptions: Ignore.
:returns: A dictionary mapping the requested attributes to a list of their values.
:raises ldap.NO_SUCH_OBJECT: If the LDAP object is not accessible.
"""
return self.lo.get(dn, attr, required)
[docs]
def getAttr(self, dn: str, attr: str, required: bool = False, exceptions: bool = False) -> list[bytes]:
"""
Return a single attribute of a single LDAP object.
:param str dn: The distinguished name of the object to lookup.
:param str attr: The attribute to fetch.
:param bool required: Raise an exception instead of returning an empty dictionary.
:param bool exceptions: Ignore.
:returns: A list of values.
:raises ldap.NO_SUCH_OBJECT: If the LDAP object is not accessible.
"""
return self.lo.getAttr(dn, attr, required)
[docs]
def search(self, filter: str = '(objectClass=*)', base: str = '', scope: str = 'sub', attr: list[str] = [], unique: bool = False, required: bool = False, timeout: int = -1, sizelimit: int = 0, serverctrls: list[ldap.controls.LDAPControl] | None = None, response: dict[str, ldap.controls.LDAPControl] | None = None) -> list[tuple[str, dict[str, list[bytes]]]]:
"""
Perform LDAP search and return values.
:param str filter: LDAP search filter.
:param str base: the starting point for the search.
:param str scope: Specify the scope of the search to be one of `base`, `base+one`, `one`, `sub`, or `domain` to specify a base object, base plus one-level, one-level, subtree, or children search.
:param attr: The list of attributes to fetch.
:param bool unique: Raise an exception if more than one object matches.
:param bool required: Raise an exception instead of returning an empty dictionary.
:param int timeout: wait at most `timeout` seconds for a search to complete. `-1` for no limit.
:param int sizelimit: retrieve at most `sizelimit` entries for a search. `0` for no limit.
:param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request
:param dict response: An optional dictionary to receive the server controls of the result.
:returns: A list of 2-tuples (dn, values) for each LDAP object, where values is a dictionary mapping attribute names to a list of values.
:raises univention.admin.uexceptions.noObject: Indicates the target object cannot be found.
:raises univention.admin.uexceptions.insufficientInformation: Indicates that the matching rule specified in the search filter does not match a rule defined for the attribute's syntax.
:raises univention.admin.uexceptions.ldapTimeout: Indicates that the time limit of the LDAP client was exceeded while waiting for a result.
:raises univention.admin.uexceptions.ldapSizelimitExceeded: Indicates that in a search operation, the size limit specified by the client or the server has been exceeded.
:raises univention.admin.uexceptions.ldapError: Indicates that the search method was called with an invalid search filter.
:raises univention.admin.uexceptions.ldapError: Indicates that the syntax of the DN is incorrect.
:raises univention.admin.uexceptions.ldapError: on any other LDAP error.
"""
try:
return self.lo.search(filter, base, scope, attr, unique, required, timeout, sizelimit, serverctrls=serverctrls, response=response)
except ldap.NO_SUCH_OBJECT as msg:
raise univention.admin.uexceptions.noObject(_err2str(msg))
except ldap.INAPPROPRIATE_MATCHING as msg:
raise univention.admin.uexceptions.insufficientInformation(_err2str(msg))
except (ldap.TIMEOUT, ldap.TIMELIMIT_EXCEEDED) as msg:
raise univention.admin.uexceptions.ldapTimeout(_err2str(msg))
except (ldap.SIZELIMIT_EXCEEDED, ldap.ADMINLIMIT_EXCEEDED) as msg:
raise univention.admin.uexceptions.ldapSizelimitExceeded(_err2str(msg))
except ldap.FILTER_ERROR as msg:
raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), filter))
except ldap.INVALID_DN_SYNTAX as msg:
raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), base), original_exception=msg)
except ldap.LDAPError as msg:
raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs]
def searchDn(self, filter: str = '(objectClass=*)', base: str = '', scope: str = 'sub', unique: bool = False, required: bool = False, timeout: int = -1, sizelimit: int = 0, serverctrls: list[ldap.controls.LDAPControl] | None = None, response: dict[str, ldap.controls.LDAPControl] | None = None) -> list[str]:
"""
Perform LDAP search and return distinguished names only.
:param str filter: LDAP search filter.
:param str base: the starting point for the search.
:param str scope: Specify the scope of the search to be one of `base`, `base+one`, `one`, `sub`, or `domain` to specify a base object, base plus one-level, one-level, subtree, or children search.
:param bool unique: Raise an exception if more than one object matches.
:param bool required: Raise an exception instead of returning an empty dictionary.
:param int timeout: wait at most timeout seconds for a search to complete. `-1` for no limit.
:param int sizelimit: retrieve at most sizelimit entries for a search. `0` for no limit.
:param serverctrls: a list of :py:class:`ldap.controls.LDAPControl` instances sent to the server along with the LDAP request.
:param dict response: An optional dictionary to receive the server controls of the result.
:returns: A list of distinguished names.
:raises univention.admin.uexceptions.noObject: Indicates the target object cannot be found.
:raises univention.admin.uexceptions.insufficientInformation: Indicates that the matching rule specified in the search filter does not match a rule defined for the attribute's syntax.
:raises univention.admin.uexceptions.ldapTimeout: Indicates that the time limit of the LDAP client was exceeded while waiting for a result.
:raises univention.admin.uexceptions.ldapSizelimitExceeded: Indicates that in a search operation, the size limit specified by the client or the server has been exceeded.
:raises univention.admin.uexceptions.ldapError: Indicates that the search method was called with an invalid search filter.
:raises univention.admin.uexceptions.ldapError: Indicates that the syntax of the DN is incorrect.
:raises univention.admin.uexceptions.ldapError: on any other LDAP error.
"""
try:
return self.lo.searchDn(filter, base, scope, unique, required, timeout, sizelimit, serverctrls=serverctrls, response=response)
except ldap.NO_SUCH_OBJECT as msg:
raise univention.admin.uexceptions.noObject(_err2str(msg))
except ldap.INAPPROPRIATE_MATCHING as msg:
raise univention.admin.uexceptions.insufficientInformation(_err2str(msg))
except (ldap.TIMEOUT, ldap.TIMELIMIT_EXCEEDED) as msg:
raise univention.admin.uexceptions.ldapTimeout(_err2str(msg))
except (ldap.SIZELIMIT_EXCEEDED, ldap.ADMINLIMIT_EXCEEDED) as msg:
raise univention.admin.uexceptions.ldapSizelimitExceeded(_err2str(msg))
except ldap.FILTER_ERROR as msg:
raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), filter))
except ldap.INVALID_DN_SYNTAX as msg:
raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), base), original_exception=msg)
except ldap.LDAPError as msg:
raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs]
def getPolicies(self, dn: str, policies: list[str] | None = None, attrs: dict[str, list[Any]] | None = None, result: Any = None, fixedattrs: Any = None) -> dict[str, dict[str, Any]]:
"""
Return |UCS| policies for |LDAP| entry.
:param str dn: The distinguished name of the |LDAP| entry.
:param list policies: List of policy object classes...
:param dict attrs: |LDAP| attributes. If not given, the data is fetched from LDAP.
:param result: UNUSED!
:param fixedattrs: UNUSED!
:returns: A mapping of policy names to
"""
udm_log.debug('get policies', dn=dn)
return self.lo.getPolicies(dn, policies, attrs, result, fixedattrs)
[docs]
def add(self, dn: str, al: list[tuple[str, Any]], exceptions: bool = False, serverctrls: list[ldap.controls.LDAPControl] | None = None, response: dict | None = None, ignore_license: bool = False) -> None:
"""
Add LDAP entry at distinguished name and attributes in add_list=(attribute-name, old-values. new-values) or (attribute-name, new-values).
:param str dn: The distinguished name of the object to add.
:param al: The add-list of 2-tuples (attribute-name, new-values).
:param bool exceptions: Raise the low level exception instead of the wrapping UDM exceptions.
:param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request
:param bool ignore_license: Ignore license check if True.
:param dict response: An optional dictionary to receive the server controls of the result.
:raises univention.admin.uexceptions.licenseDisableModify: if the UCS licence prohibits any modificcation
:raises univention.admin.uexceptions.objectExists: if the LDAP object already exists.
:raises univention.admin.uexceptions.permissionDenied: if the user does not have the required permissions.
:raises univention.admin.uexceptions.ldapError: if the syntax of the DN is invalid.
:raises univention.admin.uexceptions.ldapError: on any other LDAP error.
"""
self._validateLicense()
if not self.allow_modify and not ignore_license:
udm_log.error('adding disabled by license', dn=dn)
raise univention.admin.uexceptions.licenseDisableModify()
log.debug('add', dn=dn, addlist=al)
if exceptions:
return self.lo.add(dn, al, serverctrls=serverctrls, response=response)
try:
return self.lo.add(dn, al, serverctrls=serverctrls, response=response)
except ldap.ALREADY_EXISTS as msg:
log.debug('add failed', dn=dn, error=msg)
raise univention.admin.uexceptions.objectExists(dn)
except ldap.INSUFFICIENT_ACCESS as msg:
log.debug('add failed', dn=dn, error=msg)
raise univention.admin.uexceptions.permissionDenied()
except ldap.INVALID_DN_SYNTAX as msg:
raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), dn), original_exception=msg)
except ldap.LDAPError as msg:
log.debug('add failed', dn=dn, error=msg)
raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs]
def modify(self, dn: str, changes: list[tuple[str, Any, Any]], exceptions: bool = False, ignore_license: int = False, serverctrls: list[ldap.controls.LDAPControl] | None = None, response: dict | None = None, rename_callback: Callable | None = None) -> str:
"""
Modify LDAP entry DN with attributes in changes=(attribute-name, old-values, new-values).
:param str dn: The distinguished name of the object to modify.
:param changes: The modify-list of 3-tuples (attribute-name, old-values, new-values).
:param bool exceptions: Raise the low level exception instead of the wrapping UDM exceptions.
:param bool ignore_license: Ignore license check if True.
:param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request
:param dict response: An optional dictionary to receive the server controls of the result.
:returns: The distinguished name.
"""
self._validateLicense()
if not self.allow_modify and not ignore_license:
udm_log.error('modify disabled by license', dn=dn)
raise univention.admin.uexceptions.licenseDisableModify()
log.debug('modify', dn=dn, modlist=changes)
if exceptions:
return self.lo.modify(dn, changes, serverctrls=serverctrls, response=response, rename_callback=rename_callback)
try:
return self.lo.modify(dn, changes, serverctrls=serverctrls, response=response, rename_callback=rename_callback)
except ldap.NO_SUCH_OBJECT as msg:
log.debug('modify failed', dn=dn, error=msg)
raise univention.admin.uexceptions.noObject(dn)
except ldap.INSUFFICIENT_ACCESS as msg:
log.debug('modify failed', dn=dn, error=msg)
raise univention.admin.uexceptions.permissionDenied()
except ldap.INVALID_DN_SYNTAX as msg:
raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), dn), original_exception=msg)
except ldap.LDAPError as msg:
log.debug('modify failed', dn=dn, error=msg)
raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs]
def rename(self, dn: str, newdn: str, move_childs: int = 0, ignore_license: bool = False, serverctrls: list[ldap.controls.LDAPControl] | None = None, response: dict | None = None) -> None:
"""
Rename a LDAP object.
:param dn: The old distinguished name of the object to rename.
:param newdn: The new distinguished name of the object to rename.
:param move_childs: Also rename the sub children. Must be `0` always as `1` is not implemented.
:param ignore_license: Ignore license check if True.
:param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request
:param response: An optional dictionary to receive the server controls of the result.
"""
if move_childs != 0:
raise univention.admin.uexceptions.noObject(_('Moving children is not supported.'))
self._validateLicense()
if not self.allow_modify and not ignore_license:
udm_log.warning('rename disabled by license', dn=dn)
raise univention.admin.uexceptions.licenseDisableModify()
log.debug('rename', dn=dn, newdn=newdn)
try:
return self.lo.rename(dn, newdn, serverctrls=serverctrls, response=response)
except ldap.NO_SUCH_OBJECT as msg:
log.debug('rename failed', dn=dn, error=msg)
raise univention.admin.uexceptions.noObject(dn)
except ldap.INSUFFICIENT_ACCESS as msg:
log.debug('rename failed', dn=dn, error=msg)
raise univention.admin.uexceptions.permissionDenied()
except ldap.INVALID_DN_SYNTAX as msg:
raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), dn), original_exception=msg)
except ldap.LDAPError as msg:
log.debug('rename failed', dn=dn, error=msg)
raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs]
def delete(self, dn: str, exceptions: bool = False) -> None:
"""
Delete a LDAP object.
:param str dn: The distinguished name of the object to remove.
:param bool exceptions: Raise the low level exception instead of the wrapping UDM exceptions.
:raises univention.admin.uexceptions.noObject: if the object does not exist.
:raises univention.admin.uexceptions.permissionDenied: if the user does not have the required permissions.
:raises univention.admin.uexceptions.ldapError: if the syntax of the DN is invalid.
:raises univention.admin.uexceptions.ldapError: on any other LDAP error.
"""
self._validateLicense()
if exceptions:
try:
return self.lo.delete(dn)
except ldap.INSUFFICIENT_ACCESS:
raise univention.admin.uexceptions.permissionDenied()
log.debug('delete', dn=dn)
try:
return self.lo.delete(dn)
except ldap.NO_SUCH_OBJECT as msg:
log.debug('delete failed', dn=dn, error=msg)
raise univention.admin.uexceptions.noObject(dn)
except ldap.INSUFFICIENT_ACCESS as msg:
log.debug('delete failed', dn=dn, error=msg, binddn=self.binddn)
raise univention.admin.uexceptions.permissionDenied()
except ldap.INVALID_DN_SYNTAX as msg:
raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), dn), original_exception=msg)
except ldap.LDAPError as msg:
log.debug('delete failed', dn=dn, error=msg)
raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs]
def parentDn(self, dn: str) -> str | None:
"""
Return the parent container of a distinguished name.
:param str dn: The distinguished name.
:return: The parent distinguished name or None if the LDAP base is reached.
"""
return self.lo.parentDn(dn)
[docs]
def explodeDn(self, dn: str, notypes: bool = False) -> list[str]:
"""
Break up a DN into its component parts.
:param str dn: The distinguished name.
:param bool notypes: Return only the component's attribute values if True. Also the attribute types if False.
:return: A list of relative distinguished names.
"""
return self.lo.explodeDn(dn, notypes)
[docs]
def filter_lookup_results(self, results, context=None):
"""Evaluate access control rules for filtering of results"""
# TODO: check if we are allowed at all to search in the base, with the scope and the given filter for the attrs
return self.authz.filter_search_results(self, results, context=context)
[docs]
def search_filtered(self, context, filter='(objectClass=*)', base='', *args, **kwargs):
if not self._verify_search_base(base) or not self._verify_search_filter(filter):
return []
results = self.authz_connection.search(filter, base, *args, **kwargs)
return self._filter_ldap_search_results(results, dict(kwargs, **(context or {})))
[docs]
def search_dn_filtered(self, context, filter='(objectClass=*)', base='', *args, **kwargs):
if not self._verify_search_base(base) or not self._verify_search_filter(filter):
return []
results = self.authz_connection.searchDn(filter, base, *args, **kwargs)
return self._filter_ldap_search_dns(results, dict(kwargs, **(context or {})))
def _filter_ldap_search_results(self, results, options=None):
"""Evaluate access control rules for filtering of results"""
return self.authz.filter_search_results_attrs(self, results)
def _filter_ldap_search_dns(self, results, context=None):
"""Evaluate access control rules for filtering of results"""
return self.authz.filter_search_results_dn(self, results, context=context)
def _verify_search_base(self, base):
# mimic ldapsearch behavior
base = base or configRegistry['ldap/base']
if not self._filter_ldap_search_dns([base]):
raise univention.admin.uexceptions.noObject(base)
return True
def _verify_search_filter(self, filter_s):
# TODO: parse search filter and disallow certain filters for UDM property names where no "search" or "read" permission exists
return True