Source code for univention.management.console.modules.udm

#!/usr/bin/python3
#
# Univention Management Console
#  module: manages UDM modules
#
# SPDX-FileCopyrightText: 2011-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

import copy
import functools
import inspect
import locale
import os
import re
import shutil
import tempfile
from urllib.error import HTTPError, URLError
from urllib.parse import quote, urlencode
from urllib.request import Request

from ldap import INVALID_CREDENTIALS, LDAPError

import univention.admin.authorization as udm_auth
import univention.admin.modules as udm_modules
import univention.admin.objects as udm_objects
import univention.admin.syntax as udm_syntax
import univention.admin.uexceptions as udm_errors
import univention.admin.uldap as udm_uldap
import univention.directory.reports as udr
from univention.config_registry import handler_set
from univention.lib.i18n import Translation
from univention.management.console.config import ucr
from univention.management.console.error import Forbidden
from univention.management.console.ldap import get_admin_connection as get_ldap_admin_connection, get_user_connection
from univention.management.console.log import MODULE
from univention.management.console.message import Request as UMCRequest
from univention.management.console.modules import Base, UMC_Error
from univention.management.console.modules.decorators import (
    SimpleThread, allow_get_request, file_upload, multi_response, prevent_xsrf_check, sanitize, simple_response,
    threaded,
)
from univention.management.console.modules.mixins import ProgressMixin
from univention.management.console.modules.sanitizers import (
    BooleanSanitizer, ChoicesSanitizer, DictSanitizer, DNSanitizer, EmailSanitizer, ListSanitizer, Sanitizer,
    SearchSanitizer, StringSanitizer,
)

from .tools import LicenseError, LicenseImport, check_license, dump_license, install_opener, urlopen
from .udm_ldap import (
    LDAP_AuthenticationFailed, NoIpLeft, ObjectDoesNotExist, SuperordinateDoesNotExist, UDM_Error, UDM_Module,
    UserWithoutDN, _get_syntax, calculate_bind_hash, container_modules, get_bind_hash, get_module, get_obj_module,
    info_syntax_choices, ldap_dn2path, list_objects, read_syntax_choices, search_syntax_choices_by_key,
    set_bind_function, set_bind_hash,
)


USE_ASTERISKS = ucr.is_true('directory/manager/web/allow_wildcard_search', True)
ADD_ASTERISKS = USE_ASTERISKS and ucr.is_true('directory/manager/web/auto_substring_search', True)

_ = Translation('univention-management-console-module-udm').translate


