Source code for univention.admin.rest.utils

#!/usr/bin/python3
#
# Univention Management Console
#  Univention Directory Manager Module
#
# SPDX-FileCopyrightText: 2017-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

import logging
import re
from urllib.parse import quote

import ldap.dn
from ldap.controls.readentry import PostReadControl
from tornado.web import HTTPError

import univention.admin.types as udm_types
from univention.config_registry import ucr
from univention.management.console.log import add_filter


RE_UUID = re.compile('[^A-Fa-f0-9-]')


[docs] def init_request_context_logging(request_context): structured = ucr.is_true('directory/manager/rest/debug/structured-logging', False) if not structured and not ucr.is_true('directory/manager/rest/debug/prefix-with-request-id', True): return add_filter(RequestContextFilter(request_context, structured))
[docs] class RequestContextFilter(logging.Filter): def __init__(self, request_context, structured_logging=False): self.request_context = request_context self.structured_logging = structured_logging
[docs] def filter(self, record): try: request_context = self.request_context.get() except LookupError: request_context = {} record.request_id = request_context.get('request_id', '-') if not self.structured_logging: record.prefix = f"[{(record.request_id or '')[:10]}] " # backwards compatibility if dn := request_context.get('requester_dn'): record.requester_dn = dn if ip := request_context.get('requester_ip'): record.requester_ip = ip if hostname := request_context.get('requester_hostname'): record.requester_hostname = hostname return True
[docs] def parse_content_type(content_type): return content_type.partition(';')[0].strip().lower()
[docs] class NotFound(HTTPError): def __init__(self, object_type=None, dn=None): super().__init__(404, None, '%r %r' % (object_type, dn or '')) # FIXME: create error message
[docs] def superordinate_names(module): superordinates = module.superordinate_names if set(superordinates) == {'settings/cn'}: return [] return superordinates
[docs] def decode_properties(module, obj, properties): for key, value in properties.items(): prop = module.get_property(key) codec = udm_types.TypeHint.detect(prop, key) yield key, codec.decode_json(value)
[docs] def encode_properties(module, obj, properties): for key, value in properties.items(): prop = module.get_property(key) codec = udm_types.TypeHint.detect(prop, key) yield key, codec.encode_json(value)
[docs] def quote_dn(dn): if isinstance(dn, str): dn = dn.encode('utf-8') # duplicated slashes in URI path's can be normalized to one slash. Therefore we need to escape the slashes. return quote(dn.replace(b'//', b',/=/,')) # .replace('/', quote('/', safe=''))
[docs] def unquote_dn(dn): # tornado already decoded it (UTF-8) return dn.replace(',/=/,', '//')
def _try(func, exceptions): def deco(*args, **kwargs): try: return func(*args, **kwargs) except exceptions: pass return deco def _map_try(values, func, exceptions): return filter(None, map(_try(func, exceptions), values)) def _map_normalized_dn(dns): return _map_try(dns, lambda dn: ldap.dn.dn2str(ldap.dn.str2dn(dn)), Exception) def _get_post_read_entry_uuid(response): for c in response.get('ctrls', []): if c.controlType == PostReadControl.controlType: uuid = c.entry['entryUUID'][0] if isinstance(uuid, bytes): # starting with python-ldap 4.0 uuid = uuid.decode('ASCII') return uuid