Source code for univention.management.console.modules.schoolgroups

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Management Console module:
#   Administration of groups
#
# Copyright 2012-2025 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.

from ldap.dn import explode_rdn
from ldap.filter import filter_format

import univention.admin.uexceptions as udm_exceptions
from ucsschool.lib.models.attributes import ValidationError
from ucsschool.lib.models.base import WrongObjectType
from ucsschool.lib.models.group import SchoolClass, SchoolGroup, WorkGroup
from ucsschool.lib.models.share import GroupShare
from ucsschool.lib.models.user import Teacher, TeachersAndStaff, User
from ucsschool.lib.school_umc_base import Display, SchoolBaseModule, SchoolSanitizer
from ucsschool.lib.school_umc_ldap_connection import (
    MACHINE_WRITE,
    USER_READ,
    USER_WRITE,
    LDAP_Connection,
)
from univention.lib.i18n import Translation
from univention.management.console.log import MODULE
from univention.management.console.modules import UMC_Error
from univention.management.console.modules.decorators import sanitize
from univention.management.console.modules.sanitizers import (
    DictSanitizer,
    LDAPSearchSanitizer,
    ListSanitizer,
    StringSanitizer,
)

_ = Translation("ucs-school-umc-groups").translate


[docs] def only_workgroup_admin(func): def _decorated(self, request, *args, **kwargs): if request.flavor != "workgroup-admin": raise UMC_Error("not supported") return func(self, request, *args, **kwargs) return _decorated
[docs] def get_group_class(request): if request.flavor in ("workgroup", "workgroup-admin"): return WorkGroup elif request.flavor == "teacher": return Teacher return SchoolClass
# TODO: remove once this is implemented in uexceptions, see Bug #30088
[docs] def get_exception_msg(e): msg = getattr(e, "message", "") if getattr(e, "args", False): if e.args[0] != msg or len(e.args) != 1: for arg in e.args: msg += " " + arg return msg
def _filter_users( input_users, school, flavor, ldap_machine_write=None, ): """ Validate users according to the modification type (flavor). Only users are checked, DNs of other objects (e.g., computers) are always kept. Checks: * User exists. * User belongs to the given school. * For flavor "workgroup-admin": * User has to be student, administrator, staff or teacher. * For flavor "class": * User must be a teacher. * For flavor "workgroup": * User must be a student. return: filtered list of users """ users = [] # add only certain users to the group for userdn in input_users: try: user = User.from_dn(userdn, None, ldap_machine_write) except WrongObjectType: # the object is not a user, so it will just be added without filtering users.append(userdn) MODULE.info("Adding non user object %r." % userdn) continue except udm_exceptions.noObject as exc: MODULE.error("Not adding not existing user %r to group: %r." % (userdn, exc)) continue if not user.schools or not set(user.schools) & {school}: raise UMC_Error( _("User %s does not belong to school %r.") % (Display.user(user.get_udm_object(ldap_machine_write)), school) ) if ( flavor == "workgroup-admin" and not user.is_student(ldap_machine_write) and not user.is_administrator(ldap_machine_write) and not user.is_staff(ldap_machine_write) and not user.is_teacher(ldap_machine_write) ): raise UMC_Error( _("User %s does not belong to school %r.") % (Display.user(user.get_udm_object(ldap_machine_write)), school) ) if flavor == "class" and not user.is_teacher(ldap_machine_write): raise UMC_Error( _("User %s is not a teacher.") % (Display.user(user.get_udm_object(ldap_machine_write)),) ) if flavor == "workgroup" and not user.is_student(ldap_machine_write): raise UMC_Error( _("User %s is not a student.") % (Display.user(user.get_udm_object(ldap_machine_write)),) ) users.append(user.dn) return users
[docs] class Instance(SchoolBaseModule):
[docs] @sanitize( school=SchoolSanitizer(required=True), pattern=LDAPSearchSanitizer( required=False, default="*", use_asterisks=True, add_asterisks=False ), ) @LDAP_Connection() def groups(self, request, ldap_user_read=None): school = request.options["school"] result = self._groups( ldap_user_read, school, SchoolGroup.get_container(school), pattern=request.options.get("pattern", None), ) self.finished(request.id, result)
[docs] @sanitize( school=SchoolSanitizer(required=True), pattern=LDAPSearchSanitizer(required=False, default="", use_asterisks=True, add_asterisks=False), ) @LDAP_Connection() def users(self, request, ldap_user_read=None, ldap_position=None): # parse group parameter group = request.options.get("group") user_type = None if not group or group == "None": group = None elif group.lower() in ("teacher", "student"): user_type = group.lower() group = None result = [ {"id": i.dn, "label": Display.user(i)} for i in self._users( ldap_user_read, request.options["school"], group=group, user_type=user_type, pattern=request.options["pattern"], ) ] self.finished(request.id, result)
[docs] @sanitize( pattern=LDAPSearchSanitizer(required=True, default="", use_asterisks=True, add_asterisks=True), school=SchoolSanitizer(required=True), ) @LDAP_Connection(USER_WRITE) def query(self, request, ldap_user_write=None, ldap_position=None): klasses = [get_group_class(request)] if klasses[0] is Teacher: klasses.append(TeachersAndStaff) school_prefix = False else: school_prefix = True groups = [] for klass in klasses: groups.extend( klass.get_all( ldap_user_write, request.options["school"], filter_str=request.options["pattern"], easy_filter=True, school_prefix=school_prefix, ) ) self.finished(request.id, [group.to_dict() for group in groups])
[docs] @sanitize(StringSanitizer(required=True)) @LDAP_Connection() def get(self, request, ldap_user_read=None, ldap_position=None): klass = get_group_class(request) for group_dn in request.options: break try: group = klass.from_dn(group_dn, None, ldap_user_read) except udm_exceptions.noObject: raise UMC_Error("unknown object") result = group.to_dict() if request.flavor == "workgroup-admin": result["create_share"] = GroupShare.from_school_group(group).exists(ldap_user_read) result["allowed_email_senders_groups"] = [ {"id": dn, "label": explode_rdn(dn, True)[0]} for dn in result["allowed_email_senders_groups"] ] umc_users = [] for user_dn in result["allowed_email_senders_users"]: user = User.from_dn(user_dn, None, ldap_user_read) umc_users.append( {"id": user_dn, "label": Display.user(user.get_udm_object(ldap_user_read))} ) result["allowed_email_senders_users"] = umc_users if result["email"]: result["create_email"] = True result["email_exists"] = True if request.flavor == "teacher": schools = group.schools classes = [] for school in schools: classes += SchoolClass.get_all( ldap_user_read, school, filter_str=filter_format("uniqueMember=%s", (group_dn,)) ) result["classes"] = [ {"id": class_.dn, "label": class_.get_relative_name(), "school": class_.school} for class_ in classes ] self.finished(request.id, [result]) return result["members"] = self._filter_members(request, group, result.pop("users", []), ldap_user_read) self.finished(request.id, [result])
@staticmethod def _filter_members(request, group, users, ldap_user_read=None): """Filter out group members that should no be shown in current module flavor.""" members = [] for member_dn in users: try: user = User.from_dn(member_dn, None, ldap_user_read) except udm_exceptions.noObject: MODULE.process( "Could not open (foreign) user %r: no permissions/does not exists/not a user" % (member_dn,) ) continue if not user.schools or not set(user.schools).intersection({group.school}): continue if request.flavor == "class" and not user.is_teacher(ldap_user_read): continue # only display teachers elif request.flavor == "workgroup" and not user.is_student(ldap_user_read): continue # only display students elif ( request.flavor == "workgroup-admin" and not user.is_student(ldap_user_read) and not user.is_administrator(ldap_user_read) and not user.is_staff(ldap_user_read) and not user.is_teacher(ldap_user_read) ): continue # only display school users members.append({"id": user.dn, "label": Display.user(user.get_udm_object(ldap_user_read))}) return members
[docs] @sanitize(DictSanitizer({"object": DictSanitizer({}, required=True)})) @LDAP_Connection(USER_READ, MACHINE_WRITE) def put(self, request, ldap_machine_write=None, ldap_user_read=None, ldap_position=None): """ Returns the objects for the given IDs requests.options = [ { object : ..., options : ... }, ... ] return: True|<error message> """ if request.flavor == "teacher": request.options = request.options[0]["object"] return self.add_teacher_to_classes(request) klass = get_group_class(request) for group_from_umc in request.options: group_from_umc = group_from_umc["object"] group_from_umc_dn = group_from_umc["$dn$"] break try: group_from_ldap = klass.from_dn(group_from_umc_dn, None, ldap_machine_write) except udm_exceptions.noObject: raise UMC_Error("unknown group object") old_members = self._filter_members( request, group_from_ldap, group_from_ldap.users, ldap_user_read ) removed_members = {o["id"] for o in old_members} - set(group_from_umc["members"]) MODULE.info( 'Modifying group "%s" with members: %s' % (group_from_ldap.dn, group_from_ldap.users) ) MODULE.info("New members: %s" % group_from_umc["members"]) MODULE.info("Removed members: %s" % (removed_members,)) if request.flavor == "workgroup-admin": # do not allow groups to be renamed in order to avoid conflicts with shares # grp.name = '%(school)s-%(name)s' % group if "description" in group_from_umc: group_from_ldap.description = group_from_umc["description"] # Workgroup admin view → update teachers, admins, students, (staff) # Class view → update only the group's teachers (keep all non teachers) # Workgroup teacher view → update only the group's students users = [] # keep specific users from the group for userdn in group_from_ldap.users: try: user = User.from_dn(userdn, None, ldap_machine_write) except udm_exceptions.noObject: # no permissions/is not a user/does not exists → keep the old value users.append(userdn) continue if not user.schools or not set(user.schools) & {group_from_ldap.school}: users.append(userdn) continue if ( (request.flavor == "class" and not user.is_teacher(ldap_machine_write)) or (request.flavor == "workgroup" and not user.is_student(ldap_machine_write)) or request.flavor == "workgroup-admin" ): users.append(userdn) users += _filter_users( group_from_umc["members"], group_from_ldap.school, request.flavor, ldap_machine_write, ) group_from_ldap.users = list(set(users) - removed_members) if group_from_umc.get("create_email", False): group_from_ldap.email = group_from_umc["email"] group_from_ldap.allowed_email_senders_groups = group_from_umc["allowed_email_senders_groups"] group_from_ldap.allowed_email_senders_users = group_from_umc["allowed_email_senders_users"] else: group_from_ldap.email = "" group_from_ldap.allowed_email_senders_groups = [] group_from_ldap.allowed_email_senders_users = [] try: success = group_from_ldap.modify(ldap_machine_write) MODULE.info("Modified, group has now members: %s" % (group_from_ldap.users,)) except udm_exceptions.base as exc: MODULE.process( 'An error occurred while modifying "%s": %s' % (group_from_umc["$dn$"], get_exception_msg(exc)) ) raise UMC_Error(_("Failed to modify group (%s).") % get_exception_msg(exc)) self.finished(request.id, success)
[docs] @sanitize(DictSanitizer({"object": DictSanitizer({}, required=True)})) @only_workgroup_admin @LDAP_Connection(USER_READ, USER_WRITE) def add(self, request, ldap_user_write=None, ldap_user_read=None, ldap_position=None): for group in request.options: group = group["object"] break try: grp = { "school": group["school"], "name": "%(school)s-%(name)s" % group, "description": group["description"], "users": _filter_users( group["members"], str(group["school"]), request.flavor, ldap_user_write, ), "create_share": group.get("create_share", True), } if group.get("create_email", False): grp["email"] = group.get("email", "") grp["allowed_email_senders_groups"] = group.get("allowed_email_senders_groups", []) grp["allowed_email_senders_users"] = group.get("allowed_email_senders_users", []) grp = WorkGroup(**grp) try: success = grp.create(ldap_user_write) except ValidationError as exc: raise UMC_Error( _( "One or more errors during validation of the group occured:\n{}".format( # noqa: INT002 exc ) ) ) if not success and grp.exists(ldap_user_read): raise UMC_Error(_("The workgroup %r already exists!") % grp.name) except udm_exceptions.base as exc: MODULE.process( 'An error occurred while creating the group "%s": %s' % (group["name"], get_exception_msg(exc)) ) raise UMC_Error(_("Failed to create group (%s).") % get_exception_msg(exc)) self.finished(request.id, success)
[docs] @sanitize(DictSanitizer({"object": ListSanitizer(min_elements=1)})) @only_workgroup_admin @LDAP_Connection(USER_READ, USER_WRITE) def remove(self, request, ldap_user_write=None, ldap_user_read=None, ldap_position=None): """Deletes a workgroup""" errors = [] for group_dn in request.options[0]["object"]: group = WorkGroup.from_dn(group_dn, None, ldap_user_write) if not group.school: errors.append("Group must within the scope of a school OU: %s" % group_dn) try: group.remove(ldap_user_write) except udm_exceptions.base as exc: errors.append(get_exception_msg(exc)) MODULE.error('Could not remove group "%s": %s' % (group.dn, exc)) if len(errors) > 0: self.finished(request.id, [{"success": False, "message": "\n".join(errors)}]) self.finished(request.id, [{"success": True}])
[docs] @sanitize( **{ "$dn$": StringSanitizer(required=True), "classes": ListSanitizer(StringSanitizer(required=True), required=True), } ) @LDAP_Connection(USER_READ, MACHINE_WRITE) def add_teacher_to_classes( self, request, ldap_machine_write=None, ldap_user_read=None, ldap_position=None ): teacher = request.options["$dn$"] classes = set(request.options["classes"]) try: teacher = Teacher.from_dn(teacher, None, ldap_machine_write) if not teacher.is_teacher(ldap_machine_write): raise udm_exceptions.noObject() except udm_exceptions.noObject: raise UMC_Error("The user is not a teacher.") original_classes = set() for school in teacher.schools: original_classes.update( x.dn for x in SchoolClass.get_all( ldap_user_read, school, filter_format("uniqueMember=%s", (teacher.dn,)) ) ) classes_to_remove = original_classes - classes classes_to_add = classes - original_classes failed = [] for classdn in classes_to_add | classes_to_remove: try: class_ = SchoolClass.from_dn(classdn, None, ldap_user_read) except udm_exceptions.noObject as exc: failed.append(classdn) MODULE.error("Could not load class %s: %s" % (classdn, exc)) continue if classdn in classes_to_add and teacher.dn not in class_.users: MODULE.info("Adding teacher %s to class %s" % (teacher.dn, classdn)) class_.users.append(teacher.dn) elif classdn in classes_to_remove and teacher.dn in class_.users: MODULE.info("Removing teacher %s from class %s" % (teacher.dn, classdn)) class_.users.remove(teacher.dn) try: if not class_.modify(ldap_machine_write): failed.append(classdn) MODULE.error( "Could not add teacher %s to class %s (udm modify failed)" % (teacher.dn, classdn) ) except udm_exceptions.base as exc: MODULE.error( "Could not add teacher %s to class %s: %s" % (teacher.dn, classdn, get_exception_msg(exc)) ) failed.append(classdn) self.finished(request.id, not any(failed))