Source code for univention.udm.connections

# SPDX-FileCopyrightText: 2018-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only


from __future__ import annotations

import sys
from typing import TYPE_CHECKING, Any, TypeVar

import ldap
from ldap.filter import filter_format

import univention.admin.uexceptions
import univention.admin.uldap
import univention.config_registry

from .exceptions import ConnectionError  # noqa: A004


if TYPE_CHECKING:
    from collections.abc import Callable


_T = TypeVar("_T")  # noqa: PYI018


[docs] class LDAP_connection: """Caching LDAP connection factory.""" _ucr: univention.config_registry.ConfigRegistry = None _connection_admin: univention.admin.uldap.access | None = None _connection_account: dict[tuple[str, str, str | None, int | None, str | None], univention.admin.uldap.access] = {} @classmethod def _clear(cls) -> None: # used in tests cls._ucr = None cls._connection_admin = None cls._connection_account.clear() @classmethod def _wrap_connection(cls, func: Callable[..., _T], **kwargs: Any) -> _T: try: return func(**kwargs) except OSError: raise ConnectionError('Could not read secret file').with_traceback(sys.exc_info()[2]) except univention.admin.uexceptions.authFail: raise ConnectionError('Credentials invalid').with_traceback(sys.exc_info()[2]) except ldap.INVALID_CREDENTIALS: raise ConnectionError('Credentials invalid').with_traceback(sys.exc_info()[2]) except ldap.CONNECT_ERROR: raise ConnectionError('Connection refused').with_traceback(sys.exc_info()[2]) except ldap.SERVER_DOWN: raise ConnectionError('The LDAP Server is not running').with_traceback(sys.exc_info()[2])
[docs] @classmethod def get_admin_connection(cls) -> univention.admin.uldap.access: if not cls._connection_admin: cls._connection_admin, _po = cls._wrap_connection(univention.admin.uldap.getAdminConnection) return cls._connection_admin
[docs] @classmethod def get_machine_connection(cls, ldap_master: bool = True) -> univention.admin.uldap.access: # do not cache the machine connection as this breaks on server-password-change co, _po = cls._wrap_connection(univention.admin.uldap.getMachineConnection, ldap_master=ldap_master) return co
[docs] @classmethod def get_credentials_connection( cls, identity: str, password: str, base: str | None = None, server: str | None = None, port: int | None = None, ) -> univention.admin.uldap.access: if not cls._ucr: cls._ucr = univention.config_registry.ConfigRegistry() cls._ucr.load() if '=' not in identity: lo = cls.get_machine_connection() dns = lo.searchDn(filter_format('uid=%s', (identity,))) try: identity = dns[0] except IndexError: raise ConnectionError('Cannot get DN for username').with_traceback(sys.exc_info()[2]) access_kwargs = {'binddn': identity, 'bindpw': password, 'base': base or cls._ucr['ldap/base']} if server: access_kwargs['host'] = server if port: access_kwargs['port'] = port key = (identity, password, server, port, base) if key not in cls._connection_account: cls._connection_account[key] = cls._wrap_connection(univention.admin.uldap.access, **access_kwargs) return cls._connection_account[key]