# -*- coding: utf-8 -*-
#
# main configuration registry classes
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
#
# API stability :pylint: disable-msg=R0201,W0613,R0903
# Too pedantic :pylint: disable-msg=W0704
# Rewrite :pylint: disable-msg=R0912
"""Univention Configuration Registry handlers."""
from __future__ import print_function
import sys
import os
import random
import re
import subprocess
import pickle
import errno
import six
from pwd import getpwnam
from grp import getgrnam
from univention.config_registry.misc import asciify, directory_files
from univention.debhelper import parseRfc822 # pylint: disable-msg=W0403
if six.PY2:
from io import open
try:
from typing import Any, Dict, IO, Iterable, List, Mapping, Optional, Set, Tuple, Union # noqa: F401
_OPT = Mapping[str, Any]
_UCR = Mapping[str, str]
_CHANGES = Mapping[str, Tuple[Optional[str], Optional[str]]]
_ARG = Tuple[_UCR, _CHANGES]
_INFO = Mapping[str, List[str]]
except ImportError:
pass
__all__ = ['ConfigHandlers']
VARIABLE_PATTERN = re.compile('@%@([^@]+)@%@')
VARIABLE_TOKEN = re.compile('@%@')
EXECUTE_TOKEN = re.compile(b'@!@')
WARNING_PATTERN = re.compile('(UCRWARNING|BCWARNING|UCRWARNING_ASCII)=(.+)')
INFO_DIR = '/etc/univention/templates/info'
FILE_DIR = '/etc/univention/templates/files'
SCRIPT_DIR = '/etc/univention/templates/scripts'
MODULE_DIR = '/etc/univention/templates/modules'
WARNING_TEXT = '''\
Warning: This file is auto-generated and might be overwritten by
univention-config-registry.
Please edit the following file(s) instead:
Warnung: Diese Datei wurde automatisch generiert und kann durch
univention-config-registry ueberschrieben werden.
Bitte bearbeiten Sie an Stelle dessen die folgende(n) Datei(en):
''' # noqa: E101
assert asciify(WARNING_TEXT) == WARNING_TEXT, "Only ASCII allowed in WARNING_TEXT"
def run_filter(template, directory, srcfiles=set(), opts={}):
# type: (str, _UCR, Iterable[str], _OPT) -> bytes
"""
Process a template file: substitute variables.
:param template: Text string of template.
:param directory: UCR instance.
:param srcfiles: File names of source template.
:param opts: Command line options.
:returns: The modified template with all UCR variables and sections replaced.
"""
template = _replace_variables(template, directory, srcfiles)
if six.PY3:
tmpl = template.encode('UTF-8')
else:
tmpl = template
if opts.get('disallow-execution', False):
return tmpl
tmpl = _replace_exec(tmpl)
return tmpl
def _replace_variables(template, directory, srcfiles):
# type: (str, _UCR, Iterable[str]) -> str
while True:
i = VARIABLE_TOKEN.finditer(template)
try:
start = next(i)
end = next(i)
name = template[start.end():end.start()]
if name in directory:
value = directory[name]
if not isinstance(value, str):
# Python 2 with unicode value
value = value.encode('UTF-8') # important! template must not be of type unicode ever (in py2), otherwise some characters are lost in the below subprocess stdinput
else:
match = WARNING_PATTERN.match(name)
if match:
mode, prefix = match.groups()
value = warning_string(prefix, srcfiles=srcfiles)
if mode == "UCRWARNING_ASCII":
value = asciify(value)
else:
value = ''
if isinstance(value, (list, tuple)):
value = value[0]
template = template[:start.start()] + value + template[end.end():]
except StopIteration:
break
return template
def _replace_exec(template):
# type: (bytes) -> bytes
while True:
i = EXECUTE_TOKEN.finditer(template)
try:
start = next(i)
end = next(i)
proc = subprocess.Popen(
(sys.executable,),
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
close_fds=True)
value = proc.communicate(b'''\
# -*- coding: utf-8 -*-
import univention.config_registry
configRegistry = univention.config_registry.ConfigRegistry()
configRegistry.load()
# for compatibility
baseConfig = configRegistry
%s
''' % template[start.end():end.start()])[0]
template = template[:start.start()] + value + template[end.end():]
except StopIteration:
break
return template
def run_script(script, arg, changes):
# type: (str, str, _CHANGES) -> None
"""
Execute script with command line arguments using a shell and pass changes
on STDIN.
For each changed variable a line with the 'name of the variable', the 'old
value', and the 'new value' are passed separated by '@%@'.
:param script: File name of the script.
:param arg: Execution mode, e.g. `generate` or `postinst`.
:param changes: Dictionary of changed UCR variables, mapping UCR variable names to 2-tuple (old-value, new-value).
"""
diff = [
u'%s@%%@%s@%%@%s\n' % (key, old, new)
for (key, (old, new)) in changes.items()
if old and new
]
cmd = script + " " + arg
proc = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, close_fds=True)
proc.communicate(u''.join(diff).encode('UTF-8'))
def run_module(modpath, fn, ucr, changes):
# type: (str, str, _UCR, _CHANGES) -> None
"""
Load the Python module that MUST be located in :py:const:`MODULE_DIR` or any
subdirectory.
:param modpath: The path to the module relative to :py:const:`MODULE_DIR`.
:param fn: Execution mode, e.g. `handler` or `preinst` or `postinst`.
:param ucr: UCR instance.
:param changes: Dictionary of changed UCR variables, mapping UCR variable names to 2-tuple (old-value, new-value).
"""
# temporarily prepend MODULE_DIR to load path
sys.path.insert(0, MODULE_DIR)
module_name = os.path.splitext(modpath)[0]
try:
module = __import__(module_name.replace(os.path.sep, '.'))
f = getattr(module, fn)
f(ucr, changes)
except (AttributeError, ImportError) as ex:
print(ex, file=sys.stderr)
del sys.path[0]
def warning_string(prefix='# ', srcfiles=set()):
# type: (str, Iterable[str]) -> str
"""
Generate UCR warning text.
:param prefix: String to prepend before each line.
:param srcfiles: File names of source template.
:returns: A warning sting based on :py:const:`WARNING_TEXT`.
"""
res = [
'%s%s' % (prefix, line)
for line in WARNING_TEXT.splitlines()
] + [
'%s\t%s' % (prefix, srcfile)
for srcfile in sorted(srcfiles)
] + [
prefix
]
return "\n".join(res)
class ConfigHandler(object):
"""Base class of all config handlers."""
variables = set() # type: Set[str]
def __call__(self, args):
# type: (_ARG) -> None
raise NotImplementedError()
class ConfigHandlerDiverting(ConfigHandler):
"""
File diverting config handler.
:param to_file: Destination file name.
"""
def __init__(self, to_file):
# type: (str) -> None
super(ConfigHandlerDiverting, self).__init__()
self.to_file = os.path.join('/', to_file)
self.user = None # type: Optional[int]
self.group = None # type: Optional[int]
self.mode = None # type: Optional[int]
self.preinst = None # type: Optional[str]
self.postinst = None # type: Optional[str]
def __hash__(self):
"""Return unique hash."""
return hash(self.to_file)
def __eq__(self, other):
"""Compare this to other handler."""
if isinstance(other, type(self)):
return self.to_file == other.to_file
return NotImplemented
def __ne__(self, other):
"""Compare this to other handler."""
if isinstance(other, type(self)):
return self.to_file != other.to_file
return NotImplemented
def _set_perm(self, stat, to_file=None):
# type: (Optional[os.stat_result], Optional[str]) -> None
"""
Set file permissions.
:param stat: File status.
:param to_file: Destination file name.
"""
if not to_file:
to_file = self.to_file
elif self.to_file != to_file:
try:
old_stat = os.stat(self.to_file)
os.chmod(to_file, old_stat.st_mode)
os.chown(to_file, old_stat.st_uid, old_stat.st_gid)
except EnvironmentError:
pass
if self.user or self.group or self.mode:
if self.mode:
os.chmod(to_file, self.mode)
if self.user and self.group:
os.chown(to_file, self.user, self.group)
elif self.user:
os.chown(to_file, self.user, 0)
elif self.group:
os.chown(to_file, 0, self.group)
elif stat:
os.chmod(to_file, stat.st_mode)
def _call_silent(self, *cmd):
# type: (*str) -> int
"""
Call command with stdin, stdout, and stderr redirected from/to :file:`/dev/null`.
:param cmd: List of command with arguments.
:returns: Process exit code.
"""
with open(os.path.devnull, 'w', encoding='utf-8') as null:
# tell possibly wrapped dpkg-divert to really do the work
env = dict(os.environ)
env['DPKG_MAINTSCRIPT_PACKAGE'] = 'univention-config'
return subprocess.call(cmd, stdin=null, stdout=null, stderr=null, env=env)
def need_divert(self):
# type: () -> bool
"""Check if diversion is needed."""
return False
def install_divert(self):
# type: () -> None
"""Prepare file for diversion."""
deb = '%s.debian' % self.to_file
self._call_silent('dpkg-divert', '--quiet', '--rename', '--local', '--divert', deb, '--add', self.to_file)
# Make sure a valid file still exists
if os.path.exists(deb) and not os.path.exists(self.to_file):
# Don't use shutil.copy2() which looses file ownership (Bug #22596)
self._call_silent('cp', '-p', deb, self.to_file)
def uninstall_divert(self):
# type: () -> bool
"""
Undo diversion of file.
:returns: `True` because the diversion was removed.
"""
try:
os.unlink(self.to_file)
except EnvironmentError:
pass
deb = '%s.debian' % self.to_file
self._call_silent('dpkg-divert', '--quiet', '--rename', '--local', '--divert', deb, '--remove', self.to_file)
return True
def _temp_file_name(self):
# type: () -> str
dirname, basename = os.path.split(self.to_file)
filename = '.%s__ucr__commit__%s' % (basename, random.random())
return os.path.join(dirname, filename)
class ConfigHandlerMultifile(ConfigHandlerDiverting):
"""
Handler for multifile.
:param dummy_from_file: Source file name used to copy file permissions from.
:param to_file: Destination file name.
"""
def __init__(self, dummy_from_file, to_file):
# type: (str, str) -> None
super(ConfigHandlerMultifile, self).__init__(to_file)
self.variables = set() # type: Set[str]
self.from_files = set() # type: Set[str]
self.dummy_from_file = dummy_from_file
self.def_count = 1
def __setstate__(self, state):
"""Load state upon unpickling."""
self.__dict__.update(state)
# may raise AttributeError, which forces UCR to rebuild the cache
self.def_count # :pylint: disable-msg=W0104
def add_subfiles(self, subfiles):
# type: (List[Tuple[str, Set[str]]]) -> None
"""
Add subfiles to multifile.
:param subfiles: List of 2-tuples (file-name, set-of-variable-names).
"""
for from_file, variables in subfiles:
self.from_files.add(from_file)
self.variables |= variables
def remove_subfile(self, subfile):
# type: (str) -> None
"""
Remove subfile.
Removed diversion of set of sub-files becomes empty.
"""
self.from_files.discard(subfile)
if not self.need_divert():
self.uninstall_divert()
def __call__(self, args):
# type: (_ARG) -> None
"""Generate multfile from subfile templates."""
ucr, changed = args
print('Multifile: %s' % self.to_file)
if hasattr(self, 'preinst') and self.preinst:
run_module(self.preinst, 'preinst', ucr, changed)
if self.def_count == 0 or not self.from_files:
return
to_dir = os.path.dirname(self.to_file)
if not os.path.isdir(to_dir):
os.makedirs(to_dir, 0o755)
if os.path.isfile(self.dummy_from_file):
stat = os.stat(self.dummy_from_file) # type: Optional[os.stat_result]
elif os.path.isfile(self.to_file):
stat = os.stat(self.to_file)
else:
stat = None
tmp_to_file = self._temp_file_name()
try:
filter_opts = {} # type: Dict[str, Any]
with open(tmp_to_file, 'wb') as to_fp:
self._set_perm(stat, tmp_to_file)
for from_file in sorted(self.from_files, key=os.path.basename):
try:
with open(from_file, 'r', encoding='utf-8') as from_fp:
to_fp.write(run_filter(from_fp.read(), ucr, srcfiles=self.from_files, opts=filter_opts))
except EnvironmentError:
continue
try:
os.rename(tmp_to_file, self.to_file)
except EnvironmentError as ex:
if ex.errno == errno.EBUSY:
with open(self.to_file, 'w+', encoding='utf-8') as fd:
fd.write(open(tmp_to_file, 'r', encoding='utf-8').read())
os.unlink(tmp_to_file)
except Exception:
if os.path.exists(tmp_to_file):
os.unlink(tmp_to_file)
raise
if hasattr(self, 'postinst') and self.postinst:
run_module(self.postinst, 'postinst', ucr, changed)
script_file = os.path.join(SCRIPT_DIR, self.to_file.strip("/"))
if os.path.isfile(script_file):
run_script(script_file, 'postinst', changed)
def need_divert(self):
# type: () -> bool
"""Diversion is needed when at least one multifile and one subfile
definition exists."""
return self.def_count >= 1 and bool(self.from_files)
def install_divert(self):
# type: () -> None
"""Prepare file for diversion."""
if self.need_divert():
super(ConfigHandlerMultifile, self).install_divert()
def uninstall_divert(self):
# type: () -> bool
"""
Undo diversion of file.
:returns: `True` when the diversion is removed, `False` when the diversion is still needed.
"""
if self.need_divert():
return False
return super(ConfigHandlerMultifile, self).uninstall_divert()
class ConfigHandlerFile(ConfigHandlerDiverting):
"""
Handler for (single)file.
:param from_file: Template source file name.
:param to_file: Destination file name.
"""
def __init__(self, from_file, to_file):
# type: (str, str) -> None
super(ConfigHandlerFile, self).__init__(to_file)
self.from_file = from_file
def __call__(self, args):
# type: (_ARG) -> None
"""Generate file from template."""
ucr, changed = args
if hasattr(self, 'preinst') and self.preinst:
run_module(self.preinst, 'preinst', ucr, changed)
print('File: %s' % self.to_file)
to_dir = os.path.dirname(self.to_file)
if not os.path.isdir(to_dir):
os.makedirs(to_dir, 0o755)
try:
stat = os.stat(self.from_file)
except EnvironmentError:
print("The referenced template file does not exist", file=sys.stderr)
return None
tmp_to_file = self._temp_file_name()
try:
filter_opts = {} # type: Dict[str, Any]
with open(self.from_file, 'r', encoding='utf-8') as from_fp, open(tmp_to_file, 'wb') as to_fp:
self._set_perm(stat, tmp_to_file)
to_fp.write(run_filter(from_fp.read(), ucr, srcfiles=[self.from_file], opts=filter_opts))
try:
os.rename(tmp_to_file, self.to_file)
except EnvironmentError as ex:
if ex.errno == errno.EBUSY:
with open(self.to_file, 'w+', encoding='utf-8') as fd:
fd.write(open(tmp_to_file, 'r', encoding='utf-8').read())
os.unlink(tmp_to_file)
except Exception:
if os.path.exists(tmp_to_file):
os.unlink(tmp_to_file)
raise
if hasattr(self, 'postinst') and self.postinst:
run_module(self.postinst, 'postinst', ucr, changed)
script_file = self.from_file.replace(FILE_DIR, SCRIPT_DIR)
if os.path.isfile(script_file):
run_script(script_file, 'postinst', changed)
def need_divert(self):
# type: () -> bool
"""For simple files the diversion is always needed."""
return True
class ConfigHandlerScript(ConfigHandler):
"""
Handler for UCR scripts.
:param script: Script file name.
"""
def __init__(self, script):
# type: (str) -> None
super(ConfigHandlerScript, self).__init__()
self.script = script
def __hash__(self):
"""Return unique hash."""
return hash(self.script)
def __eq__(self, other):
"""Compare this to other handler."""
if isinstance(other, type(self)):
return self.script == other.script
return NotImplemented
def __ne__(self, other):
"""Compare this to other handler."""
if isinstance(other, type(self)):
return self.script != other.script
return NotImplemented
def __call__(self, args):
# type: (_ARG) -> None
"""Call external programm after change."""
_ucr, changed = args
print('Script: %s' % self.script)
if os.path.isfile(self.script):
run_script(self.script, 'generate', changed)
class ConfigHandlerModule(ConfigHandler):
"""
Handler for UCR Python module.
:param module: Module file name.
"""
def __init__(self, module):
# type: (str) -> None
super(ConfigHandlerModule, self).__init__()
self.module = module
def __hash__(self):
"""Return unique hash."""
return hash(self.module)
def __eq__(self, other):
"""Compare this to other handler."""
if isinstance(other, type(self)):
return self.module == other.module
return NotImplemented
def __ne__(self, other):
"""Compare this to other handler."""
if isinstance(other, type(self)):
return self.module != other.module
return NotImplemented
def __call__(self, args):
# type: (_ARG) -> None
"""Call Python module after change."""
ucr, changed = args
print('Module: %s' % self.module)
run_module(self.module, 'handler', ucr, changed)
def grep_variables(text):
# type: (str) -> Set[str]
"""
Search UCR template text for used variables.
:returns: Set of all variables inside `@%@` delimiters.
"""
return set(VARIABLE_PATTERN.findall(text))
[docs]class ConfigHandlers:
"""Manage handlers for configuration variables."""
CACHE_FILE = '/var/cache/univention-config/cache'
# 0: without version
# 1: with version header
# 2: switch to handlers mapping to set, drop file, add multifile.def_count
# 3: split config_registry into sub modules
VERSION = 3
VERSION_MIN = 3
VERSION_MAX = 3
VERSION_TEXT = 'univention-config cache, version'
VERSION_NOTICE = '%s %s\n' % (VERSION_TEXT, VERSION)
VERSION_RE = re.compile('^%s (?P<version>[0-9]+)$' % VERSION_TEXT)
_handlers = {} # type: Dict[str, Set[ConfigHandler]] # variable -> set(handlers)
_multifiles = {} # type: Dict[str, ConfigHandlerMultifile] # multifile -> handler
_subfiles = {} # type: Dict[str, List[Tuple[str, Set[str]]]] # multifile -> [(subfile, variables)] // pending
def __init__(self):
# type: () -> None
pass
@staticmethod
def _get_cache_version(cache_file):
# type: (IO) -> int
"""
Read cached `.info` data.
:param cache_file: Opened cache file.
:returns: Version.
"""
line = cache_file.readline() # IOError is propagated
match = ConfigHandlers.VERSION_RE.match(line)
if match:
version = int(match.group('version'))
# "Old style" cache (version 0) doesn't contain version notice
else:
cache_file.seek(0)
version = 0
return version
@classmethod
def _parse_rfc822_file(cls, fname):
# type: (str) -> List[Dict[str, List[str]]]
"""
Parse :rfc:`822` file.
:param fname: Path to :rfc:`822` file.
:returns: A list of dictionaries.
"""
with open(fname, 'r', encoding='utf-8') as fd:
return parseRfc822(fd.read())
[docs] def load(self):
# type: () -> None
"""Load cached `.info` data or force update."""
try:
with open(ConfigHandlers.CACHE_FILE, 'rb') as cache_file:
version = self._get_cache_version(cache_file)
chv = ConfigHandlers
if not chv.VERSION_MIN <= version <= chv.VERSION_MAX:
raise TypeError("Invalid cache file version.")
pickler = pickle.Unpickler(cache_file)
self._handlers = pickler.load()
if version <= 1:
# version <= 1: _handlers[multifile] -> [handlers]
# version >= 2: _handlers[multifile] -> set([handlers])
self._handlers = {k: set(v) for k, v in self._handlers.items()}
# version <= 1: _files UNUSED
pickler.load()
self._subfiles = pickler.load()
self._multifiles = pickler.load()
except (Exception, pickle.UnpicklingError):
self.update()
[docs] def get_handler(self, entry):
# type: (_INFO) -> Optional[ConfigHandler]
"""
Parse entry and return Handler instance.
:param entry: `.info` file entry dictionary.
:returns: An instance of `None`.
"""
try:
typ = entry['Type'][0]
handler = getattr(self, '_get_handler_%s' % typ)
except (LookupError, AttributeError):
return None
else:
return handler(entry)
def _parse_common_file_handler(self, handler, entry):
# type: (ConfigHandlerDiverting, _INFO) -> None
"""
Parse common file and multifile entries.
:param handler: Handler instance.
:param entry: `.info` file entry dictionary.
"""
try:
handler.preinst = entry['Preinst'][0]
except LookupError:
pass
try:
handler.postinst = entry['Postinst'][0]
except LookupError:
pass
handler.variables |= set(entry.get('Variables', set()))
try:
user = entry['User'][0]
except LookupError:
pass
else:
try:
handler.user = getpwnam(user).pw_uid
except LookupError:
print(('W: failed to convert the username %s to the uid' % (user,)), file=sys.stderr)
try:
group = entry['Group'][0]
except LookupError:
pass
else:
try:
handler.group = getgrnam(group).gr_gid
except LookupError:
print(('W: failed to convert the groupname %s to the gid' % (group,)), file=sys.stderr)
try:
mode = entry['Mode'][0]
except LookupError:
pass
else:
try:
handler.mode = int(mode, 8)
except ValueError:
print('W: failed to convert mode %s' % (mode,), file=sys.stderr)
def _get_handler_file(self, entry):
# type: (_INFO) -> Optional[ConfigHandlerFile]
"""
Parse file entry and return Handler instance.
:param entry: `.info` file entry dictionary.
:returns: An instance of `None`.
"""
try:
name = entry['File'][0]
except LookupError:
return None
from_path = os.path.join(FILE_DIR, name)
handler = ConfigHandlerFile(from_path, name)
if os.path.exists(from_path):
with open(from_path, 'r', encoding='utf-8') as fd:
handler.variables = grep_variables(fd.read())
self._parse_common_file_handler(handler, entry)
return handler
def _get_handler_script(self, entry):
# type: (_INFO) -> Optional[ConfigHandlerScript]
"""
Parse script entry and return Handler instance.
:param entry: `.info` file entry dictionary.
:returns: An instance of `None`.
"""
try:
script = entry['Script'][0]
variables = entry['Variables']
except LookupError:
return None
handler = ConfigHandlerScript(os.path.join(SCRIPT_DIR, script))
handler.variables = set(variables)
return handler
def _get_handler_module(self, entry):
# type: (_INFO) -> Optional[ConfigHandlerModule]
"""
Parse module entry and return Handler instance.
:param entry: `.info` file entry dictionary.
:returns: An instance of `None`.
"""
try:
module = entry['Module'][0]
variables = entry['Variables']
except LookupError:
return None
handler = ConfigHandlerModule(os.path.splitext(module)[0])
handler.variables = set(variables)
return handler
def _get_handler_multifile(self, entry):
# type: (_INFO) -> Optional[ConfigHandlerMultifile]
"""
Parse multifile entry and return Handler instance.
:param entry: `.info` file entry dictionary.
:returns: An instance of `None`.
"""
try:
mfile = entry['Multifile'][0]
except LookupError:
return None
try:
handler = self._multifiles[mfile]
handler.def_count += 1
except KeyError:
from_path = os.path.join(FILE_DIR, mfile)
handler = ConfigHandlerMultifile(from_path, mfile)
self._parse_common_file_handler(handler, entry)
# Add pending subfiles from earlier entries
self._multifiles[mfile] = handler
try:
file_vars = self._subfiles.pop(mfile)
handler.add_subfiles(file_vars)
except KeyError:
pass
return handler
def _get_handler_subfile(self, entry):
# type: (Dict[str, List[str]]) -> Optional[ConfigHandlerMultifile]
"""
Parse subfile entry and return Handler instance.
:param entry: `.info` file entry dictionary.
:returns: An instance of `None`.
"""
try:
mfile = entry['Multifile'][0]
subfile = entry['Subfile'][0]
except LookupError:
return None
variables = set(entry.get('Variables', set()))
name = os.path.join(FILE_DIR, subfile)
try:
with open(name, 'r', encoding='utf-8') as temp_file:
variables |= grep_variables(temp_file.read())
except EnvironmentError:
print("Failed to process Subfile %s" % (name,), file=sys.stderr)
return None
qentry = (name, variables)
# if multifile handler does not yet exists, queue subfiles for later
try:
handler = self._multifiles[mfile]
handler.add_subfiles([qentry])
except KeyError:
pending = self._subfiles.setdefault(mfile, [])
pending.append(qentry)
return None
return handler
[docs] def update(self):
# type: () -> Set[ConfigHandler]
"""
Parse all `.info` files to build list of handlers.
:returns: Set of all handlers.
"""
self._handlers.clear()
self._multifiles.clear()
self._subfiles.clear()
handlers = set() # type: Set[ConfigHandler]
for info in directory_files(INFO_DIR):
if not info.endswith('.info'):
continue
for section in self._parse_rfc822_file(info):
handler = self.get_handler(section)
if handler:
handlers.add(handler)
for handler in handlers:
for variable in handler.variables:
v2h = self._handlers.setdefault(variable, set())
v2h.add(handler)
self._save_cache()
return handlers
[docs] def update_divert(self, handlers):
# type: (Iterable[ConfigHandler]) -> None
"""
Synchronize diversions with handlers.
:param handlers: List of handlers.
"""
wanted = {h.to_file: h for h in handlers if isinstance(h, ConfigHandlerDiverting) and h.need_divert()}
to_remove = set() # type: Set[str]
# Scan for diversions done by UCR
with open('/var/lib/dpkg/diversions', 'r', encoding='utf-8') as div_file:
# from \n to \n package \n
try:
while True:
path_from = next(div_file).rstrip()
path_to = next(div_file).rstrip()
diversion = next(div_file).rstrip()
if path_from + '.debian' != path_to:
continue
if ':' != diversion: # local diversion
continue
assert path_from not in to_remove # no duplicates
try:
handler = wanted.pop(path_from)
except KeyError:
to_remove.add(path_from)
except StopIteration:
pass
# Remove existing diversion not wanted
for path in to_remove:
tmp_handler = ConfigHandlerDiverting(path)
tmp_handler.uninstall_divert()
# Install missing diversions still wanted
for path, handler in wanted.items():
handler.install_divert()
def _save_cache(self):
# type: () -> None
"""Write cache file."""
try:
with open(ConfigHandlers.CACHE_FILE, 'wb') as cache_file:
cache_file.write(ConfigHandlers.VERSION_NOTICE.encode('utf-8'))
pickler = pickle.Pickler(cache_file)
pickler.dump(self._handlers)
pickler.dump(self._subfiles)
pickler.dump(self._multifiles)
except IOError as ex:
if ex.errno != errno.EACCES:
raise
[docs] def register(self, package, ucr):
# type: (str, _UCR) -> Set[ConfigHandler]
"""
Register new info file for package.
:param package: Name of the package to register.
:param ucr: UCR instance.
:returns: Set of (new) handlers.
"""
handlers = set() # type: Set[ConfigHandler]
fname = os.path.join(INFO_DIR, '%s.info' % package)
for section in self._parse_rfc822_file(fname):
handler = self.get_handler(section)
if handler:
handlers.add(handler)
for handler in handlers:
if isinstance(handler, ConfigHandlerDiverting):
handler.install_divert()
values = {} # type: Dict[str, Tuple[None, Optional[str]]]
for variable in handler.variables:
v2h = self._handlers.setdefault(variable, set())
v2h.add(handler)
values[variable] = (None, ucr[variable])
try:
_re = re.compile(variable)
except re.error:
continue
values.update((key, (None, val)) for key, val in ucr.items() if _re.match(key))
handler((ucr, values))
self._save_cache()
return handlers
[docs] def unregister(self, package, ucr):
# type: (str, _UCR) -> Set[ConfigHandler]
"""
Un-register info file for package.
:param package: Name of the package to un-register.
:param ucr: UCR instance.
:returns: Set of (then obsolete) handlers.
"""
obsolete_handlers = set() # type: Set[ConfigHandler]
mf_handlers = set() # type: Set[ConfigHandler] # Remaining Multifile handlers
fname = os.path.join(INFO_DIR, '%s.info' % package)
for section in self._parse_rfc822_file(fname):
try:
typ = section['Type'][0]
except LookupError:
continue
if typ == 'file':
handler = self.get_handler(section)
elif typ == 'subfile':
mfile = section['Multifile'][0]
sfile = section['Subfile'][0]
try:
handler = self._multifiles[mfile]
except KeyError:
continue # skip SubFile w/o MultiFile
name = os.path.join(FILE_DIR, sfile)
handler.remove_subfile(name)
mf_handlers.add(handler)
elif typ == 'multifile':
mfile = section['Multifile'][0]
handler = self._multifiles[mfile]
handler.def_count -= 1
mf_handlers.add(handler)
else:
continue
if not handler: # Bug #17913
print(("Skipping internal error: no handler for %r in %s" % (section, package)), file=sys.stderr)
continue
if isinstance(handler, ConfigHandlerDiverting) and handler.uninstall_divert():
obsolete_handlers.add(handler)
for handler in mf_handlers - obsolete_handlers:
self.call_handler(ucr, handler)
try:
# remove cache file to force rebuild of cache
os.unlink(ConfigHandlers.CACHE_FILE)
except EnvironmentError:
pass
return obsolete_handlers
def __call__(self, variables, arg):
# type: (Iterable[str], _ARG) -> None
"""
Call handlers registered for changes in variables.
:param variables: Changed UCR variable names.
:param arg: 2-tuple(UCR-instance, changed) where changed is a dictionary mapping ucs-variable-names to values.
"""
if not variables:
return
pending_handlers = set() # type: Set[ConfigHandler]
for reg_var, handlers in self._handlers.items():
try:
_re = re.compile(reg_var)
except re.error as ex:
print('Failed to compile regular expression %s: %s' % (reg_var, ex), file=sys.stderr)
continue
for variable in variables:
if _re.match(variable):
pending_handlers |= handlers
for handler in pending_handlers:
handler(arg)
[docs] def commit(self, ucr, filelist=[]):
# type: (_UCR, Iterable[str]) -> None
"""
Call handlers to (re-)generate files.
:param ucr: UCR instance.
:param filelist: List of files to re-generate. By default *all* files will be re-generated and all modules and scripts will we re-invoked!
"""
_filelist = []
for fname in filelist:
fname = os.path.expanduser(fname)
fname = os.path.expandvars(fname)
fname = os.path.abspath(fname)
_filelist.append(fname)
# find handlers
pending_handlers = set()
for fname in directory_files(INFO_DIR):
if not fname.endswith('.info'):
continue
for section in self._parse_rfc822_file(fname):
if not section.get('Type'):
continue
handler = None
if _filelist:
files = section.get('File') or section.get('Multifile') or ()
for filename in files:
if not os.path.isabs(filename):
filename = '/%s' % filename
if filename in _filelist:
handler = self.get_handler(section)
break
else:
continue
else:
handler = self.get_handler(section)
if handler:
pending_handlers.add(handler)
# print missing files
for fname in set(_filelist) - {h.to_file for h in pending_handlers if isinstance(h, ConfigHandlerDiverting)}:
print('Warning: The file %r is not registered as an UCR template.' % (fname,), file=sys.stderr)
# call handlers
for handler in pending_handlers:
self.call_handler(ucr, handler)
[docs] def call_handler(self, ucr, handler):
# type: (_UCR, ConfigHandler) -> None
"""
Call handler passing current configuration variables.
:param ucr: UCR instance.
:param handler: The handler to call.
"""
values = {} # type: Dict[str, Tuple[Optional[str], Optional[str]]]
for variable in handler.variables:
if variable in self._handlers.keys():
if ".*" in variable:
for i in range(4):
var = variable.replace(".*", "%s" % i)
val = ucr.get(var)
values[var] = (val, val)
else:
val = ucr.get(variable)
values[variable] = (val, val)
handler((ucr, values))
# vim:set sw=4 ts=4 noet: