Source code for univention.config_registry.frontend

# -*- coding: utf-8 -*-
#
"""Univention Configuration Registry command line implementation."""
#  main configuration registry classes
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
#
# API stability :pylint: disable-msg=W0613
# Rewrite       :pylint: disable-msg=R0912,R0914,R0915
from __future__ import print_function

import os
import sys
import re
import time

import six

import univention.config_registry_info as cri
from univention.config_registry.backend import exception_occured, SCOPE, ConfigRegistry
from univention.config_registry.handler import run_filter, ConfigHandlers
from univention.config_registry.misc import validate_key, escape_value
from univention.config_registry.filters import filter_shell, filter_keys_only, filter_sort
if six.PY2:
	from io import open
try:
	from typing import Any, Callable, Dict, IO, Iterator, List, NoReturn, Optional, Tuple  # noqa: F401
except ImportError:  # pragma: no cover
	pass

__all__ = [
	'REPLOG_FILE',
	'UnknownKeyException',
	'handler_set',
	'handler_unset',
	'handler_dump',
	'handler_update',
	'handler_commit',
	'handler_register',
	'handler_unregister',
	'handler_filter',
	'handler_search',
	'handler_get',
	'handler_info',
	'handler_version',
	'handler_help',
	'main',
]

REPLOG_FILE = '/var/log/univention/config-registry.replog'

_SHOW_EMPTY, _SHOW_DESCRIPTION, _SHOW_SCOPE, _SHOW_CATEGORIES, _SHOW_DEFAULT = (1 << _ for _ in range(5))


