Source code for univention.directory.reports.admin

#
# Univention Directory Reports
#  write an interpreted token structure to a file
#
# SPDX-FileCopyrightText: 2007-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

from html import escape

import univention.admin.mapping as ua_mapping
import univention.admin.modules as ua_modules
import univention.admin.objects as ua_objects
import univention.admin.uexceptions as ua_exceptions
import univention.admin.uldap as ua_ldap
import univention.debug as ud
from univention.config_registry import ConfigRegistry
from univention.directory.reports.filter import filter_get


__all__ = ['cache_object', 'connect', 'connected', 'get_object', 'identify', 'set_format']

_admin = None

TEX_ESCAPE = {
    '€': 'EUR',
    '"': "''",
    '\\': '\\textbackslash{}',
    '&': '\\&',
    '%': '\\%',
    '#': '\\#',
    '_': '\\_',
    '{': '\\{',
    '}': '\\}',
    '~': '\\textasciitilde{}',
    '^': '\\^{\\,}',
    '$': '\\$',
    '°': '$^{\\circ}$',
    '´': '',
}


def texClean(str):
    """
    Escape string for use in LaTeX.

    >>> texClean('Test')
    'Test'
    >>> texClean('"\\&%#_{}~^$')
    "''\\\\textbackslash{}\\\\&\\\\%\\\\#\\\\_\\\\{\\\\}\\\\textasciitilde{}\\\\^{\\\\,}\\\\$"
    >>> texClean('€°´')
    'EUR$^{\\\\circ}$'
    """
    esc = ''.join([TEX_ESCAPE.get(c, c) for c in str])
    # str is NOT unicode, so '€°´' are non-ASCII characters, which use multiple bytes. See Bug #16637
    esc = esc.replace('€', 'EUR')
    esc = esc.replace('°', '$^{\\circ}$')
    esc = esc.replace('´', '')
    return esc


class AdminConnection:

    def __init__(self, userdn=None, password=None, host='localhost', base=None, start_tls=2, access=None, format=None):
        self._cached = {}
        self._modules = {}
        self._policies = {}
        self._format = format
        self._bc = ConfigRegistry()
        self._bc.load()
        self.__reverse = {}
        if not base:
            self._base = self._bc['ldap/base']
        else:
            self._base = base
        self._position = ua_ldap.position(self._base)
        if access:
            self._access = access
        else:
            self._access = ua_ldap.access(host=host, base=self._base, binddn=userdn, bindpw=password, start_tls=start_tls)
        ua_modules.update()

    def __repr__(self):
        fmt = '%s(userdn=%r, password=%r, host=%r, base=%r, start_tls=%r, access=%r, format=%r)'
        val = (self.__class__.__name__, self._access.binddn, self._access.bindpw, self._access.host, self._access.base, self._access.start_tls, self._access, self._format)
        return fmt % val

    def cache_object(self, obj):
        return self.get_object(ua_objects.module(obj), obj.dn)

    def clear_cache(self):
        del self._cached
        self._cached = {}

    def get_object(self, module, dn):
        if dn in self.__reverse:  # this value has been escaped => use <self.__reverse> to unescape
            possible_real_DNs = set()
            for possible_real_DN_set in self.__reverse[dn].values():
                possible_real_DNs |= possible_real_DN_set  # collect every distinct possible value
            possible_real_DNs = tuple(possible_real_DNs)
            if not len(possible_real_DNs) == 1:
                raise ValueError('ambiguous DNs, cannot unescape %s (possibilities: %s)' % (repr(dn), repr(possible_real_DNs)))
            dn = possible_real_DNs[0]
        try:
            return self.get_object_real(module, dn)
        # FIXME: get rid of this ua_exceptions.wrongObjectType
        except (ua_exceptions.noObject, ua_exceptions.wrongObjectType):
            return None

    def get_object_real(self, module, dn):
        if dn in self._cached:
            return self._cached[dn]
        if isinstance(module, str):
            if module in self._modules:
                module = self._modules[module]
            else:
                name = module
                module = ua_modules.get(name)
                ua_modules.init(self._access, self._position, module)
                self._modules[name] = module
        elif module is None:
            module = self.identify(dn)
            if not module:
                return None
            ua_modules.init(self._access, self._position, module)
        new = ua_objects.get(module, None, self._access, position=self._position, dn=dn)
        # if the object is not valid it should be displayed as an empty object
        try:
            new.open()
        except Exception:
            # write the traceback in the logfile
            import traceback

            ud.debug(ud.ADMIN, ud.ERROR, 'The object %s could not be opened' % dn)
            ud.debug(ud.ADMIN, ud.ERROR, 'Traceback: %s' % (traceback.format_exc(),))
        for key, value in new.items():
            from univention.directory.reports.document import Document
            if self._format in (Document.TYPE_LATEX, Document.TYPE_RML):
                i, j = self.format_property(new.descriptions, key, value)
                new.info[i] = j
            else:
                new.info[key] = value

        self._get_policies(new)
        self._cached[dn] = new

        return new

    def identify(self, dn):
        res = self._access.authz_connection.search(base=dn, scope='base')
        if res:
            mods = ua_modules.identify(dn, res[0][1])
            if mods:
                return mods[0]
        return None

    # store the old value of every attribute (if it is a string) in <self.__reverse> to enable <get_object()> to reverse the escaping
    def format_property(self, props, oldkey, oldvalue):
        (newkey, newvalue) = self.format_property_real(props, oldkey, oldvalue)
        assert newkey == oldkey
        key = oldkey
        if isinstance(newvalue, list | tuple):  # multivalue => unpack
            for (newv, oldv) in zip(newvalue, oldvalue):
                if isinstance(oldv, str) and newv != oldv:  # only consider strings, because DNs are always strings
                    if newv not in self.__reverse:
                        self.__reverse[newv] = {}
                    oldvalues = self.__reverse[newv].get(key, set())
                    oldvalues.add(oldv)
                    self.__reverse[newv][key] = oldvalues
        else:
            if isinstance(oldvalue, str) and newvalue != oldvalue:  # only consider strings, because DNs are always strings
                if newvalue not in self.__reverse:
                    self.__reverse[newvalue] = {}
                oldvalues = self.__reverse[newvalue].get(key, set())
                oldvalues.add(oldvalue)
                self.__reverse[newvalue][key] = oldvalues
        return (key, newvalue)

    def format_property_real(self, props, key, value):
        prop = props.get(key, None)

        if not prop:
            return (key, value)
        else:
            if isinstance(value, list | tuple):
                result = []
                for v in value:
                    if isinstance(v, list | tuple):
                        for i in v:
                            result.append(self.escape(str(i)))
                    else:
                        result.append(self.escape(str(v)))
                value = result
            elif value:
                value = self.escape(value)
            filter = filter_get(prop.syntax)
            if filter:
                return filter(prop, key, value)

        return (key, value)

    def escape(self, value):
        from univention.directory.reports.document import Document
        if self._format == Document.TYPE_LATEX:
            return texClean(value)
        elif self._format == Document.TYPE_RML:
            return escape(value, quote=True)
        return value

    def _get_policies(self, obj):
        dict = {}
        policies = self._access.authz_connection.getPolicies(obj.dn)
        for policy_oc, attrs in policies.items():
            module_name = ua_objects.ocToType(policy_oc)
            module = ua_modules.get(module_name)
            if not module:
                continue
            for attr_name, value_dict in attrs.items():
                dict[attr_name] = value_dict['value']

            for key, value in ua_mapping.mapDict(module.mapping, dict).items():
                from univention.directory.reports.document import Document
                if self._format in (Document.TYPE_LATEX, Document.TYPE_RML):
                    i, j = self.format_property(module.property_descriptions, key, value)
                    obj.info[i] = j
                else:
                    obj.info[key] = value


[docs] def connect(userdn=None, password=None, host='localhost', base=None, start_tls=2, access=None): global _admin if _admin: return _admin = AdminConnection(userdn, password, host, base, start_tls, access)
[docs] def cache_object(obj): if not _admin: return None return _admin.cache_object(obj)
def clear_cache(): if not _admin: return _admin.clear_cache()
[docs] def get_object(module, dn): if not _admin: return None try: return _admin.get_object(module, dn) except ua_exceptions.ldapError: return None
[docs] def set_format(format): if _admin: _admin._format = format
[docs] def identify(dn): return _admin.identfy(dn)
[docs] def connected(): return _admin is not None
if __name__ == '__main__': import doctest doctest.testmod()