# SPDX-FileCopyrightText: 2018-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""En/Decoders for object properties."""
from __future__ import annotations
import datetime
import logging
import sys
import time
from typing import TYPE_CHECKING, Any
import lazy_object_proxy
import univention.admin.modules
from univention.admin.syntax import sambaGroupType
from univention.admin.uexceptions import valueInvalidSyntax
from .binary_props import Base64BinaryProperty, Base64Bzip2BinaryProperty
from .exceptions import NoObject, UnknownModuleType
from .udm import UDM
if TYPE_CHECKING:
from collections.abc import Callable
__dn_list_property_encoder_class_cache = {}
__dn_property_encoder_class_cache = {}
[docs]
class BaseEncoder:
static = False # whether to create an instance or use a class/static method
def __init__(self, property_name: str | None = None, *args: Any, **kwargs: Any) -> None:
self.property_name = property_name
def __repr__(self) -> str:
return f'{self.__class__.__name__}({self.property_name})'
[docs]
def encode(self, value: Any | None = None) -> Any | None:
raise NotImplementedError()
[docs]
def decode(self, value: Any | None = None) -> Any | None:
raise NotImplementedError()
[docs]
class Base64BinaryPropertyEncoder(BaseEncoder):
static = False
[docs]
def decode(self, value: str | None = None) -> Base64BinaryProperty | None:
if value:
return Base64BinaryProperty(self.property_name, value)
else:
return value
[docs]
def encode(self, value: Base64BinaryProperty | None = None) -> str | None:
if value:
if not isinstance(value, Base64BinaryProperty):
value = Base64BinaryProperty(self.property_name, raw_value=value)
return value.encoded
else:
return value
[docs]
class Base64Bzip2BinaryPropertyEncoder(BaseEncoder):
static = False
[docs]
def decode(self, value: str | None = None) -> Base64Bzip2BinaryProperty | None:
if value:
return Base64Bzip2BinaryProperty(self.property_name, value)
else:
return value
[docs]
def encode(self, value: Base64Bzip2BinaryProperty | None = None) -> str | None:
if value:
return value.encoded
else:
return value
[docs]
class DatePropertyEncoder(BaseEncoder):
static = True
[docs]
@staticmethod
def decode(value: str | None = None) -> datetime.date | None:
if value:
return datetime.date(*time.strptime(value, '%Y-%m-%d')[0:3])
else:
return value
[docs]
@staticmethod
def encode(value: datetime.date | None = None) -> str | None:
if value:
return value.strftime('%Y-%m-%d')
else:
return value
[docs]
class DisabledPropertyEncoder(BaseEncoder):
static = True
[docs]
@staticmethod
def decode(value: str | None = None) -> bool:
return value == '1'
[docs]
@staticmethod
def encode(value: bool | None = None) -> str:
return '1' if value else '0'
[docs]
class HomePostalAddressPropertyEncoder(BaseEncoder):
static = True
[docs]
@staticmethod
def decode(value: list[list[str]] | None = None) -> list[dict[str, str]] | None:
if value:
return [{'street': v[0], 'zipcode': v[1], 'city': v[2]} for v in value]
else:
return value
[docs]
@staticmethod
def encode(value: list[dict[str, str]] | None = None) -> list[list[str]] | None:
if value:
return [[v['street'], v['zipcode'], v['city']] for v in value]
else:
return value
[docs]
class ListOfListOflTextToDictPropertyEncoder(BaseEncoder):
static = True
[docs]
@staticmethod
def decode(value: list[list[str]] | None = None) -> dict[str, str] | None:
if value is None:
return value
return dict(value)
[docs]
@staticmethod
def encode(value: dict[str, str] | None = None) -> list[list[str]] | None:
if value:
return [[k, v] for k, v in value.items()]
else:
return value
[docs]
class MultiLanguageTextAppcenterPropertyEncoder(BaseEncoder):
static = True
[docs]
@staticmethod
def decode(value: list[str] | None = None) -> dict[str, str] | None:
if value:
res = {}
for s in value:
lang, txt = s.split(' ', 1)
lang = lang.strip('[]')
res[lang] = txt
return res
else:
return value
[docs]
@staticmethod
def encode(value: dict[str, str] | None = None) -> list[str] | None:
if value:
return [f'[{k}] {v}' for k, v in value.items()]
else:
return value
[docs]
class SambaGroupTypePropertyEncoder(BaseEncoder):
static = True
choices = dict(sambaGroupType.choices)
choices_reverted = {v: k for k, v in sambaGroupType.choices}
[docs]
@classmethod
def decode(cls, value: list[str] | None = None) -> str | None:
try:
return cls.choices[value]
except KeyError:
return value
[docs]
@classmethod
def encode(cls, value: str | None = None) -> list[str] | None:
try:
return cls.choices_reverted[value]
except KeyError:
return value
[docs]
class SambaLogonHoursPropertyEncoder(BaseEncoder):
static = True
_weekdays = ('Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat')
[docs]
@classmethod
def decode(cls, value: list[int] | None = None) -> list[str] | None:
if value:
return [f'{cls._weekdays[dow]} {hour}-{hour + 1}' for dow, hour in (divmod(v, 24) for v in value)]
else:
return value
[docs]
@classmethod
def encode(cls, value: list[str] | None = None) -> list[int] | None:
if value:
try:
values = [v.split() for v in value]
return [cls._weekdays.index(w) * 24 + int(h.split('-', 1)[0]) for w, h in values]
except (IndexError, ValueError):
raise valueInvalidSyntax('One or more entries in sambaLogonHours have invalid syntax.').with_traceback(sys.exc_info()[2])
else:
return value
[docs]
class StringCaseInsensitiveResultLowerBooleanPropertyEncoder(BaseEncoder):
static = True
result_case_func = 'lower'
false_string = 'false'
true_string = 'true'
[docs]
@classmethod
def decode(cls, value: str | None = '') -> bool:
return isinstance(value, str) and value.lower() == cls.true_string
[docs]
@classmethod
def encode(cls, value: bool | None = None) -> str:
assert cls.result_case_func in ('lower', 'upper')
if value:
return getattr(cls.true_string, cls.result_case_func)()
else:
return getattr(cls.false_string, cls.result_case_func)()
[docs]
class StringCaseInsensitiveResultUpperBooleanPropertyEncoder(StringCaseInsensitiveResultLowerBooleanPropertyEncoder):
result_case_func = 'upper'
[docs]
class StringIntBooleanPropertyEncoder(BaseEncoder):
static = True
[docs]
@staticmethod
def decode(value: str | None = None) -> bool:
return value == '1'
[docs]
@staticmethod
def encode(value: bool | None = None) -> str:
if value:
return '1'
else:
return '0'
[docs]
class StringIntPropertyEncoder(BaseEncoder):
static = False
[docs]
def decode(self, value: str | None = None) -> int | None:
if value in ('', None):
return None
else:
try:
return int(value)
except ValueError:
raise valueInvalidSyntax(f'Value of {self.property_name!r} must be an int (is {value!r}).').with_traceback(sys.exc_info()[2])
[docs]
@staticmethod
def encode(value: int | None = None) -> str | None:
if value is None:
return value
else:
return str(value)
[docs]
class StringListToList(BaseEncoder):
static = True
separator = ' '
[docs]
@classmethod
def decode(cls, value: str | None = None) -> list[str] | None:
if value:
return value.split(cls.separator)
else:
return value
[docs]
@classmethod
def encode(cls, value: list[str] | None = None) -> str | None:
if value:
return cls.separator.join(value)
else:
return value
[docs]
class DnListPropertyEncoder(BaseEncoder):
"""
Given a list of DNs, return the same list with an additional member
``objs``. ``objs`` is a lazy object that will become the list of UDM
objects the DNs refer to, when accessed.
:py:func:`dn_list_property_encoder_for()` will dynamically produce
subclasses of this for every UDM module required.
"""
static = False
udm_module_name = ''
[docs]
class DnsList(list):
# a list with an additional member variable
objs = None
def __deepcopy__(self, memodict=None):
return list(self)
[docs]
class MyProxy(lazy_object_proxy.Proxy):
# overwrite __repr__ for better navigation in ipython
def __repr__(self, __getattr__: Callable[[Any, str], Any] = object.__getattribute__) -> str:
return super(DnListPropertyEncoder.MyProxy, self).__str__()
def __init__(self, property_name: str | None = None, connection: Any | None = None, api_version: int | None = None, *args: Any, **kwargs: Any) -> None:
assert connection is not None, 'Argument "connection" must not be None.'
assert api_version is not None, 'Argument "api_version" must not be None.'
super().__init__(property_name, *args, **kwargs)
self._udm = UDM(connection, api_version)
def _list_of_dns_to_list_of_udm_objects(self, value):
udm_module = None
res = []
for dn in value:
try:
if self.udm_module_name == 'auto':
obj = self.udm.obj_by_dn(dn)
else:
if not udm_module:
udm_module = self.udm.get(self.udm_module_name)
obj = udm_module.get(dn)
except UnknownModuleType as exc:
logging.getLogger('ADMIN').warning('%s', exc)
except NoObject as exc:
logging.getLogger('ADMIN').warning('%s', exc)
else:
res.append(obj)
return res
[docs]
def decode(self, value: list[str] | None = None) -> list[str] | None:
if value is None:
value = []
assert hasattr(value, '__iter__'), f'Value is not iterable: {value!r}'
new_list = self.DnsList(value)
new_list.objs = self.MyProxy(lambda: self._list_of_dns_to_list_of_udm_objects(value))
return new_list
[docs]
@staticmethod
def encode(value: list[str] | None = None) -> list[str] | None:
try:
del value.objs
except AttributeError:
pass
return value
@property
def udm(self) -> Any:
return self._udm
[docs]
class PoliciesEncoder(BaseEncoder):
static = False
def __init__(self, property_name: str | None = None, connection: Any | None = None, api_version: int | None = None, module_name: str | None = None, *args: Any, **kwargs: Any) -> None:
assert connection is not None, 'Argument "connection" must not be None.'
assert api_version is not None, 'Argument "api_version" must not be None.'
super().__init__(property_name, *args, **kwargs)
self._udm = UDM(connection, api_version)
self.module_name = module_name
[docs]
def decode(self, value: Any | None = None) -> dict[Any, list[Any]]:
policies = {}
policy_modules = univention.admin.modules.policyTypes(self.module_name)
if not policy_modules and self._udm.get(self.module_name)._orig_udm_module.childs: # container, which allows every policy-type
policy_modules = [x for x in univention.admin.modules.modules if x.startswith('policies/') and x != 'policies/policy']
for policy_module in policy_modules:
policies.setdefault(policy_module, [])
for policy_dn in value or []:
policy_module = self._udm.obj_by_dn(policy_dn)._udm_module.name
if policy_module not in policies:
continue
policies[policy_module].append(policy_dn)
return policies
[docs]
def encode(self, value: dict[Any, list[Any]] | None = None) -> list[Any]:
if value:
return [y for x in value.values() for y in x]
else:
return []
[docs]
class CnameListPropertyEncoder(DnListPropertyEncoder):
"""
Given a list of CNAMEs, return the same list with an additional member
``objs``. ``objs`` is a lazy object that will become the list of UDM
objects the CNAMEs refer to, when accessed.
"""
udm_module_name = 'dns/alias'
def _list_of_dns_to_list_of_udm_objects(self, value):
udm_module = self.udm.get(self.udm_module_name)
return [list(udm_module.search(f'relativeDomainName={cname}'))[0] for cname in value] # noqa: RUF015
[docs]
class DnsEntryZoneAliasListPropertyEncoder(DnListPropertyEncoder):
"""
Given a list of dnsEntryZoneAlias entries, return the same list with an
additional member ``objs``. ``objs`` is a lazy object that will become
the list of UDM objects the dnsEntryZoneAlias entries refer to, when
accessed.
"""
udm_module_name = 'dns/alias'
def _list_of_dns_to_list_of_udm_objects(self, value):
udm_module = self.udm.get(self.udm_module_name)
return [udm_module.get(f'relativeDomainName={v[2]},{v[1]}') for v in value]
[docs]
class DnsEntryZoneForwardListMultiplePropertyEncoder(DnListPropertyEncoder):
"""
Given a list of dnsEntryZoneForward entries, return the same list with an
additional member ``objs``. ``objs`` is a lazy object that will become
the list of UDM objects the dnsEntryZoneForward entries refer to, when
accessed.
"""
udm_module_name = 'dns/forward_zone'
@staticmethod
def _itemgetter(value):
return value[0]
def _list_of_dns_to_list_of_udm_objects(self, value):
udm_module = self.udm.get(self.udm_module_name)
return [udm_module.get(self._itemgetter(v)) for v in value]
[docs]
class DnsEntryZoneForwardListSinglePropertyEncoder(DnsEntryZoneForwardListMultiplePropertyEncoder):
"""
Given a list of dnsEntryZoneForward entries, return the same list with an
additional member ``objs``. ``objs`` is a lazy object that will become
the list of UDM objects the dnsEntryZoneForward entries refer to, when
accessed.
"""
udm_module_name = 'dns/forward_zone'
@staticmethod
def _itemgetter(value):
return value
[docs]
class DnsEntryZoneReverseListMultiplePropertyEncoder(DnsEntryZoneForwardListMultiplePropertyEncoder):
"""
Given a list of dnsEntryZoneReverse entries, return the same list with an
additional member ``objs``. ``objs`` is a lazy object that will become
the list of UDM objects the dnsEntryZoneReverse entries refer to, when
accessed.
"""
udm_module_name = 'dns/reverse_zone'
@staticmethod
def _itemgetter(value):
return value[0]
[docs]
class DnsEntryZoneReverseListSinglePropertyEncoder(DnsEntryZoneReverseListMultiplePropertyEncoder):
"""
Given a list of dnsEntryZoneReverse entries, return the same list with an
additional member ``objs``. ``objs`` is a lazy object that will become
the list of UDM objects the dnsEntryZoneReverse entries refer to, when
accessed.
"""
udm_module_name = 'dns/reverse_zone'
@staticmethod
def _itemgetter(value):
return value
[docs]
class DnPropertyEncoder(BaseEncoder):
"""
Given a DN, return a string object with the DN and an additional member
``obj``. ``obj`` is a lazy object that will become the UDM object the DN
refers to, when accessed.
:py:func:`dn_property_encoder_for()` will dynamically produce subclasses of this for every UDM module required.
"""
static = False
udm_module_name = ''
[docs]
class DnStr(str): # noqa: SLOT000
# a string with an additional member variable
obj = None
def __deepcopy__(self, memodict=None):
return str(self)
[docs]
class MyProxy(lazy_object_proxy.Proxy):
# overwrite __repr__ for better navigation in ipython
def __repr__(self, __getattr__: Callable[[Any, str], Any] = object.__getattribute__) -> str:
return super(DnPropertyEncoder.MyProxy, self).__str__()
def __init__(self, property_name: str | None = None, connection: Any = None, api_version: int | None = None, *args: Any, **kwargs: Any) -> None:
assert connection is not None, 'Argument "connection" must not be None.'
assert api_version is not None, 'Argument "api_version" must not be None.'
super().__init__(property_name, *args, **kwargs)
self._udm = UDM(connection, api_version)
def _dn_to_udm_object(self, value: Any) -> Any | None:
try:
if self.udm_module_name == 'auto':
return self.udm.obj_by_dn(value)
else:
udm_module = self.udm.get(self.udm_module_name)
return udm_module.get(value)
except UnknownModuleType as exc:
logging.getLogger('ADMIN').error('%s', exc)
except NoObject as exc:
logging.getLogger('ADMIN').warning('%s', exc)
return None
[docs]
def decode(self, value: str | None = None) -> str | None:
if value in (None, ''):
return None
new_str = self.DnStr(value)
if value:
new_str.obj = self.MyProxy(lambda: self._dn_to_udm_object(value))
return new_str
[docs]
@staticmethod
def encode(value: str | None = None) -> str | None:
try:
del value.obj
except AttributeError:
pass
return value
@property
def udm(self) -> UDM:
return self._udm
def _classify_name(name: str) -> str:
mod_parts = name.split('/')
return ''.join(f'{mp[0].upper()}{mp[1:]}' for mp in mod_parts)
[docs]
def dn_list_property_encoder_for(udm_module_name: str) -> type[DnListPropertyEncoder]:
"""
Create a (cached) subclass of DnListPropertyEncoder specific for each UDM
module.
:param str udm_module_name:
name of UDM module (e.g. `users/user`) or
`auto` if auto-detection should be done. Auto-detection requires one
additional LDAP-query per object (still lazy though)!
:return: subclass of DnListPropertyEncoder
"""
if udm_module_name not in __dn_list_property_encoder_class_cache:
cls_name = f'DnListPropertyEncoder{_classify_name(udm_module_name)}'
specific_encoder_cls = type(cls_name, (DnListPropertyEncoder,), {})
specific_encoder_cls.udm_module_name = udm_module_name # type: ignore[attr-defined]
__dn_list_property_encoder_class_cache[udm_module_name] = specific_encoder_cls
return __dn_list_property_encoder_class_cache[udm_module_name]
[docs]
def dn_property_encoder_for(udm_module_name: str) -> type[DnPropertyEncoder]:
"""
Create a (cached) subclass of DnPropertyEncoder specific for each UDM
module.
:param str udm_module_name:
name of UDM module (e.g. `users/user`) or
`auto` if auto-detection should be done. Auto-detection requires one
additional LDAP-query per object (still lazy though)!
:return: subclass of DnPropertyEncoder
"""
if udm_module_name not in __dn_property_encoder_class_cache:
cls_name = f'DnPropertyEncoder{_classify_name(udm_module_name)}'
specific_encoder_cls = type(cls_name, (DnPropertyEncoder,), {})
specific_encoder_cls.udm_module_name = udm_module_name # type: ignore[attr-defined]
__dn_property_encoder_class_cache[udm_module_name] = specific_encoder_cls
return __dn_property_encoder_class_cache[udm_module_name]