Source code for univention.config_registry.handler

#
#  main configuration registry classes
#
# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
#
# API stability :pylint: disable-msg=R0201,W0613,R0903
# Too pedantic  :pylint: disable-msg=W0704
# Rewrite       :pylint: disable-msg=R0912

"""Univention Configuration Registry handlers."""


import errno
import os
import pickle  # noqa: S403
import random
import re
import subprocess
import sys
from collections.abc import Iterable, Mapping
from grp import getgrnam
from pwd import getpwnam
from typing import IO, Any

from univention.config_registry.misc import asciify, directory_files
from univention.debhelper import parseRfc822  # pylint: disable-msg=W0403


_OPT = Mapping[str, Any]
_UCR = Mapping[str, str]
_CHANGES = Mapping[str, tuple[str | None, str | None]]
_ARG = tuple[_UCR, _CHANGES]
_INFO = Mapping[str, list[str]]

__all__ = ['ConfigHandlers']

VARIABLE_PATTERN = re.compile('@%@([^@]+)@%@')
VARIABLE_TOKEN = re.compile('@%@')
EXECUTE_TOKEN = re.compile(b'@!@')
WARNING_PATTERN = re.compile('(UCRWARNING|BCWARNING|UCRWARNING_ASCII)=(.+)')

INFO_DIR = '/etc/univention/templates/info'
FILE_DIR = '/etc/univention/templates/files'
SCRIPT_DIR = '/etc/univention/templates/scripts'
MODULE_DIR = '/etc/univention/templates/modules'

WARNING_TEXT = '''\
Warning: This file is auto-generated and might be overwritten by
         univention-config-registry.
         Please edit the following file(s) instead:
Warnung: Diese Datei wurde automatisch generiert und kann durch
         univention-config-registry ueberschrieben werden.
         Bitte bearbeiten Sie an Stelle dessen die folgende(n) Datei(en):

'''
assert asciify(WARNING_TEXT) == WARNING_TEXT, "Only ASCII allowed in WARNING_TEXT"


def run_filter(template: str, directory: _UCR, srcfiles: Iterable[str] = set(), opts: _OPT = {}) -> bytes:
    """
    Process a template file: substitute variables.

    :param template: Text string of template.
    :param directory: UCR instance.
    :param srcfiles: File names of source template.
    :param opts: Command line options.
    :returns: The modified template with all UCR variables and sections replaced.
    """
    template = _replace_variables(template, directory, srcfiles)

    tmpl = template.encode("UTF-8")

    if opts.get('disallow-execution', False):
        return tmpl

    tmpl = _replace_exec(tmpl)

    return tmpl


def _replace_variables(template: str, directory: _UCR, srcfiles: Iterable[str]) -> str:
    while True:
        i = VARIABLE_TOKEN.finditer(template)
        try:
            start = next(i)
            end = next(i)
            name = template[start.end():end.start()]

            if name in directory:
                value = directory[name]
                if not isinstance(value, str):
                    # Python 2 with unicode value
                    value = value.encode('UTF-8')  # important! template must not be of type unicode ever (in py2), otherwise some characters are lost in the below subprocess stdinput
            else:
                match = WARNING_PATTERN.match(name)
                if match:
                    mode, prefix = match.groups()
                    value = warning_string(prefix, srcfiles=srcfiles)
                    if mode == "UCRWARNING_ASCII":
                        value = asciify(value)
                else:
                    value = ''

            if isinstance(value, list | tuple):
                value = value[0]
            template = template[:start.start()] + value + template[end.end():]
        except StopIteration:
            break

    return template


def _replace_exec(template: bytes) -> bytes:
    while True:
        i = EXECUTE_TOKEN.finditer(template)
        try:
            start = next(i)
            end = next(i)

            proc = subprocess.Popen(
                (sys.executable,),
                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                close_fds=True)
            value = proc.communicate(b'''\
# -*- coding: utf-8 -*-
import univention.config_registry
configRegistry = univention.config_registry.ConfigRegistry()
configRegistry.load()
# for compatibility
baseConfig = configRegistry
%s
''' % template[start.end():end.start()])[0]
            template = template[:start.start()] + value + template[end.end():]

        except StopIteration:
            break

    return template


def run_script(script: str, arg: str, changes: _CHANGES) -> None:
    """
    Execute script with command line arguments using a shell and pass changes
    on STDIN.
    For each changed variable a line with the 'name of the variable', the 'old
    value', and the 'new value' are passed separated by '@%@'.

    :param script: File name of the script.
    :param arg: Execution mode, e.g. `generate` or `postinst`.
    :param changes: Dictionary of changed UCR variables, mapping UCR variable names to 2-tuple (old-value, new-value).
    """
    diff = [
        '%s@%%@%s@%%@%s\n' % (key, old, new)
        for (key, (old, new)) in changes.items()
        if old and new
    ]

    cmd = script + " " + arg
    proc = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, close_fds=True)
    proc.communicate(''.join(diff).encode('UTF-8'))


def run_module(modpath: str, fn: str, ucr: _UCR, changes: _CHANGES) -> None:
    """
    Load the Python module that MUST be located in :py:const:`MODULE_DIR` or any
    subdirectory.

    :param modpath: The path to the module relative to :py:const:`MODULE_DIR`.
    :param fn: Execution mode, e.g. `handler` or `preinst` or `postinst`.
    :param ucr: UCR instance.
    :param changes: Dictionary of changed UCR variables, mapping UCR variable names to 2-tuple (old-value, new-value).
    """
    # temporarily prepend MODULE_DIR to load path
    sys.path.insert(0, MODULE_DIR)
    module_name = os.path.splitext(modpath)[0]
    try:
        module = __import__(module_name.replace(os.path.sep, '.'))
        f = getattr(module, fn)
        f(ucr, changes)
    except (AttributeError, ImportError) as ex:
        print(ex, file=sys.stderr)
    del sys.path[0]


def warning_string(prefix: str = '# ', srcfiles: Iterable[str] = set()) -> str:
    """
    Generate UCR warning text.

    :param prefix: String to prepend before each line.
    :param srcfiles: File names of source template.
    :returns: A warning sting based on :py:const:`WARNING_TEXT`.
    """
    res = [
        '%s%s' % (prefix, line)
        for line in WARNING_TEXT.splitlines()
    ] + [
        '%s\t%s' % (prefix, srcfile)
        for srcfile in sorted(srcfiles)
    ] + [
        prefix,
    ]

    return "\n".join(res)


class ConfigHandler:
    """Base class of all config handlers."""

    variables: set[str] = set()

    def __call__(self, args: _ARG) -> None:
        raise NotImplementedError()


