Source code for univention.management.console.modules.decorators

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Management Console
#  Decorators for functions in UMC 2.0 modules
#
# Copyright 2012-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

"""
Convenience decorators for developers of UMC modules
====================================================

Functions exposed by UMC modules often share some logic. They check the
existence and formatting of variables or check permissions. If anything
fails, they react in a similar way. If everything is correct, the real
logic is often as simple as returning one single value.

This module provides functions that can be used to separate repeating
tasks from the actual business logic. This means:

* less time to code
* fewer bugs
* consistent behavior throughout the UMC in standard cases

Note that the functions defined herein do not cover every corner case during
UMC module development. You are not bound to use them if you need more
flexibility.
"""

import sys
import inspect
import time
import types
import notifier
import notifier.threads
import functools
from threading import Thread

from univention.lib.i18n import Translation

from univention.management.console.error import UMC_Error, UnprocessableEntity
from univention.management.console.log import MODULE

from univention.management.console.modules.sanitizers import MultiValidationError, ValidationError, Sanitizer, DictSanitizer, ListSanitizer

_ = Translation('univention.management.console').translate


[docs]def sanitize(*args, **kwargs): """ Decorator that lets you sanitize the user input. The sanitize function can be used to validate the input as well as change it. Note that changing a value here will actually alter the request object. This should be no problem though. If the validation step fails an error will be passed to the user instead of executing the function. This step should not raise anything other than :class:`~univention.management.console.modules.sanitizers.ValidationError` or :class:`~univention.management.console.modules.sanitizers.UnformattedValidationError` (one should use the method :meth:`~univention.management.console.modules.sanitizers.Sanitizer.raise_validation_error`). You can find some predefined Sanitize classes in the corresponding module or you define one yourself, deriving it from :class:`~univention.management.console.modules.sanitizers.Sanitizer`:: class SplitPathSanitizer(Sanitizer): def __init__(self): super(SplitPathSanitizer, self).__init__( validate_none=True, may_change_value=True) def _sanitize(self, value, name, further_fields): if value is None: return [] try: return value.split('/') except BaseException: self.raise_validation_error('Split failed') Before:: def my_func(self, request): var1 = request.options.get('var1') var2 = request.options.get('var2', 20) try: var1 = int(var1) var2 = int(var2) except (ValueError, TypeError): self.finished(request.id, None, 'Cannot convert to int', status=400) return if var2 < 10: self.finished(request.id, None, 'var2 must be >= 10', status=400) return self.finished(request.id, var1 + var2) After:: @sanitize( var1=IntegerSanitizer(required=True), var2=IntegerSanitizer(required=True, minimum=10, default=20) ) def add(self, request): var1 = request.options.get('var1') # could now use ['var1'] var2 = request.options.get('var2') self.finished(request.id, var1 + var2) The decorator can be combined with other decorators like :func:`simple_response` (be careful with ordering of decorators here):: @sanitize( var1=IntegerSanitizer(required=True), var2=IntegerSanitizer(required=True, minimum=10) ) @simple_response def add(self, var1, var2): return var1 + var2 Note that you lose the capability of specifying defaults in *@simple_response*. You need to do it in *@sanitize* now. """ if args: return sanitize_list(args[0], **kwargs) else: return sanitize_dict(kwargs)
[docs]def sanitize_list(sanitizer, **kwargs): return lambda function: _sanitize_list(function, sanitizer, kwargs)
[docs]def sanitize_dict(sanitized_attrs, **kwargs): return lambda function: _sanitize_dict(function, sanitized_attrs, kwargs)
def _sanitize_dict(function, sanitized_attrs, sanitizer_parameters): defaults = {'default': {}, 'required': True, 'may_change_value': True} defaults.update(sanitizer_parameters) return _sanitize(function, DictSanitizer(sanitized_attrs, **defaults)) def _sanitize_list(function, sanitizer, sanitizer_parameters): defaults = {'default': [], 'required': True, 'may_change_value': True} defaults.update(sanitizer_parameters) return _sanitize(function, ListSanitizer(sanitizer, **defaults)) def _sanitize(function, sanitizer): def _response(self, request): request.options = sanitize_args(sanitizer, 'request.options', {'request.options': request.options}) return function(self, request) copy_function_meta_data(function, _response) return _response def sanitize_args(sanitizer, name, args): try: try: return sanitizer.sanitize(name, args) except MultiValidationError: raise except ValidationError as exc: multi_error = MultiValidationError() multi_error.add_error(exc, name) raise multi_error except MultiValidationError as exc: raise UnprocessableEntity(str(exc), result=exc.result())
[docs]def simple_response(function=None, with_flavor=None, with_progress=False): '''If your function is as simple as: "Just return some variables" this decorator is for you. Instead of defining the function .. code-block :: python def my_func(self, response): pass you now define a function with the variables you would expect in *request.options*. Default values are supported: .. code-block :: python @simple_response def my_func(self, var1, var2='default'): pass The decorator extracts variables from *request.options*. If the variable is not found, it either returns a failure or sets it to a default value (if specified by you). If you need to get the flavor passed to the function you can do it like this:: @simple_response(with_flavor=True) def my_func(self, flavor, var1, var2='default'): pass With *with_flavor* set, the flavor is extracted from the *request*. You can also set with_flavor='varname', in which case the variable name for the flavor is *varname*. *True* means 'flavor'. As with ordinary option arguments, you may specify a default value for flavor in the function definition:: @simple_response(with_flavor='module_flavor') def my_func(self, flavor='this comes from request.options', module_flavor='this is the flavor (and its default value)'): pass Instead of stating at the end of your function .. code-block:: python self.finished(request.id, some_value) you now just .. code-block:: python return some_value Before:: def my_func(self, request): variable1 = request.options.get('variable1') variable2 = request.options.get('variable2') flavor = request.flavor or 'default flavor' if variable1 is None: self.finished(request.id, None, message='variable1 is required', success=False) return if variable2 is None: variable2 = '' try: value = '%s_%s_%s' % (self._saved_dict[variable1], variable2, flavor) except KeyError: self.finished(request.id, None, message='Something went wrong', success=False, status=500) return self.finished(request.id, value) After:: @simple_response(with_flavor=True) def my_func(self, variable1, variable2='', flavor='default_flavor'): try: return '%s_%s_%s' % (self._saved_dict[variable1], variable2, flavor) except KeyError: raise UMC_Error('Something went wrong') ''' if function is None: return lambda f: simple_response(f, with_flavor, with_progress) if with_progress is True: with_progress = 'progress' # fake a generator function that yields whatever the original # function returned def _fake_func(self, iterator, *args): for args in iterator: break yield function(self, *args) copy_function_meta_data(function, _fake_func, copy_arg_inspect=True) # fake another variable name # the name is not important as it is removed from the list while # being processed. Even a variable named 'iterator' in the original # function does not break anything _fake_func._original_argument_names = ['self', 'iterator'] + _fake_func._original_argument_names[1:] _multi_response = _eval_simple_decorated_function(_fake_func, with_flavor) def _response(self, request, *args, **kwargs): # other arguments than request won't be propagated # needed for @LDAP_Connection # fake a multi_request request.options = [request.options] if with_progress: progress_obj = self.new_progress() request.options[0][with_progress] = progress_obj def _thread(self, progress_obj, _multi_response, request): try: result = _multi_response(self, request) except Exception: progress_obj.exception(sys.exc_info()) else: progress_obj.finish_with_result(result[0]) thread = Thread(target=_thread, args=[self, progress_obj, _multi_response, request]) thread.start() self.finished(request.id, progress_obj.initialised()) else: result = _multi_response(self, request) if not isinstance(result[0], types.FunctionType): self.finished(request.id, result[0]) else: # return value is a function which is meant to be executed as thread # TODO: replace notfier by threading thread = notifier.threads.Simple('simple_response', notifier.Callback(result[0], self, request), notifier.Callback(self.thread_finished_callback, request)) thread.run() if with_progress: _response = sanitize_dict({})(_response) copy_function_meta_data(function, _response) return _response
[docs]def multi_response(function=None, with_flavor=None, single_values=False, progress=False): ''' This decorator acts similar to :func:`simple_response` but can handle a list of dicts instead of a single dict. Technically another object is passed to the function that you can name as you like. You can iterate over this object and get the values from each dictionary in *request.options*. Default values and flavors are supported. You do not return a value, you yield them (and you are supposed to yield!):: @multi_response def my_multi_func(self, iterator, variable1, variable2=''): # here, variable1 and variable2 are yet to be initialised # i.e. variable1 and variable2 will be None! do_some_initial_stuff() try: for variable1, variable2 in iterator: # now they are set yield '%s_%s' % (self._saved_dict[variable1], variable2) except KeyError: raise UMC_Error('Something went wrong') else: # only when everything went right... do_some_cleanup_stuff() The above code will send a list of answers to the client as soon as the function is finished (i.e. after *do_some_cleanup_stuff()*) filled with values yielded. If you have just one variable in your dictionary, do not forget to add a comma, otherwise Python will assign the first value a list of one element:: for var, in iterator: # now var is set correctly pass ''' if function is None: return lambda f: multi_response(f, with_flavor, single_values, progress) response_func = _eval_simple_decorated_function(function, with_flavor, single_values, progress) def _response(self, request): result = response_func(self, request) self.finished(request.id, result) copy_function_meta_data(function, _response) return _response
def _eval_simple_decorated_function(function, with_flavor, single_values=False, progress=False): # name of flavor argument. default: 'flavor' (if given, of course) if with_flavor is True: with_flavor = 'flavor' # argument names of the function, including 'self' arguments, defaults = arginspect(function) # remove self, remove iterator arguments = arguments[2:] # use defaults as dict if defaults: defaults = dict(zip(arguments[-len(defaults):], defaults)) else: defaults = {} @sanitize(DictSanitizer(dict((arg, Sanitizer(required=arg not in defaults and arg != with_flavor, default=defaults.get(arg))) for arg in arguments), _copy_value=False) if not single_values else None) def _response(self, request): # single_values: request.options is, e.g., ["id1", "id2", "id3"], no need for complicated dicts if not single_values: # normalize the whole request.options for element in request.options: # add flavor before default checking if with_flavor: element[with_flavor] = request.flavor or defaults.get(with_flavor) # checked for required arguments, set default... now run! iterator = RequestOptionsIterator(request.options, arguments, single_values) nones = [None] * len(arguments) if progress: number = len(request.options) if progress is True: progress_title = None else: if isinstance(progress, (list, tuple)): progress_title, progress_msg = progress else: progress_title, progress_msg = progress, None if '%d' in progress_title: progress_title = progress_title % number progress_obj = self.new_progress(progress_title, number) def _thread(self, progress_obj, iterator, nones): try: for res in function(self, iterator, *nones): if progress_msg: res_msg = progress_msg % res progress_obj.progress(res, res_msg) except Exception: progress_obj.exception(sys.exc_info()) else: progress_obj.finish() thread = Thread(target=_thread, args=[self, progress_obj, iterator, nones]) thread.start() return progress_obj.initialised() else: return list(function(self, iterator, *nones)) return _response class RequestOptionsIterator(object): def __init__(self, everything, names, single_values): self.everything = everything self.names = names self.single_values = single_values self.max = len(self.everything) self.current = 0 def __bool__(self): return self.current < self.max __nonzero__ = __bool__ def __iter__(self): self.current = 0 return self def __next__(self): if self: values = self.everything[self.current] self.current += 1 if self.single_values: return values else: return [values[name] for name in self.names] else: raise StopIteration next = __next__ # Python 2 def arginspect(function): getfullargspec = getattr(inspect, 'getfullargspec', inspect.getargspec) argspec = getfullargspec(function) if hasattr(function, '_original_argument_names'): arguments = function._original_argument_names else: arguments = argspec.args if hasattr(function, '_original_argument_defaults'): defaults = function._original_argument_defaults else: defaults = argspec.defaults return arguments, defaults def copy_function_meta_data(original_function, new_function, copy_arg_inspect=False): # set function attrs to allow another arginspect to get original info # (used in @simple_response / @log - combo) if copy_arg_inspect: arguments, defaults = arginspect(original_function) new_function._original_argument_names = arguments new_function._original_argument_defaults = defaults # copy __doc__, otherwise it would not show up in api and such new_function.__doc__ = original_function.__doc__ # copy __name__, otherwise it would be something like "_response" new_function.__name__ = original_function.__name__ # copy __module__, otherwise it would be "univention.management.console.modules.decorators" new_function.__module__ = original_function.__module__
[docs]def log(function=None, sensitives=None, customs=None, single_values=False): '''Log decorator to be used with :func:`simple_response`:: @simple_response @log def my_func(self, var1, var2): return "%s__%s" % (var1, var2) The above example will write two lines into the logfile for the module (given that the UCR variable *umc/module/debug/level* is set to at least 3):: <date> MODULE ( INFO ) : my_func got: var1='value1', var2='value2' <date> MODULE ( INFO ) : my_func returned: 'value1__value2' The variable names are ordered by appearance and hold the values that are actually going to be passed to the function (i.e. after they were :func:`sanitize` 'd or set to their default value). You may specify the names of sensitive arguments that should not show up in log files and custom functions that can alter the representation of a certain variable's values (useful for non-standard datatypes like regular expressions - you may have used a :class:`~univention.management.console.modules.sanitizers.PatternSanitizer` ):: @sanitize(pattern=PatternSanitizer()) @simple_reponse @log(sensitives=['password'], customs={'pattern':lambda x: x.pattern}) def count_ucr(self, username, password, pattern): return self._ucr_count(username, password, pattern) This results in something like:: <date> MODULE ( INFO ) : count_ucr got: password='********', username='Administrator', pattern='.*' <date> MODULE ( INFO ) : count_ucr returned: 650 The decorator also works with :func:`multi_response`:: @multi_response @log def multi_my_func(self, var1, var2): return "%s__%s" % (var1, var2) This results in something like:: <date> MODULE ( INFO ) : multi_my_func got: [var1='value1', var2='value2'], [var1='value3', var2='value4'] <date> MODULE ( INFO ) : multi_my_func returned: ['value1__value2', 'value3__value4'] ''' if function is None: return lambda f: log(f, sensitives, customs, single_values) if customs is None: customs = {} if sensitives is None: sensitives = [] for sensitive in sensitives: customs[sensitive] = lambda x: '********' def _log(names, args): if single_values: args = [args] return ['%s=%r' % (name, customs.get(name, lambda x: x)(arg)) for name, arg in zip(names, args)] # including self names, _ = arginspect(function) name = function.__name__ # multi_response yields i.e. is generator function if inspect.isgeneratorfunction(function): # remove self, iterator names = names[2:] def _response(self, iterator, *args): arg_reprs = [] for element in iterator: arg_repr = _log(names, element) if arg_repr: arg_reprs.append(arg_repr) if arg_reprs: MODULE.info('%s got: [%s]' % (name, '], ['.join(', '.join(arg_repr) for arg_repr in arg_reprs))) result = [] for res in function(self, iterator, *args): result.append(res) yield res MODULE.info('%s returned: %r' % (name, result)) else: # remove self names = names[1:] def _response(self, *args): arg_repr = _log(names, args) if arg_repr: MODULE.info('%s got: %s' % (name, ', '.join(arg_repr))) result = function(self, *args) MODULE.info('%s returned: %r' % (name, result)) return result copy_function_meta_data(function, _response, copy_arg_inspect=True) return _response
[docs]def file_upload(function): ''' This decorator restricts requests to be UPLOAD-commands. Simple, yet effective ''' def _response(self, request): if request.command != 'UPLOAD': raise UMC_Error(_('%s can only be used as UPLOAD') % (function.__name__)) return function(self, request) copy_function_meta_data(function, _response) prevent_referer_check(_response) prevent_xsrf_check(_response) return _response
[docs]class reloading_ucr(object): _last_reload = dict() def __init__(self, ucr, timeout=0.2): self._ucr = ucr self._timeout = timeout def __call__(self, func): @functools.wraps(func) def wrapper(*args, **kwargs): last_reload = self._last_reload.get(id(self._ucr), 0) if last_reload == 0 or time.time() - last_reload > self._timeout: self._ucr.load() self._last_reload[id(self._ucr)] = time.time() return func(*args, **kwargs) return wrapper
[docs]def require_password(function): @functools.wraps(function) def _decorated(self, request, *args, **kwargs): self.require_password() return function(self, request, *args, **kwargs) return _decorated
def allow_get_request(function=None, xsrf_check=False, referer_check=False): """Allows HTTP GET requests. Additionally prevents the XSRF check and the referer check.""" def _decorator(function): if not xsrf_check: prevent_xsrf_check(function) if not referer_check: prevent_referer_check(function) function.allow_get = True return function if function is None: return _decorator return _decorator(function) def prevent_xsrf_check(function): function.xsrf_protection = False return function def prevent_referer_check(function): function.referer_protection = False return function __all__ = ['simple_response', 'multi_response', 'sanitize', 'log', 'sanitize_list', 'sanitize_dict', 'file_upload', 'reloading_ucr', 'require_password']