#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention UCS@school
#
# Copyright 2016-2025 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.
"""Configuration classes."""
import json
import logging
from typing import Any, Dict, List, Optional, Type # noqa: F401
from jsonschema import ValidationError, validate
from six import string_types
from ucsschool.lib.models.utils import ucr, ucr_username_max_length
from .exceptions import InitialisationError, ReadOnlyConfiguration
from .utils.config_pyhook import ConfigPyHook
from .utils.configuration_checks import run_configuration_checks
from .utils.import_pyhook import run_import_pyhooks
USER_IMPORT_SCHEMA_FILE = "/usr/share/ucs-school-import/schema/user_import_configuration_schema.json"
CONFIGURATION_ERROR_LOG = "/var/log/univention/ucs-school-import/import-configuration-error.log"
[docs]
def setup_configuration(conffiles, **kwargs): # type: (List[str], **str) -> ReadOnlyDict
logger = logging.getLogger(__name__)
config = Configuration(conffiles)
ConfigurationFile("kwargs (cmdline args)").validate(kwargs)
config.update(kwargs)
_set_username_maxlength(config, logger)
run_import_pyhooks(ConfigPyHook, "post_config_files_read", config, conffiles, kwargs)
config.check_mandatory_attributes(logger)
config.close()
logger.info("Finished reading configuration, starting checks...")
run_configuration_checks(config)
return config
def _set_username_maxlength(config, logger): # type: (ReadOnlyDict, logging.Logger) -> None
ucrv = ucr.get("ucsschool/username/max_length")
logger.info("UCRV ucsschool/username/max_length: %r", ucrv)
try:
default = config["username"]["max_length"]["default"]
except KeyError:
default = int(ucr_username_max_length) # convert lazy proxy object to int
config["username"].setdefault("max_length", {})["default"] = default
logger.info(
"Set value of configuration key username:max_length:default to%s value of UCR variable "
"ucsschool/username/max_length: %d.",
" default" if ucrv is None else "",
default,
)
try:
student = config["username"]["max_length"]["student"]
except KeyError:
exam_prefix = ucr.get("ucsschool/ldap/default/userprefix/exam", "exam-")
student = default - len(exam_prefix)
config["username"]["max_length"]["student"] = student
logger.info(
"Set value of configuration key username:max_length:student to username:max_length:default "
"reduced by length of the exam-prefix (%r): %d.",
exam_prefix,
student,
)
[docs]
class ConfigurationFile(object):
_schema = None # type: Optional[Dict[str, Any]]
def __init__(self, filename): # type: (str) -> None
self.filename = filename
self.logger = logging.getLogger(__name__)
err_handler = logging.FileHandler(CONFIGURATION_ERROR_LOG)
err_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
self.err_logger = logging.getLogger("ConfigurationErrorLogger")
self.err_logger.setLevel(logging.INFO)
self.err_logger.addHandler(err_handler)
[docs]
def read(self): # type: () -> Dict[str, Any]
"""
:raises InitialisationError: when the configuration file could not be
read or when it contained invalid JSON
"""
self.logger.info("Reading configuration from %r...", self.filename)
try:
with open(self.filename, "rb") as fp:
return json.load(fp)
except ValueError as ve:
error = InitialisationError(
"Error in configuration file {!r}: {}.".format(self.filename, ve),
log_traceback=False,
)
self.err_logger.error("InitialisationError: {}".format(error))
except IOError as exc:
error = InitialisationError(
"Error in configuration file {!r}: {}.".format(self.filename, exc),
log_traceback=False,
)
self.err_logger.error("InitialisationError: {}".format(error))
[docs]
def write(self, conf): # type: (str) -> None
self.logger.info("Writing configuration to %r...", self.filename)
with open(self.filename, "wb") as fp:
return json.dump(conf, fp)
[docs]
def update(self, conf): # type: (**str) -> None
self.logger.info("Updating configuration in %r...", self.filename)
cur = self.read()
cur.update(conf)
with open(self.filename, "wb") as fp:
return json.dump(cur, fp)
[docs]
@classmethod
def get_schema(cls): # type: () -> Dict[str, Any]
""":raises InitialisationError: when the json schema cannot be read"""
if not cls._schema:
try:
with open(USER_IMPORT_SCHEMA_FILE, "rb") as schema_file:
cls._schema = json.load(schema_file)
except ValueError as exc:
raise InitialisationError(
"Error reading json schema {!r}: {}.".format(USER_IMPORT_SCHEMA_FILE, exc),
log_traceback=False,
)
return cls._schema
[docs]
def validate(self, cf_obj): # type: (Dict[str, Any]) -> None
"""
:raises InitialisationError: when `cf_obj` does not conform to the
json schema
"""
self.logger.debug("Validating %r...", self.filename)
try:
validate(instance=cf_obj, schema=self.get_schema())
except ValidationError as exc:
raise InitialisationError(
"Schema validation failed for configuration file {!r}: {}.".format(self.filename, exc),
log_traceback=False,
)
[docs]
class ReadOnlyDict(dict):
@classmethod
def _recursive_typed_update(cls, a, b): # type: (Dict[Any, Any], Dict[Any, Any]) -> Dict[Any, Any]
for k, v in b.items():
if isinstance(v, dict):
# recurse into nested dict
a[k] = cls._recursive_typed_update(a.get(k, {}), v)
else:
# Try to use any other type than str (when overwriting
# configuration from cmdline).
if v is None or callable(v):
a[k] = v
else:
t = type(v)
if isinstance(t, string_types) and a.get(k):
t = type(a[k])
a[k] = t(v)
return a
[docs]
def update(self, E=None, **F): # type: (Optional[Dict[Any, Any]], **Any) -> None
self._recursive_typed_update(self, E)
if F:
self._recursive_typed_update(self, F)
@staticmethod
def __closed(*args, **kwargs): # type: (*Any, **Any) -> None
raise ReadOnlyConfiguration()
[docs]
def check_mandatory_attributes(self, logger): # type: (logging.Logger) -> None
try:
mandatory_attributes = self["mandatory_attributes"]
assert isinstance(mandatory_attributes, list)
except (AssertionError, KeyError):
# will be checked in
# /usr/share/ucs-school-import/checks/defaults::test_minimal_mandatory_attributes()
pass
else:
missing_mandatory_attributes = [
attr
for attr in ("firstname", "lastname", "name", "record_uid", "school", "source_uid")
if attr not in mandatory_attributes
]
if missing_mandatory_attributes:
logger.info("Adding %r to 'mandatory_attributes'.", missing_mandatory_attributes)
mandatory_attributes.extend(missing_mandatory_attributes)
mandatory_attributes.sort()
[docs]
def close(self): # type: () -> None
self.__setitem__ = self.__delitem__ = self.update = self._recursive_typed_update = self.__closed
[docs]
class Configuration(object):
"""Singleton to the global configuration object."""
class __SingleConf:
conffiles = []
def __init__(self, filenames): # type: (List[str]) -> None
if not filenames:
raise InitialisationError("Configuration not yet loaded.")
self.config = None
for filename in filenames:
cf = ConfigurationFile(filename)
cf_obj = cf.read()
cf.validate(cf_obj)
if self.config:
self.config.update(cf_obj)
else:
self.config = ReadOnlyDict(cf_obj)
self.conffiles.append(filename)
self.config.conffiles = self.conffiles
_instance = None
def __new__(cls, filenames=None): # type: (Type[Configuration], Optional[List[str]]) -> ReadOnlyDict
if not cls._instance:
cls._instance = cls.__SingleConf(filenames)
return cls._instance.config