class ConfigHandlerDiverting(ConfigHandler):
    """
    File diverting config handler.

    :param to_file: Destination file name.
    """

    def __init__(self, to_file: str) -> None:
        super().__init__()
        self.to_file = os.path.join('/', to_file)
        self.user: int | None = None
        self.group: int | None = None
        self.mode: int | None = None
        self.preinst: str | None = None
        self.postinst: str | None = None

    def __hash__(self):
        """Return unique hash."""
        return hash(self.to_file)

    def __eq__(self, other):
        """Compare this to other handler."""
        if isinstance(other, type(self)):
            return self.to_file == other.to_file
        return NotImplemented

    def __ne__(self, other):
        """Compare this to other handler."""
        if isinstance(other, type(self)):
            return self.to_file != other.to_file
        return NotImplemented

    def _set_perm(self, stat: os.stat_result | None, to_file: str | None = None) -> None:
        """
        Set file permissions.

        :param stat: File status.
        :param to_file: Destination file name.
        """
        if not to_file:
            to_file = self.to_file
        elif self.to_file != to_file:
            try:
                old_stat = os.stat(self.to_file)
                os.chmod(to_file, old_stat.st_mode)
                os.chown(to_file, old_stat.st_uid, old_stat.st_gid)
            except OSError:
                pass

        if self.user or self.group or self.mode:
            if self.mode:
                os.chmod(to_file, self.mode)

            if self.user and self.group:
                os.chown(to_file, self.user, self.group)
            elif self.user:
                os.chown(to_file, self.user, 0)
            elif self.group:
                os.chown(to_file, 0, self.group)
        elif stat:
            os.chmod(to_file, stat.st_mode)

    def _call_silent(self, *cmd: str) -> int:
        """
        Call command with stdin, stdout, and stderr redirected from/to :file:`/dev/null`.

        :param cmd: List of command with arguments.
        :returns: Process exit code.
        """
        with open(os.path.devnull, 'w', encoding='utf-8') as null:
            # tell possibly wrapped dpkg-divert to really do the work
            env = dict(os.environ)
            env['DPKG_MAINTSCRIPT_PACKAGE'] = 'univention-config'
            return subprocess.call(cmd, stdin=null, stdout=null, stderr=null, env=env)

    def need_divert(self) -> bool:
        """Check if diversion is needed."""
        return False

    def install_divert(self) -> None:
        """Prepare file for diversion."""
        deb = '%s.debian' % self.to_file
        self._call_silent('dpkg-divert', '--quiet', '--rename', '--local', '--divert', deb, '--add', self.to_file)
        # Make sure a valid file still exists
        if os.path.exists(deb) and not os.path.exists(self.to_file):
            # Don't use shutil.copy2() which looses file ownership (Bug #22596)
            self._call_silent('cp', '-p', deb, self.to_file)

    def uninstall_divert(self) -> bool:
        """
        Undo diversion of file.

        :returns: `True` because the diversion was removed.
        """
        try:
            os.unlink(self.to_file)
        except OSError:
            pass
        deb = '%s.debian' % self.to_file
        self._call_silent('dpkg-divert', '--quiet', '--rename', '--local', '--divert', deb, '--remove', self.to_file)
        return True

    def _temp_file_name(self) -> str:
        dirname, basename = os.path.split(self.to_file)
        filename = '.%s__ucr__commit__%s' % (basename, random.random())
        return os.path.join(dirname, filename)


