Source code for ucsschool.lib.models.validator

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# UCS@school python lib
#
# Copyright 2021-2025 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.

import logging
import os
import re
import traceback
import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union  # noqa: F401

import lazy_object_proxy
import ldap
from ldap.dn import escape_dn_chars, str2dn

if TYPE_CHECKING:
    from .base import UdmObject  # noqa: F401

from ucsschool.lib.models.utils import get_file_handler, ucr
from ucsschool.lib.roles import (
    InvalidUcsschoolRoleString,
    UnknownContextType,
    UnknownRole,
    create_ucsschool_role_string,
    get_role_info,
    role_computer_room,
    role_exam_user,
    role_marketplace_share,
    role_school_admin,
    role_school_class,
    role_school_class_share,
    role_staff,
    role_student,
    role_teacher,
    role_workgroup,
    role_workgroup_share,
)
from ucsschool.lib.schoolldap import SchoolSearchBase

LOG_FILE = "/var/log/univention/ucs-school-validation.log"
VALIDATION_LOGGER = "UCSSchool-Validation"


[docs] def get_private_data_logger(): private_data_logger = None if os.geteuid() == 0: private_data_logger = logging.getLogger(VALIDATION_LOGGER) private_data_logger.setLevel("DEBUG") backup_count = int(ucr.get("ucsschool/validation/logging/backupcount", "").strip() or 60) private_data_logger.addHandler( get_file_handler("DEBUG", LOG_FILE, uid=0, gid=0, backupCount=backup_count) ) return private_data_logger
private_data_logger = lazy_object_proxy.Proxy(get_private_data_logger)
[docs] def get_position_from(dn): # type: (str) -> Optional[str] # note: obj.position does not have a position return ldap.dn.dn2str(ldap.dn.str2dn(dn)[1:])
[docs] def obj_to_dict(obj): # type: (UdmObject) -> Dict[str, Any] dict_obj = {} dict_obj["props"] = dict(obj.items()) dict_obj["dn"] = obj.dn dict_obj["position"] = get_position_from(dict_obj["dn"]) dict_obj["options"] = dict.fromkeys(obj.options, True) return dict_obj
[docs] def obj_to_dict_conversion(obj): # type: (Union[UdmObject, Dict[str, Any]]) -> Dict[str, Any] if isinstance(obj, dict): dict_obj = obj else: dict_obj = obj_to_dict(obj) return dict_obj
[docs] def is_student_role(role): # type: (str) -> bool return role in (role_student, role_exam_user)
[docs] class SchoolValidator(object): position_regex = None attributes = [] roles = []
[docs] @classmethod def validate(cls, obj): # type: (Dict[str, Any]) -> List[str] errors = [] roles = obj["props"].get("ucsschoolRole", []) errors.append(cls.required_roles(roles, cls.expected_roles(obj))) errors.append(cls.required_attributes(obj["props"])) errors.append(cls.position(obj["position"])) return errors
[docs] @classmethod def required_attributes(cls, props): # type: (Dict[str, Any]) -> Optional[str] missing_attributes = [attr for attr in cls.attributes if not props.get(attr, "")] if missing_attributes: return "is missing required attributes: {!r}.".format(missing_attributes)
[docs] @classmethod def position(cls, position): # type: (str) -> Optional[str] if not cls.position_regex.match(position): return "has wrong position in ldap."
[docs] @classmethod def required_roles(cls, roles, expected_roles): # type: (List[str], List[str]) -> Optional[str] roles = [r.lower() for r in roles] missing_roles = [role for role in expected_roles if role.lower() not in roles] if missing_roles: return "is missing roles {!r}".format(missing_roles)
[docs] @classmethod def roles_at_school(cls, schools): # type: (List[str]) -> List[str] """Get all roles for all schools which the object is expected to have.""" expected_roles = [] for role in cls.roles: for school in schools: expected_roles.append(create_ucsschool_role_string(role, school)) return expected_roles
[docs] @classmethod def expected_roles(cls, obj): # type: (Dict[str, Any]) -> List[str] return []
[docs] @classmethod def get_search_base(cls, school): # type: (str) -> SchoolSearchBase from .base import UCSSchoolHelperAbstractClass return UCSSchoolHelperAbstractClass.get_search_base(school)
[docs] class UserValidator(SchoolValidator): is_student = False is_exam_user = False is_teacher = False is_staff = False is_school_admin = False attributes = [ "username", "ucsschoolRole", "school", "firstname", "lastname", ]
[docs] @classmethod def validate(cls, obj): # type: (Dict[str, Any]) -> List[str] errors = super(UserValidator, cls).validate(obj) schools = obj["props"].get("school", []) groups = obj["props"].get("groups", []) errors.append(cls.validate_required_groups(groups, cls.expected_groups(obj))) errors.extend(cls.validate_group_membership(groups)) roles = [] for role_str in obj["props"].get("ucsschoolRole", []): try: roles.append(get_role_info(role_str)) except (InvalidUcsschoolRoleString, UnknownRole, UnknownContextType) as exc: errors.append(str(exc)) errors.append(cls.validate_part_of_school(roles, schools)) errors.append(cls.validate_student_roles(roles)) return errors
[docs] @classmethod def expected_roles(cls, obj): # type: (Dict[str, Any]) -> List[str] schools = obj["props"].get("school", []) return cls.roles_at_school(schools)
[docs] @classmethod def expected_groups(cls, obj): # type: (Dict[str, Any]) -> List[str] """Collect expected groups of user. Overwrite for special cases in subclasses.""" expected_groups = [] schools = obj["props"].get("school", []) expected_groups.extend(cls.domain_users_group(schools)) expected_groups.extend(cls.role_groups(schools)) return expected_groups
[docs] @classmethod def validate_required_groups(cls, groups, expected_groups): # type: (List[str], List[Any]) -> Optional[str] """ Object should be in all groups/ containers. E.g.: Students must have at least one group in `cn=klassen,cn=schueler,cn=groups,ou=ou`, which is true if the string ends with the classes position. For groups like `cn=schueler-ou` endwith is the same as equal. """ groups = [g.lower() for g in groups] missing_groups = [ exp_group for exp_group in expected_groups if not any(g.endswith(exp_group.lower()) for g in groups) ] if missing_groups: return "is missing groups at positions {!r}".format(missing_groups)
[docs] @classmethod def validate_part_of_school(cls, roles, schools): # type: (List[Tuple[str]], List[str]) -> Optional[str] """Users should not have roles with schools which they don't have.""" schools = [s.lower() for s in schools] missing_schools = {s for r, c, s in roles if c == "school" and s.lower() not in schools} if missing_schools: return "is not part of schools: {!r}.".format(list(missing_schools))
[docs] @classmethod def validate_student_roles(cls, roles): # type: (List[Tuple[str]]) -> Optional[str] """Students should not have teacher, staff or school_admin role.""" not_allowed_for_students = [role_teacher, role_staff, role_school_admin] for r, _c, _s in roles: if (cls.is_student and r in not_allowed_for_students) or ( not cls.is_student and r in [role_student, role_exam_user] ): return "must not have these roles: {!r}.".format(not_allowed_for_students)
[docs] @classmethod def domain_users_group(cls, schools): # type: (List[str]) -> List[str] """Users should be inside the `Domain Users OU` of their schools.""" return [ "cn=Domain Users {0},cn=groups,ou={0},{1}".format(escape_dn_chars(school), ucr["ldap/base"]) for school in schools ]
[docs] @classmethod def role_groups(cls, schools): # type: (List[str]) -> List[str] """ Users with `cls.role` should be in the corresponding group at each school they are part of. Implemented in subclasses. """ return []
[docs] @classmethod def validate_group_membership(cls, groups): # type: (List[str]) -> List[str] """Validate group membership, e.g. students should not be in teachers group.""" return [ "Disallowed member of group {}".format(dn) for dn in groups if ( (SchoolSearchBase.get_is_student_group_regex().match(dn) and not cls.is_student) or (SchoolSearchBase.get_is_teachers_group_regex().match(dn) and not cls.is_teacher) or (SchoolSearchBase.get_is_staff_group_regex().match(dn) and not cls.is_staff) or (SchoolSearchBase.get_is_admins_group_regex().match(dn) and cls.is_student) ) ]
[docs] class StudentValidator(UserValidator): position_regex = SchoolSearchBase.get_students_pos_regex() is_student = True roles = [role_student]
[docs] @classmethod def expected_groups(cls, obj): # type: (Dict[str, Any]) -> List[str] """Students have at least one class at every school.""" schools = obj["props"].get("school", []) expected_groups = super(StudentValidator, cls).expected_groups(obj) expected_groups.extend([cls.get_search_base(school).classes for school in schools]) return expected_groups
[docs] @classmethod def role_groups(cls, schools): # type: (List[str]) -> List[str] return [cls.get_search_base(school).students_group for school in schools]
[docs] class TeacherValidator(UserValidator): position_regex = SchoolSearchBase.get_teachers_pos_regex() is_teacher = True roles = [role_teacher]
[docs] @classmethod def role_groups(cls, schools): # type: (List[str]) -> List[str] return [cls.get_search_base(school).teachers_group for school in schools]
[docs] class ExamStudentValidator(StudentValidator): position_regex = SchoolSearchBase.get_exam_users_pos_regex() is_exam_user = True is_student = True roles = [role_exam_user]
[docs] @classmethod def validate(cls, obj): # type: (Dict[str, Any]) -> List[str] errors = super(ExamStudentValidator, cls).validate(obj) roles = [] for role_str in obj["props"].get("ucsschoolRole", []): try: roles.append(get_role_info(role_str)) except (InvalidUcsschoolRoleString, UnknownRole, UnknownContextType) as exc: errors.append(str(exc)) errors.append(cls.validate_exam_contexts(roles)) return errors
[docs] @classmethod def validate_exam_contexts(cls, roles): # type: (List[Tuple[str]]) -> str """ ExamUsers should have a role with context `exam`, e.g exam_user:exam:demo-exam-DEMOSCHOOL. """ exam_roles = [r for r, c, s in roles if c == "exam" and r == role_exam_user] if not exam_roles: return "is missing role with context exam."
[docs] @classmethod def role_groups(cls, schools): # type: (List[str]) -> List[str] """ ExamUsers should be inside a corresponding group in each of their schools. SchoolSearchBase.examGroup has no school.lower() """ return [ "cn={},cn=ucsschool,cn=groups,{}".format( escape_dn_chars(SchoolSearchBase._examGroupNameTemplate % {"ou": school.lower()}), ucr["ldap/base"], ) for school in schools ]
[docs] class StaffValidator(UserValidator): position_regex = SchoolSearchBase.get_staff_pos_regex() is_staff = True roles = [role_staff]
[docs] @classmethod def role_groups(cls, schools): # type: (List[str]) -> List[str] return [cls.get_search_base(school).staff_group for school in schools]
[docs] class TeachersAndStaffValidator(UserValidator): position_regex = SchoolSearchBase.get_teachers_and_staff_pos_regex() is_teacher = True is_staff = True roles = [role_teacher, role_staff]
[docs] @classmethod def role_groups(cls, schools): # type: (List[str]) -> List[str] """ TeachersAndStaff Users should be inside teachers and staff groups in all of their schools. """ expected_groups = [] for school in schools: expected_groups.append(cls.get_search_base(school).teachers_group) expected_groups.append(cls.get_search_base(school).staff_group) return expected_groups
[docs] class SchoolAdminValidator(UserValidator): position_regex = SchoolSearchBase.get_admins_pos_regex() is_school_admin = True roles = [role_school_admin]
[docs] @classmethod def role_groups(cls, schools): # type: (List[str]) -> List[str] return [cls.get_search_base(school).admins_group for school in schools]
[docs] class GroupAndShareValidator(SchoolValidator): attributes = [ "name", "ucsschoolRole", ] @staticmethod def _extract_ou(dn): # type: (str) -> Optional[str] """ Groups and shares do not have the property school, so it is extracted of the dn. """ try: return next(val for x in str2dn(dn) for attr, val, z in x if attr.lower() == "ou") except StopIteration: pass
[docs] @classmethod def validate(cls, obj): # type: (Dict[str, Any]) -> List[str] school = GroupAndShareValidator._extract_ou(obj["dn"]) errors = super(GroupAndShareValidator, cls).validate(obj) errors.append(cls.school_prefix(obj["props"]["name"], school)) return errors
[docs] @classmethod def expected_roles(cls, obj): # type: (Dict[str, Any]) -> List[str] school = GroupAndShareValidator._extract_ou(obj["dn"]) return cls.roles_at_school([school])
[docs] @classmethod def school_prefix(cls, name, school): # type: (str, str) -> Optional[str] """Groups and Shares should have a school prefix in their name, like `DEMOSCHOOL-Democlass`""" if role_marketplace_share not in cls.roles: if school and name and not name.startswith("{}-".format(school)): return "has an incorrect school prefix for school {}.".format(school)
[docs] class SchoolClassValidator(GroupAndShareValidator): roles = [role_school_class] position_regex = SchoolSearchBase.get_schoolclass_pos_regex()
[docs] class WorkGroupValidator(GroupAndShareValidator): roles = [role_workgroup] position_regex = SchoolSearchBase.get_workgroup_pos_regex()
[docs] class ComputerroomValidator(GroupAndShareValidator): roles = [role_computer_room] position_regex = SchoolSearchBase.get_computerroom_pos_regex()
[docs] class WorkGroupShareValidator(GroupAndShareValidator): roles = [role_workgroup_share] position_regex = SchoolSearchBase.get_workgroup_share_pos_regex()
[docs] class ClassShareValidator(GroupAndShareValidator): roles = [role_school_class_share] position_regex = SchoolSearchBase.get_school_class_share_pos_regex()
[docs] class MarketplaceShareValidator(GroupAndShareValidator): roles = [role_marketplace_share] position_regex = SchoolSearchBase.get_workgroup_share_pos_regex() dn_regex = re.compile( r"cn=Marktplatz,cn=shares,ou=[^,]+?,{}".format(ucr["ldap/base"]), flags=re.IGNORECASE, )
[docs] def get_class(obj): # type: (Dict[str, Any]) -> Optional[Type[SchoolValidator]] options = obj["options"] position = obj["position"] if options.get("ucsschoolExam", False): return ExamStudentValidator if options.get("ucsschoolTeacher", False) and options.get("ucsschoolStaff", False): return TeachersAndStaffValidator if options.get("ucsschoolStudent", False): return StudentValidator if options.get("ucsschoolTeacher", False): return TeacherValidator if options.get("ucsschoolStaff", False): return StaffValidator if options.get("ucsschoolAdministrator", False): return SchoolAdminValidator if SchoolClassValidator.position_regex.match(position): return SchoolClassValidator if WorkGroupValidator.position_regex.match(position): return WorkGroupValidator if ComputerroomValidator.position_regex.match(position): return ComputerroomValidator if ClassShareValidator.position_regex.match(position): return ClassShareValidator if MarketplaceShareValidator.dn_regex.match(obj["dn"]): # note: MarketplaceShares have the same position as WorkgroupShares, # but are unique for ous. return MarketplaceShareValidator if WorkGroupShareValidator.position_regex.match(position): return WorkGroupShareValidator
[docs] def validate(obj, logger=None): # type: (Dict[str, Any], logging.Logger) -> None """ Objects are validated as dicts and errors are logged to the passed logger. Sensitive data is only logged to /var/log/univention/ucs-school-validation.log """ dict_obj = obj_to_dict_conversion(obj) validation_class = get_class(dict_obj) if validation_class: options = dict_obj["options"] errors = validation_class.validate(dict_obj) errors = [err for err in errors if err is not None] if errors: validation_uuid = str(uuid.uuid4()) errors_str = "{} UCS@school Object {} with options {} has validation errors: {}".format( validation_uuid, dict_obj.get("dn", ""), "{!r}".format(options), ", ".join(errors), ) varname = "ucsschool/validation/logging/enabled" if ucr.is_true(varname, True) or ucr.get(varname) in ( "", None, ): # tests: 00_validation_log_enabled if logger: logger.warning(errors_str) if private_data_logger: stack_trace = " ".join(traceback.format_stack()[:-2]).replace("\n", " ") private_data_logger.warning("\t".join([errors_str, str(dict_obj), stack_trace]))