Source code for univention.ldap_cache.frontend

#!/usr/bin/python3
# SPDX-FileCopyrightText: 2021-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

from typing import Any

from univention.ldap_cache.cache import Cache, get_cache  # noqa: F401


def _extract_id_from_dn(dn: str) -> str:
    """
    We know that this is wrong in general. But to speed up things
    we do not use explode_dn from ldap.
    We use the knowledge about users/user, groups/group, computers/computer objects:
    Their uid / cn must not contain a "," or a "=".
    %timeit dn.split(",", 1)[0].split("=", 1)[1]
    => 300ns
    %timeit ldap.explode_dn(dn, 1)[0]
    => 8µs
    """  # noqa: RUF002
    return dn.split(",", 1)[0].split("=", 1)[1]


[docs] def groups_for_user(user_dn: str, consider_nested_groups: bool = True, cache: dict[str, set[str]] | None = None) -> list[str]: user_dn = user_dn.lower() if cache is None: _cache = get_cache() subcache = _cache.get_sub_cache('uniqueMembers').load() cache = {key: {val.lower() for val in values} for key, values in subcache.items()} search_for_dns = [user_dn] found: set[str] = set() while search_for_dns: search_for = search_for_dns.pop().lower() for member, dns in cache.items(): if search_for in dns and member not in found: found.add(member) search_for_dns.append(member) if not consider_nested_groups: break return sorted(found)
[docs] def users_in_group(group_dn: str, consider_nested_groups: bool = True, readers: tuple[Any | None, Any | None] = (None, None), group_cache: dict[str, list[str]] = {}) -> list[str]: group_dn = group_dn.lower() cache = get_cache() member_uid_cache, unique_member_cache = (cache.get_sub_cache(name) for name in ['memberUids', 'uniqueMembers']) with member_uid_cache.reading(readers[0]) as member_uid_reader, unique_member_cache.reading(readers[1]) as unique_member_reader: ret: set[str] = set() members = unique_member_cache.get(group_dn, unique_member_reader) if not members: return [] uids = member_uid_cache.get(group_dn, member_uid_reader) or [] uids = {uid.lower() for uid in uids} for member in members: rdn = _extract_id_from_dn(member).lower() if rdn in uids: ret.add(member.lower()) elif '%s$' % rdn in uids: continue else: if consider_nested_groups: if member in group_cache: ret.update(group_cache[member]) else: members = users_in_group(member, consider_nested_groups, readers=(member_uid_reader, unique_member_reader), group_cache=group_cache) group_cache[member] = members ret.update(members) return sorted(ret)
[docs] def users_groups() -> dict[str, list[str]]: """ Find all user-group relationship, including implicit ones: if Group1 have Group2 as a subgroup, all users from Group2 are also considered members of Group1. """ cache = get_cache() member_uid_cache, unique_member_cache = (cache.get_sub_cache(name) for name in ['memberUids', 'uniqueMembers']) group_users: dict[str, list[str]] = {} _group_cache: dict[str, list[str]] = {} with member_uid_cache.reading() as member_uid_reader, unique_member_cache.reading() as unique_member_reader: for group in unique_member_cache.keys(): group_users[group] = users_in_group(group, readers=(member_uid_reader, unique_member_reader), group_cache=_group_cache) res: dict[str, set[str]] = {} for group, members in group_users.items(): for member in members: groups = res.setdefault(member, set()) groups.add(group) # return groups as sorted list return {_extract_id_from_dn(user): sorted(groups) for user, groups in res.items()}