class ConfigHandlerMultifile(ConfigHandlerDiverting):
    """
    Handler for multifile.

    :param dummy_from_file: Source file name used to copy file permissions from.
    :param to_file: Destination file name.
    """

    def __init__(self, dummy_from_file: str, to_file: str) -> None:
        super().__init__(to_file)
        self.variables: set[str] = set()
        self.from_files: set[str] = set()
        self.dummy_from_file = dummy_from_file
        self.def_count = 1

    def __setstate__(self, state):
        """Load state upon unpickling."""
        self.__dict__.update(state)
        # may raise AttributeError, which forces UCR to rebuild the cache
        self.def_count  # :pylint: disable-msg=W0104  # noqa: B018

    def add_subfiles(self, subfiles: list[tuple[str, set[str]]]) -> None:
        """
        Add subfiles to multifile.

        :param subfiles: List of 2-tuples (file-name, set-of-variable-names).
        """
        for from_file, variables in subfiles:
            self.from_files.add(from_file)
            self.variables |= variables

    def remove_subfile(self, subfile: str) -> None:
        """
        Remove subfile.

        Removed diversion of set of sub-files becomes empty.
        """
        self.from_files.discard(subfile)
        if not self.need_divert():
            self.uninstall_divert()

    def __call__(self, args: _ARG) -> None:
        """Generate multfile from subfile templates."""
        ucr, changed = args
        print('Multifile: %s' % self.to_file)

        if hasattr(self, 'preinst') and self.preinst:
            run_module(self.preinst, 'preinst', ucr, changed)

        if self.def_count == 0 or not self.from_files:
            return

        to_dir = os.path.dirname(self.to_file)
        if not os.path.isdir(to_dir):
            os.makedirs(to_dir, 0o755)

        if os.path.isfile(self.dummy_from_file):
            stat: os.stat_result | None = os.stat(self.dummy_from_file)
        elif os.path.isfile(self.to_file):
            stat = os.stat(self.to_file)
        else:
            stat = None

        tmp_to_file = self._temp_file_name()
        try:
            filter_opts: dict[str, Any] = {}

            with open(tmp_to_file, 'wb') as to_fp:
                self._set_perm(stat, tmp_to_file)

                for from_file in sorted(self.from_files, key=os.path.basename):
                    try:
                        with open(from_file, encoding='utf-8') as from_fp:
                            to_fp.write(run_filter(from_fp.read(), ucr, srcfiles=self.from_files, opts=filter_opts))
                    except OSError:
                        continue

            try:
                os.rename(tmp_to_file, self.to_file)
            except OSError as ex:
                if ex.errno == errno.EBUSY:
                    with open(self.to_file, 'w+', encoding='utf-8') as fd:
                        fd.write(open(tmp_to_file, encoding='utf-8').read())
                    os.unlink(tmp_to_file)
        except Exception:
            if os.path.exists(tmp_to_file):
                os.unlink(tmp_to_file)
            raise

        if hasattr(self, 'postinst') and self.postinst:
            run_module(self.postinst, 'postinst', ucr, changed)

        script_file = os.path.join(SCRIPT_DIR, self.to_file.strip("/"))
        if os.path.isfile(script_file):
            run_script(script_file, 'postinst', changed)

    def need_divert(self) -> bool:
        """
        Diversion is needed when at least one multifile and one subfile
        definition exists.
        """
        return self.def_count >= 1 and bool(self.from_files)

    def install_divert(self) -> None:
        """Prepare file for diversion."""
        if self.need_divert():
            super().install_divert()

    def uninstall_divert(self) -> bool:
        """
        Undo diversion of file.

        :returns: `True` when the diversion is removed, `False` when the diversion is still needed.
        """
        if self.need_divert():
            return False
        return super().uninstall_divert()


class ConfigHandlerFile(ConfigHandlerDiverting):
    """
    Handler for (single)file.

    :param from_file: Template source file name.
    :param to_file: Destination file name.
    """

    def __init__(self, from_file: str, to_file: str) -> None:
        super().__init__(to_file)
        self.from_file = from_file

    def __call__(self, args: _ARG) -> None:
        """Generate file from template."""
        ucr, changed = args

        if hasattr(self, 'preinst') and self.preinst:
            run_module(self.preinst, 'preinst', ucr, changed)

        print('File: %s' % self.to_file)

        to_dir = os.path.dirname(self.to_file)
        if not os.path.isdir(to_dir):
            os.makedirs(to_dir, 0o755)

        try:
            stat = os.stat(self.from_file)
        except OSError:
            print("The referenced template file does not exist", file=sys.stderr)
            return

        tmp_to_file = self._temp_file_name()
        try:
            filter_opts: dict[str, Any] = {}

            with open(self.from_file, encoding='utf-8') as from_fp, open(tmp_to_file, 'wb') as to_fp:
                self._set_perm(stat, tmp_to_file)

                to_fp.write(run_filter(from_fp.read(), ucr, srcfiles=[self.from_file], opts=filter_opts))

            try:
                os.rename(tmp_to_file, self.to_file)
            except OSError as ex:
                if ex.errno == errno.EBUSY:
                    with open(self.to_file, 'w+', encoding='utf-8') as fd:
                        fd.write(open(tmp_to_file, encoding='utf-8').read())
                    os.unlink(tmp_to_file)
        except Exception:
            if os.path.exists(tmp_to_file):
                os.unlink(tmp_to_file)
            raise

        if hasattr(self, 'postinst') and self.postinst:
            run_module(self.postinst, 'postinst', ucr, changed)

        script_file = self.from_file.replace(FILE_DIR, SCRIPT_DIR)
        if os.path.isfile(script_file):
            run_script(script_file, 'postinst', changed)

    def need_divert(self) -> bool:
        """For simple files the diversion is always needed."""
        return True


