#!/usr/bin/python3
#
# Univention Management Console
# Univention Directory Manager Module
#
# SPDX-FileCopyrightText: 2017-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
# TODO: use pydantic
import base64
import binascii
import copy
import functools
import inspect
import ldap.dn
import univention.admin.syntax as udm_syntax
import univention.admin.types as udm_types
import univention.admin.uexceptions as udm_errors
from univention.admin.rest.utils import parse_content_type
from univention.config_registry import ucr
from univention.lib.i18n import Translation
from univention.management.console.error import UnprocessableEntity
from univention.management.console.modules.sanitizers import ( # noqa: F401
BooleanSanitizer, ChoicesSanitizer, DictSanitizer as UMCDictSanitizer, DNSanitizer, EmailSanitizer,
IntegerSanitizer, LDAPSearchSanitizer, ListSanitizer, MultiValidationError, Sanitizer, SearchSanitizer,
StringSanitizer, ValidationError,
)
_ = Translation('univention-directory-manager-rest').translate
[docs]
class Param:
def __init__(self, sanitizer, alias=None, description=None, deprecated=None, example=None, examples=None, style=None, explode=None):
self.sanitizer = sanitizer
self.alias = alias
self.description = description
self.deprecated = deprecated
self.example = example
self.examples = examples
self.style = style
self.explode = explode
[docs]
class Path(Param):
pass
[docs]
class Body(Param):
def __init__(self, sanitizer, content_type='application/json', **kwargs):
super().__init__(sanitizer, **kwargs)
self.content_type = content_type
[docs]
class Query(Param):
pass
[docs]
class Payload:
def __init__(self, content_type, sanitizer=None):
self.content_type = content_type
self.sanitizer = sanitizer or DictSanitizer
[docs]
def make_sanitizer(self, body_params):
return self.sanitizer()
[docs]
class JSONPayload(Payload):
def __init__(self, **kwargs):
super().__init__('application/json', DictSanitizer)
self.kwargs = kwargs
[docs]
def make_sanitizer(self, body_params):
kwargs = self.kwargs.copy()
kwargs.update({
param.alias or key: param.sanitizer if param.content_type == self.content_type else Sanitizer() for key, param in body_params.items()
})
return self.sanitizer(self.kwargs, required=True, further_arguments=['resource'], _copy_value=False)
[docs]
class Base64EncodingSanitizer(Sanitizer):
def _sanitize(self, value, name, further_args):
try:
return base64.b64decode(value)
except binascii.Error:
self.raise_validation_error(_('Needs to have proper base64 encoding.'))
[docs]
class PatchDocument(Payload):
def __init__(self, cls=None, **kwargs):
super().__init__('application/json-patch+json', cls or PatchDocumentSanitizer, **kwargs)
[docs]
class PatchRepresentation(PatchDocument):
def __init__(self, **kwargs):
super().__init__(PatchRepresentationSanitizer, **kwargs)
[docs]
def sanitize(method):
args = inspect.getfullargspec(method)
all_args = dict(zip(reversed(args.args), reversed(args.defaults)))
def _get_args(ptype):
return {
key: param
for key, param in all_args.items()
if isinstance(param, ptype)
}
query_params = _get_args(Query)
body_params = _get_args(Body)
payload_params = _get_args(Payload)
if body_params and payload_params:
raise RuntimeError('body and payload sanitizer cannot be combined.')
method.params = {'query': query_params} # for openapi
method.sanitizers = {}
if query_params:
query_sanitizers = {param.alias or key: param.sanitizer for key, param in query_params.items()}
method.sanitizers['query_string'] = QueryStringSanitizer(query_sanitizers, required=True, further_arguments=['resource'], _copy_value=False)
if body_params:
content_type_sanitizers = {param.content_type: DictSanitizer({name: param.sanitizer}, further_arguments=['resource'], _copy_value=False) for name, param in body_params.items()}
method.sanitizers['body_arguments'] = DictSanitizer(content_type_sanitizers, required=True, further_arguments=['resource'], _copy_value=False)
if payload_params:
content_type_sanitizers = {param.content_type: param.make_sanitizer(body_params) for name, param in payload_params.items()}
method.sanitizers['body_arguments'] = DictSanitizer(content_type_sanitizers, required=True, further_arguments=['resource'], _copy_value=False)
method.sanitizer = DictSanitizer(method.sanitizers, further_arguments=['resource'], _copy_value=False)
@functools.wraps(method)
async def decorator(self, *args, **params):
content_type = parse_content_type(self.request.headers.get('Content-Type', ''))
payload = {
'query_string': {k: [v.decode('UTF-8') for v in val] for k, val in self.request.query_arguments.items()} if self.request.query_arguments else {},
'body_arguments': {
'application/json': {},
'application/json-patch+patch': [],
'application/x-www-form-urlencoded': {},
'multipart/form-data': {},
},
}
payload['body_arguments'] = {content_type: self.request.body_arguments}
if 'body_arguments' in method.sanitizers:
for key, san in method.sanitizers['body_arguments'].sanitizers.items():
san.required = content_type == key
def _result_func(x):
if x.get('body_arguments', {}).get(content_type):
x['body_arguments'] = x['body_arguments'][content_type]
return x
arguments = self.sanitize_arguments(method.sanitizer, 'request.arguments', {'request.arguments': payload, 'resource': self}, _result_func=_result_func)
self.request.decoded_query_arguments = {
key: arguments['query_string'][param.alias or key]
for key, param in query_params.items()
}
body_arguments = arguments['body_arguments'][content_type]
self.request.body_arguments = {
key: None if param.content_type != content_type else body_arguments
for key, param in payload_params.items()
}
self.request.body_arguments.update({
key: None if param.content_type != content_type else body_arguments[param.alias or key]
for key, param in body_params.items()
})
return await method(self, *self.path_args, **self.path_kwargs, **self.request.decoded_query_arguments, **self.request.body_arguments)
return decorator
[docs]
class PatchDocumentSanitizer(ListSanitizer):
def __init__(self, *args, **kwargs):
super().__init__(DictSanitizer({
'op': ChoicesSanitizer(('add', 'remove', 'replace', 'copy', 'move', 'test'), required=True),
'path': StringSanitizer('^/.*', required=True),
'value': Sanitizer(),
}), *args, **kwargs)
def _sanitize(self, value, name, further_arguments):
return list(self.parse_patch_document(super()._sanitize(value, name, further_arguments), name))
[docs]
def parse_patch_document(self, value, name):
for operation in value:
op = operation['op']
path = [x.replace('~1', '/').replace('~0', '~') for x in operation['path'].split('/')[1:]]
value = operation.get('value')
if op in ('copy', 'move', 'test'):
continue # ignore, currently not needed
yield op, path, value
[docs]
class PatchRepresentationSanitizer(PatchDocumentSanitizer):
def _sanitize(self, value, name, further_arguments):
patch_document = super()._sanitize(value, name, further_arguments)
multi_error = MultiValidationError()
patch = []
# TODO: restrict length?
for op, path, value in patch_document:
if path[0] not in ('superordinate', 'options', 'properties', 'policies'):
continue
if op in ('test', 'move', 'copy'):
try:
self.raise_formatted_validation_error('Operation %(name)s unsupported.', path[0], value)
except ValidationError as exc:
multi_error.add_error(exc, op)
continue
if path[0] == 'properties':
if len(path) != 2:
try:
self.raise_formatted_validation_error('Invalid property name given: %(name)s', '.'.join(path), value)
except ValidationError as exc:
multi_error.add_error(exc, path[0])
continue
patch.append((path[0], path[1], op, value))
elif len(path) != 1:
try:
self.raise_formatted_validation_error('Invalid name given: %(name)s', '.'.join(path), value)
except ValidationError as exc:
multi_error.add_error(exc, path[0])
else:
patch.append((path[0], None, op, value))
if multi_error.has_errors():
raise multi_error
return patch
[docs]
class DictSanitizer(UMCDictSanitizer):
def __init__(self, sanitizers, allow_other_keys=True, **kwargs):
self.default_sanitizer = kwargs.get('default_sanitizer')
self.key_sanitizer = kwargs.get('key_sanitizer')
super().__init__(sanitizers, allow_other_keys=allow_other_keys, **kwargs)
def _sanitize(self, value, name, further_arguments):
if not isinstance(value, dict):
self.raise_formatted_validation_error(_('Not a "dict"'), name, type(value).__name__)
if not self.allow_other_keys and any(key not in self.sanitizers for key in value):
self.raise_validation_error(_('Has more than the allowed keys'))
altered_value = copy.deepcopy(value) if self._copy_value else value
multi_error = MultiValidationError()
for attr in set(value) | set(self.sanitizers):
sanitizer = self.sanitizers.get(attr, self.default_sanitizer)
try:
if self.key_sanitizer:
attr = self.key_sanitizer.sanitize(attr, {attr: attr})
if sanitizer:
altered_value[attr] = sanitizer.sanitize(attr, value)
except ValidationError as e:
multi_error.add_error(e, attr)
if multi_error.has_errors():
raise multi_error
return altered_value
[docs]
class QueryStringSanitizer(DictSanitizer):
def _sanitize(self, value, name, further_arguments):
if isinstance(value, dict):
for key, sanitizer in self.sanitizers.items():
if len(value.get(key, [])) == 1 and not isinstance(sanitizer, ListSanitizer):
value[key] = value[key][0]
elif isinstance(sanitizer, DictSanitizer):
value[key] = {k[len(key) + 1:-1]: v[0] for k, v in value.items() if k.startswith(key + '[') and k.endswith(']')}
# value[key] = QueryStringSanitizer(sanitizer.sanitizers).sanitize(key, {key: value[key]})
return super()._sanitize(value, name, further_arguments)
[docs]
class ObjectPropertySanitizer(StringSanitizer):
def __init__(self, **kwargs):
"""
A LDAP attribute name.
must at least be 1 character long.
This sanitizer prevents LDAP search filter injections in the attribute name.
TODO: in theory we should only allow existing searchable properties for the requested object type
"""
args = {
"minimum": 0,
"regex_pattern": r'^[\w\d\-;]*$',
}
args.update(kwargs)
StringSanitizer.__init__(self, **args)
[docs]
class PropertiesSanitizer(DictSanitizer):
def __init__(self, *args, **kwargs):
super().__init__({}, *args, default_sanitizer=PropertySanitizer(), **kwargs)
[docs]
def sanitize(self, properties, module, obj):
# TODO: add sanitizer for e.g. required properties (respect options!)
self.default_sanitizer._module = module
self.default_sanitizer._obj = obj
try:
return super().sanitize('properties', {'properties': properties})
finally:
self.default_sanitizer._module = None
self.default_sanitizer._obj = None
[docs]
class PropertySanitizer(Sanitizer):
def __init__(self, *args, **kwargs):
self._module = None
self._obj = None
super().__init__(*args, **kwargs)
def _sanitize(self, value, name, further_arguments):
property_obj = self._module.get_property(name)
if property_obj is None:
if name == 'objectFlag':
return value # not every object type has the extended attribute for objectFlag
self.raise_validation_error(_('The %(module)s module has no property %(name)s.'), module=self._module.title)
if not self._obj.has_property(name):
return value # value will not be set, so no validation is required
codec = udm_types.TypeHint.detect(property_obj, name)
try:
return codec.encode_json(value)
except udm_errors.valueError as exc:
exc.message = ''
self.raise_validation_error(_('The property %(name)s has an invalid value: %(details)s'), details=str(exc))
[docs]
class BoolSanitizer(ChoicesSanitizer):
def __init__(self, **kwargs):
super().__init__(choices=['1', 'on', 'true', 'false', '0', 'off', '', None, True, False], **kwargs)
def _sanitize(self, value, name, further_arguments):
return super()._sanitize(value, name, further_arguments) in ('1', 'on', 'true', True)
[docs]
class LDAPFilterSanitizer(StringSanitizer):
def _sanitize(self, value, name, further_arguments):
value = super()._sanitize(value, name, further_arguments)
try:
return udm_syntax.ldapFilter.parse(value)
except udm_errors.valueError as exc:
exc.message = ''
self.raise_validation_error(str(exc))
[docs]
class DNSanitizer(DNSanitizer):
base = ldap.dn.str2dn(ucr['ldap/base'].lower())
baselen = len(base)
base_internal = ldap.dn.str2dn('cn=internal'.lower())
base_internal_len = len(base_internal)
def __init__(self, **kwargs):
self.enforce_correct_ldap_base = kwargs.get('enforce_correct_ldap_base', True)
super().__init__(**kwargs)
def _sanitize(self, value, name, further_arguments):
value = super()._sanitize(value, name, further_arguments)
if value and self.enforce_correct_ldap_base:
dn = ldap.dn.str2dn(value.lower())
if dn[-self.baselen:] != self.base and dn[-self.base_internal_len:] != self.base_internal:
self.raise_validation_error(_('The ldap base is invalid. Use %(details)s.'), details=ldap.dn.dn2str(self.base))
return value
[docs]
class SanitizerBase:
[docs]
def sanitize_arguments(self, sanitizer, *args, **kwargs):
field = kwargs.pop('_fieldname', 'request.arguments')
result = kwargs.pop('_result_func', lambda x: x)
try:
try:
return sanitizer.sanitize(*args, **kwargs)
except MultiValidationError:
raise
except ValidationError as exc:
multi_error = MultiValidationError()
multi_error.add_error(exc, field)
raise multi_error
except MultiValidationError as e:
raise UnprocessableEntity(str(e), result=result(e.result()))
[docs]
def raise_sanitization_errors(self, errors):
multi_error = MultiValidationError()
for field, message in errors:
property_name = field[-1]
try:
self.raise_sanitization_error(field, message)
except UnprocessableEntity as exc:
print(exc.result)
multi_error.add_error(ValidationError(message, property_name, None), property_name)
self.raise_sanitization_multi_error(multi_error)
[docs]
def raise_sanitization_multi_error(self, multi_error, field='properties', type='body'):
if multi_error.has_errors():
class FalseSanitizer(Sanitizer):
def sanitize(self):
raise multi_error
self.sanitize_arguments(FalseSanitizer(), _result_func=lambda x: {type: {field: x}}, _fieldname=field)
[docs]
def raise_sanitization_error(self, field, message, type='body'):
fields = field if isinstance(field, list | tuple) else (field,)
field = fields[-1]
def _result(x):
error = {type: {}}
err = error[type]
for f in fields:
if f == field:
break
err[f] = {}
err = err[f]
err.update(x)
return error
class FalseSanitizer(Sanitizer):
def sanitize(self):
self.raise_formatted_validation_error('%(message)s', field, None, message=message)
self.sanitize_arguments(FalseSanitizer(), _result_func=_result, _fieldname=field)