[docs] def sanitize_func(sanitizer_func): from univention.management.console.modules.decorators import copy_function_meta_data, sanitize def _decorated(function): def _response(self, request): sanitizer_parameters = sanitizer_func(self, request) if isinstance(sanitizer_parameters, dict): sanitizer = sanitize(**sanitizer_parameters) else: # if isinstance(sanitizer_parameters, (list, tuple)): sanitizer = sanitize(*sanitizer_parameters) return sanitizer(function)(self, request) copy_function_meta_data(function, _response) return _response return _decorated
[docs] def module_from_request(func): def _decorated(self, request, *a, **kw): request.options['module'] = self._get_module_by_request(request) return func(self, request, *a, **kw) return _decorated
[docs] def bundled(func): def _decoarated(self, request): bundled = isinstance(request.options, list | tuple) if not bundled: ret = func(self, request) else: options = request.options ret = [func(self, request) for request.options in options] self.finished(request.id, ret) return _decoarated
[docs] class ObjectPropertySanitizer(StringSanitizer): def __init__(self, **kwargs): """ A LDAP attribute name. must at least be 1 character long. This sanitizer prevents LDAP search filter injections in the attribute name. TODO: in theory we should only allow existing attributes for the request object(/object type) """ args = { "minimum": 1, "regex_pattern": r'^[\w\d\-;]+$', } args.update(kwargs) StringSanitizer.__init__(self, **args)
[docs] class PropertySearchSanitizer(SearchSanitizer): def _sanitize(self, value, name, further_arguments): object_type = further_arguments.get('objectType') property_ = further_arguments.get('objectProperty') add_asterisks, use_asterisks = self.add_asterisks, self.use_asterisks if object_type and property_ and UDM_Module(object_type).module: prop = UDM_Module(object_type).module.property_descriptions.get(property_) # If the property is represented as a Checkbox in the frontend then # we get True/False as search value. # We need to make sure that the sanitizer rewrites this to the # correct thruthy/falsy string of the syntax class and not add asterisks. if prop and issubclass(prop.syntax if inspect.isclass(prop.syntax) else type(prop.syntax), udm_syntax.IStates | udm_syntax.boolean): self.use_asterisks = False self.add_asterisks = False value = prop.syntax.sanitize_property_search_value(value) try: return super()._sanitize(value, name, further_arguments) finally: self.add_asterisks, self.use_asterisks = add_asterisks, use_asterisks
[docs] class UDMModuleMeta(type): def __new__(mcs, name, bases, attrs): for attr_name, attr_value in list(attrs.items()): if callable(attr_value) and not attr_name.startswith('_'): attrs[attr_name] = mcs.wrap_method(attr_value) return super().__new__(mcs, name, bases, attrs)
[docs] @staticmethod def wrap_method(method): @functools.wraps(method) def update_bind_function(self, *args, **kwargs): if len(args) >= 1 and args[0] is not None and isinstance(args[0], UMCRequest): request = args[0] def bind_user_connection(lo): request.bind_user_connection(lo) self.require_license(lo) new_bind_hash = calculate_bind_hash(request) if new_bind_hash != get_bind_hash(): MODULE.debug("Bind hash has changed. Setting new bind function") set_bind_hash(new_bind_hash) set_bind_function(bind_user_connection) return method(self, *args, **kwargs) return update_bind_function
[docs] class Instance(Base, ProgressMixin, metaclass=UDMModuleMeta): def __init__(self): Base.__init__(self) self.reports_cfg = None self.modules_with_childs = [] self.__license_checks = set() install_opener(ucr)
[docs] def prepare(self, request): super().prepare(request) if not request.user_dn: raise UserWithoutDN(request.username) MODULE.info('Initializing module as user %r', request.user_dn) def bind_user_connection(lo): request.bind_user_connection(lo) lo = udm_auth.Authorization.inject_ldap_connection(lo) if request.federated_account: lo.federated_account = True if request.roles is not None: lo.actor_roles = request.roles self.require_license(lo) set_bind_function(bind_user_connection) set_bind_hash(calculate_bind_hash(request)) if ucr.is_true("directory/manager/web/delegative-administration/enabled"): udm_auth.Authorization.enable(lambda: get_ldap_admin_connection()[0]) # noqa: PLW0108 # read user settings and initial UDR self.reports_cfg = udr.Config() self.modules_with_childs = container_modules()
[docs] def set_locale(self, _locale): super().set_locale(_locale) locale.setlocale(locale.LC_TIME, _locale)
[docs] def error_handling(self, etype, exc, etraceback): super().error_handling(etype, exc, etraceback) if isinstance(exc, udm_errors.authFail | INVALID_CREDENTIALS): MODULE.warning('Authentication failed: %s', exc) raise LDAP_AuthenticationFailed() if isinstance(exc, udm_errors.permissionDenied) or isinstance(exc, UDM_Error) and isinstance(exc.exc, udm_errors.permissionDenied): raise Forbidden(str(exc)) if isinstance(exc, udm_errors.base | LDAPError): MODULE.error('Error:', exc_info=(etype, exc, etraceback))
[docs] def require_license(self, lo): if id(lo) in self.__license_checks: return self.__license_checks.add(id(lo)) try: import univention.admin.license # noqa: F401 except ImportError: return # GPL Version try: check_license(lo, True) except LicenseError: lo.allow_modify = False lo.requireLicense() lo.authz_connection.requireLicense()
[docs] def get_ldap_connection(self): try: lo, _po = get_user_connection(bind=self.bind_user_connection, write=True, bindhash=calculate_bind_hash(self._current_request)) except (LDAPError, udm_errors.ldapError): lo, _po = get_user_connection(bind=self.bind_user_connection, write=True, bindhash=calculate_bind_hash(self._current_request)) lo = udm_auth.Authorization.inject_ldap_connection(lo) if self._current_request.federated_account: lo.federated_account = True if self._current_request.roles is not None: lo.actor_roles = self._current_request.roles return lo, udm_uldap.position(lo.base)
[docs] def get_module(self, flavor, ldap_dn): return get_module(flavor, ldap_dn, self.get_ldap_connection()[0])
[docs] def get_obj_module(self, flavor, ldap_dn): return get_obj_module(flavor, ldap_dn, self.get_ldap_connection()[0])
def _get_module_by_request(self, request, object_type=None): """ Tries to determine the UDM module to use. If no specific object type is given the request option 'objectType' is used. In case none if this leads to a valid object type the request flavor is chosen. Failing all this will raise in UMC_Error exception. On success a UMC_Module object is returned. """ if object_type is None: object_type = request.options.get('objectType') module_name = object_type if not module_name or module_name == 'all': module_name = request.flavor if not module_name or module_name == 'navigation': raise UMC_Error(_('No flavor or valid UDM module name specified')) return UDM_Module(module_name)
[docs] def license(self, request): message = None try: check_license(self.get_ldap_connection()[0]) except LicenseError as exc: message = str(exc) self.finished(request.id, {'message': message})
[docs] def license_info(self, request): self.require_license(self.get_ldap_connection()[0]) license_data = {} try: import univention.admin.license as udm_license except ImportError: license_data['licenseVersion'] = 'gpl' else: license_data['licenseVersion'] = udm_license._license.version if udm_license._license.version == '1': for item in ('licenses', 'real'): license_data[item] = {} for lic_type in ('CLIENT', 'ACCOUNT', 'DESKTOP', 'GROUPWARE'): count = getattr(udm_license._license, item)[udm_license._license.version][getattr(udm_license.License, lic_type)] if isinstance(count, str): try: count = int(count) except ValueError: count = None license_data[item][lic_type.lower()] = count if 'UGS' in udm_license._license.types: udm_license._license.types = [x for x in udm_license._license.types if x != 'UGS'] elif udm_license._license.version == '2': for item in ('licenses', 'real'): license_data[item] = {} for lic_type in ('SERVERS', 'USERS', 'MANAGEDCLIENTS', 'CORPORATECLIENTS'): count = getattr(udm_license._license, item)[udm_license._license.version][getattr(udm_license.License, lic_type)] if isinstance(count, str): try: count = int(count) except ValueError: count = None license_data[item][lic_type.lower()] = count license_data['keyID'] = udm_license._license.licenseKeyID license_data['support'] = udm_license._license.licenseSupport license_data['premiumSupport'] = udm_license._license.licensePremiumSupport license_data['licenseTypes'] = udm_license._license.types license_data['oemProductTypes'] = udm_license._license.oemProductTypes license_data['endDate'] = udm_license._license.endDate license_data['baseDN'] = udm_license._license.licenseBase free_license = '' if license_data['baseDN'] == 'Free for personal use edition': free_license = 'ffpu' if license_data['baseDN'] == 'UCS Core Edition': free_license = 'core' if free_license: license_data['baseDN'] = ucr.get('ldap/base', '') license_data['freeLicense'] = free_license license_data['sysAccountsFound'] = udm_license._license.sysAccountsFound self.finished(request.id, license_data)
[docs] @prevent_xsrf_check def license_import(self, request): filename = None if isinstance(request.options, list | tuple) and request.options: # file upload file_upload(lambda s, r: None)(self, request) # protect against hacking attempts! filename = request.options[0]['tmpfile'] else: sanitize(license=StringSanitizer(required=True))(lambda self, request: None)(self, request) lic = request.options['license'] # Replace non-breaking space with a normal space # https://forge.univention.org/bugzilla/show_bug.cgi?id=30098 lic = lic.replace('\xa0', " ") lic_file = tempfile.NamedTemporaryFile(delete=False) lic_file.write(lic.encode('UTF-8')) lic_file.close() filename = lic_file.name def _error(msg=None): self.finished(request.id, [{ 'success': False, 'message': msg, }]) try: with open(filename, 'rb') as fd: # check license and write it to LDAP importer = LicenseImport(fd) importer.check(ucr.get('ldap/base', '')) importer.write(self.get_ldap_connection()[0]) except (ValueError, AttributeError, LDAPError) as exc: MODULE.error('License import failed (malformed LDIF): %r', exc) # AttributeError: missing univentionLicenseBaseDN # ValueError raised by ldif.LDIFParser when e.g. dn is duplicated # LDAPError e.g. LDIF contained non existing attributes if isinstance(exc, LDAPError) and len(exc.args) and isinstance(exc.args[0], dict) and exc.args[0].get('info'): _error(_('LDAP error: %s.') % exc.args[0].get('info')) else: _error(_('License import failed: malformed LDIF.')) return except LicenseError as exc: MODULE.error('LicenseImport check failed: %r', exc) _error(str(exc)) return finally: os.unlink(filename) self.finished(request.id, [{'success': True}])
[docs] @multi_response(progress=[_('Moving %d object(s)'), _('%($dn$)s moved')]) def move(self, iterator, object, options): for object, options in iterator: if 'container' not in options: yield {'$dn$': object, 'success': False, 'details': _('The destination is missing')} continue module = self.get_module(None, object) if not module: yield {'$dn$': object, 'success': False, 'details': _('Could not identify the given LDAP object')} elif 'move' not in module.operations: yield {'$dn$': object, 'success': False, 'details': _('This object can not be moved')} else: try: module.move(object, options['container']) yield {'$dn$': object, 'success': True} except UDM_Error as e: yield {'$dn$': object, 'success': False, 'details': str(e)}
[docs] @sanitize(DictSanitizer({ "object": DictSanitizer({}, required=True), "options": DictSanitizer({ "objectType": StringSanitizer(required=True), }, required=True), }, required=True)) @threaded def add(self, request): """ Creates LDAP objects. requests.options = [ { 'options' : {}, 'object' : {} }, ... ] return: [ { '$dn$' : <LDAP DN>, 'success' : (True|False), 'details' : <message> }, ... ] """ result = [] for obj in request.options: options = obj.get('options', {}) properties = obj.get('object', {}) module = self._get_module_by_request(request, object_type=options.get('objectType')) if '$labelObjectType$' in properties: del properties['$labelObjectType$'] try: dn = module.create(properties, container=options.get('container'), superordinate=options.get('superordinate')) result.append({'$dn$': dn, 'success': True}) except UDM_Error as e: result.append({'$dn$': e.dn, 'success': False, 'details': str(e)}) return result
[docs] @sanitize(DictSanitizer({ "object": DictSanitizer({ '$dn$': StringSanitizer(required=True), }, required=True), }), required=True) @threaded def put(self, request): """ Modifies the given list of LDAP objects. requests.options = [ { 'options' : {}, 'object' : {} }, ... ] return: [ { '$dn$' : <LDAP DN>, 'success' : (True|False), 'details' : <message> }, ... ] """ result = [] for obj in request.options: properties = obj.get('object') or {} ldap_dn = properties['$dn$'] module = self.get_module(request.flavor, ldap_dn) if module is None: if len(request.options) == 1: raise ObjectDoesNotExist(ldap_dn) result.append({'$dn$': ldap_dn, 'success': False, 'details': _('LDAP object does not exist.')}) continue MODULE.info('Modifying LDAP object %s', ldap_dn) if '$labelObjectType$' in properties: del properties['$labelObjectType$'] try: module.modify(properties) result.append({'$dn$': ldap_dn, 'success': True}) except UDM_Error as exc: result.append({'$dn$': ldap_dn, 'success': False, 'details': str(exc)}) return result
[docs] @threaded def remove(self, request): """ Removes the given list of LDAP objects. requests.options = [ { 'object' : <LDAP DN>, 'options' { 'cleanup' : (True|False), 'recursive' : (True|False) } }, ... ] return: [ { '$dn$' : <LDAP DN>, 'success' : (True|False), 'details' : <message> }, ... ] """ result = [] for item in request.options: ldap_dn = item.get('object') options = item.get('options', {}) module = self.get_module(request.flavor, ldap_dn) if module is None: result.append({'$dn$': ldap_dn, 'success': False, 'details': _('LDAP object could not be identified')}) continue try: module.remove(ldap_dn, options.get('cleanup', False), options.get('recursive', False)) result.append({'$dn$': ldap_dn, 'success': True}) except UDM_Error as e: result.append({'$dn$': ldap_dn, 'success': False, 'details': str(e)}) return result
[docs] @simple_response def meta_info(self, objectType): module = UDM_Module(objectType) if module: return { 'help_link': module.help_link, 'help_text': module.help_text, 'columns': module.columns, 'has_tree': module.has_tree, 'ldap_base': module.ldap_base, }
[docs] @threaded def get(self, request): """ Retrieves the given list of LDAP objects. Password property will be removed. requests.options = [ <LDAP DN>, ... ] return: [ { '$dn$' : <LDAP DN>, <object properties> }, ... ] """ MODULE.info('Starting thread for udm/get request') return self._get(request)
[docs] @threaded def copy(self, request): return self._get(request, copy=True)
def _get(self, request, copy=False): def _remove_uncopyable_properties(obj): if not copy: return for name, p in obj.descriptions.items(): if not p.copyable: obj.info.pop(name, None) result = [] for ldap_dn in request.options: if request.flavor == 'users/self': ldap_dn = request.user_dn obj, module = self.get_obj_module(request.flavor, ldap_dn) if module is None: raise ObjectDoesNotExist(ldap_dn) else: if obj: _remove_uncopyable_properties(obj) obj.set_defaults = True obj.set_default_values() _remove_uncopyable_properties(obj) empty_props_with_default_set = {} for key in obj.info.keys(): if obj.hasChanged(key): empty_props_with_default_set[key] = { 'default_value': obj.info[key], 'prevent_umc_default_popup': obj.descriptions[key].prevent_umc_default_popup, } # show all lazy loading properties in UMC lazy_loading_props = {key: obj.descriptions[key] for key in obj.descriptions if (obj.has_property(key)) and obj.descriptions[key].lazy_loading_fn} for prop in lazy_loading_props.values(): prop.lazy_load(obj) obj.authz.filter_object_properties(obj) props = obj.info props['$empty_props_with_default_set$'] = empty_props_with_default_set for passwd in module.password_properties: if passwd in props: del props[passwd] if not copy: props['$dn$'] = obj.dn props['$options$'] = {} for opt in module.get_options(udm_object=obj): props['$options$'][opt['id']] = opt['value'] props['$policies$'] = {} for policy in obj.policies: pol_mod = self.get_module(None, policy) if pol_mod and pol_mod.name: props['$policies$'].setdefault(pol_mod.name, []).append(policy) props['$labelObjectType$'] = module.title props['$flags$'] = [x.decode('UTF-8') for x in obj.oldattr.get('univentionObjectFlag', [])] props['$operations$'] = module.operations props['$references$'] = module.get_policy_references(ldap_dn) result.append(props) else: MODULE.process('The LDAP object for the LDAP DN %s could not be found', ldap_dn) return result
[docs] @sanitize( objectPropertyValue=PropertySearchSanitizer( add_asterisks=ADD_ASTERISKS, use_asterisks=USE_ASTERISKS, further_arguments=['objectType', 'objectProperty'], ), objectProperty=ObjectPropertySanitizer(required=True), fields=ListSanitizer(), ) @threaded def query(self, request): """ Searches for LDAP objects and returns a few properties of the found objects requests.options = {} 'objectType' -- the object type to search for (default: if not given the flavor is used) 'objectProperty' -- the object property that should be scanned 'objectPropertyValue' -- the filter that should be found in the property 'fields' -- the properties which should be returned 'container' -- the base container where the search should be started (default: LDAP base) 'superordinate' -- the superordinate object for the search (default: None) 'scope' -- the search scope (default: sub) return: [ { '$dn$' : <LDAP DN>, 'objectType' : <UDM module name>, 'path' : <location of object> }, ... ] """ ucr.load() module = self._get_module_by_request(request) superordinate = request.options.get('superordinate') if superordinate == 'None': superordinate = None elif superordinate is not None: MODULE.info('Query defines a superordinate %s', superordinate) _superordinate, mod = self.get_obj_module(request.flavor, superordinate) if mod is not None: MODULE.info('Found UDM module %r for superordinate %s', mod.name, superordinate) superordinate = _superordinate if not request.options.get('container'): request.options['container'] = superordinate.dn else: raise SuperordinateDoesNotExist(superordinate) # overwrite base, blocklists are always in its module defined base if module.name == 'blocklists/list': request.options['container'] = module.ldap_base container = request.options.get('container') containers = module.get_default_containers() if container == 'default' else [container] # Reduce list of containers to avoid duplicate results if len(containers) > 1: containers = [ c1 for c1 in containers if not any(c2 for c2 in containers if c1.endswith(c2) and c2 != c1) ] objectProperty = request.options['objectProperty'] objectPropertyValue = request.options['objectPropertyValue'] scope = request.options.get('scope', 'sub') hidden = request.options.get('hidden') fields = (set(request.options.get('fields', []) or []) | {objectProperty}) - {'name', 'None'} result = [] for container in containers: result.extend(module.search(container, objectProperty, objectPropertyValue, superordinate, scope=scope, hidden=hidden, allow_asterisks=USE_ASTERISKS) or []) entries = [] object_type = request.options.get('objectType', request.flavor) for obj in result: if obj is None: continue module = self.get_module(object_type, obj.dn) if module is None: # This happens when concurrent a object is removed between the module.search() and self.get_module() call MODULE.warning('LDAP object does not exists %s (flavor: %s). The object is ignored.', obj.dn, request.flavor) continue entry = { '$dn$': obj.dn, '$childs$': module.childs, '$flags$': [x.decode('UTF-8') for x in obj.oldattr.get('univentionObjectFlag', [])], '$operations$': module.operations, 'objectType': module.name, 'labelObjectType': module.subtitle, 'name': module.obj_description(obj), 'path': ldap_dn2path(obj.dn, include_rdn=False, ldap_base=module.ldap_base), } if '$value$' in fields: entry['$value$'] = [module.property_description(obj, column['name']) for column in module.columns] for field in fields - set(module.password_properties) - set(entry.keys()): entry[field] = module.property_description(obj, field) entries.append(entry) return entries
[docs] def reports_query(self, request): """Returns a list of reports for the given object type""" # i18n: translation for univention-directory-reports _('PDF Document') ldap_connection = self.get_ldap_connection()[0] report_types = self.reports_cfg.get_report_names(request.flavor) result = [ {'id': name, 'label': _(name)} for name in sorted(report_types) if ldap_connection.authz.is_report_create_allowed(ldap_connection, request.flavor, name, raise_exception=False) ] self.finished(request.id, result)
[docs] def sanitize_reports_create(self, request): choices = self.reports_cfg.get_report_names(request.flavor) return { "report": ChoicesSanitizer(choices=choices, required=True), "objects": ListSanitizer(DNSanitizer(minimum=1), required=True, min_elements=1), }
[docs] @sanitize_func(sanitize_reports_create) @threaded def reports_create(self, request): """Creates a report for the given LDAP DNs and returns the URL to access the file""" ldap_connection = self.get_ldap_connection()[0] ldap_connection.authz.is_report_create_allowed(ldap_connection, request.flavor, request.options['report']) report = udr.Report(ldap_connection) try: report_file = report.create(request.flavor, request.options['report'], request.options['objects']) except udr.ReportError as exc: raise UMC_Error(str(exc)) path = '/usr/share/univention-management-console-module-udm/' filename = os.path.join(path, os.path.basename(report_file)) shutil.move(report_file, path) os.chmod(filename, 0o600) url = '/univention/command/udm/reports/get?report=%s' % (quote(os.path.basename(report_file)),) return {'URL': url}
[docs] @allow_get_request @sanitize(report=StringSanitizer(required=True)) def reports_get(self, request): report = request.options['report'] path = '/usr/share/univention-management-console-module-udm/' filename = os.path.join(path, os.path.basename(report)) try: with open(filename, 'rb') as fd: self.finished(request.id, fd.read(), mimetype='text/csv' if report.endswith('.csv') else 'application/pdf') except OSError: raise UMC_Error(_('The report does not exists. Please create a new one.'), status=404)
[docs] def values(self, request): """ Returns the default search pattern/value for the given object property requests.options = {} 'objectProperty' -- the object property that should be scanned return: <value> """ module = self._get_module_by_request(request) property_name = request.options.get('objectProperty') if property_name == 'None': result = None else: result = module.get_default_values(property_name) self.finished(request.id, result)
[docs] @sanitize( networkDN=StringSanitizer(required=True), increaseCounter=BooleanSanitizer(default=False), ) def network(self, request): """ Returns the next IP configuration based on the given network object requests.options = {} 'networkDN' -- the LDAP DN of the network object 'increaseCounter' -- if given and set to True, network object counter for IP addresses is increased return: {} """ module = UDM_Module('networks/network') obj = module.get(request.options['networkDN']) if not obj: raise ObjectDoesNotExist(request.options['networkDN']) try: obj.refreshNextIp() except udm_errors.nextFreeIp: raise NoIpLeft(request.options['networkDN']) result = {'ip': obj['nextIp'], 'dnsEntryZoneForward': obj['dnsEntryZoneForward'], 'dhcpEntryZone': obj['dhcpEntryZone'], 'dnsEntryZoneReverse': obj['dnsEntryZoneReverse']} self.finished(request.id, result) if request.options['increaseCounter']: # increase the next free IP address obj.stepIp() obj.modify()
[docs] @module_from_request @simple_response() def containers(self, module): """ Returns the list of default containers for the given object type. Therefore the Python module and the default object in the LDAP directory are searched. requests.options = {} 'objectType' -- The UDM module name return: [ { 'id' : <LDAP DN of container>, 'label' : <name> }, ... ] """ containers = [{'id': x, 'label': ldap_dn2path(x, ldap_base=module.ldap_base)} for x in module.get_default_containers()] return sorted(containers, key=lambda x: x['label'].lower())
[docs] @module_from_request @simple_response def templates(self, module): """ Returns the list of template objects for the given object type. requests.options = {} 'objectType' -- The UDM module name return: [ { 'id' : <LDAP DN of container or None>, 'label' : <name> }, ... ] """ result = [] if module.template: template = UDM_Module(module.template) objects = template.search(ucr.get('ldap/base')) for obj in objects: obj.open() result.append({'id': obj.dn, 'label': template.obj_description(obj)}) return result
[docs] def types(self, request): """ Returns the list of object types matching the given flavor or container. requests.options = {} 'superordinate' -- if available only types for the given superordinate are returned (not for the navigation) 'container' -- if available only types suitable for the given container are returned (only for the navigation) return: [ { 'id' : <LDAP DN of container or None>, 'label' : <name> }, ... ] """ superordinate = request.options.get('superordinate') if request.flavor != 'navigation': module = UDM_Module(request.flavor) if superordinate: module = self.get_module(request.flavor, superordinate) or module self.finished(request.id, module.child_modules) return container = request.options.get('container') or superordinate if not container: # no container is specified, return all existing object types MODULE.info('no container specified, returning all object types') self.finished(request.id, [{'id': name, 'label': getattr(mod, 'short_description', name)} for name, mod in udm_modules.modules.items()]) return if container == 'None': # if 'None' is given, use the LDAP base container = ucr.get('ldap/base') MODULE.info("no container == 'None', set LDAP base as container") # create a list of modules that can be created # ... all container types except container/dc allowed_modules = {m for m in udm_modules.containers if udm_modules.name(m) != 'container/dc'} # the container may be a superordinate or have one as its parent # (or grandparent, ....) superordinate = udm_modules.find_superordinate(container, None, self.get_ldap_connection()[0]) if superordinate: # there is a superordinate... add its subtypes to the list of allowed modules MODULE.info('container has a superordinate: %s', superordinate) allowed_modules.update(udm_modules.subordinates(superordinate)) else: # add all types that do not have a superordinate MODULE.info('container has no superordinate') allowed_modules.update(mod for mod in udm_modules.modules.values() if not udm_modules.superordinates(mod)) # make sure that the object type can be created allowed_modules = [mod for mod in allowed_modules if udm_modules.supports(mod, 'add')] MODULE.info('all modules that are allowed: %s', [udm_modules.name(mod) for mod in allowed_modules]) # return the final list of object types self.finished(request.id, [{'id': udm_modules.name(_module), 'label': getattr(_module, 'short_description', udm_modules.name(_module))} for _module in allowed_modules])
[docs] @bundled @sanitize(objectType=StringSanitizer()) # objectDN=StringSanitizer(allow_none=True), def layout(self, request): """ Returns the layout information for the given object type. requests.options = {} 'objectType' -- The UDM module name. If not available the flavor is used return: <layout data structure (see UDM Python modules)> """ module = self._get_module_by_request(request) module.load(force_reload=True) # reload for instant extended attributes if request.flavor == 'users/self': object_dn = None else: object_dn = request.options.get('objectDN') return module.get_layout(object_dn)
[docs] @bundled @sanitize( objectType=StringSanitizer(), objectDn=StringSanitizer(), searchable=BooleanSanitizer(default=False), ) def properties(self, request): """ Returns the properties of the given object type. requests.options = {} 'searchable' -- If given only properties that might be used for search filters are returned return: [ {}, ... ] """ module = self._get_module_by_request(request) module.load(force_reload=True) # reload for instant extended attributes object_dn = request.options.get('objectDN') properties = module.get_properties(object_dn) if request.options.get('searchable', False): properties = [prop for prop in properties if prop.get('searchable', False)] return properties
[docs] @module_from_request @simple_response def options(self, module): """ Returns the options specified for the given object type requests.options = {} 'objectType' -- The UDM module name. If not available the flavor is used return: [ {}, ... ] """ return module.options
[docs] @bundled @sanitize( objectType=StringSanitizer(), ) def policies(self, request): """Returns a list of policy types that apply to the given object type""" module = self._get_module_by_request(request) return module.policies
[docs] @threaded def validate(self, request): """ Validates the correctness of values for properties of the given object type. Therefore the syntax definition of the properties is used. requests.options = {} 'objectType' -- The UDM module name. If not available the flavor is used return: [ { 'property' : <name>, 'valid' : (True|False), 'details' : <message> }, ... ] """ module = self._get_module_by_request(request) result = [] for property_name, value in request.options.get('properties').items(): # ignore special properties named like $.*$, e.g. $options$ if property_name.startswith('$') and property_name.endswith('$'): continue property_obj = module.get_property(property_name) if property_obj is None: raise UMC_Error(_('Property %s not found') % property_name) # check each element if 'value' is a list if isinstance(value, tuple | list) and property_obj.multivalue: subResults = [] subDetails = [] for ival in value: try: property_obj.syntax.parse(ival) subResults.append(True) subDetails.append('') except (udm_errors.valueInvalidSyntax, udm_errors.valueError, TypeError) as e: subResults.append(False) subDetails.append(str(e)) result.append({'property': property_name, 'valid': subResults, 'details': subDetails}) # otherwise we have a single value else: try: property_obj.syntax.parse(value) result.append({'property': property_name, 'valid': True}) except (udm_errors.valueInvalidSyntax, udm_errors.valueError) as e: result.append({'property': property_name, 'valid': False, 'details': str(e)}) return result
[docs] @sanitize( syntax=StringSanitizer(required=True), key=SearchSanitizer(use_asterisks=False), ) @simple_response def syntax_choices_key(self, syntax, key): """ If size limit is reached search only for the current value (so that the selected value is valid). Bug #26556: git:ce2b2842b7c6728047c4d4e1cd2d7d399c401e4a """ # FIXME: this is a blocking method. At least execute in a thread! # FIXME: remove and replace with a elegant mechanism. lo, po = self.get_ldap_connection() syntax = _get_syntax(syntax) if syntax is None: return return search_syntax_choices_by_key(syntax, key, lo, po)
[docs] @sanitize(syntax=StringSanitizer(required=True)) @simple_response def syntax_choices_info(self, syntax): """ Fetch meta information about syntax choices. By doing a search query the number of results is returned. Check if the size limit would be reached. If reached, ComboBoxes add a entry with a search bar. Only used by UDM_Objects (and UDM_Attributes). """ # FIXME: this is a blocking method. At least execute in a thread! # FIXME: remove, replace with pagination of syntax choices. Or do it directly in udm/properties lo, po = self.get_ldap_connection() syntax = _get_syntax(syntax) if syntax is None: return return info_syntax_choices(syntax, ldap_connection=lo, ldap_position=po)
[docs] @sanitize( objectPropertyValue=SearchSanitizer(), objectProperty=ObjectPropertySanitizer(), syntax=StringSanitizer(required=True), ) @threaded def syntax_choices(self, request): """ Dynamically determine valid values for a given syntax class requests.options = {} 'syntax' -- The UDM syntax class return: [ { 'id' : <name>, 'label' : <text> }, ... ] """ syntax = _get_syntax(request.options['syntax']) if syntax is None: return options = request.options options.pop('allow_asterisks', None) # internal option options['dependencies'] = {} dependency_name = options.get('$depends$') if options.get(dependency_name): options['dependencies'] = {dependency_name: options.pop(dependency_name)} ldap_connection, ldap_position = self.get_ldap_connection() return read_syntax_choices(syntax, options, ldap_connection=ldap_connection, ldap_position=ldap_position)
[docs] @sanitize( container=StringSanitizer(default='', allow_none=True), ) @threaded def move_container_query(self, request): scope = 'one' modules = self.modules_with_childs container = request.options['container'] if not container: scope = 'base' return self._container_query(request, container, modules, scope)
[docs] @sanitize( container=StringSanitizer(allow_none=True), ) @threaded def nav_container_query(self, request): """ Returns a list of LDAP containers located under the given LDAP base (option 'container'). If no base container is specified the LDAP base object is returned. """ ldap_base = ucr['ldap/base'] container = request.options['container'] modules = self.modules_with_childs scope = 'one' if not container: # get the tree root == the ldap base scope = 'base' elif request.flavor != 'navigation' and container and ldap_base.lower() == container.lower(): # this is the tree root of DNS / DHCP, show all zones / services scope = 'sub' modules = [request.flavor] return self._container_query(request, container, modules, scope)
def _container_query(self, request, container, modules, scope): """Get a list of containers or child objects of the specified container.""" if not container: container = ucr['ldap/base'] defaults = {} if request.flavor != 'navigation': defaults['$operations$'] = ['search'] # disallow edit module = UDM_Module(request.flavor) if request.flavor in ('dns/dns', 'dhcp/dhcp', 'blocklists/all'): defaults.update({ 'label': module.title, 'icon': 'udm-%s' % (request.flavor.replace('/', '-'),), }) return [dict({ 'id': container, 'label': ldap_dn2path(container, ldap_base=module.ldap_base), 'icon': 'udm-container-dc', 'path': ldap_dn2path(container, ldap_base=module.ldap_base), 'objectType': 'container/dc', '$operations$': UDM_Module('container/dc').operations, '$flags$': [], '$childs$': True, '$isSuperordinate$': False, }, **defaults)] result = [] for xmodule in modules: xmodule = UDM_Module(xmodule) superordinate = udm_objects.get_superordinate(xmodule.module, None, self.get_ldap_connection()[0], container) try: for item in xmodule.search(container, scope=scope, superordinate=superordinate): module = UDM_Module(item.module) result.append({ 'id': item.dn, 'label': module.obj_description(item), 'icon': 'udm-%s' % (module.name.replace('/', '-')), 'path': ldap_dn2path(item.dn, ldap_base=xmodule.ldap_base), 'objectType': module.name, '$operations$': module.operations, '$flags$': [x.decode('UTF-8') for x in item.oldattr.get('univentionObjectFlag', [])], '$childs$': module.childs, '$isSuperordinate$': udm_modules.isSuperordinate(module.module), }) except UDM_Error as exc: raise UMC_Error(str(exc)) return result
[docs] @sanitize( container=StringSanitizer(required=True), ) def nav_object_query(self, request): """ Returns a list of objects in a LDAP container (scope: one) requests.options = {} 'container' -- the base container where the search should be started (default: LDAP base) 'objectType' -- the object type that should be displayed (optional) 'objectProperty' -- the object property that should be scanned (optional) 'objectPropertyValue' -- the filter that should b found in the property (optional) return: [ { '$dn$' : <LDAP DN>, 'objectType' : <UDM module name>, 'path' : <location of object> }, ... ] """ object_type = request.options.get('objectType', '') ldap_connection, ldap_position = self.get_ldap_connection() if object_type not in ('None', '$containers$'): # we need to search for a specific objectType, then we should call the standard query # we also need to get the correct superordinate superordinate = udm_objects.get_superordinate(object_type, None, ldap_connection, request.options['container']) if superordinate and superordinate.module == 'settings/cn': # false positive detected superordinate; Bug #32843 superordinate = None if superordinate: superordinate = superordinate.dn request.options['superordinate'] = superordinate request.options['scope'] = 'one' self.query(request) return def _thread(container): entries = [] for module, obj in list_objects(container, object_type=object_type, ldap_connection=ldap_connection, ldap_position=ldap_position): if obj is None: continue if object_type != '$containers$' and module.childs: continue if object_type == '$containers$' and not module.childs: continue entries.append({ '$dn$': obj.dn, '$childs$': module.childs, 'objectType': module.name, 'labelObjectType': module.subtitle, 'name': udm_objects.description(obj), 'path': ldap_dn2path(obj.dn, include_rdn=False, ldap_base=module.ldap_base), '$flags$': [x.decode('UTF-8') for x in obj.oldattr.get('univentionObjectFlag', [])], '$operations$': module.operations, }) return entries thread = SimpleThread('NavObjectQuery', _thread, lambda r, t: self.thread_finished_callback(r, t, request)) thread.run(request.options['container'])
[docs] @sanitize(DictSanitizer({ "objectType": StringSanitizer(required=True), "policies": ListSanitizer(), "policyType": StringSanitizer(required=True), "objectDN": Sanitizer(default=None), "container": Sanitizer(default=None), # objectDN=StringSanitizer(default=None, allow_none=True), # container=StringSanitizer(default=None, allow_none=True) })) @threaded def object_policies(self, request): """ Returns a virtual policy object containing the values that the given object or container inherits """ object_dn = None container_dn = None obj = None def _get_object(_dn, _module): """Get existing UDM object and corresponding module. Verify user input.""" if _module is None or _module.module is None: raise UMC_Error('The given object type is not valid') _obj = _module.get(_dn) if _obj is None or (_dn and not _obj.exists()): raise ObjectDoesNotExist(_dn) return _obj def _get_object_parts(_options): """Get object related information and corresponding UDM object/module. Verify user input.""" _object_type = _options['objectType'] _object_dn = _options['objectDN'] _container_dn = _options['container'] if (object_dn, container_dn) == (_object_dn, _container_dn): # nothing has changed w.r.t. last entry -> return last values return (object_dn, container_dn, obj) _obj = None _module = None if _object_dn: # editing an exiting UDM object -> use the object itself _module = UDM_Module(_object_type) _obj = _get_object(_object_dn, _module) elif _container_dn: # editing a new (i.e. non existing) object -> use the parent container _module = self.get_module(None, _container_dn) _obj = _get_object(_container_dn, _module) return (_object_dn, _container_dn, _obj) ret = [] for ioptions in request.options: object_dn, container_dn, obj = _get_object_parts(ioptions) policy_dns = ioptions.get('policies', []) policy_module = UDM_Module(ioptions['policyType']) policy_obj = _get_object(policy_dns[0] if policy_dns else None, policy_module) if obj is None: ret.append({}) continue policy_obj.clone(obj) # There are 2x2x2 (=8) cases that may occur (c.f., Bug #31916): # (1) # [edit] editing existing UDM object # -> the existing UDM object itself is loaded # [new] virtually edit non-existing UDM object (when a new object is being created) # -> the parent container UDM object is loaded # (2) # [w/pol] UDM object has assigned policies in LDAP directory # [w/o_pol] UDM object has no policies assigned in LDAP directory # (3) # [inherit] user request to (virtually) change the policy to 'inherited' # [set_pol] user request to (virtually) assign a particular policy faked_policy_reference = None if object_dn and not policy_dns: # case: [edit; w/pol; inherit] # -> current policy is (virtually) overwritten with 'None' faked_policy_reference = [None] elif not object_dn and policy_dns: # cases: # * [new; w/pol; inherit] # * [new; w/pol; set_pol] # -> old + temporary policy are both (virtually) set at the parent container faked_policy_reference = obj.policies + policy_dns else: # cases: # * [new; w/o_pol; inherit] # * [new; w/o_pol; set_pol] # * [edit; w/pol; set_pol] # * [edit; w/o_pol; inherit] # * [edit; w/o_pol; set_pol] faked_policy_reference = policy_dns policy_obj.policy_result(faked_policy_reference) infos = copy.copy(policy_obj.polinfo_more) for key in infos.keys(): if key in policy_obj.polinfo: if isinstance(infos[key], tuple | list): continue infos[key]['value'] = policy_obj.polinfo[key] ret.append(infos) return ret
[docs] @threaded def object_options(self, request): """ Returns the options known by the given objectType. If an LDAP DN is passed the current values for the options of this object are returned, otherwise the default values for the options are returned. """ object_type = request.options.get('objectType') if not object_type: raise UMC_Error('The object type is missing') object_dn = request.options.get('objectDN') module = UDM_Module(object_type) if module.module is None: raise UMC_Error('The given object type is not valid') return module.get_option(object_dn)
[docs] @sanitize(email=EmailSanitizer(required=True)) @simple_response def request_new_license(self, email): license = dump_license() if license is None: raise UMC_Error(_('Cannot parse License from LDAP')) data = {} data['email'] = email data['licence'] = license data = urlencode(data).encode('ASCII') url = 'https://license.univention.de/keyid/conversion/submit' request = Request(url, data=data, headers={'User-agent': 'UMC/AppCenter'}) # noqa: S310 self._request_license(request) # creating a new ucr variable to prevent duplicated registration (Bug #35711) handler_set(['ucs/web/license/requested=true']) return True
def _request_license(self, request): try: urlopen(request) except (OSError, HTTPError, URLError) as exc: strerror = '' if hasattr(exc, 'read'): # try to parse an html error body = exc.read().decode('UTF-8', 'replace') match = re.search('<span id="details">(?P<details>.*?)</span>', body, flags=re.DOTALL) if match: strerror = match.group(1).replace('\n', '') if not strerror: if hasattr(exc, 'getcode') and exc.getcode() >= 400: strerror = _('This seems to be a problem with the license server. Please try again later.') while hasattr(exc, 'reason'): exc = exc.reason if hasattr(exc, 'errno'): version = ucr.get('version/version') errno = exc.errno strerror += getattr(exc, 'strerror', '') or '' if errno == 1: # gaierror(1, something like 'SSL Unknown protocol') link_to_doc = _('https://docs.software-univention.de/manual-%s.html#ip-config:Web_proxy_for_caching_and_policy_management__virus_scan') % version strerror += '. ' + _('This may be a problem with the proxy of your system. You may find help at %s.') % link_to_doc if errno == -2: # gaierror(-2, 'Name or service not known') link_to_doc = _('https://docs.software-univention.de/manual-%s.html#networks:dns') % version strerror += '. ' + _('This is probably due to the DNS settings of your server. You may find help at %s.') % link_to_doc if not strerror.strip(): strerror = str(exc) raise UMC_Error(_('An error occurred while contacting the license server: %s') % (strerror,), status=500)