#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Management Console
# Sanitizer Classes used in decorator
#
# Copyright 2012-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
Sanitize classes for the sanitize decorator
===========================================
This module provides the Sanitize base class as well as some important
and often used Sanitizers. They are used in the
:class:`~univention.management.console.modules.decorators.sanitize` function.
If the provided classes do not meet your requirements you can easily
make one yourself.
The main job of sanitizers is to alter values if needed so that they
cannot do something harmful in the exposed UMC-functions. But they are also
very helpful when one needs to just validate input.
"""
import re
import six
import copy
import ldap.filter
from univention.lib.i18n import Translation
_ = Translation('univention.management.console').translate
try:
from typing import Any, Dict, Iterable, Mapping, NoReturn, Optional, Pattern, Union, TypeVar # noqa: F401
except ImportError:
pass
[docs]class ValidationError(Exception):
"""
Error raised when the sanitizer finds a value he cannot use at all
(e.g. letters when an int is expected).
"""
def __init__(self, msg, name, value):
# type: (str, str, Any) -> None
self.msg = msg
self.name = name
self.value = value
def __str__(self):
# type: () -> str
return self.msg
[docs] def number_of_errors(self):
# type: () -> int
'''1...'''
return 1
[docs] def result(self):
# type: () -> Any
'''Returns the message'''
# return {'name' : self.name, 'value' : self.value, 'msg' : self.msg}
return self.msg
[docs]class MultiValidationError(ValidationError):
"""
Error used for validation of an arbitrary number of sanitizers.
Used by :class:`~DictSanitizer` and :class:`~ListSanitizer`.
"""
def __init__(self):
# type: () -> None
self.validation_errors = {} # type: Dict[Union[int, str], ValidationError]
[docs] def add_error(self, e, name):
# type: (ValidationError, Union[int, str]) -> None
'''Adds a :class:`ValidationError`'''
self.validation_errors[name] = e
[docs] def number_of_errors(self):
# type: () -> int
'''Cumulative number of errors found'''
num = 0
for k, v in self.validation_errors.items():
num += v.number_of_errors()
return num
def __str__(self):
# type: () -> str
return _('%d error(s) occurred') % self.number_of_errors()
[docs] def has_errors(self):
# type: () -> bool
'''Found any errors'''
return bool(self.validation_errors)
[docs] def result(self):
# type: () -> Dict[Union[int, str], str]
'''Returns a errors in a similar way like the arguments were passed
to the sanitizers.'''
return dict([(name, e.result()) for name, e in self.validation_errors.items()])
[docs]class Sanitizer(object):
r'''
Base class of all sanitizers.
For reasons of extensibility and for ease of subclassing, the
parameters are \**kwargs. But only the following are meaningful:
:param str further_arguments: names of arguments that should be
passed along with the actual argument in order to return something
reasonable. Default: *None*
:param bool required: if the argument is required. Default: *False*
:param object default: if argument is not given and not
:attr:`~Sanitizer.required`, default is returned - even when not
:attr:`~Sanitizer.may_change_value`. Note that this value is not
passing the sanitizing procedure, so make sure to be able to handle
it. Default: *None*
:param bool may_change_value: if the process of sanitizing is allowed
to alter *request.options*. If not, the sanitizer can still be used
for validation. Default: *True*
:param bool allow_none: if None is allowed and not further validated.
Default: *False*
'''
def __init__(self, **kwargs):
self.further_arguments = kwargs.get('further_arguments', None)
self.required = kwargs.get('required', False)
self.default = kwargs.get('default', None)
self.may_change_value = kwargs.get('may_change_value', True)
self.allow_none = kwargs.get('allow_none', False)
[docs] def sanitize(self, name, options):
# type: (str, Mapping[str, object]) -> Any
'''Sanitize function. Internally calls _sanitize with the
correct values and returns the new value (together with a flag
indicating whether the value was found at all).
If you write your own Sanitize class, you probably want to
override :meth:`~Sanitizer._sanitize`.
.. document private functions
.. automethod:: _sanitize
'''
if name not in options:
if self.required:
self.raise_formatted_validation_error(_('Argument required'), name, None)
else:
return self.default
value = options[name]
if value is None and self.allow_none:
return value
if self.further_arguments:
further_arguments = dict([(field, options.get(field)) for field in self.further_arguments])
else:
further_arguments = {}
try:
new_value = self._sanitize(value, name, further_arguments)
if self.may_change_value:
return new_value
else:
return value
except UnformattedValidationError as e:
self.raise_formatted_validation_error(str(e), name, value, **e.kwargs)
[docs] def _sanitize(self, value, name, further_arguments):
# type: (Any, str, Mapping[str, object]) -> object
'''The method where the actual sanitizing takes place.
The standard method just returns *value* so be sure to
override this method in your Sanitize class.
:param object value: the value as found in *request.options*.
:param str name: the name of the argument currently
sanitized.
:param further_arguments: dictionary
holding the values of those additional arguments
in *request.options* that are needed for sanitizing.
the arguments come straight from the not altered
options dict (i.e. before potentially changing
sanitizing happened).
:type further_arguments: dict[str, object]
'''
return value
[docs] def raise_validation_error(self, msg, **kwargs):
# type: (str, **Any) -> NoReturn
r'''Used to more or less uniformly raise a
:class:`~ValidationError`. This will actually raise an
:class:`~UnformattedValidationError` for your convenience.
If used in :meth:`~Sanitizer._sanitize`, it will be
automatically enriched with name, value und formatting in
:meth:`~Sanitizer.sanitize`.
:param dict \**kwargs: additional arguments for formatting
'''
raise UnformattedValidationError(msg, kwargs)
[docs]class DictSanitizer(Sanitizer):
''' DictSanitizer makes sure that the value is a dict and sanitizes its fields.
You can give the same parameters as the base class.
Plus:
:param sanitizers: will be applied to the content of the sanitized dict
:param bool allow_other_keys: if other keys than those in
:attr:`~DictSanitizer.sanitizers` are allowed.
:param default_sanitizer: will be applied to the content if no sanitizer is defined
:type sanitizers: dict[str, Sanitizer]
:type default_sanitizer: Sanitizer
'''
def __init__(self, sanitizers, allow_other_keys=True, default_sanitizer=None, **kwargs):
# type: (Dict[str, Sanitizer], bool, Sanitizer, **Any) -> None
self._copy_value = kwargs.pop('_copy_value', True)
super(DictSanitizer, self).__init__(**kwargs)
self.sanitizers = sanitizers
self.default_sanitizer = default_sanitizer
self.allow_other_keys = allow_other_keys
def _sanitize(self, value, name, further_arguments):
# type: (object, str, Mapping[str, object]) -> object
if not isinstance(value, dict):
self.raise_formatted_validation_error(_('Not a "dict"'), name, type(value).__name__)
if not self.allow_other_keys and any(key not in self.sanitizers for key in value):
self.raise_validation_error(_('Has more than the allowed keys'))
altered_value = copy.deepcopy(value) if self._copy_value else value
multi_error = MultiValidationError()
for attr in set(value) | set(self.sanitizers):
sanitizer = self.sanitizers.get(attr, self.default_sanitizer)
try:
if sanitizer:
altered_value[attr] = sanitizer.sanitize(attr, value)
except ValidationError as e:
multi_error.add_error(e, attr)
if multi_error.has_errors():
raise multi_error
return altered_value
def __add__(self, other):
# type: (DictSanitizer) -> DictSanitizer
new = copy.deepcopy(self)
new.sanitizers.update(other.sanitizers)
return new
[docs]class ListSanitizer(Sanitizer):
''' ListSanitizer makes sure that the value is a list and sanitizes its elements.
You can give the same parameters as the base class.
Plus:
:param sanitizer: sanitizes each of the sanitized list's elements.
If *None*, no sanitizing of elements takes place.
:param int min_elements: must have at least this number of elements
:param int max_elements: must have at most this number of elements
:type sanitizer: Sanitizer
'''
def __init__(self, sanitizer=None, min_elements=None, max_elements=None, **kwargs):
# type: (Optional[Sanitizer], Optional[int], Optional[int], **Any) -> None
super(ListSanitizer, self).__init__(**kwargs)
self.sanitizer = sanitizer
self.min_elements = min_elements
self.max_elements = max_elements
def _sanitize(self, value, name, further_arguments):
# type: (Any, str, Mapping[str, object]) -> object
if not isinstance(value, list):
self.raise_formatted_validation_error(_('Not a "list"'), name, type(value).__name__)
if self.min_elements is not None and len(value) < self.min_elements:
self.raise_validation_error(_('Must have at least %(min_elements)d element(s)'))
if self.max_elements is not None and len(value) > self.max_elements:
self.raise_validation_error(_('May have at most %(max_elements)d element(s)'))
if self.sanitizer is None:
# no sanitizer given: we can only
# check instance and min/max elements
return value
multi_error = MultiValidationError()
altered_value = []
for i, item in enumerate(value):
name = 'Element #%d' % i
try:
altered_value.append(self.sanitizer.sanitize(name, {name: item}))
except ValidationError as e:
multi_error.add_error(e, i)
if multi_error.has_errors():
raise multi_error
return altered_value
[docs]class BooleanSanitizer(Sanitizer):
'''BooleanSanitizer makes sure that the value is a bool.
It converts other data types if possible.
'''
def _sanitize(self, value, name, further_arguments):
# type: (Any, str, Mapping[str, object]) -> bool
try:
return bool(value)
except BaseException:
self.raise_validation_error(_('Cannot be converted to a boolean'))
[docs]class IntegerSanitizer(Sanitizer):
'''IntegerSanitizer makes sure that the value is an int.
It converts other data types if possible and is able
to validate boundaries.
You can give the same parameters as the base class.
Plus:
:param int minimum: minimal value allowed
:param bool minimum_strict: if the value must be > minimum
(>= otherwise)
:param int maximum: maximal value allowed
:param bool maximum_strict: if the value must be < maximum
(<= otherwise)
'''
def __init__(self, minimum=None, maximum=None, minimum_strict=None, maximum_strict=None, **kwargs):
# type: (Optional[int], Optional[int], bool, bool, **Any) -> None
super(IntegerSanitizer, self).__init__(**kwargs)
self.minimum = minimum
self.maximum = maximum
self.minimum_strict = minimum_strict
self.maximum_strict = maximum_strict
def _sanitize(self, value, name, further_arguments):
# type: (Any, str, Mapping[str, object]) -> int
try:
value = int(value)
if not isinstance(value, int):
# value is of type 'long'
raise ValueError()
except (ValueError, TypeError):
self.raise_validation_error(_('Cannot be converted to a number'))
else:
if self.minimum is not None:
if self.minimum_strict:
if not value > self.minimum:
self.raise_validation_error(_('Should stay %s') % '> %(minimum)d')
else:
if not value >= self.minimum:
self.raise_validation_error(_('Should stay %s') % '>= %(minimum)d')
if self.maximum is not None:
if self.maximum_strict:
if not value < self.maximum:
self.raise_validation_error(_('Should stay %s') % '< %(maximum)d')
else:
if not value <= self.maximum:
self.raise_validation_error(_('Should stay %s') % '<= %(maximum)d')
return value
[docs]class SearchSanitizer(Sanitizer):
r''' Baseclass for other Sanitizers that are used for a simple search.
That means that everything is escaped except for asterisks that are
considered as wildcards for any number of characters. (If
:attr:`~SearchSanitizer.use_asterisks` is True, which is default)
Handles adding of asterisks and and some simple sanity checks.
Real logic is done in a to-be-overridden method named
:meth:`~SearchSanitizer._escape_and_return`.
Currently used for :class:`~LDAPSearchSanitizer` and
:class:`~PatternSanitizer`.
Like the Baseclass of all Sanitizers, it accepts only keyword-arguments
(derived classes may vary). You may specify the same as in the Baseclass
plus:
:param bool add_asterisks: add asterisks at the beginning and the end
of the value if needed. Examples:
* "string" -> "\*string*"
* "" -> "*"
* "string*" -> "string*"
Default: True
:param int max_number_of_asterisks: An error will be raised if
the number of * in the string exceeds this limit. Useful because
searching with too many of these patterns in a search query
can be very expensive. Note that * from
:attr:`~SearchSanitizer.add_asterisks` do count. *None* means an
arbitrary number is allowed. Default: 5
:param bool use_asterisks: treat asterisks special, i.e. as a
substring of arbitrary length. If *False*, it will be escaped as
any other character. If *False* the defaults change:
* :attr:`~SearchSanitizer.add_asterisks` to *False*
* :attr:`~SearchSanitizer.max_number_of_asterisks` to *None*.
Default: True
'''
def __init__(self, **kwargs):
self.use_asterisks = kwargs.get('use_asterisks', True)
if self.use_asterisks:
self.add_asterisks = kwargs.get('add_asterisks', True)
self.max_number_of_asterisks = kwargs.get('max_number_of_asterisks', 5)
else:
self.add_asterisks = kwargs.get('add_asterisks', False)
self.max_number_of_asterisks = kwargs.get('max_number_of_asterisks', None)
super(SearchSanitizer, self).__init__(**kwargs)
def _escape_and_return(self, value):
# type: (str) -> str
return value
def _sanitize(self, value, name, further_fields):
# type: (Any, str, Mapping[str, object]) -> object
if value is None:
value = ''
value = str(value)
if self.use_asterisks:
value = re.sub(r'\*+', '*', value)
if self.add_asterisks and '*' not in value:
if not value.startswith('*'):
value = '*%s' % value
if not value.endswith('*'):
value = '%s*' % value
if self.max_number_of_asterisks is not None:
if value.count('*') > self.max_number_of_asterisks:
# show the possibly changed value
self.raise_formatted_validation_error(_('The maximum number of asterisks (*) in the search string is %(max_number_of_asterisks)d'), name, value)
return self._escape_and_return(value)
[docs]class LDAPSearchSanitizer(SearchSanitizer):
r'''Sanitizer for LDAP-Searches. Everything that
could possibly confuse an LDAP-Search is escaped
except for \*.
'''
ESCAPED_WILDCARD = ldap.filter.escape_filter_chars('*')
def _escape_and_return(self, value):
# type: (str) -> str
value = ldap.filter.escape_filter_chars(value)
if self.use_asterisks:
value = value.replace(self.ESCAPED_WILDCARD, '*')
return value
[docs]class PatternSanitizer(SearchSanitizer):
'''PatternSanitizer converts the input into a regular expression.
It can handle anything (through the inputs __str__ method), but
only strings seem to make sense.
The input should be a string with asterisks (*) if needed. An
askterisk stands for anything at any length (regular expression: .*).
The sanitizer escapes the input, replaces * with .* and applies
the params.
You can give the same parameters as the base class.
If you specify a string as :attr:`~Sanitizer.default`, it will be
compiled to a regular expression. Hints:
default='.*' -> matches everything;
default='(?!)' -> matches nothing
Plus:
:param bool ignore_case: pattern is compiled with re.IGNORECASE flag
to search case insensitive.
:param bool multiline: pattern is compiled with re.MULTILINE flag
to search across multiple lines.
'''
def __init__(self, ignore_case=True, multiline=True, **kwargs):
# type: (bool, bool, **Any) -> None
default = kwargs.get('default')
if isinstance(default, six.string_types):
default = re.compile(default)
kwargs['default'] = default
super(PatternSanitizer, self).__init__(**kwargs)
self.ignore_case = ignore_case
self.multiline = multiline
def __deepcopy__(self, memo):
# type: (Any) -> PatternSanitizer
new = PatternSanitizer(
self.ignore_case,
self.multiline,
use_asterisks=self.use_asterisks,
add_asterisks=self.add_asterisks,
max_number_of_asterisks=self.max_number_of_asterisks,
further_arguments=copy.copy(self.further_arguments), # string...
required=self.required,
default=self.default, # None or non-copyable pattern
may_change_value=self.may_change_value,
)
return new
def _escape_and_return(self, value):
# type: (str) -> Pattern[str]
value = re.escape(value)
if self.use_asterisks:
value = value.replace(r'\*', '.*')
flags = 0
if self.ignore_case:
flags = flags | re.IGNORECASE
if self.multiline:
flags = flags | re.MULTILINE
return re.compile('^%s$' % value, flags)
[docs]class StringSanitizer(Sanitizer):
''' StringSanitizer makes sure that the input is a string.
The input can be validated by a regular expression and by string length
:param regex_pattern: a regex pattern or a string which will be
compiled into a regex pattern
:type regex_pattern: re.Pattern or str
:param int re_flags: additional regex flags for the regex_pattern
which will be compiled if :attr:`~StringSanitizer.regex_pattern`
is a string
:param int minimum: the minimum length of the string
:param int maximum: the maximum length of the string
:type regex_pattern: six.string_types or re._pattern_type
'''
def __init__(self, regex_pattern=None, re_flags=0, minimum=None, maximum=None, **kwargs):
# type: (Union[None, Pattern[str], str], int, Optional[int], Optional[int], **Any) -> None
super(StringSanitizer, self).__init__(**kwargs)
if isinstance(regex_pattern, six.string_types):
regex_pattern = re.compile(regex_pattern, flags=re_flags)
self.minimum = minimum
self.maximum = maximum
self.regex_pattern = regex_pattern
def __deepcopy__(self, memo):
# type: (Any) -> StringSanitizer
new = StringSanitizer(
self.regex_pattern, # None or non-copyable pattern
0,
self.minimum,
self.maximum,
further_arguments=copy.copy(self.further_arguments), # strings
required=self.required,
default=self.default,
may_change_value=self.may_change_value,
)
return new
def _sanitize(self, value, name, further_args):
# type: (Any, str, Mapping[str, object]) -> object
if not isinstance(value, six.string_types):
self.raise_validation_error(_('Value is not a string'))
if self.minimum and len(value) < self.minimum:
self.raise_validation_error(_('Value is too short, it has to be at least of length %(minimum)d'))
if self.maximum and len(value) > self.maximum:
self.raise_validation_error(_('Value is too long, it has to be at most of length %(maximum)d'))
if self.regex_pattern and not self.regex_pattern.search(value):
self.raise_validation_error(_('Value is invalid'))
return value
[docs]class DNSanitizer(StringSanitizer):
''' DNSanitizer is a sanitizer that checks if the value has correct LDAP
Distinguished Name syntax '''
def _sanitize(self, value, name, further_args):
# type: (Any, str, Mapping[str, object]) -> object
value = super(DNSanitizer, self)._sanitize(value, name, further_args)
try:
ldap.dn.str2dn(value)
except ldap.DECODING_ERROR:
self.raise_validation_error(_('Value is not a LDAP DN.'))
return value
[docs]class EmailSanitizer(StringSanitizer):
''' EmailSanitizer is a very simple sanitizer that checks
the very basics of an email address: At least 3 characters and
somewhere in the middle has to be an @-sign '''
def __init__(self, **kwargs):
# type: (**Any) -> None
super(EmailSanitizer, self).__init__(r'.@.', **kwargs)
[docs]class ChoicesSanitizer(Sanitizer):
''' ChoicesSanitizer makes sure that the input is in a given set of
choices.
:param object choices: the allowed choices used.
'''
def __init__(self, choices, **kwargs):
# type: (Iterable[str], **Any) -> None
super(ChoicesSanitizer, self).__init__(**kwargs)
# makes sure to have an iterable and unifies errors msg
# because list has a different representation than tuple
self.choices = list(choices)
def _sanitize(self, value, name, further_args):
# type: (Any, str, Mapping[str, object]) -> object
for choice in self.choices:
if choice == value:
# return element from choices
# not value itself: 1 == True
return choice
else:
self.raise_validation_error(_('Value has to be one of %(choices)r'))
[docs]class MappingSanitizer(ChoicesSanitizer):
''' MappingSanitizer makes sure that the input is in a key in a
dictionary and returns the corresponding value.
:param mapping: the dictionary that is used for sanitizing
:type mapping: {object : object}
'''
def __init__(self, mapping, **kwargs):
# type: (Mapping[str, object], Any) -> None
try:
# sort allowed values to have reproducible error messages
# sorted works with every base data type, even inter-data type!
choices = sorted(mapping.keys())
except Exception:
# but who knows...
choices = list(mapping.keys())
super(MappingSanitizer, self).__init__(choices, **kwargs)
self.mapping = mapping
def _sanitize(self, value, name, further_args):
# type: (Any, str, Mapping[str, object]) -> object
value = super(MappingSanitizer, self)._sanitize(value, name, further_args)
return self.mapping[value]
__all__ = ['UnformattedValidationError', 'ValidationError', 'MultiValidationError', 'Sanitizer', 'DictSanitizer', 'ListSanitizer', 'BooleanSanitizer', 'IntegerSanitizer', 'SearchSanitizer', 'LDAPSearchSanitizer', 'PatternSanitizer', 'StringSanitizer', 'DNSanitizer', 'EmailSanitizer', 'ChoicesSanitizer', 'MappingSanitizer']