class ConfigHandlerScript(ConfigHandler):
    """
    Handler for UCR scripts.

    :param script: Script file name.
    """

    def __init__(self, script: str) -> None:
        super().__init__()
        self.script = script

    def __hash__(self):
        """Return unique hash."""
        return hash(self.script)

    def __eq__(self, other):
        """Compare this to other handler."""
        if isinstance(other, type(self)):
            return self.script == other.script
        return NotImplemented

    def __ne__(self, other):
        """Compare this to other handler."""
        if isinstance(other, type(self)):
            return self.script != other.script
        return NotImplemented

    def __call__(self, args: _ARG) -> None:
        """Call external programm after change."""
        _ucr, changed = args
        print('Script: %s' % self.script)
        if os.path.isfile(self.script):
            run_script(self.script, 'generate', changed)


class ConfigHandlerModule(ConfigHandler):
    """
    Handler for UCR Python module.

    :param module: Module file name.
    """

    def __init__(self, module: str) -> None:
        super().__init__()
        self.module = module

    def __hash__(self):
        """Return unique hash."""
        return hash(self.module)

    def __eq__(self, other):
        """Compare this to other handler."""
        if isinstance(other, type(self)):
            return self.module == other.module
        return NotImplemented

    def __ne__(self, other):
        """Compare this to other handler."""
        if isinstance(other, type(self)):
            return self.module != other.module
        return NotImplemented

    def __call__(self, args: _ARG) -> None:
        """Call Python module after change."""
        ucr, changed = args
        print('Module: %s' % self.module)
        run_module(self.module, 'handler', ucr, changed)


def grep_variables(text: str) -> set[str]:
    """
    Search UCR template text for used variables.

    :returns: Set of all variables inside `@%@` delimiters.
    """
    return set(VARIABLE_PATTERN.findall(text))


