Source code for univention.admin.mapping

# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""Functions to map between |UDM| properties and |LDAP| attributes."""

from __future__ import annotations

import base64
import inspect
from typing import TYPE_CHECKING, TypeVar

import univention.admin.uexceptions
from univention.admin import localization


if TYPE_CHECKING:
    from collections.abc import Callable, Iterable
    from typing import Any

from univention.admin.log import log


translation = localization.translation('univention/admin')

_ = translation.translate

_E = TypeVar('_E')
_Encoding = tuple[str, ...]


[docs] def MapToBytes(udm_value: list[str] | tuple[str, ...] | str, encoding: _Encoding = ()) -> list[bytes] | bytes: if isinstance(udm_value, list | tuple): return [MapToBytes(udm_val, encoding=encoding) for udm_val in udm_value] return str(udm_value).encode(*encoding)
[docs] def UnmapToUnicode(ldap_value: list[bytes] | tuple[bytes, ...] | bytes, encoding: _Encoding = ()) -> list[str] | str: if isinstance(ldap_value, list | tuple): return [UnmapToUnicode(ldap_val, encoding=encoding) for ldap_val in ldap_value] return ldap_value.decode(*encoding)
[docs] def DaysToSeconds(days: str) -> str: """ Convert number of days to seconds. :param day: the number of days. :returns: the number of seconds. >>> DaysToSeconds('1') '86400' """ return str(int(days) * 24 * 60 * 60)
[docs] def SecondsToDays(seconds: str) -> str: """ Convert number of seconds to number of complete days. :param seconds: 1-tuple with the number of seconds. :returns: the number of complete days. >>> SecondsToDays(('86401',)) '1' """ return str((int(seconds[0])) // (60 * 60 * 24))
[docs] def StringToLower(string: str) -> str: """ Convert string to lower-case. :param string: a string. :returns: the lower-cased string. >>> StringToLower("Aa") 'aa' """ return string.lower()
[docs] def ListUniq(list: list[_E]) -> list[_E]: """ Return list of unique items. :param list: A list of elements. :returns: a list with duplicate elements removed. >>> ListUniq(['1', '1', '2']) ['1', '2'] """ result: list[_E] = [] if list: for element in list: if element not in result: result.append(element) return result
[docs] def ListToString(value: list[bytes], encoding: _Encoding = ()) -> str: """ Return first element from list. This is right mapping for single-valued properties, as |LDAP| always returns lists of values. :param list: A list of elements. :returns: the first element or the empty string. >>> ListToString([]) '' >>> ListToString([b'value']) 'value' """ if value: return UnmapToUnicode(value, encoding)[0] else: return ''
[docs] def ListToIntToString(list_: list[bytes]) -> str: """ Return first element from list if it is an integer. :param list: A list of elements. :returns: the first element or the empty string. >>> ListToIntToString([]) '' >>> ListToIntToString([b'x']) '' >>> ListToIntToString([b'1']) '1' """ if list_: try: return str(int(list_[0])) except (ValueError, TypeError): pass return ''
[docs] def ListToLowerString(list: list[bytes]) -> str: """ Return first element from list lower-cased. :param list: A list of elements. :returns: the first element lower-cased or the empty string. >>> ListToLowerString([]) '' >>> ListToLowerString([b'Value']) 'value' """ return StringToLower(ListToString(list))
[docs] def ListToLowerList(list: list[str]) -> list[str]: """ Return the list with all elements converted to lower-case. :param list: A list of elements. :returns: a list of the elemets converted to lower case. >>> ListToLowerList(['A', 'a']) ['a', 'a'] """ return [StringToLower(string) for string in list]
[docs] def ListToLowerListUniq(list: list[str]) -> list[str]: """ Return the list with all elements converted to lower-case and duplicates removed. :param list: A list of elements. :returns: a list of the elemets converted to lower case with duplicates removed. >>> ListToLowerListUniq(['A', 'a']) ['a'] """ return ListUniq(ListToLowerList(list))
[docs] def nothing(a: Any) -> None: """'Do nothing' mapping returning `None`."""
[docs] def IgnoreNone(value: str, encoding: _Encoding = ()) -> bytes | None: """ Return the value if it is not the string `None`. :param value: Some element(s). :returns: The element(s) if it is not `None`. >>> IgnoreNone('1') b'1' >>> IgnoreNone('None') """ if value != 'None': return value.encode(*encoding) return None # FIXME:
def _stringToInt(value: bytes | str) -> int: """ Try to convert string into integer. :param value: a srting. :returns: the integer value or `0`. >>> _stringToInt('1') 1 >>> _stringToInt('ucs') 0 """ try: return int(value) except (ValueError, TypeError): return 0
[docs] def unmapUNIX_TimeInterval(seconds: list[bytes] | tuple[bytes] | bytes) -> list[str]: """ Map number of seconds to a human understandable time interval. :param seconds: number of seconds :returns: a 2-tuple (value, unit) >>> unmapUNIX_TimeInterval(['0']) # doctest: +ALLOW_UNICODE ['0', 'days'] >>> unmapUNIX_TimeInterval(('1',)) # doctest: +ALLOW_UNICODE ['1', 'seconds'] >>> unmapUNIX_TimeInterval('60') # doctest: +ALLOW_UNICODE ['1', 'minutes'] >>> unmapUNIX_TimeInterval('3600') # doctest: +ALLOW_UNICODE ['1', 'hours'] >>> unmapUNIX_TimeInterval('86400') # doctest: +ALLOW_UNICODE ['1', 'days'] """ if isinstance(seconds, list | tuple): seconds = seconds[0] value = _stringToInt(seconds) unit = 'seconds' if value % 60 == 0: value //= 60 unit = 'minutes' if value % 60 == 0: value //= 60 unit = 'hours' if value % 24 == 0: value //= 24 unit = 'days' return [str(value), unit]
[docs] def mapUNIX_TimeInterval(value: list[str] | tuple[str] | tuple[str, str] | str) -> bytes: """ Unmap a human understandable time interval back to number of seconds. :param value: a 2-tuple (value, unit) :returns: the number of seconds. >>> mapUNIX_TimeInterval(0) b'0' >>> mapUNIX_TimeInterval([1, 'days']) b'86400' >>> mapUNIX_TimeInterval((1, 'hours')) b'3600' >>> mapUNIX_TimeInterval((1, 'minutes')) b'60' """ unit = 'seconds' if isinstance(value, tuple | list): if len(value) > 1: unit = value[1] value = value[0] val = _stringToInt(value) if unit == 'days': val *= 24 * 60 * 60 elif unit == 'hours': val *= 60 * 60 elif unit == 'minutes': val *= 60 return str(val).encode('ASCII')
[docs] def unmapBase64(value: list[bytes] | tuple[bytes, ...] | bytes) -> list[str] | str: """ Convert binary data (as found in |LDAP|) to Base64 encoded |UDM| property value(s). :param value: some binary data. :returns: the base64 encoded data or the empty string on errors. >>> unmapBase64([b'a']) 'YQ==' >>> unmapBase64([b'a', b'b']) ['YQ==', 'Yg=='] >>> unmapBase64([None]) '' """ if len(value) > 1: try: return [base64.b64encode(x).decode('ASCII') for x in value] except Exception as exc: log.error('failed unmapBase64', error=exc) else: try: return base64.b64encode(value[0]).decode('ASCII') except Exception as exc: log.error('failed unmapBase64', error=exc) return ''
[docs] def mapBase64(value: list[str] | str) -> list[bytes] | bytes: # @overload (list[str]) -> list[bytes] # @overload (str) -> bytes """ Convert Base64 encoded |UDM| property values to binary data (for storage in |LDAP|). :param value: some base64 encoded value. :returns: the decoded binary data. >>> mapBase64('*') '*' >>> mapBase64(['YQ==']) [b'a'] >>> mapBase64('YQ==') b'a' """ if value == '*': # special case for filter pattern '*' return value if isinstance(value, list): try: return [base64.b64decode(x) for x in value] except Exception as exc: log.error('failed mapBase64', error=exc) else: try: return base64.b64decode(value) except Exception as exc: log.error('failed mapBase64', error=exc) return ''
[docs] def BooleanListToString(list: list[bytes], encoding: _Encoding = ()) -> str: """ Convert |LDAP| boolean to |UDM|. :param list: list of |LDAP| attribute values. :returns: the empty string for `False` or otherwise the first element. >>> BooleanListToString([b'0']) '' >>> BooleanListToString([b'1']) '1' """ v = ListToString(list, encoding=encoding) if v == '0': return '' return v
[docs] def BooleanUnMap(value: str, encoding: _Encoding = ()) -> bytes: """ Convert |UDM| boolean to |LDAP|. :param list: One |LDAP| attribute values. :returns: the empty string for `False` or otherwise the first element. >>> BooleanUnMap('0') b'' >>> BooleanUnMap('1') b'1' """ if value == '0': return b'' return value.encode(*encoding)
[docs] class dontMap: """'Do nothing' mapping."""
[docs] class mapping: """Map |LDAP| attribute names and values to |UDM| property names and values and back.""" def __init__(self) -> None: self._map: dict[str, tuple[str, Callable[[Any], Any] | None]] = {} self._unmap: dict[str, tuple[str, Callable[[Any], Any] | None]] = {} self._unmap_func: dict[str, Callable[[Any], Any]] = {} self._map_encoding: dict[str, tuple[str, str]] = {} self._unmap_encoding: dict[str, tuple[str, str]] = {}
[docs] def register( self, map_name: str, unmap_name: str, map_value: Callable[[Any], Any] | None = None, unmap_value: Callable[[Any], Any] | None = None, encoding: str = 'UTF-8', encoding_errors: str = 'strict', ) -> None: """ Register a new mapping. :param map_name: |UDM| property name. :param unmap_name: |LDAP| attribute name. :param map_value: function to map |UDM| property values to |LDAP| attribute values. :param unmap_value: function to map |LDAP| attribute values to |UDM| property values. """ self._map[map_name] = (unmap_name, map_value) self._unmap[unmap_name] = (map_name, unmap_value) self._map_encoding[map_name] = (encoding, encoding_errors) self._unmap_encoding[unmap_name] = (encoding, encoding_errors)
[docs] def unregister(self, map_name: str, pop_unmap: bool = True) -> None: """ Remove a mapping |UDM| to |LDAP| (and also the reverse). :param map_name: |UDM| property name. :param pop_unmap: `False` prevents the removal of the mapping from |LDAP| to |UDM|, which the default `True` also does. """ # unregister(pop_unmap=False) is used by LDAP_Search syntax classes with viewonly=True. # See SimpleLdap._init_ldap_search(). unmap_name, _map_value = self._map.pop(map_name, ('', None)) self._map_encoding.pop(map_name, None) if pop_unmap: self._unmap.pop(unmap_name, None) self._unmap_encoding.pop(unmap_name, None)
[docs] def registerUnmapping( self, unmap_name: str, unmap_value: Callable[[Any], Any], encoding: str = 'UTF-8', encoding_errors: str = 'strict', ) -> None: """ Register a new unmapping from |LDAP| to |UDM|. :param unmap_name: |LDAP| attribute name. :param unmap_value: function to map |LDAP| attribute values to |UDM| property values. """ self._unmap_func[unmap_name] = unmap_value self._unmap_encoding[unmap_name] = (encoding, encoding_errors)
[docs] def mapName(self, map_name: str) -> str: """ Map |UDM| property name to |LDAP| attribute name. >>> map = mapping() >>> map.mapName('unknown') '' >>> map.register('udm', 'ldap') >>> map.mapName('udm') 'ldap' """ return self._map.get(map_name, [''])[0]
[docs] def unmapName(self, unmap_name: str) -> str: """ Map |LDAP| attribute name to |UDM| property name. >>> map = mapping() >>> map.unmapName('unknown') '' >>> map.register('udm', 'ldap') >>> map.unmapName('ldap') 'udm' """ return self._unmap.get(unmap_name, [''])[0]
[docs] def mapValue( self, map_name: str, value: Any, encoding_errors=None, ) -> bytes: """ Map |UDM| property value to |LDAP| attribute value. >>> map = mapping() >>> map.mapValue('unknown', None) #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... KeyError: >>> map.register('udm', 'ldap') >>> map.mapValue('udm', 'value') b'value' >>> map.register('udm', 'ldap', lambda udm: udm.lower().encode('utf-8'), None) >>> map.mapValue('udm', None) b'' >>> map.mapValue('udm', [0]) b'' >>> map.mapValue('udm', 'UDM') b'udm' >>> map.register('sambaLogonHours', 'ldap') >>> map.mapValue('sambaLogonHours', [0]) [b'0'] """ map_value = self._map[map_name][1] if not value: return b'' if not any(value) and map_name != 'sambaLogonHours': # sambaLogonHours might be [0], see Bug #33703 return b'' encoding, strictness = self._map_encoding.get(map_name, ('UTF-8', 'strict')) strictness = encoding_errors or strictness if not map_value: map_value = MapToBytes kwargs = {} if 'encoding' in inspect.getfullargspec(map_value).args: kwargs['encoding'] = (encoding, strictness) try: value = map_value(value, **kwargs) except UnicodeEncodeError: raise univention.admin.uexceptions.valueInvalidSyntax(_('Invalid encoding for %s') % (map_name,)) return value
[docs] def mapValueDecoded( self, map_name: str, value: Any, encoding_errors=None, ) -> Any: encoding, errors = self.getEncoding(map_name) errors = encoding_errors or errors value = self.mapValue(map_name, value, encoding_errors=errors) if isinstance(value, list | tuple): log.warning('mapValueDecoded returned a list. This is probably not wanted?', property=map_name) # TODO: debug via stack_info = True value = [val.decode(encoding, errors) for val in value] else: value = value.decode(encoding, errors) return value
[docs] def unmapValue(self, unmap_name: str, value: Any) -> Any: """ Map |LDAP| attribute value to |UDM| property value. >>> map = mapping() >>> map.unmapValue('unknown', None) #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... KeyError: >>> map.register('udm', 'ldap') >>> map.unmapValue('ldap', b'value') 'value' >>> map.register('udm', 'ldap', None, lambda ldap: ldap.decode('utf-8').upper()) >>> map.unmapValue('ldap', b'ldap') 'LDAP' """ unmap_value = self._unmap[unmap_name][1] if not unmap_value: unmap_value = UnmapToUnicode encoding, strictness = self._unmap_encoding.get(unmap_name, ('UTF-8', 'strict')) kwargs = {} if 'encoding' in inspect.getfullargspec(unmap_value).args: kwargs['encoding'] = (encoding, strictness) try: return unmap_value(value, **kwargs) except UnicodeDecodeError: raise univention.admin.uexceptions.valueInvalidSyntax(_('Invalid encoding for %s') % (unmap_name,))
[docs] def unmapValues(self, oldattr: dict[str, Any]) -> dict[str, Any]: """Unmaps |LDAP| attribute values to |UDM| property values.""" info = mapDict(self, oldattr) for key, func in self._unmap_func.items(): kwargs = {} if 'encoding' in inspect.getfullargspec(func).args: kwargs['encoding'] = self._unmap_encoding.get(key, ('UTF-8', 'strict')) info[key] = func(oldattr, **kwargs) return info
[docs] def shouldMap(self, map_name: str) -> bool: return not isinstance(self._map[map_name][1], dontMap)
[docs] def shouldUnmap(self, unmap_name: str) -> bool: return not isinstance(self._unmap[unmap_name][1], dontMap)
[docs] def getEncoding(self, map_name: str) -> _Encoding: return self._map_encoding.get(map_name, self._unmap_encoding.get(map_name, ()))
[docs] def mapCmp(mapping: mapping, key: str, old: Any, new: Any) -> bool: """ Compare old and new for equality (mapping back to LDAP value if possible). >>> map = mapping() >>> mapCmp(map, 'unknown', 'old', 'new') False >>> mapCmp(map, 'unknown', 'same', 'same') True >>> map.register('udm', 'ldap') >>> mapCmp(map, 'udm', 'old', 'new') False >>> mapCmp(map, 'udm', 'same', 'same') True >>> map.register('udm', 'ldap', lambda udm: udm.lower(), None) >>> mapCmp(map, 'udm', 'case', 'CASE') True """ try: _, f = mapping._map[key] if mapping.shouldMap(key) and f: return f(old) == f(new) return old == new except KeyError: return old == new
[docs] def mapDict(mapping: mapping, old: dict[str, Any]) -> dict[str, Any]: """ Convert dictionary mapping LDAP_attriute_name to LDAP_value to a (partial) dictionary mapping UDM_property_name to UDM_value. >>> map = mapping() >>> map.register('udm', 'ldap', None, lambda ldap: ldap.decode('utf-8').upper()) >>> mapDict(map, {'ldap': b'ldap', 'unknown': None}) {'udm': 'LDAP'} """ new = {} if old: for key, value in old.items(): try: if not mapping.shouldUnmap(key): continue k = mapping.unmapName(key) v = mapping.unmapValue(key, value) except KeyError: continue new[k] = v return new
[docs] def mapList(mapping: mapping, old: list[Any] | None) -> list[Any]: # UNUSED """ Convert list of LDAP attribute names to list of UDM property names. >>> map = mapping() >>> mapList(map, None) [] >>> mapList(map, ['unknown']) [''] >>> map.register('udm', 'ldap', None, None) >>> mapList(map, ['ldap', 'unknown']) ['udm', ''] """ new = [] if old: for i in old: try: k = mapping.unmapName(i) except KeyError: # BUG: never happens because unmapName() returns '' continue new.append(k) return new
[docs] def mapDiff(mapping: mapping, diff: Iterable[tuple[str, Any, Any]]) -> list[tuple[str, Any, Any]]: """ Convert mod-list of UDM property names/values to mod-list of LDAP attribute names/values. >>> map = mapping() >>> mapDiff(map, None) [] >>> mapDiff(map, [('unknown', None, None)]) [] >>> map.register('udm', 'ldap', lambda udm: udm.lower().encode('utf-8'), None) >>> mapDiff(map, [('udm', 'OLD', 'NEW')]) [('ldap', b'old', b'new')] >>> mapDiff(map, [('udm', 'case', 'CASE')]) [] """ ml = [] if diff: for key, oldvalue, newvalue in diff: try: if not mapping.shouldMap(key): continue k = mapping.mapName(key) ov = mapping.mapValue(key, oldvalue) nv = mapping.mapValue(key, newvalue) except KeyError: continue if k and ov != nv: ml.append((k, ov, nv)) return ml
[docs] def mapDiffAl(mapping: mapping, diff: Iterable[tuple[str, Any, Any]]) -> list[tuple[str, Any]]: # UNUSED """ Convert mod-list of UDM property names/values to add-list of LDAP attribute names/values. >>> map = mapping() >>> mapDiffAl(map, None) [] >>> mapDiffAl(map, [('unknown', None, None)]) [] >>> map.register('udm', 'ldap', lambda udm: udm.lower().encode('utf-8'), None) >>> mapDiffAl(map, [('udm', 'OLD', 'NEW'), ('unknown', None, None)]) [('ldap', b'new')] """ ml = [] if diff: for key, _oldvalue, newvalue in diff: try: if not mapping.shouldMap(key): continue k = mapping.mapName(key) nv = mapping.mapValue(key, newvalue) except KeyError: continue ml.append((k, nv)) return ml
[docs] def mapRewrite(filter: univention.admin.filter.expression, mapping: mapping) -> None: """ Re-write UDM property name/value in UDM filter expression to LDAP attribute name/value. >>> from argparse import Namespace >>> map = mapping() >>> f = Namespace(variable='unknown', value=None); mapRewrite(f, map); (f.variable, f.value) ('unknown', None) >>> map.register('udm', 'ldap', lambda udm: udm.lower().encode('utf-8'), None) >>> f = Namespace(variable='udm', value='UDM'); mapRewrite(f, map); (f.variable, f.value) ('ldap', b'udm') """ try: key = filter.variable if not mapping.shouldMap(key): return k = mapping.mapName(key) v = mapping.mapValueDecoded(key, filter.value, encoding_errors='ignore') except KeyError: return if k: filter.variable = k filter.value = v