Source code for ucsschool.lib.models.user

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# UCS@school python lib: models
#
# Copyright 2014-2025 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.

import os.path
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Set, Tuple, Type  # noqa: F401

from ldap.dn import escape_dn_chars, explode_rdn
from ldap.filter import filter_format
from six import iteritems

import univention.admin.modules as udm_modules
from univention.admin import syntax
from univention.admin.filter import conjunction, parse
from univention.admin.uexceptions import noObject, valueError

from ..roles import role_exam_user, role_pupil, role_school_admin, role_staff, role_student, role_teacher
from .attributes import (
    Birthday,
    Disabled,
    Email,
    Firstname,
    Lastname,
    Password,
    SchoolClassesAttribute,
    Schools,
    UserExpirationDate,
    Username,
    WorkgroupsAttribute,
)
from .base import NoObject, RoleSupportMixin, UCSSchoolHelperAbstractClass, UnknownModel, WrongModel
from .computer import AnyComputer
from .group import BasicGroup, Group, SchoolClass, SchoolGroup, WorkGroup
from .misc import MailDomain
from .school import School
from .utils import _, create_passwd, ucr

if TYPE_CHECKING:
    from .base import LoType, SuperOrdinateType, UdmObject, UldapFilter  # noqa: F401


[docs] class User(RoleSupportMixin, UCSSchoolHelperAbstractClass): name = Username(_("Username"), aka=["Username", "Benutzername"]) # type: str schools = Schools(_("Schools")) # type: List[str] firstname = Firstname( _("First name"), aka=["First name", "Vorname"], required=True, unlikely_to_change=True, ) # type: str lastname = Lastname( _("Last name"), aka=["Last name", "Nachname"], required=True, unlikely_to_change=True, ) # type: str birthday = Birthday( _("Birthday"), aka=["Birthday", "Geburtstag"], unlikely_to_change=True ) # type: str expiration_date = UserExpirationDate( _("Expiration date"), aka=["Expiration date", "Ablaufdatum"] ) # type: str email = Email(_("Email"), aka=["Email", "E-Mail"], unlikely_to_change=True) # type: str password = Password(_("Password"), aka=["Password", "Passwort"]) # type: Optional[str] disabled = Disabled(_("Disabled"), aka=["Disabled", "Gesperrt"]) # type: bool school_classes = SchoolClassesAttribute( _("Class"), aka=["Class", "Klasse"] ) # type: Dict[str, List[str]] workgroups = WorkgroupsAttribute( _("WorkGroup"), aka=["WorkGroup", "Workgroup"] ) # type: Dict[str, List[str]] type_name = None # type: str type_filter = ( "(|(objectClass=ucsschoolTeacher)(objectClass=ucsschoolStaff)(objectClass=ucsschoolStudent))" ) _profile_path_cache = {} # type: Dict[str, str] _samba_home_path_cache = {} # type: Dict[str, str] # _samba_home_path_cache is invalidated in School.invalidate_cache() roles = [] # type: List[str] default_roles = [] # type: List[str] default_options = () # type: Tuple[str] def __init__(self, *args, **kwargs): super(User, self).__init__(*args, **kwargs) self.check_password_policies = False if self.school_classes is None: self.school_classes = {} # set a dict for Staff if self.school and not self.schools: self.schools.append(self.school)
[docs] @classmethod def shall_create_mail_domain(cls): # type: () -> bool return ucr.is_true("ucsschool/import/generate/mail/domain")
[docs] def get_roleshare_home_subdir(self): # type: () -> str from ucsschool.lib.roleshares import roleshare_home_subdir return roleshare_home_subdir(self.school, self.roles, ucr)
[docs] def get_samba_home_drive(self): # type: () -> str return ucr.get("ucsschool/import/set/homedrive")
[docs] def get_samba_netlogon_script_path(self): # type: () -> str return ucr.get("ucsschool/import/set/netlogon/script/path")
[docs] def get_samba_home_path(self, lo): # type: (LoType) -> str school = School.cache(self.school) # if defined then use UCR value ucr_variable = ucr.get("ucsschool/import/set/sambahome") if ucr_variable is not None: samba_home_path = r"\\%s" % ucr_variable.strip("\\") elif ucr.is_true("ucsschool/singlemaster", False): # in single server environments the Primary Directory Node is always the fileserver samba_home_path = r"\\%s" % ucr.get("hostname") # if there's a cached result then use it elif school.dn not in self._samba_home_path_cache: samba_home_path = None # get windows home server from OU object school = self.get_school_obj(lo) home_share_file_server = school.home_share_file_server if home_share_file_server: samba_home_path = r"\\%s" % self.get_name_from_dn(home_share_file_server) self._samba_home_path_cache[school.dn] = samba_home_path else: samba_home_path = self._samba_home_path_cache[school.dn] if samba_home_path is not None: return r"%s\%s" % (samba_home_path, self.name)
[docs] def get_profile_path(self, lo): # type: (LoType) -> str ucr_variable = ucr.get("ucsschool/import/set/serverprofile/path") if ucr_variable is not None: return ucr_variable school = School.cache(self.school) if school.dn not in self._profile_path_cache: profile_path = r"%s\%%USERNAME%%\windows-profiles\default" for computer in AnyComputer.get_all( lo, self.school, "univentionService=Windows Profile Server" ): profile_path = profile_path % (r"\\%s" % computer.name) break else: profile_path = profile_path % "%LOGONSERVER%" self._profile_path_cache[school.dn] = profile_path return self._profile_path_cache[school.dn]
[docs] def is_student(self, lo): # type: (LoType) -> bool return self.__check_object_class(lo, "ucsschoolStudent", self._legacy_is_student)
[docs] def is_exam_student(self, lo): # type: (LoType) -> bool return self.__check_object_class(lo, "ucsschoolExam", self._legacy_is_exam_student)
[docs] def is_teacher(self, lo): # type: (LoType) -> bool return self.__check_object_class(lo, "ucsschoolTeacher", self._legacy_is_teacher)
[docs] def is_staff(self, lo): # type: (LoType) -> bool return self.__check_object_class(lo, "ucsschoolStaff", self._legacy_is_staff)
[docs] def is_administrator(self, lo): # type: (LoType) -> bool return self.__check_object_class(lo, "ucsschoolAdministrator", self._legacy_is_admininstrator)
@classmethod def _legacy_is_student(cls, school, dn): # type: (str, str) -> bool cls.logger.warning("Using deprecated method is_student()") return dn.lower().endswith(cls.get_search_base(school).students.lower()) @classmethod def _legacy_is_exam_student(cls, school, dn): # type: (str, str) -> bool cls.logger.warning("Using deprecated method is_exam_student()") return dn.lower().endswith(cls.get_search_base(school).examUsers.lower()) @classmethod def _legacy_is_teacher(cls, school, dn): # type: (str, str) -> bool cls.logger.warning("Using deprecated method is_teacher()") search_base = cls.get_search_base(school) return ( dn.lower().endswith(search_base.teachers.lower()) or dn.lower().endswith(search_base.teachersAndStaff.lower()) or dn.lower().endswith(search_base.admins.lower()) ) @classmethod def _legacy_is_staff(cls, school, dn): # type: (str, str) -> bool cls.logger.warning("Using deprecated method is_staff()") search_base = cls.get_search_base(school) return dn.lower().endswith(search_base.staff.lower()) or dn.lower().endswith( search_base.teachersAndStaff.lower() ) @classmethod def _legacy_is_admininstrator(cls, school, dn): # type: (str, str) -> bool cls.logger.warning("Using deprecated method is_admininstrator()") return dn.lower().endswith(cls.get_search_base(school).admins.lower()) def __check_object_class(self, lo, object_class, fallback): # type: (LoType, str, Callable[[str, str], bool]) -> bool obj = self.get_udm_object(lo) if not obj: raise noObject("Could not read %r" % (self.dn,)) if "ucsschoolSchool" in obj.oldattr: return object_class.encode("UTF-8") in obj.oldattr.get("objectClass", []) return fallback(self.school, self.dn)
[docs] @classmethod def get_class_for_udm_obj(cls, udm_obj, school): # type: (UdmObject, str) -> Type["User"] ocs = {x.decode("UTF-8") for x in udm_obj.oldattr.get("objectClass", [])} if ocs >= {"ucsschoolTeacher", "ucsschoolStaff"}: return TeachersAndStaff if ocs >= {"ucsschoolExam", "ucsschoolStudent"}: return ExamStudent if "ucsschoolTeacher" in ocs: return Teacher if "ucsschoolStaff" in ocs: return Staff if "ucsschoolStudent" in ocs: return Student if "ucsschoolAdministrator" in ocs: return SchoolAdmin # legacy DN based checks if cls._legacy_is_student(school, udm_obj.dn): return Student if cls._legacy_is_teacher(school, udm_obj.dn): if cls._legacy_is_staff(school, udm_obj.dn): return TeachersAndStaff return Teacher if cls._legacy_is_staff(school, udm_obj.dn): return Staff if cls._legacy_is_exam_student(school, udm_obj.dn): return ExamStudent return User
[docs] @classmethod def from_udm_obj(cls, udm_obj, school, lo): # type: (UdmObject, str, LoType) -> "User" obj = super(User, cls).from_udm_obj(udm_obj, school, lo) obj.password = None obj.school_classes = cls.get_school_classes(udm_obj, obj) obj.workgroups = cls.get_workgroups(udm_obj, obj) return obj
[docs] def create( self, lo, validate=True, check_password_policies=False ): # type: (LoType, Optional[bool], Optional[bool]) -> bool self.check_password_policies = check_password_policies return super(User, self).create(lo=lo, validate=validate)
[docs] def do_create(self, udm_obj, lo): # type: (UdmObject, LoType) -> None if not self.schools: self.schools = [self.school] self.set_default_options(udm_obj) self.create_mail_domain(lo) password_created = False if not self.password: self.logger.debug("No password given. Generating random one") self.password = create_passwd(dn=self.dn) password_created = True udm_obj["primaryGroup"] = self.primary_group_dn(lo) udm_obj["groups"] = self.groups_used(lo) subdir = self.get_roleshare_home_subdir() udm_obj["unixhome"] = "/home/" + os.path.join(subdir, self.name) if password_created or not self.check_password_policies: udm_obj["overridePWHistory"] = "1" udm_obj["overridePWLength"] = "1" else: udm_obj["overridePWHistory"] = "0" udm_obj["overridePWLength"] = "0" if self.disabled is None: udm_obj["disabled"] = "0" if "mailbox" in udm_obj: udm_obj["mailbox"] = "/var/spool/%s/" % self.name samba_home = self.get_samba_home_path(lo) if samba_home: udm_obj["sambahome"] = samba_home profile_path = self.get_profile_path(lo) if profile_path: udm_obj["profilepath"] = profile_path home_drive = self.get_samba_home_drive() if home_drive is not None: udm_obj["homedrive"] = home_drive script_path = self.get_samba_netlogon_script_path() if script_path is not None: udm_obj["scriptpath"] = script_path success = super(User, self).do_create(udm_obj, lo) if password_created: # dont' show password in post_hooks # (it has already been saved to LDAP in super().do_create() above) self.password = "" # nosec return success
[docs] def modify( self, lo, validate=True, move_if_necessary=None, check_password_policies=False ): # type: (LoType, Optional[bool], Optional[bool], Optional[bool]) -> bool self.check_password_policies = check_password_policies return super(User, self).modify(lo=lo, validate=validate, move_if_necessary=move_if_necessary)
[docs] def do_modify(self, udm_obj, lo): # type: (UdmObject, LoType) -> None self.create_mail_domain(lo) self.password = self.password or None removed_schools = set(udm_obj["school"]) - set(self.schools) if removed_schools: # change self.schools back, so schools can be removed by remove_from_school() self.schools = udm_obj["school"] for removed_school in removed_schools: self.logger.info("Removing %r from school %r...", self, removed_school) if not self.remove_from_school(removed_school, lo): self.logger.error("Error removing %r from school %r.", self, removed_school) return # remove SchoolClasses or WorkGroups the user is not part of anymore # ignore all others (global groups and $OU-groups) mandatory_groups = self.groups_used(lo) for group_dn in [dn for dn in udm_obj["groups"] if dn not in mandatory_groups]: try: school_class = SchoolClass.from_dn(group_dn, None, lo) classes = self.school_classes.get(school_class.school, []) if school_class.name not in classes and school_class.get_relative_name() not in classes: self.logger.debug("Removing %r from SchoolClass %r.", self, group_dn) udm_obj["groups"].remove(group_dn) # it's not a class but could be a workgroup except noObject: try: workgroup = WorkGroup.from_dn(group_dn, None, lo) workgroups = self.workgroups.get(workgroup.school, []) if ( workgroup.name not in workgroups and workgroup.get_relative_name() not in workgroups ): self.logger.debug("Removing %r from WorkGroup %r.", self, group_dn) udm_obj["groups"].remove(group_dn) except noObject: continue # make sure user is in all mandatory groups and school classes current_groups = {grp_dn.lower() for grp_dn in udm_obj["groups"]} groups_to_add = [dn for dn in mandatory_groups if dn.lower() not in current_groups] # [dn for dn in mandatory_groups if dn.lower() not in current_groups] if groups_to_add: self.logger.debug("Adding %r to groups %r.", self, groups_to_add) udm_obj["groups"].extend(groups_to_add) if self.check_password_policies: udm_obj["overridePWHistory"] = "0" udm_obj["overridePWLength"] = "0" else: udm_obj["overridePWHistory"] = "1" udm_obj["overridePWLength"] = "1" return super(User, self).do_modify(udm_obj, lo)
[docs] def do_school_change(self, udm_obj, lo, old_school): # type: (UdmObject, LoType, str) -> None super(User, self).do_school_change(udm_obj, lo, old_school) school = self.school self.logger.info("User is part of the following groups: %r", udm_obj["groups"]) self.remove_from_groups_of_school(old_school, lo) self._udm_obj_searched = False self.school_classes.pop(old_school, None) self.workgroups.pop(old_school, None) udm_obj = self.get_udm_object(lo) udm_obj["primaryGroup"] = self.primary_group_dn(lo) groups = set(udm_obj["groups"]) at_least_groups = set(self.groups_used(lo)) if (groups | at_least_groups) != groups: udm_obj["groups"] = list(groups | at_least_groups) subdir = self.get_roleshare_home_subdir() udm_obj["unixhome"] = "/home/" + os.path.join(subdir, self.name) samba_home = self.get_samba_home_path(lo) if samba_home: udm_obj["sambahome"] = samba_home profile_path = self.get_profile_path(lo) if profile_path: udm_obj["profilepath"] = profile_path home_drive = self.get_samba_home_drive() if home_drive is not None: udm_obj["homedrive"] = home_drive script_path = self.get_samba_netlogon_script_path() if script_path is not None: udm_obj["scriptpath"] = script_path if udm_obj["departmentNumber"] == [old_school]: udm_obj["departmentNumber"] = [school] if school not in udm_obj["school"]: udm_obj["school"].append(school) if old_school in udm_obj["school"]: udm_obj["school"].remove(old_school) udm_obj.modify(ignore_license=True)
def _alter_udm_obj(self, udm_obj): # type: (UdmObject) -> None if self.email is not None: udm_obj["e-mail"] = self.email udm_obj["departmentNumber"] = [self.school] return super(User, self)._alter_udm_obj(udm_obj)
[docs] def get_mail_domain(self): # type: () -> MailDomain if self.email: domain_name = self.email.split("@")[-1] return MailDomain.cache(domain_name)
[docs] def create_mail_domain(self, lo): # type: (LoType) -> None mail_domain = self.get_mail_domain() if mail_domain is not None and not mail_domain.exists(lo): if self.shall_create_mail_domain(): mail_domain.create(lo) else: self.logger.warning("Not allowed to create %r.", mail_domain)
[docs] def set_default_options(self, udm_obj): # type: (UdmObject) -> None for option in self.get_default_options(): if option not in udm_obj.options: udm_obj.options.append(option)
[docs] @classmethod def get_default_options(cls): # type: () -> Set[str] options = set() # u-s-import uses multiple inheritance, we have to cover all parents for kls in cls.__bases__: # type: "User" try: options.update(kls.get_default_options()) except AttributeError: pass options.update(cls.default_options) return options
[docs] def get_specific_groups(self, lo): # type: (LoType) -> List[str] groups = self.get_domain_users_groups() for school_class in self.get_school_class_objs(): groups.append(self.get_class_dn(school_class.name, school_class.school, lo)) for workgroup in self.get_workgroup_objs(): groups.append(self.get_workgroup_dn(workgroup.name, workgroup.school, lo)) return groups
[docs] def validate( self, lo, validate_unlikely_changes=False, check_name=True ): # type: (LoType, Optional[bool]) -> None super(User, self).validate( lo, validate_unlikely_changes=validate_unlikely_changes, check_name=check_name ) try: udm_obj = self.get_udm_object(lo) except UnknownModel: udm_obj = None except WrongModel as exc: udm_obj = None self.add_error( "name", _( "It is not supported to change the role of a user. %(old_role)s %(name)s cannot " "become a %(new_role)s." ) % { "old_role": exc.model.type_name, "name": self.name, "new_role": self.type_name, }, ) if udm_obj: original_class = self.get_class_for_udm_obj(udm_obj, self.school) if original_class is not self.__class__: self.add_error( "name", _( "It is not supported to change the role of a user. %(old_role)s %(name)s cannot" " become a %(new_role)s." ) % { "old_role": original_class.type_name, "name": self.name, "new_role": self.type_name, }, ) if self.email: if self.get_first_udm_obj( lo, filter_format("&(!(uid=%s))(mailPrimaryAddress=%s)", (self.name, self.email)), ): self.add_error( "email", _( "The email address is already taken by another user. Please change the email " "address." ), ) # mail_domain = self.get_mail_domain(lo) # if not mail_domain.exists(lo) and not self.shall_create_mail_domain(): # self.add_error( # 'email', # _('The mail domain is unknown. Please change the email address or create the mail \ # domain "%s" using the Univention Directory Manager.') % mail_domain.name) if not isinstance(self.school_classes, Mapping): self.add_error( "school_classes", _("Type of 'school_classes' is {type!r}, but must be dictionary.").format( type=type(self.school_classes) ), ) # verify user is (or will be) in all schools of its school_classes for school, _classes in iteritems(self.school_classes): if school.lower() not in (s.lower() for s in self.schools + [self.school]): self.add_error( "school_classes", _( "School {school!r} in 'school_classes' is missing in the users 'school(s)' " "attribute." ).format(school=school), ) # check syntax of all class names for school, classes in iteritems(self.school_classes): for class_name in classes: try: syntax.gid.parse(class_name) except valueError as exc: self.add_error("school_classes", str(exc)) if not isinstance(self.workgroups, Mapping): self.add_error( "workgroups", _("Type of 'workgroups' is {type!r}, but must be dictionary.").format( type=type(self.workgroups) ), ) # verify user is (or will be) in all schools of its work groups for school, _workgroups in iteritems(self.workgroups): if school.lower() not in (s.lower() for s in self.schools + [self.school]): self.add_error( "workgroups", _( "School {school!r} in 'workgroups' is missing in the users 'school(s)' " "attributes." ).format(school=school), ) # check syntax of all work group names for school, workgroups in iteritems(self.workgroups): for work_group_name in workgroups: try: syntax.gid.parse(work_group_name) except valueError as exc: self.add_error("workgroups", str(exc))
[docs] def remove_from_school(self, school, lo): # type: (str, LoType) -> bool if not self.exists(lo): self.logger.warning("User does not exists, not going to remove.") return False try: (self.schools or [school]).remove(school) except ValueError: self.logger.warning("User is not part of school %r. Not removing.", school) return False if not self.schools: self.logger.warning("User %r not part of any school, removing it.", self) return self.remove(lo) if self.school == school: if not self.change_school(self.schools[0], lo): return False else: self.remove_from_groups_of_school(school, lo) self.school_classes.pop(school, None) self.workgroups.pop(school, None) return True
[docs] def remove_from_groups_of_school(self, school, lo): # type: (str, LoType) -> None for cls in (SchoolClass, WorkGroup, SchoolGroup): for group in cls.get_all(lo, school, filter_format("uniqueMember=%s", (self.dn,))): try: group.users.remove(self.dn) except ValueError: pass else: self.logger.info( "Removing %r from group %r of school %r.", self.dn, group.dn, school, ) group.modify(lo) if self.is_administrator(lo): # Bug 54368 # remove_from_groups_of_school() doesn't remove school admins from admins-OU group admin_group_dns = self.get_school_admin_groups([school]) for dn in admin_group_dns: try: admin_group = BasicGroup.from_dn(dn, school, lo) except NoObject: continue try: admin_group.users.remove(self.dn) except ValueError: pass else: self.logger.info( "Removing %r from group %r of school %r.", self.dn, admin_group.dn, school, ) admin_group.modify(lo)
[docs] def get_group_dn(self, group_name, school): # type: (str, str) -> str return Group.cache(group_name, school).dn
[docs] def get_class_dn(self, class_name, school, lo): # type: (str, str, LoType) -> str # Bug #32337: check if the class exists without OU prefix # if it does not exist the class name with OU prefix is used school_class = SchoolClass.cache(class_name, school) if school_class.get_relative_name() == school_class.name: if not school_class.exists(lo): class_name = "%s-%s" % (school, class_name) school_class = SchoolClass.cache(class_name, school) return school_class.dn
[docs] def get_workgroup_dn(self, workgroup_name, school, lo): # type: (str, str, LoType) -> str school_workgroup = WorkGroup.cache(workgroup_name, school) if school_workgroup.get_relative_name() == school_workgroup.name: wg = WorkGroup.cache(workgroup_name, school) if not wg.exists(lo): workgroup_name = "%s-%s" % (school, workgroup_name) school_workgroup = WorkGroup.cache(workgroup_name, school) return school_workgroup.dn
[docs] def primary_group_dn(self, lo): # type: (LoType) -> str dn = self.get_group_dn("Domain Users %s" % self.school, self.school) return self.get_or_create_group_udm_object(dn, lo).dn
[docs] def get_domain_users_groups(self, schools=None): # type: (Optional[List[str]]) -> List[str] return [ self.get_group_dn("Domain Users %s" % school, school) for school in (schools or self.schools) ]
[docs] def get_students_groups(self, schools=None): # type: (Optional[List[str]]) -> List[str] prefix = ucr.get("ucsschool/ldap/default/groupprefix/pupils", "schueler-") return [ self.get_group_dn("%s%s" % (prefix, school), school) for school in (schools or self.schools) ]
[docs] def get_teachers_groups(self, schools=None): # type: (Optional[List[str]]) -> List[str] prefix = ucr.get("ucsschool/ldap/default/groupprefix/teachers", "lehrer-") return [ self.get_group_dn("%s%s" % (prefix, school), school) for school in (schools or self.schools) ]
[docs] def get_staff_groups(self, schools=None): # type: (Optional[List[str]]) -> List[str] prefix = ucr.get("ucsschool/ldap/default/groupprefix/staff", "mitarbeiter-") return [ self.get_group_dn("%s%s" % (prefix, school), school) for school in (schools or self.schools) ]
[docs] def get_school_admin_groups(self, schools=None): # type: (Optional[List[str]]) -> List[str] prefix = self.get_search_base("DEMOSCHOOL").group_prefix_admins ldap_base = ucr.get("ldap/base") return [ "cn=%s%s,cn=ouadmins,cn=groups,%s" % (escape_dn_chars(prefix), escape_dn_chars(school.lower()), ldap_base) for school in (schools or self.schools) ]
[docs] def groups_used(self, lo): # type: (LoType) -> List[str] group_dns = self.get_specific_groups(lo) for group_dn in group_dns: self.get_or_create_group_udm_object(group_dn, lo) return group_dns
[docs] @classmethod def get_or_create_group_udm_object(cls, group_dn, lo, fresh=False): # type: (str, LoType, Optional[bool]) -> Group """ In the case of work groups, this function assumes that they already exists. :raises RuntimeError: if a work group does not exist. """ name = cls.get_name_from_dn(group_dn) school = cls.get_school_from_dn(group_dn) if school is None and name.startswith(cls.get_search_base(school).group_prefix_admins): # Should only happen for ouadmin groups group = BasicGroup.from_dn(group_dn, None, lo) elif Group.is_school_class(school, group_dn): group = SchoolClass.cache(name, school) elif Group.is_school_workgroup(school, group_dn): group = WorkGroup.cache(name, school) if group.exists(lo): return group # this should not happen raise RuntimeError("Work group '%s' does not exist, please create it first." % group_dn) else: group = Group.cache(name, school) if fresh: group._udm_obj_searched = False group.create(lo) return group
[docs] def is_active(self): # type: () -> bool return self.disabled != "1"
[docs] def to_dict(self): # type: () -> Dict[str, Any] ret = super(User, self).to_dict() display_name = [] if self.firstname: display_name.append(self.firstname) if self.lastname: display_name.append(self.lastname) ret["display_name"] = " ".join(display_name) school_classes = {} for school_class in self.get_school_class_objs(): school_classes.setdefault(school_class.school, []).append(school_class.name) ret["school_classes"] = school_classes workgroups = {} for workgroup in self.get_workgroup_objs(): workgroups.setdefault(workgroup.school, []).append(workgroup.name) ret["workgroups"] = workgroups ret["type_name"] = self.type_name ret["type"] = self.__class__.__name__ ret["type"] = ret["type"][0].lower() + ret["type"][1:] return ret
[docs] def get_school_class_objs(self): # type: () -> List[SchoolClass] ret = [] for school, classes in iteritems(self.school_classes): for school_class in classes: ret.append(SchoolClass.cache(school_class, school)) return ret
[docs] def get_workgroup_objs(self): # type: () -> List[WorkGroup] ret = [] for school, workgroups in iteritems(self.workgroups): for workgroup in workgroups: ret.append(WorkGroup.cache(workgroup, school)) return ret
[docs] @classmethod def get_school_classes(cls, udm_obj, obj): # type: (UdmObject, "User") -> Dict[str, List[str]] school_classes = {} for group in udm_obj["groups"]: for school in obj.schools: if Group.is_school_class(school, group): school_class_name = cls.get_name_from_dn(group) school_classes.setdefault(school, []).append(school_class_name) return school_classes
[docs] @classmethod def get_workgroups(cls, udm_obj, obj): # type: (UdmObject, "User") -> Dict[str, List[str]] workgroups = {} for group in udm_obj["groups"]: for school in obj.schools: if Group.is_school_workgroup(school, group): workgroup_name = cls.get_name_from_dn(group) workgroups.setdefault(school, []).append(workgroup_name) return workgroups
[docs] @classmethod def get_container(cls, school): # type: (str) -> str return cls.get_search_base(school).users
[docs] @classmethod def lookup(cls, lo, school, filter_s="", superordinate=None): # type: (LoType, str, Optional[UldapFilter], Optional[SuperOrdinateType]) -> List[UdmObject] filter_object_type = conjunction( "&", [ parse(cls.type_filter), parse(filter_format("ucsschoolSchool=%s", [school])), ], ) if filter_s: filter_object_type = conjunction("&", [filter_object_type, parse(filter_s)]) objects = udm_modules.lookup( cls._meta.udm_module, None, lo, filter="{}".format(filter_object_type), scope="sub", superordinate=superordinate, ) objects.extend( obj for obj in super(User, cls).lookup(lo, school, filter_s, superordinate=superordinate) if not any(obj.dn == x.dn for x in objects) ) return objects
[docs] class Meta: udm_module = "users/user" name_is_unique = True allow_school_change = False
[docs] class Student(User): type_name = _("Student") type_filter = "(&(objectClass=ucsschoolStudent)(!(objectClass=ucsschoolExam)))" roles = [role_pupil] default_options = ("ucsschoolStudent",) default_roles = [role_student]
[docs] def do_school_change(self, udm_obj, lo, old_school): # type: (UdmObject, LoType, str) -> None try: exam_user = ExamStudent.from_student_dn(lo, old_school, self.old_dn) except noObject as exc: self.logger.info("No exam user for %r found: %s", self.old_dn, exc) else: self.logger.info("Removing exam user %r", exam_user.dn) exam_user.remove(lo) super(Student, self).do_school_change(udm_obj, lo, old_school)
[docs] @classmethod def get_container(cls, school): # type: (str) -> UdmObject return cls.get_search_base(school).students
[docs] @classmethod def get_exam_container(cls, school): # type: (str) -> str return cls.get_search_base(school).examUsers
[docs] def get_specific_groups(self, lo): # type: (LoType) -> List[str] groups = super(Student, self).get_specific_groups(lo) groups.extend(self.get_students_groups()) return groups
[docs] class Teacher(User): type_name = _("Teacher") type_filter = "(&(objectClass=ucsschoolTeacher)(!(objectClass=ucsschoolStaff)))" roles = [role_teacher] default_roles = [role_teacher] default_options = ("ucsschoolTeacher",)
[docs] @classmethod def get_container(cls, school): # type: (str) -> str return cls.get_search_base(school).teachers
[docs] def get_specific_groups(self, lo): # type: (LoType) -> List[str] groups = super(Teacher, self).get_specific_groups(lo) groups.extend(self.get_teachers_groups()) return groups
[docs] class SchoolAdmin(User): type_name = _("School Administrator") type_filter = "(objectClass=ucsschoolAdministrator)" roles = [role_school_admin] default_roles = [role_school_admin] default_options = ("ucsschoolAdministrator",)
[docs] @classmethod def get_container(cls, school): # type: (str) -> str return cls.get_search_base(school).admins
[docs] def get_specific_groups(self, lo): # type: (LoType) -> List[str] groups = super(SchoolAdmin, self).get_specific_groups(lo) groups.extend(self.get_school_admin_groups()) return groups
[docs] class Staff(User): school_classes = None type_name = _("Staff") roles = [role_staff] default_roles = [role_staff] type_filter = "(&(!(objectClass=ucsschoolTeacher))(objectClass=ucsschoolStaff))" default_options = ("ucsschoolStaff",)
[docs] @classmethod def get_container(cls, school): # type: (str) -> str return cls.get_search_base(school).staff
[docs] def get_samba_home_path(self, lo): # type: (LoType) -> None """Do not set sambaHomePath for staff users.""" return
[docs] def get_samba_home_drive(self): # type: () -> None """Do not set sambaHomeDrive for staff users.""" return
[docs] def get_samba_netlogon_script_path(self): # type: () -> None """Do not set sambaLogonScript for staff users.""" return
[docs] def get_profile_path(self, lo): # type: (LoType) -> None """Do not set sambaProfilePath for staff users.""" return
[docs] def get_school_class_objs(self): # type: () -> List[SchoolClass] return []
[docs] @classmethod def get_school_classes(cls, udm_obj, obj): # type: (UdmObject, "Staff") -> Dict[str, List[str]] return {}
[docs] def get_specific_groups(self, lo): # type: (LoType) -> List[str] groups = super(Staff, self).get_specific_groups(lo) groups.extend(self.get_staff_groups()) return groups
[docs] class TeachersAndStaff(Teacher): type_name = _("Teacher and Staff") type_filter = "(&(objectClass=ucsschoolStaff)(objectClass=ucsschoolTeacher))" roles = [role_teacher, role_staff] default_roles = [role_teacher, role_staff] default_options = ("ucsschoolStaff",)
[docs] @classmethod def get_container(cls, school): # type: (str) -> str return cls.get_search_base(school).teachersAndStaff
[docs] def get_specific_groups(self, lo): # type: (LoType) -> List[str] groups = super(TeachersAndStaff, self).get_specific_groups(lo) groups.extend(self.get_staff_groups()) return groups
[docs] class ExamStudent(Student): type_name = _("Exam student") type_filter = "(&(objectClass=ucsschoolStudent)(objectClass=ucsschoolExam))" default_roles = [role_exam_user] default_options = ("ucsschoolExam",)
[docs] @classmethod def get_container(cls, school): # type: (str) -> str return cls.get_search_base(school).examUsers
[docs] @classmethod def from_student_dn(cls, lo, school, dn): # type: (LoType, str, str) -> "ExamStudent" examUserPrefix = ucr.get("ucsschool/ldap/default/userprefix/exam", "exam-") dn = "uid=%s%s,%s" % ( escape_dn_chars(examUserPrefix), escape_dn_chars(explode_rdn(dn, True)[0]), cls.get_container(school), ) return cls.from_dn(dn, school, lo)