Source code for univention.management.console.modules.ucr

#!/usr/bin/python3
#
# Univention Management Console
#  module: manages Univention Config Registry variables
#
# SPDX-FileCopyrightText: 2006-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

import re
from io import StringIO
from re import Pattern
from typing import Any

import univention.info_tools as uit
from univention.config_registry import ConfigRegistry, handler_set, handler_unset, validate_key
from univention.config_registry_info import ConfigRegistryInfo, Variable
from univention.lib.i18n import Translation
from univention.management.console.base import Base, UMC_Error
from univention.management.console.config import ucr
from univention.management.console.modules.decorators import sanitize, simple_response
from univention.management.console.modules.sanitizers import (
    ChoicesSanitizer, DictSanitizer, PatternSanitizer, StringSanitizer,
)


_ = Translation('univention-management-console-module-ucr').translate


ONLINE_BASE = 'repository/online'
COMPONENT_BASE = f'{ONLINE_BASE}/component'
DEPRECATED_VARS = ['prefix', 'username', 'password', 'unmaintained', 'port']
DEPRECATED_GEN = [f'{ONLINE_BASE}/{dep}' for dep in DEPRECATED_VARS]
RE_KEY = re.compile(f'{COMPONENT_BASE}/([^/]+)/({"|".join(DEPRECATED_VARS)})')


[docs] class UCRKeySanitizer(StringSanitizer): def _sanitize(self, value: str, name: str, further_arguments: list[Any]) -> str | None: """ sanitizing UCR keys :param value: Value to be sanitized :param name: name of the key :param further_arguments: List of further arguments :return: sanitized value or None if an Error is raised """ value = super()._sanitize(value, name, further_arguments) b = StringIO() if not validate_key(value, b): error_message = b.getvalue() pre_error_message = _('A valid UCR variable name must contain at least one character and can only contain letters, numerals, "/", ".", ":", "_" and "-".') self.raise_validation_error(f'{pre_error_message} {error_message}') return return value
[docs] class Instance(Base):
[docs] def init(self): # set the language in order to return the correctly localized labels/descriptions uit.set_language(self.locale.language)
def __create_variable_info(self, options: dict) -> None: """ creating variable infos :param options: List of options """ all_info = ConfigRegistryInfo(registered_only=False) info = ConfigRegistryInfo(install_mode=True) info.read_customized() var = Variable() # description for line in options['descriptions']: text = line['text'] if not text: continue if 'lang' in line: var[f'description[{line["lang"]}]'] = text else: var['description'] = text # categories if options['categories']: var['categories'] = ','.join(options['categories']) # type var['type'] = options['type'] # are there any modifications? old_value = all_info.get_variable(options['key']) if old_value != var: # save info.add_variable(options['key'], var) info.write_customized()
[docs] def is_readonly(self, key: str) -> bool: ucrinfo_system = ConfigRegistryInfo(registered_only=False, load_customized=False) var = ucrinfo_system.get_variable(key) if var: return var.get('readonly') in ('yes', '1', 'true') return False
[docs] @sanitize(DictSanitizer({ 'object': DictSanitizer({ 'key': UCRKeySanitizer(required=True), 'value': StringSanitizer(default=''), }), })) def add(self, request) -> None: # does the same as put ucr.load() already_set = set(ucr.keys()) & {v['object']['key'] for v in request.options} if already_set: raise UMC_Error(_('The UCR variable %s is already set.') % ('", "'.join(already_set))) self.put(request)
[docs] @sanitize(DictSanitizer({ 'object': DictSanitizer({ 'key': UCRKeySanitizer(required=True), 'value': StringSanitizer(default=''), }), })) def put(self, request) -> None: for _var in request.options: var = _var['object'] value = var['value'] or '' key = var['key'] if self.is_readonly(key): raise UMC_Error(_('The UCR variable %s is read-only and can not be changed!') % (key,)) arg = [f'{key}={value}'] opts = {} handler_set(arg, opts) if 'exit_code' in opts and opts['exit_code'] != 0: if 'type_errors' in opts and len(opts['type_errors']) > 0: key, value = opts['type_errors'][0] raise UMC_Error(_('The value %s is not valid for the UCR variable %s!') % (value, key)) if 'type_def_error' in opts and len(opts['type_def_errors']) > 0: type_, key, value = opts['type_def_errors'][0] raise UMC_Error(_('Invalid UCR type definition for type %r of %r, value %r not set') % (type_, key, value)) # handle descriptions, type, and categories if 'descriptions' in var or 'type' in var or 'categories' in var: self.__create_variable_info(var) self.finished(request.id, True)
[docs] def remove(self, request) -> None: variables = [x for x in [x.get('object') for x in request.options] if x is not None] for var in variables: if self.is_readonly(var): raise UMC_Error(_('The UCR variable %s is read-only and can not be removed!') % (var,)) handler_unset(variables) self.finished(request.id, True)
[docs] def get(self, request) -> None: ucrReg = ConfigRegistry() ucrReg.load() ucrInfo = ConfigRegistryInfo(registered_only=False) # iterate over all requested variables results = [] for key in request.options: info = ucrInfo.get_variable(str(key)) value = ucrReg.get(str(key)) if not info and (value or value == ''): # only the value available results.append({'key': key, 'value': value}) elif info: # info (categories etc.) available info['value'] = value info['key'] = key results.append(info.normalize()) else: # variable not available, request failed raise UMC_Error(_('The UCR variable %(key)s could not be found') % {'key': key}) self.finished(request.id, results)
[docs] def categories(self, request) -> None: ucrInfo = ConfigRegistryInfo(registered_only=False) categories = [] for id, obj in ucrInfo.categories.items(): name = obj['name'] if ucrInfo.get_variables(id): categories.append({ 'id': id, 'label': name, }) self.finished(request.id, categories)
[docs] @sanitize(pattern=PatternSanitizer(default='.*'), key=ChoicesSanitizer(['all', 'key', 'value', 'description'], required=True)) @simple_response def query(self, pattern: str, key: str, category: list[str] | None = None) -> dict: """ Returns a dictionary of configuration registry variables found by searching for the (wildcard) expression defined by the HTTP request. Additionally a list of configuration registry categories can be defined. The dictionary returned is compatible with the Dojo data store format. """ variables = [] if category == 'all': # load _all_ config registry variables base_info = ConfigRegistryInfo(registered_only=False) else: # load _all registered_ config registry variables base_info = ConfigRegistryInfo() if category in ('all', 'all-registered'): category = None def _hidden(name: str, reg: Pattern) -> bool: if name in DEPRECATED_GEN: return True return bool(reg.fullmatch(name)) def _match_value(name, var): return var.value and pattern.match(var.value) def _match_key(name, var): return pattern.match(name) def _match_description(name, var): descr = var.get('description') return descr and pattern.match(descr) def _match_all(name, var): return _match_value(name, var) or _match_description(name, var) or _match_key(name, var) func = locals().get(f'_match_{key}') for name, var in base_info.get_variables(category).items(): if func(name, var) and not _hidden(name, RE_KEY): variables.append({ 'key': name, 'value': var.value, 'description': var.get('description', None), }) return variables