Source code for univention.admin.recyclebin

# SPDX-FileCopyrightText: 2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""|UDM| functions to handle deleted objects in recyclebin"""

import string
from collections import namedtuple
from typing import Self
from urllib.parse import quote, unquote

from ldap.filter import filter_format

import univention.admin.filter
import univention.admin.localization
from univention.admin._ucr import configRegistry
from univention.admin.log import log as admin_log


RECYCLEBIN_BASE = 'cn=recyclebin,cn=internal'
IGNORE_ATTRS = {
    'objectclass',
    'univentionobjecttype',
    'univentionobjectidentifier',
    'memberof',
}

translation = univention.admin.localization.translation('univention.admin.handlers')
_ = translation.translate


[docs] class Reference(namedtuple('Reference', ['source_attr', 'target_module', 'target_property', 'lookup_attribute', 'lookup_value'])): __slots__ = () SAFE_CHARS = string.printable.replace(':', '').replace('%', '')
[docs] @classmethod def parse(cls, reference: str) -> Self | None: """ Parse a univentionRecycleBinReference value. Format: source_attr:target_module:target_property:lookup_attribute:lookup_value Example: dn:groups/group:users:uuid:550e8400-e29b-41d4-a716-446655440000 which reads as: The LDAP attribute `DN` of the to-be-restored object should go into the `users` UDM property of the `groups/group` object identified by the LDAP attribute `uuid` $UUID. Some LDAP attributes have aliases: uuid = univentionObjectIdentifier, dn = 1.1. Fields are URL-encoded to handle special characters like colons. Returns dict with keys: source_attr, target_module, target_property, lookup_attribute, lookup_value Returns None if invalid format """ parts = reference.split(':') if len(parts) < 5: return None return cls(*(unquote(p) for p in parts[:5]))
def __str__(self) -> str: """ Format a univentionRecycleBinReference value. Format: source_attr:target_module:target_property:lookup_attribute:lookup_value Fields are URL-encoded to handle special characters like colons. """ return ':'.join(quote(field, safe=self.SAFE_CHARS) for field in self) def __bytes__(self) -> bytes: return str(self).encode('UTF-8')
[docs] def resolve(self, lo: univention.admin.uldap.access, verify_exists: bool = True, include_recyclebin: bool = False) -> dict | None: """ Resolve a single reference string to its target DN. :param lo: LDAP connection :param verify_exists: Whether to verify the target object exists :param include_recyclebin: Whether to also search in the recyclebin for deleted objects :return: str target_dn, or None if not found """ lookup_attribute_aliases = {'uuid': 'univentionObjectIdentifier'} ldap_attr = lookup_attribute_aliases.get(self.lookup_attribute, self.lookup_attribute) target_dn = None if self.lookup_attribute == 'dn': target_dn = self.lookup_value if verify_exists: results = lo.authz_connection.getAttr(target_dn, 'objectClass') if not results: target_dn = None else: # TODO: doesn't respect self.target_module filter_str = filter_format('(%s=%s)', [ldap_attr, self.lookup_value]) results = lo.authz_connection.searchDn(base=configRegistry['ldap/base'], filter=filter_str) if results: target_dn = results[0] # if not found in normal LDAP, search in recyclebin if requested if not target_dn and include_recyclebin: ldap_attr = {'dn': 'univentionRecycleBinOriginalDN'}.get(ldap_attr, ldap_attr) filter_str = filter_format('(&(objectClass=univentionRecycleBinObject)(%s=%s))', [ldap_attr, self.lookup_value]) from univention.admin.handlers.recyclebin.deletedobject import RECYCLEBIN_BASE results = lo.authz_connection.searchDn(base=RECYCLEBIN_BASE, filter=filter_str) if results: target_dn = results[0] return target_dn
[docs] def create_references(lo, object_type: str, original_dn: str | None, oldattr: dict[str, list[bytes]]): """Create univentionRecycleBinReference for a given object.""" refs = [] # TODO: move into some definition of groups/group # handle group references (memberOf) if member_of := oldattr.get('memberOf'): groups = [x.decode('UTF-8') for x in member_of] refs.extend( Reference('dn', 'groups/group', {'groups/group': 'nestedGroup', 'users/user': 'users'}.get(object_type), *to_uuid(dn, lo)) for dn in groups ) # handle groups references, where the group was deleted first and is already in the recyclebin if object_type in ('users/user', 'groups/group') and original_dn: results = lo.authz_connection.search( base=RECYCLEBIN_BASE, scope='one', filter=filter_format( '(&(objectClass=univentionRecycleBinObject)(univentionRecycleBinOriginalType=groups/group)(uniqueMember=%s))', [original_dn], ), attr=['univentionRecycleBinOriginalDN', 'univentionRecycleBinOriginalUniventionObjectIdentifier'], ) for _dn, attrs in results: group_original_dn = attrs.get('univentionRecycleBinOriginalDN', [b''])[0].decode('UTF-8') group_uuid = attrs.get('univentionRecycleBinOriginalUniventionObjectIdentifier', [b''])[0].decode('UTF-8') if group_uuid: admin_log.debug('Found deleted group with member', group_dn=group_original_dn, member=original_dn, uuid=group_uuid) refs.append( Reference('dn', 'groups/group', {'groups/group': 'nestedGroup', 'users/user': 'users'}.get(object_type), 'uuid', group_uuid), ) return refs
[docs] def to_uuid(dn: str, lo: univention.admin.uldap.access) -> tuple[str, str]: """ Convert list of groups to list of univentionRecycleBinReference. Tries to store references by UUID for stability. Search order: 1. Active LDAP - get UUID from current object 2. Recyclebin - get UUID from deleted object 3. Fallback - store DN """ ref = lo.authz_connection.get(dn, attr=['univentionObjectIdentifier']) if ref and 'univentionObjectIdentifier' in ref: return 'uuid', ref['univentionObjectIdentifier'][0].decode('utf-8') results = lo.authz_connection.search( base=RECYCLEBIN_BASE, scope='one', filter=filter_format( '(&(objectClass=univentionRecycleBinObject)(univentionRecycleBinOriginalDN=%s))', [dn], ), attr=['univentionRecycleBinOriginalUniventionObjectIdentifier'], ) if results and results[0][1].get('univentionRecycleBinOriginalUniventionObjectIdentifier'): uuid = results[0][1]['univentionRecycleBinOriginalUniventionObjectIdentifier'][0].decode('utf-8') admin_log.debug('Found UUID in recyclebin', dn=dn, uuid=uuid) return 'uuid', uuid admin_log.debug('Storing reference by DN (UUID not found)', dn=dn) return 'dn', dn