[docs] class ConfigHandlers: """Manage handlers for configuration variables.""" CACHE_FILE = '/var/cache/univention-config/cache' # 0: without version # 1: with version header # 2: switch to handlers mapping to set, drop file, add multifile.def_count # 3: split config_registry into sub modules VERSION = 3 VERSION_MIN = 3 VERSION_MAX = 3 VERSION_TEXT = 'univention-config cache, version' VERSION_NOTICE = '%s %s\n' % (VERSION_TEXT, VERSION) VERSION_RE = re.compile('^%s (?P<version>[0-9]+)$' % VERSION_TEXT) _handlers: dict[str, set[ConfigHandler]] = {} # variable -> set(handlers) _multifiles: dict[str, ConfigHandlerMultifile] = {} # multifile -> handler _subfiles: dict[str, list[tuple[str, set[str]]]] = {} # multifile -> [(subfile, variables)] // pending def __init__(self) -> None: pass @staticmethod def _get_cache_version(cache_file: IO) -> int: """ Read cached `.info` data. :param cache_file: Opened cache file. :returns: Version. """ line = cache_file.readline() # IOError is propagated match = ConfigHandlers.VERSION_RE.match(line) if match: version = int(match.group('version')) # "Old style" cache (version 0) doesn't contain version notice else: cache_file.seek(0) version = 0 return version @classmethod def _parse_rfc822_file(cls, fname: str) -> list[dict[str, list[str]]]: """ Parse :rfc:`822` file. :param fname: Path to :rfc:`822` file. :returns: A list of dictionaries. """ with open(fname, encoding='utf-8') as fd: return parseRfc822(fd.read())
[docs] def load(self) -> None: """Load cached `.info` data or force update.""" try: with open(ConfigHandlers.CACHE_FILE, 'rb') as cache_file: version = self._get_cache_version(cache_file) chv = ConfigHandlers if not chv.VERSION_MIN <= version <= chv.VERSION_MAX: raise TypeError("Invalid cache file version.") pickler = pickle.Unpickler(cache_file) self._handlers = pickler.load() if version <= 1: # version <= 1: _handlers[multifile] -> [handlers] # version >= 2: _handlers[multifile] -> set([handlers]) self._handlers = {k: set(v) for k, v in self._handlers.items()} # version <= 1: _files UNUSED pickler.load() self._subfiles = pickler.load() self._multifiles = pickler.load() except (Exception, pickle.UnpicklingError): self.update()
[docs] def get_handler(self, entry: _INFO) -> ConfigHandler | None: """ Parse entry and return Handler instance. :param entry: `.info` file entry dictionary. :returns: An instance of `None`. """ try: typ = entry['Type'][0] handler = getattr(self, '_get_handler_%s' % typ) except (LookupError, AttributeError): return None else: return handler(entry)
def _parse_common_file_handler(self, handler: ConfigHandlerDiverting, entry: _INFO) -> None: """ Parse common file and multifile entries. :param handler: Handler instance. :param entry: `.info` file entry dictionary. """ try: handler.preinst = entry['Preinst'][0] except LookupError: pass try: handler.postinst = entry['Postinst'][0] except LookupError: pass handler.variables |= set(entry.get('Variables', set())) try: user = entry['User'][0] except LookupError: pass else: try: handler.user = getpwnam(user).pw_uid except LookupError: print(('W: failed to convert the username %s to the uid' % (user,)), file=sys.stderr) try: group = entry['Group'][0] except LookupError: pass else: try: handler.group = getgrnam(group).gr_gid except LookupError: print(('W: failed to convert the groupname %s to the gid' % (group,)), file=sys.stderr) try: mode = entry['Mode'][0] except LookupError: pass else: try: handler.mode = int(mode, 8) except ValueError: print('W: failed to convert mode %s' % (mode,), file=sys.stderr) def _get_handler_file(self, entry: _INFO) -> ConfigHandlerFile | None: """ Parse file entry and return Handler instance. :param entry: `.info` file entry dictionary. :returns: An instance of `None`. """ try: name = entry['File'][0] except LookupError: return None from_path = os.path.join(FILE_DIR, name) handler = ConfigHandlerFile(from_path, name) if os.path.exists(from_path): with open(from_path, encoding='utf-8') as fd: handler.variables = grep_variables(fd.read()) self._parse_common_file_handler(handler, entry) return handler def _get_handler_script(self, entry: _INFO) -> ConfigHandlerScript | None: """ Parse script entry and return Handler instance. :param entry: `.info` file entry dictionary. :returns: An instance of `None`. """ try: script = entry['Script'][0] variables = entry['Variables'] except LookupError: return None handler = ConfigHandlerScript(os.path.join(SCRIPT_DIR, script)) handler.variables = set(variables) return handler def _get_handler_module(self, entry: _INFO) -> ConfigHandlerModule | None: """ Parse module entry and return Handler instance. :param entry: `.info` file entry dictionary. :returns: An instance of `None`. """ try: module = entry['Module'][0] variables = entry['Variables'] except LookupError: return None handler = ConfigHandlerModule(os.path.splitext(module)[0]) handler.variables = set(variables) return handler def _get_handler_multifile(self, entry: _INFO) -> ConfigHandlerMultifile | None: """ Parse multifile entry and return Handler instance. :param entry: `.info` file entry dictionary. :returns: An instance of `None`. """ try: mfile = entry['Multifile'][0] except LookupError: return None try: handler = self._multifiles[mfile] handler.def_count += 1 except KeyError: from_path = os.path.join(FILE_DIR, mfile) handler = ConfigHandlerMultifile(from_path, mfile) self._parse_common_file_handler(handler, entry) # Add pending subfiles from earlier entries self._multifiles[mfile] = handler try: file_vars = self._subfiles.pop(mfile) handler.add_subfiles(file_vars) except KeyError: pass return handler def _get_handler_subfile(self, entry: dict[str, list[str]]) -> ConfigHandlerMultifile | None: """ Parse subfile entry and return Handler instance. :param entry: `.info` file entry dictionary. :returns: An instance of `None`. """ try: mfile = entry['Multifile'][0] subfile = entry['Subfile'][0] except LookupError: return None variables = set(entry.get('Variables', set())) name = os.path.join(FILE_DIR, subfile) try: with open(name, encoding='utf-8') as temp_file: variables |= grep_variables(temp_file.read()) except OSError: print("Failed to process Subfile %s" % (name,), file=sys.stderr) return None qentry = (name, variables) # if multifile handler does not yet exists, queue subfiles for later try: handler = self._multifiles[mfile] handler.add_subfiles([qentry]) except KeyError: pending = self._subfiles.setdefault(mfile, []) pending.append(qentry) return None return handler
[docs] def update(self) -> set[ConfigHandler]: """ Parse all `.info` files to build list of handlers. :returns: Set of all handlers. """ self._handlers.clear() self._multifiles.clear() self._subfiles.clear() handlers: set[ConfigHandler] = set() for info in directory_files(INFO_DIR): if not info.endswith('.info'): continue for section in self._parse_rfc822_file(info): handler = self.get_handler(section) if handler: handlers.add(handler) for handler in handlers: for variable in handler.variables: v2h = self._handlers.setdefault(variable, set()) v2h.add(handler) self._save_cache() return handlers
[docs] def update_divert(self, handlers: Iterable[ConfigHandler]) -> None: """ Synchronize diversions with handlers. :param handlers: List of handlers. """ wanted = {h.to_file: h for h in handlers if isinstance(h, ConfigHandlerDiverting) and h.need_divert()} to_remove: set[str] = set() # Scan for diversions done by UCR with open('/var/lib/dpkg/diversions', encoding='utf-8') as div_file: # from \n to \n package \n try: while True: path_from = next(div_file).rstrip() path_to = next(div_file).rstrip() diversion = next(div_file).rstrip() if path_from + '.debian' != path_to: continue if diversion != ':': # local diversion continue assert path_from not in to_remove # no duplicates try: handler = wanted.pop(path_from) except KeyError: to_remove.add(path_from) except StopIteration: pass # Remove existing diversion not wanted for path in to_remove: tmp_handler = ConfigHandlerDiverting(path) tmp_handler.uninstall_divert() # Install missing diversions still wanted for path, handler in wanted.items(): handler.install_divert()
def _save_cache(self) -> None: """Write cache file.""" try: with open(ConfigHandlers.CACHE_FILE, 'wb') as cache_file: cache_file.write(ConfigHandlers.VERSION_NOTICE.encode('utf-8')) pickler = pickle.Pickler(cache_file) pickler.dump(self._handlers) pickler.dump(self._subfiles) pickler.dump(self._multifiles) except OSError as ex: if ex.errno != errno.EACCES: raise
[docs] def register(self, package: str, ucr: _UCR) -> set[ConfigHandler]: """ Register new info file for package. :param package: Name of the package to register. :param ucr: UCR instance. :returns: Set of (new) handlers. """ handlers: set[ConfigHandler] = set() fname = os.path.join(INFO_DIR, '%s.info' % package) for section in self._parse_rfc822_file(fname): handler = self.get_handler(section) if handler: handlers.add(handler) for handler in handlers: if isinstance(handler, ConfigHandlerDiverting): handler.install_divert() values: dict[str, tuple[None, str | None]] = {} for variable in handler.variables: v2h = self._handlers.setdefault(variable, set()) v2h.add(handler) values[variable] = (None, ucr[variable]) try: _re = re.compile(variable) except re.error: continue values.update((key, (None, val)) for key, val in ucr.items() if _re.match(key)) handler((ucr, values)) self._save_cache() return handlers
[docs] def unregister(self, package: str, ucr: _UCR) -> set[ConfigHandler]: """ Un-register info file for package. :param package: Name of the package to un-register. :param ucr: UCR instance. :returns: Set of (then obsolete) handlers. """ obsolete_handlers: set[ConfigHandler] = set() mf_handlers: set[ConfigHandler] = set() # Remaining Multifile handlers fname = os.path.join(INFO_DIR, '%s.info' % package) for section in self._parse_rfc822_file(fname): try: typ = section['Type'][0] except LookupError: continue if typ == 'file': handler = self.get_handler(section) elif typ == 'subfile': mfile = section['Multifile'][0] sfile = section['Subfile'][0] try: handler = self._multifiles[mfile] except KeyError: continue # skip SubFile w/o MultiFile name = os.path.join(FILE_DIR, sfile) handler.remove_subfile(name) mf_handlers.add(handler) elif typ == 'multifile': mfile = section['Multifile'][0] handler = self._multifiles[mfile] handler.def_count -= 1 mf_handlers.add(handler) else: continue if not handler: # Bug #17913 print(("Skipping internal error: no handler for %r in %s" % (section, package)), file=sys.stderr) continue if isinstance(handler, ConfigHandlerDiverting) and handler.uninstall_divert(): obsolete_handlers.add(handler) for handler in mf_handlers - obsolete_handlers: self.call_handler(ucr, handler) try: # remove cache file to force rebuild of cache os.unlink(ConfigHandlers.CACHE_FILE) except OSError: pass return obsolete_handlers
def __call__(self, variables: Iterable[str], arg: _ARG) -> None: """ Call handlers registered for changes in variables. :param variables: Changed UCR variable names. :param arg: 2-tuple(UCR-instance, changed) where changed is a dictionary mapping ucs-variable-names to values. """ if not variables: return pending_handlers: set[ConfigHandler] = set() for reg_var, handlers in self._handlers.items(): try: _re = re.compile(reg_var) except re.error as ex: print('Failed to compile regular expression %s: %s' % (reg_var, ex), file=sys.stderr) continue for variable in variables: if _re.match(variable): pending_handlers |= handlers for handler in pending_handlers: handler(arg)
[docs] def commit(self, ucr: _UCR, filelist: Iterable[str] = []) -> None: """ Call handlers to (re-)generate files. :param ucr: UCR instance. :param filelist: List of files to re-generate. By default *all* files will be re-generated and all modules and scripts will we re-invoked! """ _filelist = [] for fname in filelist: fname = os.path.expanduser(fname) fname = os.path.expandvars(fname) fname = os.path.abspath(fname) _filelist.append(fname) # find handlers pending_handlers = set() for fname in directory_files(INFO_DIR): if not fname.endswith('.info'): continue for section in self._parse_rfc822_file(fname): if not section.get('Type'): continue handler = None if _filelist: files = section.get('File') or section.get('Multifile') or () for filename in files: if not os.path.isabs(filename): filename = '/%s' % filename if filename in _filelist: handler = self.get_handler(section) break else: continue else: handler = self.get_handler(section) if handler: pending_handlers.add(handler) # print missing files for fname in set(_filelist) - {h.to_file for h in pending_handlers if isinstance(h, ConfigHandlerDiverting)}: print('Warning: The file %r is not registered as an UCR template.' % (fname,), file=sys.stderr) # call handlers for handler in pending_handlers: self.call_handler(ucr, handler)
[docs] def call_handler(self, ucr: _UCR, handler: ConfigHandler) -> None: """ Call handler passing current configuration variables. :param ucr: UCR instance. :param handler: The handler to call. """ values: dict[str, tuple[str | None, str | None]] = {} for variable in handler.variables: if variable in self._handlers.keys(): if ".*" in variable: for i in range(4): var = variable.replace(".*", "%s" % i) val = ucr.get(var) values[var] = (val, val) else: val = ucr.get(variable) values[variable] = (val, val) handler((ucr, values))