Source code for univention.admin.handlers.recyclebin.removedobject

# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""UDM module for recyclebin deleted objects"""

import copy

import ldap
from ldap import modlist

import univention.admin.filter
import univention.admin.localization
import univention.admin.mapping as udm_mapping
import univention.admin.syntax as udm_syntax
import univention.admin.uexceptions
from univention.admin._ucr import configRegistry
from univention.admin.handlers import simpleLdap
from univention.admin.layout import Group, Tab
from univention.admin.log import log
from univention.admin.modules import _ldap_operational_attribute_names
from univention.admin.recyclebin import IGNORE_ATTRS, RECYCLEBIN_BASE, Reference, create_references
from univention.admindiary.events import DiaryEvent


translation = univention.admin.localization.translation('univention.admin.handlers.recyclebin')
_ = translation.translate


module = 'recyclebin/removedobject'
operations = ['read', 'remove', 'search', 'restore']
childs = False
short_description = _('Recyclebin: Deleted Object')
object_name = _('Deleted Object')
object_name_plural = _('Deleted Objects')
long_description = _('Objects that have been moved to the recyclebin')

# during package upgrade. TODO: remove in UCS 5.3
udm_syntax.__dict__.setdefault('RecycleBinReference', udm_syntax.string)
udm_syntax.__dict__.setdefault('RecycleBinSupportedModules', udm_syntax.string)

# fmt: off
options = {
    'default': univention.admin.option(
        short_description=short_description,
        default=True,
        objectClasses=['top', 'extensibleObject', 'univentionRecycleBinObject', 'dynamicObject'],
    ),
}

property_descriptions = {
    'originalDN': univention.admin.property(
        short_description=_('Original DN'),
        long_description=_('Distinguished name of the original object before deletion.'),
        syntax=udm_syntax.ldapDn,
        may_change=False,
        required=True,
        include_in_default_search=True,
        identifies=True,
    ),
    'originalUniventionObjectIdentifier': univention.admin.property(
        short_description=_('Original Object Identifier'),
        long_description=_('UniventionObjectIdentifier of the deleted object.'),
        syntax=udm_syntax.UUID,
        may_change=False,
        required=True,
        identifies=True,
    ),
    'originalObjectType': univention.admin.property(
        short_description=_('Original Object Type'),
        long_description=_('UDM module type of the original object.'),
        syntax=udm_syntax.RecycleBinSupportedModules,
        may_change=False,
        required=True,
    ),
    'originalObjectClasses': univention.admin.property(
        short_description=_('Original object classes'),
        long_description=_('Object classes of the deleted object.'),
        syntax=udm_syntax.string,
        may_change=False,
        required=False,
        multivalue=True,
        dontsearch=True,
    ),
    'originalEntryUUID': univention.admin.property(
        short_description=_('Original EntryUUID'),
        long_description=_('EntryUUID of the deleted object.'),
        syntax=udm_syntax.UUID,
        may_change=False,
        required=True,
    ),
    'originalName': univention.admin.property(
        short_description=_('Original Name'),
        long_description=_('Original name of the deleted object (uid for users, cn for groups).'),
        syntax=udm_syntax.string,
        may_change=False,
        include_in_default_search=True,
    ),
    'purgeAt': univention.admin.property(
        short_description=_('Delete At'),
        long_description=_('Timestamp when the object should be permanently deleted based on retention policy. Note: Actual deletion is handled by OpenLDAP DDS and may occur slightly later.'),
        syntax=udm_syntax.GeneralizedTimeUTC,
        may_change=False,
        required=True,
    ),
    'removalDate': univention.admin.property(
        short_description=_('Deletion Date'),
        long_description=_('Timestamp when the object was deleted.'),
        syntax=udm_syntax.GeneralizedTimeUTC,
        may_change=False,
        required=True,
    ),
    'referencedBy': univention.admin.property(
        short_description=_('Referenced By'),
        long_description=_('List of objects that referenced this object at deletion time.'),
        syntax=udm_syntax.RecycleBinReference,
        multivalue=True,
        may_change=False,
        dontsearch=True,
    ),
}

default_property_descriptions = copy.deepcopy(property_descriptions)  # for later reset of descriptions
default_options = copy.deepcopy(options)  # for later reset of descriptions

layout = [
    Tab(_('Deleted Object'), _('Basic information'), layout=[
        Group(_('Object information'), layout=[
            'originalObjectType',
            'originalName',
            'originalDN',
            'originalUniventionObjectIdentifier',
            'removalDate',
            'purgeAt',
        ]),
    ]),
    Tab(_('Referencing objects'), _('Objects referencing this deleted object'), layout=[
        'referencedBy',
    ]),
]


[docs] def map_reference(value): """ Map a reference list to encoded LDAP attribute format. Input: List of lists (from complex syntax): [['groups', 'groups/group', 'users', 'dn', 'cn=...']] Output: Encoded string: b'groups:groups%2Fgroup:users:dn:cn%3D...' """ if not value: return [] result = [] for ref in value: if len(ref) != 5: continue result.append(bytes(Reference(*ref))) return result
[docs] def unmap_reference(value): """ Unmap encoded LDAP attribute to list of lists for complex syntax. Input: Encoded strings: ['groups:groups%2Fgroup:users:dn:cn%3D...'] Output: List of lists: [['groups', 'groups/group', 'users', 'dn', 'cn=...']] """ if not value: return [] result = [] for ref in value: parsed = Reference.parse(ref.decode('UTF-8')) if parsed: result.append(tuple(parsed)) return result
mapping = udm_mapping.mapping() mapping.register('originalObjectType', 'univentionRecycleBinOriginalType', None, udm_mapping.ListToString) mapping.register('purgeAt', 'univentionRecycleBinDeleteAt', None, udm_mapping.ListToString) mapping.register('removalDate', 'univentionRecycleBinDeletionDate', None, udm_mapping.ListToString) mapping.register('referencedBy', 'univentionRecycleBinReference', map_reference, unmap_reference) mapping.register('originalUniventionObjectIdentifier', 'univentionRecycleBinOriginalUniventionObjectIdentifier', None, udm_mapping.ListToString) mapping.register('originalDN', 'univentionRecycleBinOriginalDN', None, udm_mapping.ListToString) mapping.register('originalObjectClasses', 'univentionRecycleBinOriginalObjectClass') mapping.register('originalEntryUUID', 'univentionRecycleBinOriginalEntryUUID', None, udm_mapping.ListToString) # fmt: on
[docs] class object(simpleLdap): module = module ldap_base = RECYCLEBIN_BASE def __init__(self, *args, **kwargs) -> None: self.foreign_policies = [] self.foreign_options = [] super().__init__(*args, **kwargs) self.options.extend(self.foreign_options) self.policies.extend(self.foreign_policies) self.save()
[docs] def open_guardian(self): pass # lazy loading calls this for users/user
[docs] def description(self) -> str: """Return the original name for display in UMC grid""" # return self['originalName'] # return ldap.dn.explode_rdn(self.dn, True)[0] return self.oldattr.get('uid', self.oldattr.get('cn', [self.dn.encode('UTF-8')]))[0].decode('UTF-8')
@property def descriptions(self): # caution! We are modifying the module (not object's!) property_descriptions here # descriptions = super().descriptions descriptions = copy.deepcopy(default_property_descriptions) if 'originalObjectType' not in self.info: return descriptions module = univention.admin.modules.get(self.info['originalObjectType']) if not module: log.error('Original object type %s not found', self.info['originalObjectType']) return descriptions # add original properties to description for pname, prop in module.property_descriptions.items(): if pname not in descriptions and pname != 'objectFlag': # we need to do this for restore in UMC # otherwise UMC complains about required properties # of the original object that we don't have on the deleted object descriptions[pname] = copy.deepcopy(prop) # don't change original description! descriptions[pname].may_change = False descriptions[pname].readonly = True descriptions[pname].identifies = False descriptions[pname].required = False descriptions[pname].prevent_umc_default_popup = True return descriptions def _post_unmap(self, info: dict, oldattr: dict) -> dict: """Add computed originalName property""" # we can't store operational attribute memberOf at the deleted object, so we store it as reference, which we can convert back to memberOf references = [Reference(*ref) for ref in info.get('referencedBy', [])] memberof_references = [ref for ref in references if ref.target_module == 'groups/group' and ref.target_property in ('users', 'hosts', 'nestedGroup') and ref.source_attr == 'dn'] if memberof_references: member_of = [ ref.resolve(self.lo) for ref in memberof_references ] oldattr['memberOf'] = [x.encode('UTF-8') for x in member_of if x] info = super()._post_unmap(info, oldattr) info['originalName'] = oldattr.get('uid', oldattr.get('cn', [self.dn.encode('UTF-8')]))[0].decode('UTF-8') self._unmap_original_properties(info, oldattr) return info def _unmap_original_properties(self, info, oldattr): global options, property_descriptions options = copy.deepcopy(default_options) property_descriptions = copy.deepcopy(default_property_descriptions) # reset to original state for each object! if not self.dn or 'originalObjectType' not in info: return self.info['originalObjectType'] = info['originalObjectType'] property_descriptions.update(self.descriptions) # overwrite the module property descriptions!! module = univention.admin.modules.get(info['originalObjectType']) if not module: log.error('Original object type %s not found', info['originalObjectType']) return base = self.lo.base self.lo.lo.base = configRegistry['ldap/base'] try: oldattr = oldattr.copy() oldattr['objectClass'] = oldattr['univentionRecycleBinOriginalObjectClass'] obj = module.object(None, self.lo, None, info['originalDN'], attributes=oldattr) # some properties are only unmapped in open() e.g. users/user:primaryGroup, groups/group:users,... obj.open() # we are in hell.. there are potential errors here. # except univention.admin.uexceptions.primaryGroup: !? except univention.admin.uexceptions.wrongObjectType: # ignore? return finally: self.lo.lo.base = base props = obj.info for opt in module.options: if opt not in options and opt != 'defaut': options[opt] = copy.deepcopy(module.options[opt]) options[opt].editable = False self.foreign_options = obj.options self.foreign_policies = obj.policies props['univentionObjectIdentifier'] = info['originalUniventionObjectIdentifier'] info.update(props) def _ldap_pre_create(self) -> None: super()._ldap_pre_create() if not univention.dn.DN(self.dn).endswith(univention.dn.DN(self.ldap_base)): raise univention.admin.uexceptions.valueError(_('%s objects need to be created in %s: %s') % (module, self.ldap_base, self.dn)) self._operational_attributes = _ldap_operational_attribute_names(self.lo) def _ldap_modlist(self): ml = super()._ldap_modlist() self.oldattr.setdefault('univentionRecycleBinReference', []).extend( bytes(ref) for ref in self.get_references() ) # filter attributes, remove operational attributes and IGNORE_ATTRS orig_attr = { attr: value for attr, value in self.oldattr.items() if attr.lower() not in IGNORE_ATTRS | self._operational_attributes } ml += modlist.addModlist(orig_attr) return ml
[docs] def get_references(self): return create_references(self.lo, self.info['originalObjectType'], self.info.get('originalDN'), self.oldattr)
[docs] def restore_references(self) -> None: """Restore generic references from recyclebin object""" references = self['referencedBy'] if not references: self.log.debug('No preserved references found') return self.log.info('Starting reference restoration', reference_count=len(references)) for reference in references: ref = Reference(*reference) target_dn = ref.resolve(self.lo, verify_exists=True) if not target_dn: self.log.warning( 'Target object no longer exists, skipping reference restoration', reference=reference, reason='target_not_found', ) continue source_attr_aliases = {'dn': 'originalDN'} source_attr_key = source_attr_aliases.get(ref.source_attr, ref.source_attr) source_value = self.info.get(source_attr_key) if not source_value: self.log.warning( 'Restored attribute not found or empty', reference=reference, source_attr=ref.source_attr, reason='source_attr_not_found', ) continue mod = univention.admin.modules.get(ref.target_module) # e.g. groups/group target_obj = mod.object(None, self.lo.authz_connection, None, target_dn) target_obj.open() prop = ref.target_property # e.g. users current_value = target_obj.get(prop) if target_obj.descriptions[prop].multivalue: if source_value in current_value or []: continue target_obj[prop].append(source_value) else: if current_value == source_value: continue target_obj[prop] = source_value try: target_obj.modify() except (univention.admin.uexceptions.base, ldap.LDAPError) as exc: self.log.warning( 'Failed to restore reference', target_dn=target_dn, target_module=ref.target_module, target_property=prop, error=str(exc), reason='modify_failed', ) # raise else: self.log.info( 'Successfully restored reference', target_dn=target_dn, target_module=ref.target_module, target_property=prop, )
def _get_admin_diary_event(self, event_name: str) -> DiaryEvent: name = self['originalObjectType'].replace('/', '_').upper() return DiaryEvent.get('UDM_%s_%s' % (name, event_name)) or DiaryEvent.get('UDM_GENERIC_%s' % event_name)
[docs] @classmethod def rewrite_filter(cls, filter_expr, mapping) -> None: """Make originalName searchable by rewriting to uid/cn search""" super().rewrite_filter(filter_expr, mapping) if filter_expr.variable == 'originalName' and filter_expr.value: uid = copy.copy(filter_expr) uid.variable = 'uid' cn = copy.copy(filter_expr) cn.variable = 'cn' filter_s = '(|%s%s)' % (uid, cn) filter_expr.transform_to_conjunction(univention.admin.filter.parse(filter_s))
lookup = object.lookup lookup_filter = object.lookup_filter identify = object.identify