# -*- coding: utf-8 -*-
#
# Copyright 2018-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention.
#
# This program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
En/Decoders for object properties.
"""
from __future__ import absolute_import, unicode_literals
import sys
import six
import datetime
import time
import lazy_object_proxy
from .binary_props import Base64BinaryProperty, Base64Bzip2BinaryProperty
from .udm import UDM
from .utils import UDebug
from .exceptions import NoObject, UnknownModuleType
from univention.admin.uexceptions import valueInvalidSyntax
from univention.admin.syntax import sambaGroupType
__dn_list_property_encoder_class_cache = {}
__dn_property_encoder_class_cache = {}
[docs]class BaseEncoder(object):
static = False # whether to create an instance or use a class/static method
def __init__(self, property_name=None, *args, **kwargs):
self.property_name = property_name
def __repr__(self):
return '{}({})'.format(self.__class__.__name__, self.property_name)
[docs] def encode(self, value=None):
raise NotImplementedError()
[docs] def decode(self, value=None):
raise NotImplementedError()
[docs]class Base64BinaryPropertyEncoder(BaseEncoder):
static = False
[docs] def decode(self, value=None):
if value:
return Base64BinaryProperty(self.property_name, value)
else:
return value
[docs] def encode(self, value=None):
if value:
if not isinstance(value, Base64BinaryProperty):
value = Base64BinaryProperty(self.property_name, raw_value=value)
return value.encoded
else:
return value
[docs]class Base64Bzip2BinaryPropertyEncoder(BaseEncoder):
static = False
[docs] def decode(self, value=None):
if value:
return Base64Bzip2BinaryProperty(self.property_name, value)
else:
return value
[docs] def encode(self, value=None):
if value:
return value.encoded
else:
return value
[docs]class DatePropertyEncoder(BaseEncoder):
static = True
[docs] @staticmethod
def decode(value=None):
if value:
return datetime.date(*time.strptime(value, '%Y-%m-%d')[0:3])
else:
return value
[docs] @staticmethod
def encode(value=None):
if value:
return value.strftime('%Y-%m-%d')
else:
return value
[docs]class DisabledPropertyEncoder(BaseEncoder):
static = True
[docs] @staticmethod
def decode(value=None):
return value == '1'
[docs] @staticmethod
def encode(value=None):
return '1' if value else '0'
[docs]class HomePostalAddressPropertyEncoder(BaseEncoder):
static = True
[docs] @staticmethod
def decode(value=None):
if value:
return [{'street': v[0], 'zipcode': v[1], 'city': v[2]} for v in value]
else:
return value
[docs] @staticmethod
def encode(value=None):
if value:
return [[v['street'], v['zipcode'], v['city']] for v in value]
else:
return value
[docs]class ListOfListOflTextToDictPropertyEncoder(BaseEncoder):
static = True
[docs] @staticmethod
def decode(value=None):
if value:
return dict(value)
else:
return value
[docs] @staticmethod
def encode(value=None):
if value:
return [[k, v] for k, v in value.items()]
else:
return value
[docs]class MultiLanguageTextAppcenterPropertyEncoder(BaseEncoder):
static = True
[docs] @staticmethod
def decode(value=None):
if value:
res = {}
for s in value:
lang, txt = s.split(' ', 1)
lang = lang.strip('[]')
res[lang] = txt
return res
else:
return value
[docs] @staticmethod
def encode(value=None):
if value:
return ['[{}] {}'.format(k, v) for k, v in value.items()]
else:
return value
[docs]class SambaGroupTypePropertyEncoder(BaseEncoder):
static = True
choices = dict(sambaGroupType.choices)
choices_reverted = {v: k for k, v in sambaGroupType.choices}
[docs] @classmethod
def decode(cls, value=None):
try:
return cls.choices[value]
except KeyError:
return value
[docs] @classmethod
def encode(cls, value=None):
try:
return cls.choices_reverted[value]
except KeyError:
return value
[docs]class SambaLogonHoursPropertyEncoder(BaseEncoder):
static = True
_weekdays = ('Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat')
[docs] @classmethod
def decode(cls, value=None):
if value:
return ['{} {}-{}'.format(cls._weekdays[v / 24], v % 24, v % 24 + 1) for v in value]
else:
return value
[docs] @classmethod
def encode(cls, value=None):
if value:
try:
values = [v.split() for v in value]
return [cls._weekdays.index(w) * 24 + int(h.split('-', 1)[0]) for w, h in values]
except (IndexError, ValueError):
six.reraise(valueInvalidSyntax, valueInvalidSyntax('One or more entries in sambaLogonHours have invalid syntax.'), sys.exc_info()[2])
else:
return value
[docs]class StringCaseInsensitiveResultLowerBooleanPropertyEncoder(BaseEncoder):
static = True
result_case_func = 'lower'
false_string = 'false'
true_string = 'true'
[docs] @classmethod
def decode(cls, value=''):
return isinstance(value, six.string_types) and value.lower() == cls.true_string
[docs] @classmethod
def encode(cls, value=None):
assert cls.result_case_func in ('lower', 'upper')
if value:
return getattr(cls.true_string, cls.result_case_func)()
else:
return getattr(cls.false_string, cls.result_case_func)()
[docs]class StringCaseInsensitiveResultUpperBooleanPropertyEncoder(StringCaseInsensitiveResultLowerBooleanPropertyEncoder):
result_case_func = 'upper'
[docs]class StringIntBooleanPropertyEncoder(BaseEncoder):
static = True
[docs] @staticmethod
def decode(value=None):
return value == '1'
[docs] @staticmethod
def encode(value=None):
if value:
return '1'
else:
return '0'
[docs]class StringIntPropertyEncoder(BaseEncoder):
static = False
[docs] def decode(self, value=None):
if value in ('', None):
return None
else:
try:
return int(value)
except ValueError:
six.reraise(valueInvalidSyntax, valueInvalidSyntax('Value of {!r} must be an int (is {!r}).'.format(self.property_name, value)), sys.exc_info()[2])
[docs] @staticmethod
def encode(value=None):
if value is None:
return value
else:
return str(value)
[docs]class StringListToList(BaseEncoder):
static = True
separator = ' '
[docs] @classmethod
def decode(cls, value=None):
if value:
return value.split(cls.separator)
else:
return value
[docs] @classmethod
def encode(cls, value=None):
if value:
return cls.separator.join(value)
else:
return value
[docs]class DnListPropertyEncoder(BaseEncoder):
"""
Given a list of DNs, return the same list with an additional member
``objs``. ``objs`` is a lazy object that will become the list of UDM
objects the DNs refer to, when accessed.
:py:func:`dn_list_property_encoder_for()` will dynamically produce
subclasses of this for every UDM module required.
"""
static = False
udm_module_name = ''
[docs] class DnsList(list):
# a list with an additional member variable
objs = None
def __deepcopy__(self, memodict=None):
return list(self)
[docs] class MyProxy(lazy_object_proxy.Proxy):
# overwrite __repr__ for better navigation in ipython
def __repr__(self, __getattr__=object.__getattribute__):
return super(DnListPropertyEncoder.MyProxy, self).__str__()
def __init__(self, property_name=None, connection=None, api_version=None, *args, **kwargs):
assert connection is not None, 'Argument "connection" must not be None.'
assert api_version is not None, 'Argument "api_version" must not be None.'
super(DnListPropertyEncoder, self).__init__(property_name, *args, **kwargs)
self._udm = UDM(connection, api_version)
def _list_of_dns_to_list_of_udm_objects(self, value):
udm_module = None
res = []
for dn in value:
try:
if self.udm_module_name == 'auto':
obj = self.udm.obj_by_dn(dn)
else:
if not udm_module:
udm_module = self.udm.get(self.udm_module_name)
obj = udm_module.get(dn)
except UnknownModuleType as exc:
UDebug.warn(str(exc))
except NoObject as exc:
UDebug.warn(str(exc))
else:
res.append(obj)
return res
[docs] def decode(self, value=None):
if value is None:
value = []
assert hasattr(value, '__iter__'), 'Value is not iterable: {!r}'.format(value)
new_list = self.DnsList(value)
new_list.objs = self.MyProxy(lambda: self._list_of_dns_to_list_of_udm_objects(value))
return new_list
[docs] @staticmethod
def encode(value=None):
try:
del value.objs
except AttributeError:
pass
return value
@property
def udm(self):
return self._udm
[docs]class CnameListPropertyEncoder(DnListPropertyEncoder):
"""
Given a list of CNAMEs, return the same list with an additional member
``objs``. ``objs`` is a lazy object that will become the list of UDM
objects the CNAMEs refer to, when accessed.
"""
udm_module_name = 'dns/alias'
def _list_of_dns_to_list_of_udm_objects(self, value):
udm_module = self.udm.get(self.udm_module_name)
return [list(udm_module.search('relativeDomainName={}'.format(cname)))[0] for cname in value]
[docs]class DnsEntryZoneAliasListPropertyEncoder(DnListPropertyEncoder):
"""
Given a list of dnsEntryZoneAlias entries, return the same list with an
additional member ``objs``. ``objs`` is a lazy object that will become
the list of UDM objects the dnsEntryZoneAlias entries refer to, when
accessed.
"""
udm_module_name = 'dns/alias'
def _list_of_dns_to_list_of_udm_objects(self, value):
udm_module = self.udm.get(self.udm_module_name)
return [udm_module.get('relativeDomainName={},{}'.format(v[2], v[1])) for v in value]
[docs]class DnsEntryZoneForwardListMultiplePropertyEncoder(DnListPropertyEncoder):
"""
Given a list of dnsEntryZoneForward entries, return the same list with an
additional member ``objs``. ``objs`` is a lazy object that will become
the list of UDM objects the dnsEntryZoneForward entries refer to, when
accessed.
"""
udm_module_name = 'dns/forward_zone'
@staticmethod
def _itemgetter(value):
return value[0]
def _list_of_dns_to_list_of_udm_objects(self, value):
udm_module = self.udm.get(self.udm_module_name)
return [udm_module.get(self._itemgetter(v)) for v in value]
[docs]class DnsEntryZoneForwardListSinglePropertyEncoder(DnsEntryZoneForwardListMultiplePropertyEncoder):
"""
Given a list of dnsEntryZoneForward entries, return the same list with an
additional member ``objs``. ``objs`` is a lazy object that will become
the list of UDM objects the dnsEntryZoneForward entries refer to, when
accessed.
"""
udm_module_name = 'dns/forward_zone'
@staticmethod
def _itemgetter(value):
return value
[docs]class DnsEntryZoneReverseListMultiplePropertyEncoder(DnsEntryZoneForwardListMultiplePropertyEncoder):
"""
Given a list of dnsEntryZoneReverse entries, return the same list with an
additional member ``objs``. ``objs`` is a lazy object that will become
the list of UDM objects the dnsEntryZoneReverse entries refer to, when
accessed.
"""
udm_module_name = 'dns/reverse_zone'
@staticmethod
def _itemgetter(value):
return value[0]
[docs]class DnsEntryZoneReverseListSinglePropertyEncoder(DnsEntryZoneReverseListMultiplePropertyEncoder):
"""
Given a list of dnsEntryZoneReverse entries, return the same list with an
additional member ``objs``. ``objs`` is a lazy object that will become
the list of UDM objects the dnsEntryZoneReverse entries refer to, when
accessed.
"""
udm_module_name = 'dns/reverse_zone'
@staticmethod
def _itemgetter(value):
return value
[docs]class DnPropertyEncoder(BaseEncoder):
"""
Given a DN, return a string object with the DN and an additional member
``obj``. ``obj`` is a lazy object that will become the UDM object the DN
refers to, when accessed.
:py:func:`dn_property_encoder_for()` will dynamically produce
subclasses of this for every UDM module required.
"""
static = False
udm_module_name = ''
[docs] class DnStr(str):
# a string with an additional member variable
obj = None
def __deepcopy__(self, memodict=None):
return str(self)
[docs] class MyProxy(lazy_object_proxy.Proxy):
# overwrite __repr__ for better navigation in ipython
def __repr__(self, __getattr__=object.__getattribute__):
return super(DnPropertyEncoder.MyProxy, self).__str__()
def __init__(self, property_name=None, connection=None, api_version=None, *args, **kwargs):
assert connection is not None, 'Argument "connection" must not be None.'
assert api_version is not None, 'Argument "api_version" must not be None.'
super(DnPropertyEncoder, self).__init__(property_name, *args, **kwargs)
self._udm = UDM(connection, api_version)
def _dn_to_udm_object(self, value):
try:
if self.udm_module_name == 'auto':
return self.udm.obj_by_dn(value)
else:
udm_module = self.udm.get(self.udm_module_name)
return udm_module.get(value)
except UnknownModuleType as exc:
UDebug.error(str(exc))
except NoObject as exc:
UDebug.warn(str(exc))
return None
[docs] def decode(self, value=None):
if value in (None, ''):
return None
new_str = self.DnStr(value)
if value:
new_str.obj = self.MyProxy(lambda: self._dn_to_udm_object(value))
return new_str
[docs] @staticmethod
def encode(value=None):
try:
del value.obj
except AttributeError:
pass
return value
@property
def udm(self):
return self._udm
def _classify_name(name):
mod_parts = name.split('/')
return ''.join('{}{}'.format(mp[0].upper(), mp[1:]) for mp in mod_parts)
[docs]def dn_list_property_encoder_for(udm_module_name):
"""
Create a (cached) subclass of DnListPropertyEncoder specific for each UDM
module.
:param str udm_module_name: name of UDM module (e.g. `users/user`) or
`auto` if auto-detection should be done. Auto-detection requires one
additional LDAP-query per object (still lazy though)!
:return: subclass of DnListPropertyEncoder
:rtype: type(DnListPropertyEncoder)
"""
if udm_module_name not in __dn_list_property_encoder_class_cache:
cls_name = str('DnListPropertyEncoder{}').format(_classify_name(udm_module_name))
specific_encoder_cls = type(cls_name, (DnListPropertyEncoder,), {})
specific_encoder_cls.udm_module_name = udm_module_name
__dn_list_property_encoder_class_cache[udm_module_name] = specific_encoder_cls
return __dn_list_property_encoder_class_cache[udm_module_name]
[docs]def dn_property_encoder_for(udm_module_name):
"""
Create a (cached) subclass of DnPropertyEncoder specific for each UDM
module.
:param str udm_module_name: name of UDM module (e.g. `users/user`) or
`auto` if auto-detection should be done. Auto-detection requires one
additional LDAP-query per object (still lazy though)!
:return: subclass of DnPropertyEncoder
:rtype: type(DnPropertyEncoder)
"""
if udm_module_name not in __dn_property_encoder_class_cache:
cls_name = str('DnPropertyEncoder{}').format(_classify_name(udm_module_name))
specific_encoder_cls = type(cls_name, (DnPropertyEncoder,), {})
specific_encoder_cls.udm_module_name = udm_module_name
__dn_property_encoder_class_cache[udm_module_name] = specific_encoder_cls
return __dn_property_encoder_class_cache[udm_module_name]