Source code for univention.admin.authorization

#!/usr/bin/python3
#
# Univention Directory Manager
#
# SPDX-FileCopyrightText: 2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""Authorization for UDM access."""

from logging import getLogger

import univention.admin.modules
import univention.admin.types
from univention.admin import configRegistry
from univention.admin.uexceptions import permissionDenied
from univention.authorization.authorization import LocalGuardianAuthorizationClient


__all__ = ('Authorization',)

log = getLogger('ADMIN').getChild(__name__)

LDAP_BASE = configRegistry['ldap/base']
ROLE_CACHE_SIZE = 1000


def auth_log(action, actor, target, **kwargs):
    msg = f'{action} by {actor["id"]} to {target.get("id")} not allowed'
    if kwargs:
        extra = '; '.join(f'{k}={v!r}' for k, v in kwargs.items())
        msg = f'{msg}: {extra}'
    log.debug('%s', msg % kwargs)


def get_user(lo, user_dn: str):
    data = lo.authz_connection.get(user_dn, attr=['univentionObjectType'])
    modname = data.get('univentionObjectType')
    if not modname:
        return

    mod = univention.admin.modules.get(modname[0].decode('UTF-8'))
    obj = mod.object(None, lo, None, user_dn)
    obj.open()
    return obj


def get_user_roles(obj) -> list[str]:
    if hasattr(obj, 'open_guardian'):
        obj.open_guardian()
    role_set = set(obj.get('guardianInheritedRoles', []) + obj.get('guardianRoles', []))
    return role_set


def _san_module(module):
    return module.replace('/', '-')


def _san_property(prop):
    return prop.lower()


[docs] class Authorization: """Check authorization via access control lists""" global_enabled = False engine = None get_privileged_connection = lambda: None # noqa: E731 _user_roles_cache = {}
[docs] @classmethod def enable(cls, get_privileged_connection): """Enables ACL checking globally if the running service supports it""" cls.global_enabled = True cls.get_privileged_connection = get_privileged_connection
[docs] @classmethod def inject_ldap_connection(cls, user_connection, metadata=None): """Extends the user connection to get admin powers and store metadata per connection""" if cls.global_enabled: user_connection.authz.enabled = True user_connection.metadata = metadata return user_connection
[docs] @classmethod def get_authz_connection(cls, lo): if cls.global_enabled: return cls.get_privileged_connection() return lo
@property def lo(self): return self.__class__.get_privileged_connection() def __init__(self): self.enabled = False if self.engine is None: self.__class__.engine = LocalGuardianAuthorizationClient('/var/lib/univention-directory-manager-modules/guardian/')
[docs] @classmethod def clear_caches(cls): if cls.engine: cls.engine.reload() cls._user_roles_cache.clear()
@classmethod def _get_cached_actor(cls, lo): actor_dn = lo.binddn # FIXME: memory leak, use weakref.ref() ? actor = get_user(cls.get_privileged_connection(), actor_dn) if getattr(lo, 'actor_roles', None) is not None: return lambda: (actor, lo.actor_roles) return lambda: (actor, get_user_roles(actor)) # FIXME: don't cache actor roles as we don't know when to invalidate the cache. Roles of groups can be changed at any time. if cls._user_roles_cache.get(actor_dn) is None: cls._user_roles_cache[actor_dn] = (actor, get_user_roles(actor)) return lambda: cls._user_roles_cache[actor_dn] # @functools.lru_cache(maxsize=ROLE_CACHE_SIZE) def _get_cached_roles(self, lo, dn): user = get_user(lo, dn) if not user: return [] return get_user_roles(user)
[docs] def is_receive_allowed(self, obj, raise_exception=True): if not self.enabled: return True mod = _san_module(obj.module) actor, targets = self._get_actor_and_targets(obj.lo, obj) allowed = self._check_permissions( actor, targets, *self._get_extras({mod}), targeted_permissions_to_check=[f'udm:{mod}:read'], ) if not allowed: auth_log('read', actor, targets[0]) if raise_exception: raise permissionDenied() return allowed
[docs] def filter_object_properties(self, obj): return self.filter_search_results(obj.lo, [obj])[0]
[docs] def filter_search_results_dn(self, lo, results): if not self.enabled: return results # TODO: how could we realize filterting without receiving the object # TODO: skip authorization in get_object() ? # FIXME: remove this performance intensive search!!! results = [univention.admin.objects.get_object(lo, dn) for dn in results] results = [x for x in results if x is not None] # cn=admin and others is not a UDM object filtered = self.filter_search_results(lo, results) return [obj.dn for obj in filtered]
[docs] def filter_search_results_attrs(self, lo, results): if not self.enabled: return results targets = [] results_ext = [] for result in results: dn, attrs = result module = attrs['univentionObjectType'][0].decode('UTF-8') # cn=admin and others is not a UDM object mod = univention.admin.modules.get(module) mapping = mod.mapping props = {} for attr in list(attrs): prop = mapping.unmapName(attr) props[prop] = attrs[attr] target = { 'id': dn, 'roles': self._get_target_roles(module, dn), 'attributes': { 'dn': dn, 'id': dn, 'objectType': module, 'position': self.lo.parentDn(dn) or LDAP_BASE, 'properties': props, # 'options': ..., 'policies': None, 'uuid': None, }, } targets.append({'old_target': target, 'new_target': self._empty_target()}) results_ext.append(( module, dn, result, set(mod.property_descriptions), )) filtered = self._filter_search_results(lo, results_ext, targets) response = [] for result, module, readable_attributes in filtered: _, attrs = result for attr in list(attrs): prop = univention.admin.modules.get(module).mapping.unmapName(attr) if not self._is_readable(readable_attributes, module, prop): # FIXME: is module correct? attrs.pop(attr) response.append(result) return response
[docs] def filter_search_results(self, lo, results): if not self.enabled: return results targets = [ self._get_targets(lo, None, target_obj) for target_obj in results ] results_ext = [ (result.module, result.dn, result, set(result.descriptions)) for result in results ] filtered = self._filter_search_results(lo, results_ext, targets) response = [] for result, module, readable_attributes in filtered: for prop in list(result.info): if not self._is_readable(readable_attributes, module, prop): # TODO: remove from oldattr # FIXME: what if the object is open()ed afterwards? result.info.pop(prop) result.oldinfo.pop(prop, None) response.append(result) return response
def _filter_search_results(self, lo, results, targets): if not results: return results # FIXME: less error prone but allows side channel timing attacks actor = self._get_actor(lo) allowed, permissions_result = self._get_and_check_permissions( actor, targets, *self._get_extras({x[0] for x in results}), # general_permissions_to_check=[f'udm:{mod}:read'], # FIXME: no general permission can be granted, as the object type might differ ) if not permissions_result['actor_has_all_general_permissions']: auth_log('search', actor, {'id': 'multiple targets'}, general=allowed) return [] # raise permissionDenied() filtered = [] for i, (module, dn, result, all_properties) in enumerate(results): target_permissions = permissions_result['target_permissions'][i] assert target_permissions['target_id'] == dn, (target_permissions['target_id'], dn) # TODO: replace with UUID mod = _san_module(module) if not {f'udm:{mod}:read', f'udm:{mod}:search'} & target_permissions['permissions']: auth_log('search', actor, {'id': target_permissions['target_id']}) continue readable_attributes = self._get_readable_properties(target_permissions['permissions'], mod, all_properties) filtered.append((result, module, readable_attributes)) return filtered
[docs] def is_create_allowed(self, obj, raise_exception=True): if self.enabled: # is_create_allowed is called to early, so that we have to compute the LDAP DN obj.ready() # all required properties / DN identifying property must be set obj.dn = obj._ldap_dn() return self._is_write_action_allowed('create', obj, raise_exception=raise_exception)
[docs] def is_modify_allowed(self, obj, raise_exception=True): return self._is_write_action_allowed('modify', obj, raise_exception=raise_exception)
[docs] def is_rename_allowed(self, obj, raise_exception=True): return self._is_write_action_allowed('rename', obj, raise_exception=raise_exception)
[docs] def is_move_allowed(self, obj, dest, raise_exception=True): if not self.enabled: return True # FIXME: deepcopy is expensive import copy moved_obj = copy.deepcopy(obj) moved_obj.dn = dest mod = _san_module(obj.module) actor, targets = self._get_actor_and_targets(obj.lo, obj, moved_obj) if not self._check_permissions( actor, targets, *self._get_extras({mod}), targeted_permissions_to_check=[f'udm:{mod}:move'], ): auth_log('move', actor, targets[0]) if raise_exception: raise permissionDenied() return False return True
[docs] def is_remove_allowed(self, obj, raise_exception=True): if not self.enabled: return mod = _san_module(obj.module) actor, targets = self._get_actor_and_targets(obj.lo, obj, None) if not self._check_permissions( actor, targets, *self._get_extras({mod}), targeted_permissions_to_check=[f'udm:{mod}:remove'], ): auth_log('remove', actor, targets[0]) if raise_exception: raise permissionDenied() return False return True
[docs] def object_exists(self, obj): if not self.is_receive_allowed(obj, raise_exception=False): raise univention.admin.uexceptions.noObject(obj.dn)
[docs] def is_report_create_allowed(self, lo, module, report_type, raise_exception=True): if not self.enabled: return True mod = _san_module(module) actor = self._get_actor(lo) if not self._check_permissions( actor, [{ 'old_target': { 'id': report_type, 'roles': [], 'attributes': { 'objectType': module, # 'position': obj.lo.parentDn(obj.old_dn) or LDAP_BASE if old else obj.lo.parentDn(obj.dn) or LDAP_BASE, }, }, 'new_target': self._empty_target(), }], *self._get_extras({mod}), targeted_permissions_to_check=[f'udm:{mod}:report-create'], ): auth_log('report-create', actor, {}) if raise_exception: raise permissionDenied() return False return True
def _get_and_check_permissions(self, *args, **kwargs): result = self.engine.get_and_check_permissions(*args, **kwargs) if not kwargs.get('general_permissions_to_check'): result['actor_has_all_general_permissions'] = True if not kwargs.get('targeted_permissions_to_check'): result['actor_has_all_targeted_permissions'] = True return result['actor_has_all_general_permissions'] and result['actor_has_all_targeted_permissions'], result def _check_permissions(self, *args, **kwargs): result = self.engine.check_permissions(*args, **kwargs) if not kwargs.get('general_permissions_to_check'): result['actor_has_all_general_permissions'] = True if not kwargs.get('targeted_permissions_to_check'): result['actor_has_all_targeted_permissions'] = True return result['actor_has_all_general_permissions'] and result['actor_has_all_targeted_permissions'] def _is_write_action_allowed(self, action, obj, raise_exception=True): if not self.enabled: return mod = _san_module(obj.module) changed_properties = [ prop for prop in obj.descriptions if obj.has_property(prop) and obj.hasChanged(prop) ] actor, targets = self._get_actor_and_targets(obj.lo, obj, obj) allowed, permissions_result = self._get_and_check_permissions( actor, targets, *self._get_extras({mod}), targeted_permissions_to_check=[f'udm:{mod}:{action}'], ) writeable_attributes = self._get_writeable_properties( permissions_result['general_permissions'] | permissions_result['target_permissions'][0]['permissions'], mod, set(obj.descriptions), ) all_allowed = allowed and self._is_all_writeable(writeable_attributes, obj.module, changed_properties) if not all_allowed: auth_log(action, actor, targets[0], general=allowed, changed_properties=changed_properties) if raise_exception: raise permissionDenied() return all_allowed def _is_readable(self, readable_attributes, module, prop): return _san_property(prop) in readable_attributes def _is_writable(self, writeable_attributes, module, prop): return _san_property(prop) in writeable_attributes def _is_all_writeable(self, writeable_attributes, module, changed_props): return all(self._is_writable(writeable_attributes, module, prop) for prop in changed_props) def _get_readable_properties(self, permissions, module, all_properties): readable = {} unreadable = {} for mod, perms in self._parse_permissions(permissions).items(): for action in ('write', 'read'): readable.setdefault(mod, set()).update(perms.get(action, set())) for action in ('none', 'writeonly'): unreadable.setdefault(mod, set()).update(perms.get(action, set())) props = readable.get(module, set()) if '*' in props: props |= {_san_property(p) for p in all_properties} props -= {'*'} props -= unreadable.get(module, set()) return props def _get_writeable_properties(self, permissions, module, all_properties): writeable = {} unwriteable = {} for mod, perms in self._parse_permissions(permissions).items(): for action in ('write',): writeable.setdefault(mod, set()).update(perms.get(action, set())) for action in ('none', 'readonly'): unwriteable.setdefault(mod, set()).update(perms.get(action, set())) props = writeable.get(module, set()) if '*' in props: props |= {_san_property(p) for p in all_properties} props -= {'*'} props -= unwriteable.get(module, set()) return props def _parse_permissions(self, permissions): parsed = {} for permission in permissions: app_name, mod, perm = permission.split(':', 2) if app_name != 'udm' or not mod: continue action, _, prop = perm.partition('-property-') if not _: continue if action in ('write', 'writeonly', 'read', 'readonly', 'none'): parsed.setdefault(mod, {}).setdefault(action, set()).add(prop) return parsed def _get_targets(self, lo, old_target, new_target=None): return { 'old_target': self._get_target(old_target, old=True) if old_target is not None and old_target.exists() else self._empty_target(), 'new_target': self._get_target(new_target) if new_target is not None else self._empty_target(), } def _get_actor_and_targets(self, lo, old_target, new_target=None): return self._get_actor(lo), [self._get_targets(lo, old_target, new_target)] def _get_extras(self, modules): contexts = [] namespaces = [f'udm:{_san_module(mod)}' for mod in modules] extra_request_data = { 'ldap_base': LDAP_BASE, } return contexts, namespaces, extra_request_data def _get_actor(self, lo): actor, actor_roles = self._get_cached_actor(lo)() return { 'id': actor.dn, 'roles': actor_roles, 'attributes': self._get_representation(actor), } def _get_target(self, obj, old=False): return { 'id': obj.old_dn if old else obj.dn, 'roles': self._get_target_roles(obj.module, obj.old_dn), 'attributes': self._get_representation(obj, old), } def _get_target_roles(self, module, dn): if module != 'users/user': return [] return self._get_cached_roles(self.lo, dn) def _empty_target(self): return {'id': '', 'roles': [], 'attributes': {}} def _get_representation(self, obj, old=False): """Get a represenation of the object like UDM REST API would serve it""" return { 'dn': obj.old_dn if old else obj.dn, 'id': None, 'objectType': obj.module, 'position': obj.lo.parentDn(obj.old_dn) or LDAP_BASE if old else obj.lo.parentDn(obj.dn) or LDAP_BASE, 'properties': self._decode_properties(obj, obj.oldinfo) if old else self._decode_properties(obj, obj.info), 'options': self._decode_options(obj, obj.old_options) if old else self._decode_options(obj, obj.options), 'policies': None, 'uuid': None, } def _decode_properties(self, obj, props): return { key: univention.admin.types.TypeHint.detect(obj.descriptions[key], key).decode_json(value) for key, value in props.items() } def _decode_options(self, obj, options): mod = univention.admin.modules.get(obj.module) return { opt: opt in options for opt in getattr(mod, 'options', {}) if opt != 'default' }