Source code for univention.management.console.message

#!/usr/bin/python3
#
# Univention Management Console
#
# SPDX-FileCopyrightText: 2006-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""
A backwards compatible layer to wrap HTTP request and response messages.
The API of the Python objects representing the messages are based on the class :class:`.Message`.
"""

import mimetypes
import sys
import time
import weakref
from typing import Any

import ldap

import univention.admin.uexceptions as udm_errors
from univention.management.console.error import PasswordRequired
from univention.management.console.ldap import get_user_connection
from univention.management.console.log import CORE, PARSER, PROTOCOL


RequestType = int
UmcpBody = dict | str | bytes

MIMETYPE_JSON = 'application/json'

__all__ = ('Request', 'Response')


class _bind_user_connection:
    """wrapper to prevent reference leak"""

    def __init__(self, weakself):
        self.weakself = weakself

    def __call__(self, lo):
        return self.weakself().bind_user_connection(lo)

    def __hash__(self):
        return self.weakself().__hash__()


class Message:
    """
    Represents a wrapper for a HTTP message.

    :param type: message type (RESPONSE or REQUEST)
    :param str command: type of request (UPLOAD or COMMAND)
    :param str mime_type: defines the MIME type of the message body
    :param data: binary data that should contain a message
    :param arguments: the URL path which is requested.
    :param options: options passed to the command handler. This works for request messages with MIME type application/json only.
    """

    RESPONSE, REQUEST = range(2)
    __counter = 0

    def __init__(self, type: RequestType = REQUEST, command: str = '', mime_type: str = MIMETYPE_JSON, data: bytes | None = None, arguments: list[str] | None = None, options: dict[str, Any] | None = None) -> None:
        self.id: str | None = None
        if mime_type == MIMETYPE_JSON:
            self.body: UmcpBody = {}
        else:
            self.body = b''
        self.command = command
        self.arguments = arguments if arguments is not None else []
        self.mimetype = mime_type
        if mime_type == MIMETYPE_JSON:
            self.options = options if options is not None else {}
        self.cookies = {}
        self.headers = {}
        self.http_method = None

    @classmethod
    def generate_id(cls) -> str:
        # cut off 'L' for long
        generated_id = '%lu-%d' % (int(time.time() * 100000), Message.__counter)
        Message.__counter += 1
        return generated_id

    def _create_id(self) -> None:
        self.id = self.generate_id()

    def recreate_id(self) -> None:
        """Creates a new unique ID for the message"""
        self._create_id()

    # JSON body properties
    def _set_key(self, key, value, cast=None):
        if self.mimetype == MIMETYPE_JSON:
            if cast is not None:
                self.body[key] = cast(value)
            else:
                self.body[key] = value
        else:
            PARSER.warning('Attribute %s just available for MIME type %s', key, MIMETYPE_JSON)

    def _get_key(self, key, default=None):
        if isinstance(default, dict):
            default = default.copy()
        if self.mimetype == MIMETYPE_JSON:
            if isinstance(default, dict):
                self.body.setdefault(key, default)
            return self.body.get(key, default)
        else:
            PARSER.info('Attribute %s just available for MIME type %s', key, MIMETYPE_JSON)
            return default

    #: contains a human readable error message
    message = property(lambda self: self._get_key('message'), lambda self, value: self._set_key('message', value))

    #: contains error information
    error = property(lambda self: self._get_key('error'), lambda self, value: self._set_key('error', value))

    #: contains the data that represents the result of the request
    result = property(lambda self: self._get_key('result'), lambda self, value: self._set_key('result', value))

    #: contains the HTTP status code defining the success or failure of a request
    status = property(lambda self: self._get_key('status'), lambda self, value: self._set_key('status', value, int))

    #: contains the reason phrase for the status code
    reason = property(lambda self: self._get_key('reason'), lambda self, value: self._set_key('reason', value))

    #: defines options to pass on to the module command
    options = property(lambda self: self._get_key('options'), lambda self, value: self._set_key('options', value))

    #: flavor of the request
    flavor = property(lambda self: self._get_key('flavor'), lambda self, value: self._set_key('flavor', value))


[docs] class Request(Message): """Wraps a HTTP request message in a backwards compatible Python API format""" _user_connections = set() # prevent garbage collection def __init__(self, command: str, arguments: Any = None, options: Any = None, mime_type: str = MIMETYPE_JSON) -> None: Message.__init__(self, Message.REQUEST, command, arguments=arguments, options=options, mime_type=mime_type) self._create_id() self.username = None self.password = None self.user_dn = None self.auth_type = None self.locale = None self._request_handler = None self.roles = None self.federated_account = False
[docs] def require_password(self): if self.auth_type is not None: raise PasswordRequired()
[docs] def get_user_ldap_connection(self, no_cache=False, **kwargs): if not self.user_dn: return # local user (probably root) try: lo, _po = get_user_connection(bind=_bind_user_connection(weakref.ref(self)), bindhash=hash((self.auth_type, self.username, self.password)), write=kwargs.pop('write', False), follow_referral=True, no_cache=no_cache, **kwargs) if not no_cache: self._user_connections.add(lo) return lo except (ldap.LDAPError, udm_errors.base) as exc: CORE.warning('Failed to open LDAP connection for user %s: %s', self.user_dn, exc)
[docs] def bind_user_connection(self, lo): CORE.process('LDAP bind', dn=self.user_dn, username=self.username) try: if self.auth_type == 'OIDC': lo.lo.bind_oauthbearer(None, self.password) if not lo.lo.compare_dn(lo.binddn, self.user_dn): CORE.warning('OIDC binddn does not match: %r != %r', lo.binddn, self.user_dn) self.user_dn = lo.binddn elif self.auth_type == 'SAML': lo.lo.bind_saml(self.password) if not lo.lo.compare_dn(lo.binddn, self.user_dn): CORE.warning('SAML binddn does not match: %r != %r', lo.binddn, self.user_dn) self.user_dn = lo.binddn else: try: lo.lo.bind(self.user_dn, self.password) except ldap.INVALID_CREDENTIALS: # workaround for Bug #44382: the password might be a SAML message, try to authenticate via SAML etype, exc, etraceback = sys.exc_info() CORE.error('LDAP authentication for %r failed: %s', self.user_dn, exc) if len(self.password) < 25: raise CORE.warning('Trying to authenticate via SAML.') try: lo.lo.bind_saml(self.password) except ldap.OTHER: CORE.error('SAML authentication failed.') raise exc.with_traceback(etraceback) CORE.error('Wrong authentication type. Resetting.') self.auth_type = 'SAML' except ldap.INVALID_CREDENTIALS: etype, exc, etraceback = sys.exc_info() exc = etype('An error during LDAP authentication happened. Auth type: %s; SAML message length: %s; DN length: %s; Original Error: %s' % (self.auth_type, len(self.password or '') if len(self.password or '') > 25 else False, len(self.user_dn or ''), exc)) raise exc.with_traceback(etraceback)
[docs] class Response(Message): """ This class describes a response to a request from the console frontend to the console daemon """ def __init__(self, request: Request = None, data: Any = None, mime_type: str = MIMETYPE_JSON) -> None: Message.__init__(self, Message.RESPONSE, mime_type=mime_type) if request: self.id = request.id self.command = request.command self.arguments = request.arguments if request.mimetype == MIMETYPE_JSON: self.options = request.options elif data: self.parse(data) recreate_id = None
[docs] def set_body(self, filename: str, mimetype: str | None = None) -> None: """ Set body of response by guessing the mime type of the given file if not specified and adding the content of the file to the body. The mime type is guessed using the extension of the filename. """ if mimetype is None: self.mimetype, _encoding = mimetypes.guess_type(filename) else: self.mimetype = mimetype if self.mimetype is None: PROTOCOL.process('Failed to guess MIME type of %s', filename) raise TypeError('Unknown mime type') with open(filename, 'rb') as fd: # FIXME: should check size first self.body = fd.read()