#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# UCS@school python lib
#
# Copyright 2021-2025 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.
import logging
import os
import re
import traceback
import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union # noqa: F401
import lazy_object_proxy
import ldap
from ldap.dn import escape_dn_chars, str2dn
if TYPE_CHECKING:
from .base import UdmObject # noqa: F401
from ucsschool.lib.models.utils import get_file_handler, ucr
from ucsschool.lib.roles import (
InvalidUcsschoolRoleString,
UnknownContextType,
UnknownRole,
create_ucsschool_role_string,
get_role_info,
role_computer_room,
role_exam_user,
role_marketplace_share,
role_school_admin,
role_school_class,
role_school_class_share,
role_staff,
role_student,
role_teacher,
role_workgroup,
role_workgroup_share,
)
from ucsschool.lib.schoolldap import SchoolSearchBase
LOG_FILE = "/var/log/univention/ucs-school-validation.log"
VALIDATION_LOGGER = "UCSSchool-Validation"
[docs]
def get_private_data_logger():
private_data_logger = None
if os.geteuid() == 0:
private_data_logger = logging.getLogger(VALIDATION_LOGGER)
private_data_logger.setLevel("DEBUG")
backup_count = int(ucr.get("ucsschool/validation/logging/backupcount", "").strip() or 60)
private_data_logger.addHandler(
get_file_handler("DEBUG", LOG_FILE, uid=0, gid=0, backupCount=backup_count)
)
return private_data_logger
private_data_logger = lazy_object_proxy.Proxy(get_private_data_logger)
[docs]
def get_position_from(dn): # type: (str) -> Optional[str]
# note: obj.position does not have a position
return ldap.dn.dn2str(ldap.dn.str2dn(dn)[1:])
[docs]
def obj_to_dict(obj): # type: (UdmObject) -> Dict[str, Any]
dict_obj = {}
dict_obj["props"] = dict(obj.items())
dict_obj["dn"] = obj.dn
dict_obj["position"] = get_position_from(dict_obj["dn"])
dict_obj["options"] = dict.fromkeys(obj.options, True)
return dict_obj
[docs]
def obj_to_dict_conversion(obj):
# type: (Union[UdmObject, Dict[str, Any]]) -> Dict[str, Any]
if isinstance(obj, dict):
dict_obj = obj
else:
dict_obj = obj_to_dict(obj)
return dict_obj
[docs]
def is_student_role(role): # type: (str) -> bool
return role in (role_student, role_exam_user)
[docs]
class SchoolValidator(object):
position_regex = None
attributes = []
roles = []
[docs]
@classmethod
def validate(cls, obj): # type: (Dict[str, Any]) -> List[str]
errors = []
roles = obj["props"].get("ucsschoolRole", [])
errors.append(cls.required_roles(roles, cls.expected_roles(obj)))
errors.append(cls.required_attributes(obj["props"]))
errors.append(cls.position(obj["position"]))
return errors
[docs]
@classmethod
def required_attributes(cls, props): # type: (Dict[str, Any]) -> Optional[str]
missing_attributes = [attr for attr in cls.attributes if not props.get(attr, "")]
if missing_attributes:
return "is missing required attributes: {!r}.".format(missing_attributes)
[docs]
@classmethod
def position(cls, position): # type: (str) -> Optional[str]
if not cls.position_regex.match(position):
return "has wrong position in ldap."
[docs]
@classmethod
def required_roles(cls, roles, expected_roles): # type: (List[str], List[str]) -> Optional[str]
roles = [r.lower() for r in roles]
missing_roles = [role for role in expected_roles if role.lower() not in roles]
if missing_roles:
return "is missing roles {!r}".format(missing_roles)
[docs]
@classmethod
def roles_at_school(cls, schools): # type: (List[str]) -> List[str]
"""Get all roles for all schools which the object is expected to have."""
expected_roles = []
for role in cls.roles:
for school in schools:
expected_roles.append(create_ucsschool_role_string(role, school))
return expected_roles
[docs]
@classmethod
def expected_roles(cls, obj): # type: (Dict[str, Any]) -> List[str]
return []
[docs]
@classmethod
def get_search_base(cls, school): # type: (str) -> SchoolSearchBase
from .base import UCSSchoolHelperAbstractClass
return UCSSchoolHelperAbstractClass.get_search_base(school)
[docs]
class UserValidator(SchoolValidator):
is_student = False
is_exam_user = False
is_teacher = False
is_staff = False
is_school_admin = False
attributes = [
"username",
"ucsschoolRole",
"school",
"firstname",
"lastname",
]
[docs]
@classmethod
def validate(cls, obj): # type: (Dict[str, Any]) -> List[str]
errors = super(UserValidator, cls).validate(obj)
schools = obj["props"].get("school", [])
groups = obj["props"].get("groups", [])
errors.append(cls.validate_required_groups(groups, cls.expected_groups(obj)))
errors.extend(cls.validate_group_membership(groups))
roles = []
for role_str in obj["props"].get("ucsschoolRole", []):
try:
roles.append(get_role_info(role_str))
except (InvalidUcsschoolRoleString, UnknownRole, UnknownContextType) as exc:
errors.append(str(exc))
errors.append(cls.validate_part_of_school(roles, schools))
errors.append(cls.validate_student_roles(roles))
return errors
[docs]
@classmethod
def expected_roles(cls, obj): # type: (Dict[str, Any]) -> List[str]
schools = obj["props"].get("school", [])
return cls.roles_at_school(schools)
[docs]
@classmethod
def expected_groups(cls, obj): # type: (Dict[str, Any]) -> List[str]
"""Collect expected groups of user. Overwrite for special cases in subclasses."""
expected_groups = []
schools = obj["props"].get("school", [])
expected_groups.extend(cls.domain_users_group(schools))
expected_groups.extend(cls.role_groups(schools))
return expected_groups
[docs]
@classmethod
def validate_required_groups(cls, groups, expected_groups):
# type: (List[str], List[Any]) -> Optional[str]
"""
Object should be in all groups/ containers.
E.g.: Students must have at least one group in `cn=klassen,cn=schueler,cn=groups,ou=ou`,
which is true if the string ends with the classes position.
For groups like `cn=schueler-ou` endwith is the same as equal.
"""
groups = [g.lower() for g in groups]
missing_groups = [
exp_group
for exp_group in expected_groups
if not any(g.endswith(exp_group.lower()) for g in groups)
]
if missing_groups:
return "is missing groups at positions {!r}".format(missing_groups)
[docs]
@classmethod
def validate_part_of_school(cls, roles, schools):
# type: (List[Tuple[str]], List[str]) -> Optional[str]
"""Users should not have roles with schools which they don't have."""
schools = [s.lower() for s in schools]
missing_schools = {s for r, c, s in roles if c == "school" and s.lower() not in schools}
if missing_schools:
return "is not part of schools: {!r}.".format(list(missing_schools))
[docs]
@classmethod
def validate_student_roles(cls, roles): # type: (List[Tuple[str]]) -> Optional[str]
"""Students should not have teacher, staff or school_admin role."""
not_allowed_for_students = [role_teacher, role_staff, role_school_admin]
for r, _c, _s in roles:
if (cls.is_student and r in not_allowed_for_students) or (
not cls.is_student and r in [role_student, role_exam_user]
):
return "must not have these roles: {!r}.".format(not_allowed_for_students)
[docs]
@classmethod
def domain_users_group(cls, schools): # type: (List[str]) -> List[str]
"""Users should be inside the `Domain Users OU` of their schools."""
return [
"cn=Domain Users {0},cn=groups,ou={0},{1}".format(escape_dn_chars(school), ucr["ldap/base"])
for school in schools
]
[docs]
@classmethod
def role_groups(cls, schools): # type: (List[str]) -> List[str]
"""
Users with `cls.role` should be in the corresponding group at
each school they are part of.
Implemented in subclasses.
"""
return []
[docs]
@classmethod
def validate_group_membership(cls, groups): # type: (List[str]) -> List[str]
"""Validate group membership, e.g. students should not be in teachers group."""
return [
"Disallowed member of group {}".format(dn)
for dn in groups
if (
(SchoolSearchBase.get_is_student_group_regex().match(dn) and not cls.is_student)
or (SchoolSearchBase.get_is_teachers_group_regex().match(dn) and not cls.is_teacher)
or (SchoolSearchBase.get_is_staff_group_regex().match(dn) and not cls.is_staff)
or (SchoolSearchBase.get_is_admins_group_regex().match(dn) and cls.is_student)
)
]
[docs]
class StudentValidator(UserValidator):
position_regex = SchoolSearchBase.get_students_pos_regex()
is_student = True
roles = [role_student]
[docs]
@classmethod
def expected_groups(cls, obj): # type: (Dict[str, Any]) -> List[str]
"""Students have at least one class at every school."""
schools = obj["props"].get("school", [])
expected_groups = super(StudentValidator, cls).expected_groups(obj)
expected_groups.extend([cls.get_search_base(school).classes for school in schools])
return expected_groups
[docs]
@classmethod
def role_groups(cls, schools): # type: (List[str]) -> List[str]
return [cls.get_search_base(school).students_group for school in schools]
[docs]
class TeacherValidator(UserValidator):
position_regex = SchoolSearchBase.get_teachers_pos_regex()
is_teacher = True
roles = [role_teacher]
[docs]
@classmethod
def role_groups(cls, schools): # type: (List[str]) -> List[str]
return [cls.get_search_base(school).teachers_group for school in schools]
[docs]
class ExamStudentValidator(StudentValidator):
position_regex = SchoolSearchBase.get_exam_users_pos_regex()
is_exam_user = True
is_student = True
roles = [role_exam_user]
[docs]
@classmethod
def validate(cls, obj): # type: (Dict[str, Any]) -> List[str]
errors = super(ExamStudentValidator, cls).validate(obj)
roles = []
for role_str in obj["props"].get("ucsschoolRole", []):
try:
roles.append(get_role_info(role_str))
except (InvalidUcsschoolRoleString, UnknownRole, UnknownContextType) as exc:
errors.append(str(exc))
errors.append(cls.validate_exam_contexts(roles))
return errors
[docs]
@classmethod
def validate_exam_contexts(cls, roles): # type: (List[Tuple[str]]) -> str
"""
ExamUsers should have a role with context `exam`,
e.g exam_user:exam:demo-exam-DEMOSCHOOL.
"""
exam_roles = [r for r, c, s in roles if c == "exam" and r == role_exam_user]
if not exam_roles:
return "is missing role with context exam."
[docs]
@classmethod
def role_groups(cls, schools): # type: (List[str]) -> List[str]
"""
ExamUsers should be inside a corresponding group in each of their schools.
SchoolSearchBase.examGroup has no school.lower()
"""
return [
"cn={},cn=ucsschool,cn=groups,{}".format(
escape_dn_chars(SchoolSearchBase._examGroupNameTemplate % {"ou": school.lower()}),
ucr["ldap/base"],
)
for school in schools
]
[docs]
class StaffValidator(UserValidator):
position_regex = SchoolSearchBase.get_staff_pos_regex()
is_staff = True
roles = [role_staff]
[docs]
@classmethod
def role_groups(cls, schools): # type: (List[str]) -> List[str]
return [cls.get_search_base(school).staff_group for school in schools]
[docs]
class TeachersAndStaffValidator(UserValidator):
position_regex = SchoolSearchBase.get_teachers_and_staff_pos_regex()
is_teacher = True
is_staff = True
roles = [role_teacher, role_staff]
[docs]
@classmethod
def role_groups(cls, schools): # type: (List[str]) -> List[str]
"""
TeachersAndStaff Users should be inside teachers and staff groups in
all of their schools.
"""
expected_groups = []
for school in schools:
expected_groups.append(cls.get_search_base(school).teachers_group)
expected_groups.append(cls.get_search_base(school).staff_group)
return expected_groups
[docs]
class SchoolAdminValidator(UserValidator):
position_regex = SchoolSearchBase.get_admins_pos_regex()
is_school_admin = True
roles = [role_school_admin]
[docs]
@classmethod
def role_groups(cls, schools): # type: (List[str]) -> List[str]
return [cls.get_search_base(school).admins_group for school in schools]
[docs]
class GroupAndShareValidator(SchoolValidator):
attributes = [
"name",
"ucsschoolRole",
]
@staticmethod
def _extract_ou(dn): # type: (str) -> Optional[str]
"""
Groups and shares do not have the property school,
so it is extracted of the dn.
"""
try:
return next(val for x in str2dn(dn) for attr, val, z in x if attr.lower() == "ou")
except StopIteration:
pass
[docs]
@classmethod
def validate(cls, obj): # type: (Dict[str, Any]) -> List[str]
school = GroupAndShareValidator._extract_ou(obj["dn"])
errors = super(GroupAndShareValidator, cls).validate(obj)
errors.append(cls.school_prefix(obj["props"]["name"], school))
return errors
[docs]
@classmethod
def expected_roles(cls, obj): # type: (Dict[str, Any]) -> List[str]
school = GroupAndShareValidator._extract_ou(obj["dn"])
return cls.roles_at_school([school])
[docs]
@classmethod
def school_prefix(cls, name, school): # type: (str, str) -> Optional[str]
"""Groups and Shares should have a school prefix in their name, like `DEMOSCHOOL-Democlass`"""
if role_marketplace_share not in cls.roles:
if school and name and not name.startswith("{}-".format(school)):
return "has an incorrect school prefix for school {}.".format(school)
[docs]
class SchoolClassValidator(GroupAndShareValidator):
roles = [role_school_class]
position_regex = SchoolSearchBase.get_schoolclass_pos_regex()
[docs]
class WorkGroupValidator(GroupAndShareValidator):
roles = [role_workgroup]
position_regex = SchoolSearchBase.get_workgroup_pos_regex()
[docs]
class ComputerroomValidator(GroupAndShareValidator):
roles = [role_computer_room]
position_regex = SchoolSearchBase.get_computerroom_pos_regex()
[docs]
class WorkGroupShareValidator(GroupAndShareValidator):
roles = [role_workgroup_share]
position_regex = SchoolSearchBase.get_workgroup_share_pos_regex()
[docs]
class ClassShareValidator(GroupAndShareValidator):
roles = [role_school_class_share]
position_regex = SchoolSearchBase.get_school_class_share_pos_regex()
[docs]
class MarketplaceShareValidator(GroupAndShareValidator):
roles = [role_marketplace_share]
position_regex = SchoolSearchBase.get_workgroup_share_pos_regex()
dn_regex = re.compile(
r"cn=Marktplatz,cn=shares,ou=[^,]+?,{}".format(ucr["ldap/base"]),
flags=re.IGNORECASE,
)
[docs]
def get_class(obj): # type: (Dict[str, Any]) -> Optional[Type[SchoolValidator]]
options = obj["options"]
position = obj["position"]
if options.get("ucsschoolExam", False):
return ExamStudentValidator
if options.get("ucsschoolTeacher", False) and options.get("ucsschoolStaff", False):
return TeachersAndStaffValidator
if options.get("ucsschoolStudent", False):
return StudentValidator
if options.get("ucsschoolTeacher", False):
return TeacherValidator
if options.get("ucsschoolStaff", False):
return StaffValidator
if options.get("ucsschoolAdministrator", False):
return SchoolAdminValidator
if SchoolClassValidator.position_regex.match(position):
return SchoolClassValidator
if WorkGroupValidator.position_regex.match(position):
return WorkGroupValidator
if ComputerroomValidator.position_regex.match(position):
return ComputerroomValidator
if ClassShareValidator.position_regex.match(position):
return ClassShareValidator
if MarketplaceShareValidator.dn_regex.match(obj["dn"]):
# note: MarketplaceShares have the same position as WorkgroupShares,
# but are unique for ous.
return MarketplaceShareValidator
if WorkGroupShareValidator.position_regex.match(position):
return WorkGroupShareValidator
[docs]
def validate(obj, logger=None): # type: (Dict[str, Any], logging.Logger) -> None
"""
Objects are validated as dicts and errors are logged to
the passed logger. Sensitive data is only logged to /var/log/univention/ucs-school-validation.log
"""
dict_obj = obj_to_dict_conversion(obj)
validation_class = get_class(dict_obj)
if validation_class:
options = dict_obj["options"]
errors = validation_class.validate(dict_obj)
errors = [err for err in errors if err is not None]
if errors:
validation_uuid = str(uuid.uuid4())
errors_str = "{} UCS@school Object {} with options {} has validation errors: {}".format(
validation_uuid,
dict_obj.get("dn", ""),
"{!r}".format(options),
", ".join(errors),
)
varname = "ucsschool/validation/logging/enabled"
if ucr.is_true(varname, True) or ucr.get(varname) in (
"",
None,
): # tests: 00_validation_log_enabled
if logger:
logger.warning(errors_str)
if private_data_logger:
stack_trace = " ".join(traceback.format_stack()[:-2]).replace("\n", " ")
private_data_logger.warning("\t".join([errors_str, str(dict_obj), stack_trace]))