Source code for ucsschool.lib.consistency

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
#
# UCS@school Diagnosis Module
#
# Copyright 2020-2025 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.
"""This module check the constistency of USC@school users, shares and groups"""
import re
import sys
from typing import Dict, List, Optional, Tuple  # noqa: F401

from ldap import INVALID_DN_SYNTAX
from ldap.dn import escape_dn_chars
from ldap.filter import escape_filter_chars, filter_format

from univention.admin.uexceptions import noObject, permissionDenied
from univention.admin.uldap import getMachineConnection
from univention.config_registry import ConfigRegistry

from .models.base import WrongObjectType
from .models.school import School
from .models.user import User
from .roles import (
    create_ucsschool_role_string,
    role_dc_slave_admin,
    role_dc_slave_edu,
    role_exam_user,
    role_memberserver_admin,
    role_memberserver_edu,
    role_school_admin,
    role_school_class,
    role_school_class_share,
    role_staff,
    role_student,
    role_teacher,
    role_workgroup,
    role_workgroup_share,
)
from .schoolldap import SchoolSearchBase


[docs] class UserCheck(object): def __init__(self): ucr = ConfigRegistry() ucr.load() ldap_base = ucr.get("ldap/base") self.lo, _ = getMachineConnection() admins_prefix = ucr.get("ucsschool/ldap/default/groupprefix/admins", "admins-") teachers_prefix = ucr.get("ucsschool/ldap/default/groupprefix/teachers", "lehrer-") staff_prefix = ucr.get("ucsschool/ldap/default/groupprefix/staff", "mitarbeiter-") students_prefix = ucr.get("ucsschool/ldap/default/groupprefix/pupils", "schueler-") self.teachers_regex = re.compile( r"cn={}(?P<ou>[^,]+?),cn=groups,ou=(?P=ou),{}".format(teachers_prefix, ldap_base), flags=re.IGNORECASE, ) self.staff_regex = re.compile( r"cn={}(?P<ou>[^,]+?),cn=groups,ou=(?P=ou),{}".format(staff_prefix, ldap_base), flags=re.IGNORECASE, ) self.students_regex = re.compile( r"cn={}(?P<ou>[^,]+?),cn=groups,ou=(?P=ou),{}".format(students_prefix, ldap_base), flags=re.IGNORECASE, ) self.ucsschool_obj_classes = { "ucsschoolTeacher": role_teacher, "ucsschoolStaff": role_staff, "ucsschoolStudent": role_student, "ucsschoolAdministrator": role_school_admin, "ucsschoolExam": role_exam_user, } self.domain_users_ou = {} self.students_ou = {} self.teachers_ou = {} self.staff_ou = {} self.admins_ou = {} self.all_schools = [ou.name for ou in School.get_all(self.lo)] for ou in self.all_schools: self.domain_users_ou[ou] = "cn=Domain Users {0},cn=groups,ou={0},{1}".format(ou, ldap_base) self.students_ou[ou] = "cn={}{},cn=groups,ou={},{}".format( students_prefix, ou.lower(), ou, ldap_base ) self.teachers_ou[ou] = "cn={}{},cn=groups,ou={},{}".format( teachers_prefix, ou.lower(), ou, ldap_base ) self.staff_ou[ou] = "cn={}{},cn=groups,ou={},{}".format( staff_prefix, ou.lower(), ou, ldap_base ) self.admins_ou[ou] = "cn={}{},cn=ouadmins,cn=groups,{}".format( admins_prefix, ou.lower(), ldap_base )
[docs] def check_allowed_membership(self, group_dn, students=False, teachers=False, staff=False): # type: (str, Optional[bool], Optional[bool], Optional[bool]) -> List[str] """ This function is used to check if a group of a user matches the users UCS@school role(s). The caller specifies the group dn and the user roles which are allowed by setting them to 'True'. Example: 'group_dn' is expected to be a teachers group, i.e. 'teachers' is set to True by the caller. If the group turns out to be a students group (where teachers are disallowed) and 'students' is False, it is an error. A warning will be appended to a list which will be returned. """ errors = [] if self.students_regex.match(group_dn) and not students: errors.append("Disallowed member of students group {}.".format(group_dn)) if self.teachers_regex.match(group_dn) and not teachers: errors.append("Disallowed member of teachers group {}.".format(group_dn)) if self.staff_regex.match(group_dn) and not staff: errors.append("Disallowed member of staff group {}.".format(group_dn)) return errors
[docs] def get_users_from_ldap( self, school, users ): # type: (str, List[str]) -> Tuple[str, Dict[str, List[bytes]]] ldap_user_list = [] if users: for user_dn in users: try: ldap_user_list.append(self.lo.search(base=user_dn)[0]) except noObject: print("User with DN {} does not exist.".format(user_dn)) sys.exit() except INVALID_DN_SYNTAX: print("DN {} has invalid syntax.".format(user_dn)) sys.exit(1) if school: users_from_school_list = self.lo.search( filter=filter_format( "(&(univentionObjectType=users/user)(ucsschoolSchool=%s))", (school,) ) ) # all users from this school gets added to the list (no duplicates) for user in users_from_school_list: if user not in ldap_user_list: ldap_user_list.append(user) if not users and not school: ldap_user_list = self.lo.search( filter="(&(univentionObjectType=users/user)(objectClass=ucsschoolType))" ) return ldap_user_list
[docs] def check_user(self, dn, attrs): # type: (str, Dict[str, List[bytes]]) -> List[str] issues = [] try: user_obj = User.from_dn(dn, None, self.lo) except WrongObjectType as exc: issues.append("Expected a user object, but is not: {}".format(exc)) return issues except permissionDenied as exc: issues.append("Could not access this user {}".format(exc)) return issues # check if objectClass is correctly set user_obj_classes = [x.decode("UTF-8") for x in attrs.get("objectClass", [])] if not any(cls in user_obj_classes for cls in self.ucsschool_obj_classes): issues.append("User has no UCS@school Object Class set.") # check if UCS@school role is correctly set for each school dependent of the objectClass user_roles = [] for cls in user_obj_classes: try: user_roles.append(self.ucsschool_obj_classes[cls]) except KeyError: continue # exam users are an exception. They are objectClass 'ucsschoolStudent', # but only require to have the role 'exam_user:school:<userschool>' if self.ucsschool_obj_classes["ucsschoolExam"] in user_roles: try: user_roles.remove(self.ucsschool_obj_classes["ucsschoolStudent"]) except ValueError: pass # ucsschool_roles are validated case-insensitive ucsschool_roles = {r.lower() for r in user_obj.ucsschool_roles} for role in user_roles: for school in user_obj.schools: ucsschool_role_string = create_ucsschool_role_string(role, school) if ucsschool_role_string.lower() not in ucsschool_roles: issues.append("User does not have UCS@school Role {}".format(ucsschool_role_string)) # check appropriate group memberships (case-insensitive) users_group_dns = [_dn.lower() for _dn in self.lo.searchDn(filter="uniqueMember={}".format(dn))] # further checks require all of user_obj.schools to exist for school in user_obj.schools: if school not in self.all_schools: issues.append( "User is member of school {}, which does not exist anymore. " "Further tests on this user cannot be performed.".format(school) ) # abort further user checks here return issues for school in user_obj.schools: if self.domain_users_ou[school].lower() not in users_group_dns: issues.append("Not member of group {}".format(self.domain_users_ou[school])) # check students if user_obj.is_student(self.lo): for school in user_obj.schools: if self.students_ou[school].lower() not in users_group_dns: issues.append("Not member of group {}".format(self.students_ou[school])) for group_dn in users_group_dns: issues += self.check_allowed_membership(group_dn, students=True) # check admins if user_obj.is_administrator(self.lo): for ou in user_obj.schools: if self.admins_ou[ou].lower() not in users_group_dns: issues.append("Not member of group {}".format(self.admins_ou[ou])) for group_dn in users_group_dns: if self.students_regex.match(group_dn): issues.append("Admin should not be in a students group {}".format(group_dn)) # check teachers and staff if user_obj.is_teacher(self.lo) and user_obj.is_staff(self.lo): for ou in user_obj.schools: if self.teachers_ou[ou].lower() not in users_group_dns: issues.append("Not member of group {}".format(self.teachers_ou[ou])) if self.staff_ou[ou].lower() not in users_group_dns: issues.append("Not member of group {}".format(self.staff_ou[ou])) for group_dn in users_group_dns: issues += self.check_allowed_membership(group_dn, teachers=True, staff=True) # check teachers elif user_obj.is_teacher(self.lo): for ou in user_obj.schools: if self.teachers_ou[ou].lower() not in users_group_dns: issues.append("Not member of group {}".format(self.teachers_ou[ou])) for group_dn in users_group_dns: issues += self.check_allowed_membership(group_dn, teachers=True) # check staff elif user_obj.is_staff(self.lo): for ou in user_obj.schools: if self.staff_ou[ou].lower() not in users_group_dns: issues.append("Not member of group {}".format(self.staff_ou[ou])) for group_dn in users_group_dns: issues += self.check_allowed_membership(group_dn, staff=True) # Check if student is in a class group. if not user_obj.school_classes and user_obj.is_student(self.lo): issues.append("Is not a member of any school class.") # Users should also be member of the corresponding school for ou in user_obj.school_classes: if ou.encode("UTF-8") not in attrs["ucsschoolSchool"]: issues.append( "Is member of class {} but school property is not correspondingly set.".format( user_obj.school_classes[ou][0] ) ) return issues
[docs] def check_mandatory_groups_exist(school=None): # type: (str) -> Dict[str, List[str]] ucr = ConfigRegistry() ucr.load() ldap_base = ucr.get("ldap/base") problematic_objects = {} lo, _ = getMachineConnection() mandatory_global_groups = [ "cn=DC-Edukativnetz,cn=ucsschool,cn=groups,{}".format(ldap_base), "cn=DC-Verwaltungsnetz,cn=ucsschool,cn=groups,{}".format(ldap_base), "cn=Member-Edukativnetz,cn=ucsschool,cn=groups,{}".format(ldap_base), "cn=Member-Verwaltungsnetz,cn=ucsschool,cn=groups,{}".format(ldap_base), ] global_groups_issues = [] for mandatory_global_group in mandatory_global_groups: try: lo.searchDn(base=mandatory_global_group) except noObject: global_groups_issues.append( "Mandatory group {} does not exist.".format(mandatory_global_group) ) if global_groups_issues: problematic_objects["Global Groups"] = global_groups_issues if school: all_schools = [school] else: all_schools = [ou.name for ou in School.get_all(lo)] for ou in all_schools: search_base = SchoolSearchBase([ou]) issues = [] mandatory_groups = [ "cn=Domain Users {0},cn=groups,ou={0},{1}".format(escape_dn_chars(ou), ldap_base), "cn=OU{}-DC-Edukativnetz,cn=ucsschool,cn=groups,{}".format(escape_dn_chars(ou), ldap_base), "cn=OU{}-DC-Verwaltungsnetz,cn=ucsschool,cn=groups,{}".format( escape_dn_chars(ou), ldap_base ), "cn=OU{}-Member-Edukativnetz,cn=ucsschool,cn=groups,{}".format( escape_dn_chars(ou), ldap_base ), "cn=OU{}-Member-Verwaltungsnetz,cn=ucsschool,cn=groups,{}".format( escape_dn_chars(ou), ldap_base ), "cn=OU{}-Klassenarbeit,cn=ucsschool,cn=groups,{}".format(escape_dn_chars(ou), ldap_base), search_base.admins_group, ] for mandatory_group in mandatory_groups: try: lo.searchDn(base=mandatory_group) except noObject: issues.append("Mandatory group {} does not exist.".format(mandatory_group)) if issues: problematic_objects[search_base.schoolDN] = issues return problematic_objects
[docs] def check_containers(school=None): # type: (Optional[str]) -> Dict[str, List[str]] problematic_objects = {} lo, _ = getMachineConnection() if school: all_schools = [school] else: all_schools = [ou.name for ou in School.get_all(lo)] for ou in all_schools: search_base = SchoolSearchBase([ou]) issues = [] mandatory_containers = [ search_base.computers, search_base.examUsers, search_base.groups, search_base.rooms, search_base.students, search_base.classes, search_base.shares, search_base.classShares, search_base.users, search_base.dhcp, search_base.networks, search_base.policies, search_base.printers, ] for mandatory_container in mandatory_containers: try: lo.searchDn(base=mandatory_container) except noObject: issues.append("Mandatory container {} does not exist.".format(mandatory_container)) if issues: problematic_objects[search_base.schoolDN] = issues return problematic_objects
[docs] def check_shares(school=None): # type: (Optional[str]) -> Dict[str, List[str]] ucr = ConfigRegistry() ucr.load() problematic_objects = {} lo, _ = getMachineConnection() if school: all_schools = [school] school_filter = school allow_wildcards = False else: all_schools = [ou.name for ou in School.get_all(lo)] school_filter = "*" allow_wildcards = True if ucr.is_true("ucsschool/import/generate/share/marktplatz", True): for ou in all_schools: search_base = SchoolSearchBase([ou]) marktplatz_share = "cn=Marktplatz,cn=shares,{}".format(search_base.schoolDN) try: lo.search(base=marktplatz_share) except noObject: problematic_objects.setdefault(marktplatz_share, []).append( "The 'Marktplatz' share of school %r does not exist." % (ou,) ) def maybe_allow_wildcards(filter_string): if allow_wildcards: filter_string = filter_string.replace(escape_filter_chars("*"), "*") return filter_string # check if there is a school class for each class share classes = [] role_classes_string = create_ucsschool_role_string(role_school_class, school_filter) for _dn, attrs in lo.search( filter=maybe_allow_wildcards(filter_format("(ucsschoolRole=%s)", [role_classes_string])) ): classes.append(attrs["cn"][0].decode("UTF-8")) role_class_share_string = create_ucsschool_role_string(role_school_class_share, school_filter) cls_shares = lo.search( filter=maybe_allow_wildcards(filter_format("(ucsschoolRole=%s)", [role_class_share_string])) ) for dn, attrs in cls_shares: if attrs["cn"][0].decode("UTF-8") not in classes: problematic_objects.setdefault(dn, []).append( "Corresponding class {} is missing.".format(attrs["cn"][0].decode("UTF-8")) ) # check if there is a work group for each work group share work_groups = [] role_workgroup_string = create_ucsschool_role_string(role_workgroup, school_filter) for dn, attrs in lo.search( filter=maybe_allow_wildcards(filter_format("(ucsschoolRole=%s)", [role_workgroup_string])) ): work_groups.append(attrs["cn"][0].decode("UTF-8")) role_workgroup_share_string = create_ucsschool_role_string(role_workgroup_share, school_filter) wg_share = lo.search( filter=maybe_allow_wildcards(filter_format("(ucsschoolRole=%s)", [role_workgroup_share_string])) ) for dn, attrs in wg_share: if attrs["cn"][0].decode("UTF-8") not in work_groups: problematic_objects.setdefault(dn, []).append( "Corresponding work group {} is missing.".format(attrs["cn"][0].decode("UTF-8")) ) return problematic_objects
[docs] def check_server_group_membership(school=None): # type: (Optional[str]) -> Dict[str, List[str]] def server_in_group_errors(lo, role, members, group_dn): problematic_objects = {} for dn, _attrs in lo.search(filter=filter_format("(ucsschoolRole=%s)", [role])): if dn not in members: problematic_objects.setdefault(dn, []).append( "is not a member of group {}".format(group_dn) ) return problematic_objects ucr = ConfigRegistry() ucr.load() ldap_base = ucr.get("ldap/base") problematic_objects = {} lo, _ = getMachineConnection() if school: all_schools = [school] else: all_schools = [ou.name for ou in School.get_all(lo)] dn_dc_edu_global = "cn=DC-Edukativnetz,cn=ucsschool,cn=groups,{}".format(ldap_base) dn_dc_admin_global = "cn=DC-Verwaltungsnetz,cn=ucsschool,cn=groups,{}".format(ldap_base) dn_member_edu_global = "cn=Member-Edukativnetz,cn=ucsschool,cn=groups,{}".format(ldap_base) dn_member_admin_global = "cn=Member-Verwaltungsnetz,cn=ucsschool,cn=groups,{}".format(ldap_base) global_groups = [dn_dc_edu_global, dn_dc_admin_global, dn_member_edu_global, dn_member_admin_global] members = {} for dn in global_groups: try: members[dn] = [x.decode("UTF-8") for x in lo.search(base=dn)[0][1]["uniqueMember"]] except KeyError: members[dn] = [] continue except noObject: problematic_objects.setdefault(dn, []).append( "Memberships of group {} could not be checked. It does not exist.".format(dn) ) continue for ou in all_schools: dn_dc_edu = "cn=OU{}-DC-Edukativnetz,cn=ucsschool,cn=groups,{}".format( escape_dn_chars(ou), ldap_base ) dn_dc_admin = "cn=OU{}-DC-Verwaltungsnetz,cn=ucsschool,cn=groups,{}".format( escape_dn_chars(ou), ldap_base ) dn_member_edu = "cn=OU{}-Member-Edukativnetz,cn=ucsschool,cn=groups,{}".format( escape_dn_chars(ou), ldap_base ) dn_member_admin = "cn=OU{}-Member-Verwaltungsnetz,cn=ucsschool,cn=groups,{}".format( escape_dn_chars(ou), ldap_base ) role_dc_edu_str = create_ucsschool_role_string(role_dc_slave_edu, ou) role_dc_admin_str = create_ucsschool_role_string(role_dc_slave_admin, ou) role_member_edu_str = create_ucsschool_role_string(role_memberserver_edu, ou) role_member_admin_str = create_ucsschool_role_string(role_memberserver_admin, ou) checks = [ (role_dc_edu_str, dn_dc_edu, dn_dc_edu_global), (role_dc_admin_str, dn_dc_admin, dn_dc_admin_global), (role_member_edu_str, dn_member_edu, dn_member_edu_global), (role_member_admin_str, dn_member_admin, dn_member_admin_global), ] for role, group_dn, global_group in checks: try: members[group_dn] = [ x.decode("UTF-8") for x in lo.search(base=group_dn)[0][1]["uniqueMember"] ] except KeyError: members[group_dn] = [] except noObject: problematic_objects.setdefault(dn, []).append( "Memberships of group {} could not be checked. It does not exist".format(dn) ) continue # When a KeyError occurs here, we still want to continue checking the rest # A KeyError is only expected if the corresponding group does not exist try: problematic_objects.update(server_in_group_errors(lo, role, members[group_dn], group_dn)) except KeyError: # occurs if group_dn did not exist pass try: problematic_objects.update( server_in_group_errors(lo, role, members[global_group], global_group) ) except KeyError: # occurs if global_group did not exist continue return problematic_objects
[docs] def check_all(school=None, user_dn=None): # type: (Optional[str], Optional[str]) -> Dict[str, Dict[str, List[str]]] user_check = UserCheck() users_from_ldap = user_check.get_users_from_ldap(school, user_dn) user_problematic_objects = {} # type: Dict[str, List[str]] for dn, attrs in users_from_ldap: user_issues = user_check.check_user(dn, attrs) if user_issues: user_problematic_objects[dn] = user_issues group_problematic_objects = check_mandatory_groups_exist(school) container_problematic_objects = check_containers(school) share_problematic_objects = check_shares(school) server_group_problematic_objects = check_server_group_membership(school) all_issues = { "users": user_problematic_objects, "groups": group_problematic_objects, "shares": share_problematic_objects, "containers": container_problematic_objects, "server_groups": server_group_problematic_objects, } return all_issues