[docs]class UnknownKeyException(Exception): """Query for unknown key: no info file nor set.""" def __init__(self, value): Exception.__init__(self, value) def __str__(self): return 'W: Unknown key: "%s"' % self.args
def replog(ucr, var, old_value, value=None): # type: (ConfigRegistry, str, Optional[str], Optional[str]) -> None """ This function writes a new entry to replication logfile if this feature has been enabled. :param ucr: UCR instance. :param var: UCR variable name. :param old_value: Old UCR variable value. :param value: New UCR variable value. `None` is now unset. """ if ucr.is_true('ucr/replog/enabled', False): if value is not None: method = 'set' varvalue = "%s=%s" % (var, escape_value(value)) else: method = 'unset' varvalue = "'%s'" % var scope_arg = { ConfigRegistry.LDAP: '--ldap-policy ', ConfigRegistry.FORCED: '--force ', ConfigRegistry.SCHEDULE: '--schedule ', }.get(ucr.scope, '') if old_value is None: old_value = "[Previously undefined]" log = u'%s: %s %s%s old:%s\n' % (time.strftime("%Y-%m-%d %H:%M:%S"), method, scope_arg, varvalue, old_value) try: if not os.path.isfile(REPLOG_FILE): os.close(os.open(REPLOG_FILE, os.O_CREAT, 0o640)) with open(REPLOG_FILE, "a+", encoding='utf-8') as logfile: logfile.write(log) except EnvironmentError as ex: print("E: exception occurred while writing to replication log: %s" % (ex,), file=sys.stderr) exception_occured()
[docs]def handler_set(args, opts={}, quiet=False): # type: (List[str], Dict[str, Any], bool) -> None """ Set config registry variables in args. Args is an array of strings 'key=value' or 'key?value'. :param args: Command line arguments. :param opts: Command line options. :param quiet: Hide output. """ ucr = _ucr_from_opts(opts) with ucr: changes = {} # type: Dict[str, Optional[str]] for arg in args: sep_set = arg.find('=') # set sep_def = arg.find('?') # set if not already set if sep_set == -1 and sep_def == -1: print("W: Missing value for config registry variable '%s'" % (arg,), file=sys.stderr) continue else: if sep_set > 0 and sep_def == -1: sep = sep_set elif sep_def > 0 and sep_set == -1: sep = sep_def else: sep = min(sep_set, sep_def) key = arg[0:sep] value = arg[sep + 1:] key_exists = key in ucr._layer do_set_value = (not key_exists or sep == sep_set) if do_set_value and validate_key(key): if not quiet: if key_exists: print('Setting %s' % key) else: print('Create %s' % key) changes[key] = value else: if do_set_value: opts['exit_code'] = 1 if not quiet: if key_exists: print('Not updating %s' % key) else: print('Not setting %s' % key) changed = ucr.update(changes) _run_changed(ucr, changed, "" if quiet else 'W: %s is overridden by scope "%s"')
[docs]def handler_unset(args, opts={}): # type: (List[str], Dict[str, Any]) -> None """ Unset config registry variables in args. :param args: Command line arguments. :param opts: Command line options. """ ucr = _ucr_from_opts(opts) with ucr: changes = {} # type: Dict[str, Optional[str]] for arg in args: if arg in ucr._layer: print('Unsetting %s' % arg) changes[arg] = None else: msg = "W: The config registry variable '%s' does not exist" print(msg % (arg,), file=sys.stderr) changed = ucr.update(changes) _run_changed(ucr, changed, 'W: %s is still set in scope "%s"')
def ucr_update(ucr, changes): # type: (ConfigRegistry, Dict[str, Optional[str]]) -> None """ Set or unset the given config registry variables. :param ucr: UCR instance. :param changes: Changed UCR variables. """ with ucr: changed = ucr.update(changes) _run_changed(ucr, changed) def _run_changed(ucr, changed, msg=""): # type: (ConfigRegistry, Dict[str, Tuple[Optional[str], Optional[str]]], str) -> None """ Run handlers for changed UCR variables. :param ucr: UCR instance. :param changed: Mapping from UCR variable name to 2-tuple (old-value, new-value). :param msg: Message to be printed when change is shadowed by higher layer. Must contain 2 `%s` placeholders for `key` and `scope-name`. """ visible = {} # type: Dict[str, Tuple[Optional[str], Optional[str]]] for key, (old_value, new_value) in changed.items(): replog(ucr, key, old_value, new_value) reg = ucr.scope while old_value is None and reg > 0: reg -= 1 old_value = ucr._registry[reg].get(key) reg, new_value = ucr.get(key, (0, None), getscope=True) if reg > ucr.scope: if msg: print(msg % (key, SCOPE[reg]), file=sys.stderr) else: visible[key] = (old_value, new_value) handlers = ConfigHandlers() handlers.load() handlers(list(visible), (ucr, visible)) def _ucr_from_opts(opts): # type: (Dict[str, Any]) -> ConfigRegistry """ Create :py:class:`ConfigRegistry` instance according to requested layer. :param opts: Command line options. :returns: A new UCR instance. """ if opts.get('ldap-policy', False): scope = ConfigRegistry.LDAP elif opts.get('force', False): scope = ConfigRegistry.FORCED elif opts.get('schedule', False): scope = ConfigRegistry.SCHEDULE else: scope = ConfigRegistry.NORMAL ucr = ConfigRegistry(write_registry=scope) return ucr
[docs]def handler_dump(args, opts={}): # type: (List[str], Dict[str, Any]) -> Iterator[str] """ Dump all variables. :param args: Command line arguments. :param opts: Command line options. """ ucr = ConfigRegistry() ucr.load() for line in str(ucr).split('\n'): yield line
[docs]def handler_update(args, opts={}): # type: (List[str], Dict[str, Any]) -> None """ Update handlers. :param args: Command line arguments. :param opts: Command line options. """ handlers = ConfigHandlers() cur = handlers.update() handlers.update_divert(cur) ucr = ConfigRegistry() _register_variable_default_values(ucr)
[docs]def handler_commit(args, opts={}): # type: (List[str], Dict[str, Any]) -> None """ Commit all registered templated files. :param args: Command line arguments. :param opts: Command line options. """ ucr = ConfigRegistry() ucr.load() handlers = ConfigHandlers() handlers.load() handlers.commit(ucr, args)
[docs]def handler_register(args, opts={}): # type: (List[str], Dict[str, Any]) -> None """ Register new `.info` file. :param args: Command line arguments. :param opts: Command line options. """ ucr = ConfigRegistry() ucr.load() handlers = ConfigHandlers() handlers.update() # cache must be current # Bug #21263: by forcing an update here, the new .info file is already # incorporated. Calling register for multifiles will increment the # def_count a second time, which is not nice, but uncritical, since the # diversion is (re-)done when >= 1. _register_variable_default_values(ucr) handlers.register(args[0], ucr)
# handlers.commit((ucr, {}))
[docs]def handler_unregister(args, opts={}): # type: (List[str], Dict[str, Any]) -> None """ Unregister old `.info` file. :param args: Command line arguments. :param opts: Command line options. """ ucr = ConfigRegistry() ucr.load() handlers = ConfigHandlers() cur = handlers.update() # cache must be current obsolete = handlers.unregister(args[0], ucr) handlers.update_divert(cur - obsolete) _register_variable_default_values(ucr)
[docs]def handler_filter(args, opts={}): # type: (List[str], Dict[str, Any]) -> None """Run filter on STDIN to STDOUT.""" ucr = ConfigRegistry() ucr.load() stdout = sys.stdout if six.PY2 else sys.stdout.buffer # type: IO[bytes] # type: ignore stdout.write(run_filter(sys.stdin.read(), ucr, opts=opts))
[docs]def handler_get(args, opts={}): # type: (List[str], Dict[str, Any]) -> Iterator[str] """ Return config registry variable. :param args: Command line arguments. :param opts: Command line options. """ ucr = ConfigRegistry() ucr.load() key = args[0] value = ucr.get(key) if value is None: return elif OPT_FILTERS['shell'][2]: yield '%s: %s' % (key, value) else: yield value
def variable_info_string(key, value, variable_info, scope=None, details=_SHOW_DESCRIPTION): # type: (str, Optional[str], Optional[cri.Variable], Optional[int], int) -> str """ Format UCR variable key, value, description, scope, categories and default value. :param key: UCR variable name. :param value: UCR variable value. :param variable_info: Description object. :param scope: UCS layer. :param details: bit-field for detail-level. :returns: formatted string """ if value is None and not variable_info: raise UnknownKeyException(key) elif value in (None, '') and not _SHOW_EMPTY & details: return '' elif value is None: # if not shell filter option is set if not OPT_FILTERS['shell'][2]: value_string = '<empty>' else: value_string = '' else: value_string = '%s' % value if scope is None or not 0 <= scope < len(SCOPE) or not _SHOW_SCOPE & details or OPT_FILTERS['shell'][2]: # Do not display scope in shell export key_value = '%s: %s' % (key, value_string) else: key_value = '%s (%s): %s' % (key, SCOPE[scope], value_string) info = [key_value] if variable_info and _SHOW_DESCRIPTION & details: # info.append(' ' + variable_info.get('description', # 'no description available')) # <https://forge.univention.org/bugzilla/show_bug.cgi?id=15556> # Workaround: description = variable_info.get('description') if not description or not description.strip(): description = 'no description available' info.append(' ' + description) if variable_info and _SHOW_CATEGORIES & details: info.append(' Categories: ' + variable_info.get('categories', 'none')) if variable_info and _SHOW_DEFAULT & details: info.append(' Default: ' + variable_info.get('default', '(not set)')) if (_SHOW_CATEGORIES | _SHOW_DESCRIPTION) & details: info.append('') return '\n'.join(info)
[docs]def handler_info(args, opts={}): # type: (List[str], Dict[str, Any]) -> Iterator[str] """ Print variable info. :param args: Command line arguments. :param opts: Command line options. """ ucr = ConfigRegistry() ucr.load() info = _get_config_registry_info() for arg in args: try: yield variable_info_string( arg, ucr.get(arg, None), info.get_variable(arg), details=_SHOW_EMPTY | _SHOW_DESCRIPTION | _SHOW_CATEGORIES | _SHOW_DEFAULT) except UnknownKeyException as ex: print(ex, file=sys.stderr)
[docs]def handler_version(args, opts={}): # type: (List[str], Dict[str, Any]) -> NoReturn """ Print version info. :param args: Command line arguments. :param opts: Command line options. """ print('univention-config-registry @%@package_version@%@') sys.exit(0)
[docs]def handler_help(args, opts={}, out=sys.stdout): # type: (List[str], Dict[str, Any], IO) -> None """ Print config registry command line usage. :param args: Command line arguments. :param opts: Command line options. """ print(u''' univention-config-registry: base configuration for UCS copyright (c) 2001-2022 Univention GmbH, Germany Syntax: univention-config-registry [options] <action> [options] [parameters] Options: -h | --help | -?: print this usage message and exit program --version | -v: print version information and exit program --shell (valid actions: dump, search): convert key/value pair into shell compatible format, e.g. `version/version: 1.0` => `version_version="1.0"` --keys-only (valid actions: dump, search): print only the keys Actions: set [--force|--schedule|--ldap-policy] <key>=<value> [... <key>=<value>]: set one or more keys to specified values; if a key is non-existent in the configuration registry it will be created get <key>: retrieve the value of the specified key from the configuration database unset [--force|--schedule|--ldap-policy] <key> [... <key>]: remove one or more keys (and its associated values) from configuration database dump: display all key/value pairs which are stored in the configuration database search [--key|--value|--all] [--category <category>] [--brief|-verbose] \\ [--non-empty] [... <regex>]: displays all key/value pairs and their descriptions that match at least one of the given regular expressions --key: only search the keys (default) --value: only search the values --all: search keys, values and descriptions --category: limit search to variables of <category> --brief: don't print descriptions (default controlled via ucr/output/brief) --verbose: also print category for each variable --non-empty: only search in non-empty variables no <regex> given: display all variables info <key> [... <key>]: display verbose information for the specified variable(s) shell [key]: convert key/value pair into shell compatible format, e.g. `version/version: 1.0` => `version_version="1.0"` (deprecated: use --shell dump instead) commit [file1 ...]: rebuild configuration file from univention template; if no file is specified ALL configuration files are rebuilt filter [file]: evaluate a template file, expects python inline code in UTF-8 or US-ASCII Description: univention-config-registry is a tool to handle the basic configuration for Univention Corporate Server (UCS) ''', file=out) # noqa: E101 sys.exit(0) # noqa: E101
def missing_parameter(action): # type: (str) -> NoReturn """Print missing parameter error.""" print('E: too few arguments for command [%s]' % (action,), file=sys.stderr) print('try `univention-config-registry --help` for more information', file=sys.stderr) sys.exit(1) def _get_config_registry_info(): # type: () -> cri.ConfigRegistryInfo cri.set_language('en') return cri.ConfigRegistryInfo(install_mode=False) def _register_variable_default_values(ucr): # type: (ConfigRegistry) -> None """Create base-default.conf layer containig all default values""" info = _get_config_registry_info() _ucr = ConfigRegistry(write_registry=ConfigRegistry.DEFAULTS) _ucr.load() defaults = {} # type: Dict[str, Optional[str]] default_variables = info.get_variables() for key, variable in default_variables.items(): value = variable.get('default') if value: defaults[key] = value for key in _ucr: if key not in default_variables: defaults[key] = None changed = {key: (old, new) for key, (old, new) in _ucr.update(defaults).items() if old != new} _ucr.save() ucr.load() _run_changed(ucr, changed, 'I: %s will be set in scope "%s"') HANDLERS = { 'set': (handler_set, 1), 'unset': (handler_unset, 1), 'dump': (handler_dump, 0), 'update': (handler_update, 0), 'commit': (handler_commit, 0), 'register': (handler_register, 1), 'unregister': (handler_unregister, 1), 'filter': (handler_filter, 0), 'search': (handler_search, 0), 'get': (handler_get, 1), 'info': (handler_info, 1), } # type: Dict[str, Tuple[Callable[[List[str], Dict[str, Any]], Optional[Iterator[str]]], int]] # action options: each of these options perform an action OPT_ACTIONS = { # name: [function, state, (alias list)] 'help': [handler_help, False, ('-h', '-?')], 'version': [handler_version, False, ('-v',)], 'debug': [lambda args: None, False, ()], } # type: Dict[str, List] # filter options: these options define filter for the output OPT_FILTERS = { # name: [prio, function, state, (valid actions)] 'keys-only': [0, filter_keys_only, False, ('dump', 'search')], 'sort': [10, filter_sort, False, ('dump', 'search', 'info')], 'shell': [99, filter_shell, False, ('dump', 'search', 'get')], } # type: Dict[str, List] BOOL, STRING = range(2) OPT_COMMANDS = { 'set': { 'force': [BOOL, False], 'ldap-policy': [BOOL, False], 'schedule': [BOOL, False], }, 'unset': { 'force': [BOOL, False], 'ldap-policy': [BOOL, False], 'schedule': [BOOL, False], }, 'search': { 'key': [BOOL, False], 'value': [BOOL, False], 'all': [BOOL, False], 'brief': [BOOL, False], 'category': [STRING, None], 'non-empty': [BOOL, False], 'verbose': [BOOL, False], }, 'filter': { 'encode-utf8': [BOOL, False], 'disallow-execution': [BOOL, False], } } # type: Dict[str, Dict[str, List]]
[docs]def main(args): # type: (List[str]) -> int """Run config registry.""" try: # close your eyes ... if not args: args.append('--help') # search for options in command line arguments while args and args[0].startswith('-'): arg = args.pop(0) # is action option? for key, opt in OPT_ACTIONS.items(): if arg[2:] == key or arg in opt[2]: opt[1] = True break else: # not an action option; is a filter option? try: OPT_FILTERS[arg[2:]][2] = True except LookupError: print('E: unknown option %s' % (arg,), file=sys.stderr) sys.exit(1) # is action already defined by global option? for name, (func, state, _aliases) in OPT_ACTIONS.items(): if state: func(args) # find action try: action = args.pop(0) except IndexError: print('E: missing action, see --help', file=sys.stderr) sys.exit(1) # COMPAT: the 'shell' command is now an option and equivalent to # --shell search if action == 'shell': action = 'search' # activate shell option OPT_FILTERS['shell'][2] = True # switch to old, brief output OPT_COMMANDS['search']['brief'][1] = True tmp = [] if not args: tmp.append('') else: for arg in args: if not arg.startswith('--'): tmp.append('^%s$' % arg) else: tmp.append(arg) args = tmp # set 'sort' option by default for dump and search if action in ['dump', 'search', 'info']: OPT_FILTERS['sort'][2] = True # set brief option when generating shell output if OPT_FILTERS['shell'][2]: OPT_COMMANDS['search']['brief'][1] = True # if a filter option is set: verify that a valid command is given for name, (_prio, func, state, actions) in OPT_FILTERS.items(): if state: if action not in actions: print('E: invalid option --%s for command %s' % (name, action), file=sys.stderr) sys.exit(1) # check command options cmd_opts = OPT_COMMANDS.get(action, {}) while args and args[0].startswith('--'): arg = args.pop(0) if action in ('set', 'unset') and arg == '--forced': arg = '--force' try: cmd_opt_tuple = cmd_opts[arg[2:]] except LookupError: print('E: invalid option %s for command %s' % (arg, action), file=sys.stderr) sys.exit(1) else: if cmd_opt_tuple[0] == BOOL: cmd_opt_tuple[1] = True else: # STRING try: cmd_opt_tuple[1] = args.pop(0) except IndexError: msg = 'E: option %s for command %s expects an argument' print(msg % (arg, action), file=sys.stderr) sys.exit(1) # Drop type cmd_opts = {key: value for key, (typ, value) in cmd_opts.items()} # action! try: handler_func, min_args = HANDLERS[action] except LookupError: print('E: unknown action "%s", see --help' % (action,), file=sys.stderr) sys.exit(1) else: # enough arguments? if len(args) < min_args: missing_parameter(action) # if any filter option is set cmd_opts['exit_code'] = 0 result = handler_func(args, cmd_opts) if result is None: return cmd_opts['exit_code'] results = result # let the filter options do their job for (_prio, filter_func, state, _actions) in sorted(OPT_FILTERS.values()): if not state: continue results = filter_func(args, results) for line in results: print(line) except (EnvironmentError, TypeError): if OPT_ACTIONS['debug'][1]: raise exception_occured() return cmd_opts['exit_code']
# vim:set sw=4 ts=4 noet: