Source code for univention.admin.rest.ldap_connection
#!/usr/bin/python3
#
# Univention Management Console
# Univention Directory Manager Module
#
# SPDX-FileCopyrightText: 2017-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
from univention.config_registry import ucr
from univention.management.console.ldap import get_connection, reset_cache # noqa: F401
[docs]
def get_user_ldap_write_connection(auth_type, binddn, bindpw):
return get_ldap_connection('user-write', auth_type, binddn, bindpw)
[docs]
def get_user_ldap_read_connection(auth_type, binddn, bindpw):
return get_ldap_connection('user-read', auth_type, binddn, bindpw)
def _get_service_ldap_connection(type_):
binddn = ucr.get(f'directory/manager/rest/ldap-connection/{type_}/binddn', ucr['ldap/hostdn'] if type_.startswith('machine-') else f'cn=admin,{ucr["ldap/base"]}')
with open(ucr.get(f'directory/manager/rest/ldap-connection/{type_}/password-file', '/etc/machine.secret' if type_.startswith('machine-') else '/etc/ldap.secret')) as fd:
password = fd.read().strip()
return get_ldap_connection(type_, None, binddn, password)
[docs]
def get_machine_ldap_write_connection():
return _get_service_ldap_connection('machine-write')
[docs]
def get_machine_ldap_read_connection():
return _get_service_ldap_connection('machine-read')
[docs]
def get_admin_ldap_write_connection():
return _get_service_ldap_connection('admin-write')
[docs]
def get_admin_ldap_read_connection():
return _get_service_ldap_connection('admin-read')
[docs]
def get_ldap_connection(type_, auth_type, binddn, bindpw):
default_uri = "ldap://%s:%d" % (ucr.get('ldap/master'), ucr.get_int('ldap/master/port', '7389'))
uri = ucr.get(f'directory/manager/rest/ldap-connection/{type_}/uri', default_uri)
start_tls = ucr.get_int('directory/manager/rest/ldap-connection/user-read/start-tls', 2)
if auth_type == 'Bearer':
return get_connection(bind=lambda lo: lo.bind_oauthbearer(binddn, bindpw), bindhash=hash((binddn, bindpw)), binddn=None, bindpw=None, host=None, port=None, base=ucr['ldap/base'], start_tls=start_tls, uri=uri)
return get_connection(bind=None, binddn=binddn, bindpw=bindpw, host=None, port=None, base=ucr['ldap/base'], start_tls=start_tls, uri=uri)