# -*- coding: utf-8 -*-
#
# main configuration registry classes
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""Univention Configuration Registry backend for data storage."""
from __future__ import print_function
import sys
import os
import fcntl
import re
import errno
import time
from enum import IntEnum
from stat import S_ISREG
try:
from collections.abc import Mapping, MutableMapping # Python 3.3+
except ImportError:
from collections import Mapping, MutableMapping
import six
from univention.config_registry.handler import run_filter
if six.PY2:
from io import open
try:
from typing import overload, Any, Dict, IO, Iterator, List, ItemsView, NoReturn, Optional, Set, Tuple, Type, TypeVar, Union # noqa: F401
from types import TracebackType # noqa: F401
from typing_extension import Literal # noqa: F401
_T = TypeVar('_T', bound='ReadOnlyConfigRegistry')
_VT = TypeVar('_VT')
except ImportError: # pragma: no cover
def overload(f): # type ignore
pass
__all__ = ['StrictModeException', 'exception_occured', 'SCOPE', 'ConfigRegistry']
MYPY = False
INVALID_VALUE_CHARS = '\r\n'
[docs]class StrictModeException(Exception):
"""Attempt to store non-UTF-8 characters in strict UTF-8 mode."""
[docs]def exception_occured(out=sys.stderr):
# type: (IO) -> NoReturn
"""
Print exception message and exit.
:param out: Output stream for message.
"""
print(u'E: your request could not be fulfilled', file=out)
print(u'try `univention-config-registry --help` for more information', file=out)
sys.exit(1)
SCOPE = ['normal', 'ldap', 'schedule', 'forced', 'custom', 'default']
class Load(IntEnum):
MANUAL = 0
ONCE = 1
ALWAYS = 2
if MYPY: # pragma: no cover
_M = Mapping[str, str]
_MM = MutableMapping[str, str]
else:
_M = Mapping
_MM = MutableMapping
class BooleanConfigRegistry(object):
"""
Mixin class for boolean operations.
"""
TRUE = frozenset({'yes', 'true', '1', 'enable', 'enabled', 'on'})
FALSE = frozenset({'no', 'false', '0', 'disable', 'disabled', 'off'})
def is_true(self, key=None, default=False, value=None):
# type: (Optional[str], bool, Optional[str]) -> bool
"""
Return if the strings value of key is considered as true.
:param key: UCR variable name.
:param default: Default value to return, if UCR variable is not set.
:param value: text string to directly evaluate instead of looking up the key.
:returns: `True` when the value is one of `yes`, `true`, `1`, `enable`, `enabled`, `on`.
>>> ucr = ConfigRegistry('/dev/null')
>>> ucr['key'] = 'yes'
>>> ucr.is_true('key')
True
>>> ucr.is_true('other')
False
>>> ucr.is_true('other', True)
True
>>> ucr.is_true(value='1')
True
"""
if value is None:
value = self.get(key) # type: ignore
if value is None:
return default
return value.lower() in self.TRUE
def is_false(self, key=None, default=False, value=None):
# type: (Optional[str], bool, Optional[str]) -> bool
"""
Return if the strings value of key is considered as false.
:param key: UCR variable name.
:param default: Default value to return, if UCR variable is not set.
:param value: text string to directly evaulate instead of looking up the key.
:returns: `True` when the value is one of `no`, `false`, `0`, `disable`, `disabled`, `off`.
>>> ucr = ConfigRegistry('/dev/null')
>>> ucr['key'] = 'no'
>>> ucr.is_false('key')
True
>>> ucr.is_false('other')
False
>>> ucr.is_false('other', True)
True
>>> ucr.is_false(value='0')
True
"""
if value is None:
value = self.get(key) # type: ignore
if value is None:
return default
return value.lower() in self.FALSE
class ViewConfigRegistry(_M, BooleanConfigRegistry):
"""
Immutable view of UCR.
"""
def __init__(self, ucr):
# type: (Mapping[str, str]) -> None
self.ucr = ucr
def __getitem__(self, key):
# type: (str) -> str
return self.ucr.__getitem__(key)
def __iter__(self):
# type: () -> Iterator[str]
return self.ucr.__iter__()
def __len__(self):
# type: () -> int
return self.ucr.__len__()
class ReadOnlyConfigRegistry(_M, BooleanConfigRegistry):
"""
Merged persistent read-only value store.
This is a merged view of several sub-registries.
:param filename: File name for custom layer text database file.
"""
DEFAULTS, NORMAL, LDAP, SCHEDULE, FORCED, CUSTOM = range(6)
LAYER_PRIORITIES = (CUSTOM, FORCED, SCHEDULE, LDAP, NORMAL, DEFAULTS)
PREFIX = '/etc/univention'
BASES = {
NORMAL: 'base.conf',
LDAP: 'base-ldap.conf',
SCHEDULE: 'base-schedule.conf',
FORCED: 'base-forced.conf',
DEFAULTS: 'base-defaults.conf',
}
def __init__(self, filename=""):
# type: (str) -> None
super(ReadOnlyConfigRegistry, self).__init__()
custom = os.getenv('UNIVENTION_BASECONF') or filename
self.autoload = Load.MANUAL
self._registry = {} # type: Dict[int, _ConfigRegistry]
for reg in self.LAYER_PRIORITIES:
if reg == self.CUSTOM:
self._registry[reg] = _ConfigRegistry(custom if custom else os.devnull)
else:
self._registry[reg] = _ConfigRegistry(os.devnull if custom else os.path.join(self.PREFIX, self.BASES[reg]))
def _walk(self):
# type: () -> Iterator[Tuple[int, _ConfigRegistry]]
"""
Iterator over layers.
:returns: Iterator of 2-tuple (layers-mumber, layer)
"""
if self.autoload:
self.load(Load.MANUAL if self.autoload == Load.ONCE else self.autoload)
for reg in self.LAYER_PRIORITIES:
registry = self._registry[reg]
yield (reg, registry)
def load(self, autoload=Load.MANUAL):
# type: (_T, Load) -> _T
"""
Load registry from file.
:param autoload: Automatically reload changed files.
"""
for reg in self._registry.values():
reg.load()
self.autoload = Load.MANUAL # prevent recursion!
strict = six.PY2 and self.is_true('ucr/encoding/strict')
self.autoload = autoload
for reg in self._registry.values():
reg.strict_encoding = strict
return self
def __enter__(self):
# type: () -> ViewConfigRegistry
"""
Return immutable view despite `autoload`.
:returns: A frozen registry.
> ucr_live = ConfigRegistry().load(autoload=Load.ALWAYS)
> with ucr_live as ucr_frozen:
> for key, value in ucr_frozen.items():
> print(key, value)
"""
return ViewConfigRegistry(self._merge())
def __exit__(self, exc_type, exc_value, traceback):
# type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None
"""
Release registry view.
"""
def __getitem__(self, key): # type: ignore
# type: (str) -> Optional[str]
"""
Return registry value.
:param key: UCR variable name.
:returns: the value or `None`.
Bug #28276: ucr[key] returns None instead of raising KeyError - it would break many UCR templates!
"""
return self.get(key)
def __contains__(self, key): # type: ignore
# type: (str) -> bool
"""
Check if registry key is set.
:param key: UCR variable name.
:returns: `True` is set, `False` otherwise.
"""
return any(key in registry for _reg, registry in self._walk())
def __iter__(self):
# type: () -> Iterator[str]
"""
Iterate over all registry keys.
:returns: Iterator over all UCR variable names.
"""
merge = self._merge()
for key in merge:
yield key
def __len__(self):
# type: () -> int
"""
Return length.
:returns: Number of UCR variables set.
"""
merge = self._merge()
return len(merge)
@overload # type: ignore
def get(self, key, default, getscope): # noqa: F811 # pragma: no cover
# type: (str, _VT, Literal[True]) -> Union[Tuple[int, str], _VT]
pass
@overload
def get(self, key, default=None): # noqa: F811 # pragma: no cover
# type: (str, _VT) -> Union[str, _VT]
pass
def get(self, key, default=None, getscope=False): # noqa: F811
# type: (str, Optional[_VT], bool) -> Union[str, Tuple[int, str], _VT, None]
"""
Return registry value (including optional scope).
:param key: UCR variable name.
:param default: Default value when the UCR variable is not set.
:param getscope: `True` makes the method return the scope level in addition to the value itself.
:returns: the value or a 2-tuple (level, value) or the default.
"""
for reg, registry in self._walk():
try:
value = registry[key] # type: str
except KeyError:
continue
if reg == self.DEFAULTS:
value = self._eval_default(value)
return (reg, value) if getscope else value
return default
@overload
def get_int(self, key): # noqa: F811 # pragma: no cover
# type: (str) -> Optional[int]
pass
@overload # type: ignore
def get_int(self, key, default): # noqa: F811 # pragma: no cover
# type: (str, _VT) -> Union[int, _VT]
pass
def get_int(self, key, default=None): # noqa: F811
# type: (str, Optional[_VT]) -> Union[int, _VT, None]
"""
Return registry value as int.
:param key: UCR variable name.
:param default: Default value when the UCR variable is not set.
:returns: the registry value or the default.
"""
try:
return int(self[key]) # type: ignore
except (KeyError, TypeError, ValueError):
return default
@overload
def _merge(self): # pragma: no cover
# type: () -> Dict[str, str]
pass
@overload
def _merge(self, getscope): # noqa: F811 # pragma: no cover
# type: (Literal[True]) -> Dict[str, Tuple[int, str]]
pass
def _merge(self, getscope=False): # noqa: F811
# type: (bool) -> Union[Dict[str, str], Dict[str, Tuple[int, str]]]
"""
Merge sub registry.
:param getscope: `True` makes the method return the scope level in addition to the value itself.
:returns: A mapping from varibal ename to eiter the value (if `getscope` is False) or a 2-tuple (level, value).
"""
merge = {} # type: Dict[str, Union[str, Tuple[int, str]]]
for reg, registry in self._walk():
for key, value in registry.items():
if key not in merge:
if reg == self.DEFAULTS:
value = self._eval_default(value)
merge[key] = (reg, value) if getscope else value
return merge # type: ignore
def _eval_default(self, default):
# type: (str) -> str
"""
Recursively evaluate default value.
:param default: Default value.
:returns: Substituted value.
"""
try:
value = run_filter(default, self, opts={'disallow-execution': True})
except RuntimeError: # maximum recursion depth exceeded
value = b''
if six.PY2:
return value
return value.decode("UTF-8")
@overload
def items(self): # pragma: no cover
# type: () -> ItemsView[str, str]
pass
@overload
def items(self, getscope): # noqa: F811 # pragma: no cover
# type: (Literal[True]) -> ItemsView[str, Tuple[int, str]]
pass
def items(self, getscope=False): # noqa: F811
# type: (bool) -> Union[ItemsView[str, str], ItemsView[str, Tuple[int, str]]]
"""
Return all registry entries a 2-tuple (key, value) or (key, (scope, value)) if getscope is True.
:param getscope: `True` makes the method return the scope level in addition to the value itself.
:returns: A mapping from varibal ename to eiter the value (if `getscope` is False) or a 2-tuple (level, value).
"""
merge = self._merge(getscope=getscope)
return merge.items() # type: ignore
def __str__(self):
# type: () -> str
"""Return registry content as string."""
merge = self._merge()
return '\n'.join(['%s: %s' % (key, val) for key, val in merge.items()])
[docs]class ConfigRegistry(ReadOnlyConfigRegistry, _MM):
"""
Merged persistent value store.
This is a merged view of several sub-registries.
:param filename: File name for custom layer text database file.
:param write_registry: The UCR level used for writing.
"""
def __init__(self, filename="", write_registry=ReadOnlyConfigRegistry.NORMAL):
# type: (str, int) -> None
super(ConfigRegistry, self).__init__(filename)
custom = os.getenv('UNIVENTION_BASECONF') or filename
self.scope = self.CUSTOM if custom else write_registry
for reg in self.LAYER_PRIORITIES:
registry = self._registry[reg]
registry._create_base_conf()
@property
def _layer(self):
# type: () -> _ConfigRegistry
"""
Return selected layer.
"""
return self._registry[self.scope]
[docs] def save(self):
# type: () -> None
"""Save registry to file."""
self._layer.save()
[docs] def lock(self):
# type: () -> None
"""Lock registry file."""
self._layer.lock()
[docs] def unlock(self):
# type: () -> None
"""Un-lock registry file."""
self._layer.unlock()
def __enter__(self): # type: ignore
# type: () -> ConfigRegistry
"""
Lock Config Registry for read-modify-write cycle.
:returns: The locked registry.
> with ConfigRegistry() as ucr:
> ucr['key'] = 'value'
"""
self.lock()
self.load()
return self
def __exit__(self, exc_type, exc_value, traceback):
# type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None
"""
Unlock registry.
"""
if exc_type is None:
self.save()
self.unlock()
[docs] def clear(self):
# type: () -> None
"""
Clear all registry keys.
"""
self._layer.clear()
def __delitem__(self, key):
# type: (str) -> None
"""
Delete registry key.
:param key: UCR variable name.
"""
del self._layer[key]
def __setitem__(self, key, value):
# type: (str, str) -> None
"""
Set registry value.
:param key: UCR variable name.
:param value: UCR variable value.
"""
self._layer[key] = value
[docs] def update(self, changes): # type: ignore
# type: (Dict[str, Optional[str]]) -> Dict[str, Tuple[Optional[str], Optional[str]]]
"""
Set or unset the given config registry variables.
:param changes: dictionary of ucr-variable-name: value-or-None.
:returns: A mapping from UCR variable name to a 2-tuple (old-value, new-value)
"""
registry = self._layer
changed = {}
for key, value in changes.items():
old_value = registry.get(key, None)
if value is None:
try:
del registry[key]
except KeyError:
continue
else:
registry[key] = value
new_value = registry.get(key, value)
changed[key] = (old_value, new_value)
return changed
[docs] def setdefault(self, key, default): # type: ignore
# type: (str, str) -> str
"""
Set value for variable only when not yet set.
:param key: UCR variable name.
:param default: UCR variable value.
:returns: The old value, if the variable was not yet set, otherwise the new value.
"""
# Bug #28276: setdefault() required KeyError
value = self.get(key, default=self)
if value is self:
value = self[key] = default
return value # type: ignore
class _ConfigRegistry(dict):
"""
Persistent value store.
This is a single value store using a text file.
:param filename: File name for text database file.
"""
RE_COMMENT = re.compile(r'^[^:]*#.*$')
def __init__(self, filename):
# type: (str) -> None
dict.__init__(self)
self.file = filename
self.backup_file = self.file + '.bak'
self.lock_filename = self.file + '.lock'
# will be set by <ConfigRegistry> for each <_ConfigRegistry> - <True>
# means the backend files are valid UTF-8 and should stay that way -->
# only accept valid UTF-8
self.strict_encoding = False
self.lock_file = None # type: Optional[IO]
self.mtime = 0.0
def load(self):
# type: () -> None
"""Load sub registry from file."""
for fn in (self.file, self.backup_file):
new = {}
try:
file_stat = os.stat(fn)
if file_stat.st_mtime <= self.mtime and fn == self.file:
return
with open(fn, 'r', encoding='utf-8') as reg_file:
if reg_file.readline() == '' or reg_file.readline() == '':
continue
reg_file.seek(0)
for line in reg_file:
line = self.RE_COMMENT.sub("", line)
if line == '':
continue
if line.find(': ') == -1:
continue
key, value = line.split(': ', 1)
new[key] = value.strip()
break
except EnvironmentError:
pass
else:
return
self.mtime = file_stat.st_mtime
self.update(new)
for key in set(self.keys()) - set(new.keys()):
self.pop(key, None)
if fn != self.file:
self._save_file(self.file)
def _create_base_conf(self):
# type: () -> None
"""Create sub registry file."""
try:
reg_file = os.open(self.file, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
os.close(reg_file)
except EnvironmentError as ex:
if ex.errno == errno.EEXIST and not os.path.isdir(self.file):
return
msg = "E: could not create file '%s': %s" % (self.file, ex)
print(msg, file=sys.stderr)
exception_occured()
def _save_file(self, filename):
# type: (str) -> None
"""
Save sub registry to file.
:param filename: File name for saving.
:raises EnvironmentError: on fatal errors.
"""
temp_filename = '%s.temp' % filename
try:
try:
file_stat = os.stat(filename)
if not S_ISREG(file_stat.st_mode):
return
except EnvironmentError:
file_stat = os.stat_result((0o0644, -1, -1, -1, 0, 0, -1, -1, -1, -1))
# open temporary file for writing
self._save_to(temp_filename)
try:
os.chmod(temp_filename, file_stat.st_mode)
os.chown(temp_filename, file_stat.st_uid, file_stat.st_gid)
os.rename(temp_filename, filename)
except EnvironmentError as ex:
if ex.errno == errno.EBUSY:
with open(filename, 'w+', encoding='utf-8') as fd:
fd.write(open(temp_filename, 'r', encoding='utf-8').read())
os.unlink(temp_filename)
else:
# In this case the temp file created above in this
# function was already moved by a concurrent UCR
# operation. Dump the current state to a backup file
temp_filename = '%s.concurrent_%s' % (filename, time.time())
self._save_to(temp_filename)
except EnvironmentError as ex:
# suppress certain errors
if ex.errno != errno.EACCES:
raise
def _save_to(self, filename):
# type: (str) -> None
"""
Serialize sub registry to file.
:param filename: File name for saving.
"""
with open(filename, 'w', encoding='utf-8') as fd:
# write data to file
fd.write(u'# univention_ base.conf\n\n')
fd.write(self.__unicode__())
# flush (meta)data
fd.flush()
os.fsync(fd.fileno())
def save(self):
# type: () -> None
"""Save sub registry to file."""
for filename in (self.backup_file, self.file):
self._save_file(filename)
def lock(self):
# type: () -> None
"""Lock sub registry file."""
self.lock_file = lock = open(self.lock_filename, "a+", encoding='utf-8')
fcntl.flock(lock.fileno(), fcntl.LOCK_EX)
def unlock(self):
# type: () -> None
"""Un-lock sub registry file."""
if self.lock_file is not None:
self.lock_file.close()
self.lock_file = None
def __setitem__(self, key, value):
"""
Set value in sub registry.
:param key: UCR variable name.
:param value: UCR variable value.
"""
if self.strict_encoding:
try:
key.decode('UTF-8') # only accept valid UTF-8 encoded bytes
except UnicodeError:
raise StrictModeException('variable name is not UTF-8 encoded')
try:
value.decode('UTF-8') # only accept valid UTF-8 encoded bytes
except UnicodeError:
raise StrictModeException('value is not UTF-8 encoded')
return dict.__setitem__(self, key, value)
@staticmethod
def remove_invalid_chars(seq):
# type: (str) -> str
"""
Remove non-UTF-8 characters from value.
:param seq: Text string.
:returns: Text string with invalid characters removed.
"""
for letter in INVALID_VALUE_CHARS:
seq = seq.replace(letter, '')
return seq
def __str__(self):
# type: () -> str
"""Return sub registry content as string."""
return '\n'.join(
'%s: %s' % (key, self.remove_invalid_chars(val))
for key, val in sorted(self.items())
)
def __unicode__(self):
data = self.__str__()
if isinstance(data, bytes):
data = data.decode('UTF-8')
return data
# vim:set sw=4 ts=4 noet: