Source code for univention.management.console.modules.schoolexam

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Management Console
#  Starts a new exam for a specified computer room
#
# Copyright 2013-2025 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.

import datetime
import logging
import os
import shutil
import subprocess
import tempfile
import time
import traceback
from itertools import chain
from typing import TYPE_CHECKING, List, Optional  # noqa: F401

import ldap
from ldap.dn import escape_dn_chars
from ldap.filter import filter_format
from samba.auth_util import system_session_unix
from samba.dcerpc import security
from samba.ntacls import getntacl, setntacl
from samba.param import LoadParm
from samba.samba3 import param

import univention.debug as ud
from ucsschool.lib import internetrules
from ucsschool.lib.models.base import WrongObjectType
from ucsschool.lib.models.group import ComputerRoom, Group
from ucsschool.lib.models.user import Student, User
from ucsschool.lib.models.utils import (
    ModuleHandler,
    NotInstalled,
    UnknownPackage,
    get_package_version,
)
from ucsschool.lib.roles import (
    context_type_exam,
    create_ucsschool_role_string,
    get_role_info,
    role_exam_user,
)
from ucsschool.lib.school_umc_base import Display, SchoolBaseModule, SchoolSanitizer
from ucsschool.lib.school_umc_ldap_connection import LDAP_Connection
from ucsschool.lib.schoolldap import SchoolSearchBase
from ucsschool.lib.schoollessons import SchoolLessons
from univention.admin.uexceptions import noObject
from univention.lib.i18n import Translation
from univention.lib.misc import custom_groupname
from univention.lib.umc import Client, ConnectionError as UMCConnectionError, Forbidden, HTTPError
from univention.management.console.config import ucr
from univention.management.console.modules import UMC_Error, computerroom
from univention.management.console.modules.decorators import (
    SimpleThread,
    file_upload,
    sanitize,
    simple_response,
)
from univention.management.console.modules.distribution import compare_dn
from univention.management.console.modules.sanitizers import (
    ChoicesSanitizer,
    DictSanitizer,
    DNSanitizer,
    ListSanitizer,
    PatternSanitizer,
    StringSanitizer,
)
from univention.management.console.modules.schoolexam import util

if TYPE_CHECKING:
    from univention.admin.uldap import access as LoType  # noqa: F401

_ = Translation("ucs-school-umc-exam").translate

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
if "schoolexam" not in list(logger.handlers):
    _module_handler = ModuleHandler(udebug_facility=ud.MODULE)
    _module_handler.set_name("schoolexam")
    _formatter = logging.Formatter(fmt="%(funcName)s:%(lineno)d  %(message)s")
    _module_handler.setFormatter(_formatter)
    logger.addHandler(_module_handler)


[docs] def load_smb_default_file() -> Optional[LoadParm]: """ Try to load the default samba shares.conf and retry if this fails. Bug 57367 :raises: UMC_Error """ lp = LoadParm() attempts = 5 for i in range(attempts): try: lp.load_default() return lp except RuntimeError: logger.warning( "Failed to load samba config. " "Please check /etc/samba/shares.conf and linked file. " "Retrying in a couple of seconds." ) time.sleep(2) else: logger.error(f"Failed to load the samba config {attempts} times.") raise UMC_Error( _( "Please contact an administrator.\n" "An error occurred while loading one of the samba share configuration files " "(see configuration file /etc/samba/shares.conf and configuration files" " below /etc/samba/shares.conf.d/). " "Known causes for this are wrong file permissions and typos in the configuration files." ) )
[docs] class Instance(SchoolBaseModule): def __init__(self): SchoolBaseModule.__init__(self) self._log_package_version("ucs-school-umc-exam") self._tmpDir = None self._progress_state = util.Progress(logger=logger) # TODO: replace with mixins.Progress self._lessons = SchoolLessons() self.lp = None
[docs] def init(self): SchoolBaseModule.init(self) # initiate paths for data distribution util.distribution.initPaths() self.lp = load_smb_default_file()
[docs] def destroy(self): # clean temporary data self._cleanTmpDir()
@staticmethod def _log_package_version(package_name): # type: (str) -> None try: logger.info( "Package %r installed in version %r.", package_name, get_package_version(package_name) ) except (NotInstalled, UnknownPackage) as exc: logger.error("Error retrieving package verion: %s", exc) def _cleanTmpDir(self): # copied from distribution module # clean up the temporary upload directory if self._tmpDir: logger.info("Clean up temporary directory: %s", self._tmpDir) shutil.rmtree(self._tmpDir, ignore_errors=True) self._tmpDir = None def _get_computerroom_module(self, request): room_module = computerroom.Instance() room_module.prepare(request) return room_module
[docs] def deny_owner_change_permissions(self, filename: str) -> None: """ A user gets full control over her permissions by default. The SDDL string of the home dir is changed to - forbid exam-users to change the permissions and owner - overrides standard behavior, i.e. full control, with 'edit' for owners. """ # s3conf needs to be loaded for direct_db_access=False in getntacl, otherwise you get an error s3conf = param.get_context() s3conf.load(self.lp.configfile) dacl = getntacl(self.lp, filename, system_session_unix(), direct_db_access=False) owner_sid = dacl.owner_sid # deny user change of permissions and owner new_deny_ace = security.ace() new_deny_ace.trustee = owner_sid new_deny_ace.type = security.SEC_ACE_TYPE_ACCESS_DENIED new_deny_ace.flags = ( security.SEC_ACE_FLAG_OBJECT_INHERIT | security.SEC_ACE_FLAG_CONTAINER_INHERIT ) new_deny_ace.access_mask = security.SEC_STD_WRITE_OWNER | security.SEC_STD_WRITE_DAC if new_deny_ace not in dacl.dacl.aces: dacl.dacl_add(new_deny_ace, 0) # Deny acl should come first for trustee in [security.dom_sid(security.SID_OWNER_RIGHTS), owner_sid]: new_allow_ace = security.ace() new_allow_ace.trustee = trustee new_allow_ace.flags = ( security.SEC_ACE_FLAG_OBJECT_INHERIT | security.SEC_ACE_FLAG_CONTAINER_INHERIT ) new_allow_ace.type = security.SEC_ACE_TYPE_ACCESS_ALLOWED new_allow_ace.access_mask = ( security.SEC_RIGHTS_FILE_WRITE | security.SEC_RIGHTS_FILE_READ | security.SEC_RIGHTS_FILE_EXECUTE | security.SEC_STD_DELETE ) if new_allow_ace not in dacl.dacl.aces: dacl.dacl_add(new_allow_ace) dacl.type = ( security.SEC_DESC_DACL_PRESENT | security.SEC_DESC_DACL_AUTO_INHERITED | security.SEC_DESC_DACL_PROTECTED | security.SEC_DESC_SELF_RELATIVE ) logger.debug("set nt acls {} on {}".format(dacl.as_sddl(), filename)) setntacl(self.lp, filename, dacl.as_sddl(), owner_sid, system_session_unix())
[docs] def set_nt_acls_on_exam_folders(self, exam_users: List[User]) -> None: """ Sets NT ACLs for exam users home dirs :param exam_users: """ logger.info("users=%r", [u.username for u in exam_users]) for exam_user in exam_users: folder = exam_user.unixhome for root, _sub, files in os.walk(folder): self.deny_owner_change_permissions(filename=root) for f in files: self.deny_owner_change_permissions(filename=str(os.path.join(root, f)))
[docs] @staticmethod def set_datadir_immutable_flag(users, project, flag=True): """ Sets or unsets the immutable bit on the recipients datadir depending on the flag :param project: The project to calculate the project directory :param users: The users to (un)set the immutable bit for :param flag: True to set the flag, False to unset """ logger.info("users=%r project=%r flag=%r", [u.username for u in users], project.name, flag) modifier = "+i" if flag else "-i" for user in users: # make datadir immutable datadir = os.path.dirname(project.user_projectdir(user).rstrip("/")) if os.path.exists(datadir): try: subprocess.check_call(["/usr/bin/chattr", modifier, datadir]) # nosec except subprocess.CalledProcessError: logger.error("Could not set the immutable bit on %r", datadir)
[docs] @file_upload @sanitize( DictSanitizer( {"filename": StringSanitizer(required=True), "tmpfile": StringSanitizer(required=True)}, required=True, ) ) def upload(self, request): # copied from distribution module # create a temporary upload directory, if it does not already exist logger.info("request.options=%r", request.options) if not self._tmpDir: self._tmpDir = tempfile.mkdtemp(prefix="ucsschool-exam-upload-") logger.info("upload() Created temporary directory: %r", self._tmpDir) for file in request.options: filename = file["filename"] if "\\" in filename: # filename seems to be a UNC / windows path filename = filename.rsplit("\\", 1)[-1] or filename.replace("\\", "_").lstrip("_") logger.info( "Filename seems to contain Windows path name or UNC - " "fixing filename: %r as %r", file["filename"], filename, ) destPath = os.path.join(self._tmpDir, filename) logger.info("upload() Received file %r, saving it to %r", file["tmpfile"], destPath) shutil.move(file["tmpfile"], destPath) self.finished(request.id, None)
[docs] @simple_response def internetrules(self): # copied from computerroom module """Returns a list of available internet rules""" return [x.name for x in internetrules.list()]
[docs] @simple_response def lesson_end(self): current = self._lessons.current if current is not None: return current.end.strftime("%H:%M") return (datetime.datetime.now() + datetime.timedelta(minutes=45)).strftime("%H:%M")
[docs] @simple_response def progress(self): return self._progress_state.poll()
@LDAP_Connection() def _user_can_modify(self, user, exam, ldap_user_read=None): """ Checks whether the given user is allowed to modify the given exam or not. Domain Admin: Can always modify School Admin: Can modify if exam owner is in own school Else: if owner is caller :param user: The user school object :param exam: The exam to be modified :return: True if user can modify else False """ logger.info("user=%r exam=%r", user, exam) if user.dn == exam.sender.dn: return True sender_user = User.from_dn(exam.sender.dn, None, ldap_user_read) if ( user.is_administrator(ldap_user_read) and len(set(sender_user.schools).intersection(user.schools)) != 0 ): return True admin_group_dn = "cn=%s,cn=groups,%s" % ( escape_dn_chars(custom_groupname("Domain Admins", ucr)), ucr["ldap/base"], ) return admin_group_dn in user.get_udm_object(ldap_user_read)["groups"] @LDAP_Connection() def _save_exam(self, request, update=False, ldap_user_read=None): """ Creates or updates an exam with the information given in the request object :param request: The request containing all information about the exam :param update: If True it is expected that an exam with the same name already exists and will be updated :return: univention.management.console.modules.distribution.util.Project :raises: UMC_Error """ logger.info("request.options=%r update=%r", request.options, update) # create a User object for the teacher sender = util.distribution.openRecipients(request.user_dn, ldap_user_read) recipients = [ util.distribution.openRecipients(i_dn, ldap_user_read) for i_dn in request.options.get("recipients", []) ] recipients = [recipient for recipient in recipients if recipient] new_values = { "name": request.options["directory"], "description": request.options["name"], "files": request.options.get("files"), "sender": sender, "room": request.options["room"], "recipients": recipients, "deadline": request.options["examEndTime"], } if not sender: raise UMC_Error(_('Could not authenticate user "%s"!') % request.user_dn) project = util.distribution.Project.load(request.options.get("name", "")) logger.info("loaded project=%r", project) orig_files = [] if update: if not project: raise UMC_Error( _("The specified exam does not exist: %s") % request.options.get("name", "") ) # make sure that the project owner himself is modifying the project if not compare_dn(project.sender.dn, request.user_dn): raise UMC_Error(_("The exam can only be modified by the owner himself.")) if project.isDistributed: raise UMC_Error(_("The exam was already started and can not be modified anymore!")) orig_files = project.files logger.info("updating project=%r with new_values=%r", project, new_values) project.update(new_values) else: if project: raise UMC_Error( _( 'An exam with the name "%s" already exists. Please choose a different name ' "for the exam." ) % new_values["name"] ) project = util.distribution.Project(new_values) logger.info("project=%r", project) try: project.validate() except ValueError as exc: raise UMC_Error(str(exc)) project.save() # copy files into project directory if self._tmpDir: for i_file in project.files: i_src = os.path.join(self._tmpDir, i_file) i_target = os.path.join(project.cachedir, i_file) if os.path.exists(i_src): # copy file to cachedir shutil.move(i_src, i_target) os.chown(i_target, 0, 0) if update: for i_file in set(orig_files) - set(project.files): i_target = os.path.join(project.cachedir, i_file) try: os.remove(i_target) except OSError: pass return project @LDAP_Connection() def _delete_exam(self, name, ldap_user_read=None): """ Deletes an exam project file including the uploaded data if the exam was not started yet and the caller is authorized to do so. :param name: Name of the exam to delete :return: True if exam was deleted, else False """ logger.info("name=%r", name) exam = util.distribution.Project.load(name) logger.info("loaded exam=%r", exam.dict) if not exam: return False if exam.isDistributed: return False if not self._user_can_modify(User.from_dn(ldap_user_read.whoami(), None, ldap_user_read), exam): return False logger.info("purge exam=%r", exam.dict) exam.purge() return True
[docs] @sanitize(StringSanitizer(required=True)) def get(self, request): logger.info("request.options=%r", request.options) result = [] for project in [util.distribution.Project.load(iid) for iid in request.options]: if not project: continue logger.info("loaded project=%r", project) # .dict) # make sure that only the project owner himself (or an admin) is able # to see the content of a project if not compare_dn(project.sender.dn, request.user_dn): raise UMC_Error( _("Exam details are only visible to the exam owner himself."), status=403 ) props = project.dict props["sender"] = props["sender"].username recipients = [] for recip in props["recipients"]: recipients.append( { "id": recip.dn, "label": (recip.type == util.distribution.TYPE_USER and Display.user(recip.dict)) or recip.name, } ) props["recipients"] = recipients props["examEndTime"] = props["deadline"] result.append(props) self.finished(request.id, result)
[docs] @sanitize( name=StringSanitizer(required=True), room=StringSanitizer(required=True), school=SchoolSanitizer(required=True), directory=StringSanitizer(required=True), shareMode=StringSanitizer(required=True), internetRule=StringSanitizer(required=True), customRule=StringSanitizer(), examEndTime=StringSanitizer(required=True), recipients=ListSanitizer(StringSanitizer(minimum=1), required=True), files=ListSanitizer(StringSanitizer()), ) def add(self, request): self._save_exam(request) self.finished(request.id, True)
[docs] @sanitize(exams=ListSanitizer(StringSanitizer(minimum=1), required=True)) def delete(self, request): result = [] for exam in request.options["exams"]: result.append(self._delete_exam(exam)) self.finished(request.id, result)
[docs] @sanitize( name=StringSanitizer(required=True), room=StringSanitizer(required=True), school=SchoolSanitizer(required=True), directory=StringSanitizer(required=True), shareMode=StringSanitizer(required=True), internetRule=StringSanitizer(required=True), customRule=StringSanitizer(), examEndTime=StringSanitizer(required=True), recipients=ListSanitizer(StringSanitizer(minimum=1), required=True), files=ListSanitizer(StringSanitizer()), ) def put(self, request): self._save_exam(request, update=True) self.finished(request.id, True)
[docs] @sanitize( name=StringSanitizer(required=True), room=StringSanitizer(required=True), school=SchoolSanitizer(required=True), directory=StringSanitizer(required=True), shareMode=StringSanitizer(required=True), internetRule=StringSanitizer(required=True), customRule=StringSanitizer(), examEndTime=StringSanitizer(required=True), recipients=ListSanitizer(StringSanitizer(minimum=1), required=True), files=ListSanitizer(StringSanitizer()), ) @LDAP_Connection() def start_exam(self, request, ldap_user_read=None, ldap_position=None): logger.info("request.options=%r", request.options) # reset the current progress state # steps: # 5 -> for preparing exam room # 25 -> for cloning users # 25 -> for each replicated users + copy of the profile directory # 20 -> distribution of exam files # 10 -> setting room properties progress = self._progress_state progress.reset(85) progress.component(_("Initializing")) # create that holds a reference to project, otherwise _thread() cannot # set the project variable in the scope of start_exam: my = type("", (), {"project": None})() # create a User object for the teacher # perform this LDAP operation outside the thread, to avoid tracebacks # in case of an LDAP timeout sender = util.distribution.openRecipients(request.user_dn, ldap_user_read) if not sender: raise UMC_Error(_('Could not authenticate user "%s"!') % request.user_dn) def _thread(): project = util.distribution.Project.load(request.options.get("name", "")) logger.info("loaded project=%r", project) directory = request.options["directory"] if project: my.project = self._save_exam(request, update=True, ldap_user_read=ldap_user_read) else: my.project = self._save_exam(request, update=False, ldap_user_read=ldap_user_read) logger.info("after saving exam: my.project=%r", my.project) # open a new connection to the Primary Directory Node UMC try: master = ucr["ldap/master"] client = Client(master) client.authenticate_with_machine_account() except (UMCConnectionError, HTTPError) as exc: logger.error("start_exam() Could not connect to UMC on %s: %s", master, exc) raise UMC_Error( _("Could not connect to Primary Directory Node %s.") % ucr.get("ldap/master") ) # mark the computer room for exam mode progress.component(_("Preparing the computer room for exam mode...")) client.umc_command( # noqa: B018 "schoolexam-master/set-computerroom-exammode", {"school": request.options["school"], "roomdn": request.options["room"]}, ).result # FIXME: no error handling progress.add_steps(5) # read all recipients and fetch all user objects users = [] for idn in request.options["recipients"]: ientry = util.distribution.openRecipients(idn, ldap_user_read) if not ientry: continue # recipients can in theory be users or groups members = [] if isinstance(ientry, util.distribution.User): members = [ientry] elif isinstance(ientry, util.distribution.Group): members = ientry.members for entry in members: # ignore all users except students user = User.from_dn(entry.dn, None, ldap_user_read) if user.is_student(ldap_user_read) and not user.is_exam_student(ldap_user_read): users.append(entry) # start to create exam user accounts progress.component(_("Preparing exam accounts")) percentPerUser = 25.0 / (1 + len(users)) examUsers = set() student_dns = set() usersReplicated = set() for num, iuser in enumerate(users, start=1): logger.info( "start_exam() Requesting exam user %02d/%02d to be created: %r", num, len(users), iuser.dn, ) progress.info( "(%02d/%02d) %s, %s (%s)" % (num, len(users), iuser.lastname, iuser.firstname, iuser.username) ) try: ires = client.umc_command( "schoolexam-master/create-exam-user", { "school": request.options["school"], "userdn": iuser.dn, "room": request.options["room"], "exam": request.options["name"], }, ).result if not ires: # occurs if disabled user gets ignored continue examuser_dn = ires.get("examuserdn") examUsers.add(examuser_dn) student_dns.add(iuser.dn) logger.info("start_exam() Exam user has been created: %r", examuser_dn) except (UMCConnectionError, HTTPError) as exc: logger.warning( "start_exam() Could not create exam user account for %r: %s", iuser.dn, exc ) # indicate the the user has been processed progress.add_steps(percentPerUser) logger.info( "start_exam() Sending DNs to add to group to Primary Directory Node: %r", student_dns ) client.umc_command( "schoolexam-master/add-exam-users-to-groups", {"users": list(student_dns), "school": request.options["school"]}, ) progress.add_steps(percentPerUser) # wait for the replication of all users to be finished progress.component(_("Preparing user home directories")) recipients = [] # list of User objects for all exam users openAttempts = 30 * 60 # wait max. 30 minutes for replication while (len(examUsers) > len(usersReplicated)) and (openAttempts > 0): openAttempts -= 1 logger.info( "start_exam() waiting for replication to be finished, %d user objects missing", len(examUsers) - len(usersReplicated), ) for idn in examUsers - usersReplicated: try: ldap_user_read.get(idn, required=True) except ldap.NO_SUCH_OBJECT: continue # not replicated yet iuser = util.distribution.openRecipients(idn, ldap_user_read) if not iuser: continue # not a users/user object logger.info("user has been replicated: %r", idn) # Bug #52307: # Creating two exams quickly in succession leads to the # second exam mode using the same UIDs as the first. # -> clear user name cache to force Samba to get the # new UID from ldap. logger.info("Clear user name cache...") cmd = ["/usr/sbin/sss_cache", "-U"] if subprocess.call(cmd): # nosec logger.error("Clearing user name cache failed: %s", " ".join(cmd)) else: logger.info("Clearing user name cache finished successfully.") # store User object in list of final recipients recipients.append(iuser) # mark the user as replicated usersReplicated.add(idn) progress.info( f"({len(usersReplicated):02d}/{len(examUsers):02d}) {iuser.lastname}, " f"{iuser.firstname} ({iuser.username})" ) progress.add_steps(percentPerUser) # wait a second time.sleep(1) progress.add_steps(percentPerUser) if openAttempts <= 0: logger.error( "replication timeout - %d user objects missing: %r ", (len(examUsers) - len(usersReplicated)), (examUsers - usersReplicated), ) raise UMC_Error(_("Replication timeout: could not create all exam users")) # update the final list of recipients my.project.recipients = recipients my.project.save() # update local NSS group cache progress.info("Updating local nss group cache...") if ucr.is_true("nss/group/cachefile", True): cmd = ["/usr/lib/univention-pam/ldap-group-to-file.py"] if ucr.is_true("nss/group/cachefile/check_member", False): cmd.append("--check_member") logger.info("Updating local nss group cache...") if subprocess.call(cmd): # nosec logger.error("Updating local nss group cache failed: %s", " ".join(cmd)) else: logger.info("Update of local nss group cache finished successfully.") # update permissions of PDFprinter queues progress.component(_("Updating printer queue permissions")) progress.info("") for recipient in recipients: print_queue = f"/var/spool/cups-pdf/{recipient.username}" if os.path.exists(print_queue): try: logger.debug(f"Updating owner of {print_queue}") shutil.chown(print_queue, user=recipient.username) except Exception: logger.error("Cannot update permisions of {print_queue}: {exc}") # distribute exam files progress.component(_("Distributing exam files")) progress.info("") exam_students = [ s for s in my.project.getRecipients() if User.from_dn(s.dn, None, ldap_user_read).is_exam_student(ldap_user_read) ] Instance.set_datadir_immutable_flag(exam_students, my.project, False) my.project.distribute() self.set_nt_acls_on_exam_folders(exam_students) Instance.set_datadir_immutable_flag(exam_students, my.project, True) progress.add_steps(20) # prepare room settings via lib... # first step: acquire room # second step: adjust room settings progress.component(_("Prepare room settings")) room = request.options["room"] logger.info("Acquire room: %s", room) room_module = self._get_computerroom_module(request) room_module._room_acquire(request, request.options["room"], ldap_user_read) progress.add_steps(1) logger.info( "Adjust room settings:\n%s", "\n".join(f" {k}={v}" for k, v in request.options.items()), ) room_module._start_exam( request, room, directory, request.options["name"], request.options.get("examEndTime") ) progress.add_steps(4) room_module._settings_set( request, "default", request.options["internetRule"], request.options["shareMode"], customRule=request.options.get("customRule"), ) # wait for samba-replication progress.add_steps(5) def _finished(thread, result, request): logger.info("result=%r", result) # mark the progress state as finished progress.info(_("finished...")) progress.finish() # finish the request at the end in order to force the module to keep # running until all actions have been completed success = not isinstance(result, BaseException) try: if my.project: my.project.starttime = datetime.datetime.now() my.project.save() except Exception: logger.exception("Could not save new project starttime.") if success: response = {"success": True} # remove uploaded files from cache self._cleanTmpDir() else: msg = str(result) response = result if not isinstance(result, UMC_Error): msg = "".join(traceback.format_exception(*thread.exc_info)) # FIXME progress.error(msg) try: # in case a distribution project has already be written to disk, purge it if my.project: logger.info("purge my.project=%r", my.project) my.project.purge() except Exception: logger.exception("Could not purge project.") self.thread_finished_callback(thread, response, request) thread = SimpleThread("start_exam", _thread, lambda t, r: _finished(t, r, request)) thread.run()
[docs] @sanitize(exam=StringSanitizer(required=True)) @simple_response def collect_exam(self, exam): logger.info("exam=%r", exam) project = util.distribution.Project.load(exam) if not project: raise UMC_Error(_("No files have been distributed")) logger.info("loaded project=%r", project) project.collect() return True
[docs] @sanitize(room=DNSanitizer(required=True)) @LDAP_Connection() def validate_room(self, request, ldap_user_read=None, ldap_position=None): error = None dn = request.options["room"] room = ComputerRoom.from_dn(dn, None, ldap_user_read) if not room.hosts: # FIXME: raise UMC_Error() error = ( _( 'Room "%s" does not contain any computers. Empty rooms may not be used to start an ' "exam." ) % room.get_relative_name() ) self.finished(request.id, error)
[docs] @sanitize(room=StringSanitizer(required=True), exam=StringSanitizer(required=True)) @LDAP_Connection() def finish_exam(self, request, ldap_user_read=None): logger.info("request.options=%r", request.options) # reset the current progress state # steps: # 10 -> collecting exam files # 5 -> for preparing exam room # 25 -> for cloning users progress = self._progress_state progress.reset(40) progress.component(_("Initializing")) # try to open project file project = util.distribution.Project.load(request.options.get("exam")) logger.info("loaded project=%r", project) if not project: # the project file does not exist... ignore problem logger.warning( "The project file for exam %s does not exist. Ignoring and finishing exam mode.", request.options.get("exam"), ) def _thread(): # perform all actions inside a thread... # collect files progress.component(_("Collecting exam files...")) if project: project.collect() # remove immutable bit from folders progress.add_steps(10) # open a new connection to the Primary Directory Node UMC master = ucr["ldap/master"] try: client = Client(master) client.authenticate_with_machine_account() except (UMCConnectionError, HTTPError) as exc: logger.error("Could not connect to UMC on %s: %s", master, exc) raise UMC_Error(_("Could not connect to Primary Directory Node %s.") % (master,)) school = SchoolSearchBase.getOU(request.options["room"]) # unset exam mode for the given computer room progress.component(_("Configuring the computer room...")) client.umc_command( # noqa: B018 "schoolexam-master/unset-computerroom-exammode", {"roomdn": request.options["room"], "school": school}, ).result progress.add_steps(5) # delete exam users accounts if project: # get a list of user accounts in parallel exams exam_role_str = create_ucsschool_role_string( role_exam_user, f"{project.name}-{school}", context_type_exam ) recipients = ldap_user_read.search( filter_format( "(&(ucsschoolRole=%s)(univentionObjectType=users/user))", (exam_role_str,) ), attr=["ucsschoolRole", "uid"], ) # This is needed for backwards compatibility with any Primary Directory Node # that is not updated to use roles for exam membership yet. exam_roles_exist = any( True for user in recipients if len( [ role for role in user[1]["ucsschoolRole"] if get_role_info(role.decode("UTF-8"))[1] == context_type_exam ] ) > 0 ) if exam_roles_exist and len(recipients) != len(project.recipients): logger.warning( "Found %d recipients through exam role, but %d through the project.", len(recipients), len(project.recipients), ) parallel_users_local = { iuser.dn: iproject.description for iproject in util.distribution.Project.list(only_distributed=True) if iproject.name != project.name for iuser in iproject.recipients } logger.info("parallel_users_local=%r", parallel_users_local) progress.component(_("Removing exam accounts")) percentPerUser = 25.0 / (1 + len(project.recipients)) # Bug #51199: # The following block speeds up the removal of several exam users by reducing the number # of LDAP group changes. This is especially relevant if the exam users are included in # many large groups (e.g. in several schools with many students). Each group change is # very time consuming for large groups. # Therefore, the group changes are first aggregated for several exam users and executed # as one LDAP modification per group. Only after that the Exam users are actually # deleted. users_to_reduce = [] for recipient_dn, recipient_attrs in recipients: exam_roles = [ role for role in recipient_attrs["ucsschoolRole"] if get_role_info(role.decode("UTF-8"))[1] == context_type_exam ] if len(exam_roles) == 1: users_to_reduce.append(recipient_dn) if users_to_reduce: logger.info( "Removing non-primary groups of %d users (of %d total).", len(users_to_reduce), len(recipients), ) umc_cmd = "schoolexam-master/remove-users-from-non-primary-groups" try: client.umc_command( # noqa: B018 umc_cmd, {"userdns": users_to_reduce, "exam": request.options["exam"]} ).result except Forbidden as exc: # Primary Directory Node has old package. No problem, as users will still be # deleted in the next step, just slower. logger.warning( "Forbidden (HTTP %r): Primary Directory Node doesn't know UMC command %r. " "Skipping non-primary-groups-removal optimization step.", exc.code, umc_cmd, ) else: logger.info("No users to remove non-primary groups found.") logger.info("Deleting %d recipients...", len(project.recipients)) for num, iuser in enumerate(project.recipients, start=1): progress.info( f"({num:02d}/{len(project.recipients):02d}) {iuser.lastname}, {iuser.firstname} " f"({iuser.username})" ) try: if exam_roles_exist or iuser.dn not in parallel_users_local: # remove LDAP user entry client.umc_command( # noqa: B018 "schoolexam-master/remove-exam-user", {"userdn": iuser.dn, "school": school, "exam": request.options["exam"]}, ).result if iuser.dn not in parallel_users_local: Instance.set_datadir_immutable_flag([iuser], project, False) # remove first the home directory, if enabled if ucr.is_true("ucsschool/exam/user/homedir/autoremove", False): shutil.rmtree(iuser.unixhome, ignore_errors=True) logger.info("Exam user has been removed: %r", iuser.dn) except (UMCConnectionError, HTTPError) as e: logger.warning("Could not remove exam user account %r: %s", iuser.dn, e) # indicate the user has been processed progress.add_steps(percentPerUser) logger.info("Finished removing exam accounts.") progress.add_steps(percentPerUser) def _finished(thread, result): # mark the progress state as finished logger.info("result=%r", result) progress.info(_("finished...")) # running until all actions have been completed if isinstance(result, BaseException): logger.error("Exception during exam_finish: %s", result) progress.error(_("An unexpected error occurred during the preparation: %s") % result) response = {"success": False} else: response = {"success": True} if project: logger.info("purge project=%r", project) try: project.purge() except Exception: logger.exception("Could not purge project.") # remove uploaded files from cache self._cleanTmpDir() progress.finish() self.thread_finished_callback(thread, response, request) thread = SimpleThread("start_exam", _thread, _finished) thread.run()
[docs] @sanitize( pattern=PatternSanitizer(required=False, default=".*"), filter=ChoicesSanitizer(["all", "private"], default="private"), ) @LDAP_Connection() def query(self, request, ldap_user_read=None): """ Get all exams (both running and planned). :param _sre.SRE_Pattern pattern: pattern that the result lists project names is matched against, defaults to `.*` (compiled by decorator). :param str filter: filter result list by project creator ("sender"). Must be either `all` or `private`, defaults to `private`. :return: list of projects :rtype: list(dict) """ pattern = request.options["pattern"] filter = request.options["filter"] user = User.from_dn(ldap_user_read.whoami(), None, ldap_user_read) result = [ { "name": project.name, "sender": project.sender.username, # teacher / admins "recipientsGroups": [ g.name for g in project.recipients if g.type == util.distribution.TYPE_GROUP ], "recipientsStudents": self._get_project_students(project, ldap_user_read), "starttime": project.starttime.strftime("%Y-%m-%d %H:%M") if project.starttime else "", "files": len(project.files) if project.files else 0, "isDistributed": project.isDistributed, "callerCanModify": self._user_can_modify(user, project), "room": ComputerRoom.get_name_from_dn(project.room) if project.room else "", } for project in util.distribution.Project.list() if pattern.match(project.name) and (filter == "all" or compare_dn(project.sender.dn, request.user_dn)) ] self.finished(request.id, result) # cannot use @simple_response with @LDAP_Connection :/
def _get_project_students(self, project, lo): temp_students = [s for s in project.recipients if s.type == util.distribution.TYPE_USER] temp_students += list( chain.from_iterable( g.members for g in project.recipients if g.type == util.distribution.TYPE_GROUP ) ) project_students = [] for student in {(s.username, s.dn) for s in temp_students}: try: user_obj = User.from_dn(student[1], None, lo) except noObject: logger.warning( "DN %r is stored as part of project %r but does not exist.", student[1], project.name ) continue if ( not project.isDistributed and user_obj.is_student(lo) and not user_obj.is_exam_student(lo) ): project_students.append(student[0]) elif project.isDistributed and user_obj.is_exam_student(lo): project_students.append(student[0]) return project_students
[docs] @sanitize(groups=ListSanitizer(DNSanitizer(minimum=1), required=True, min_elements=1)) @LDAP_Connection() def groups2students(self, request, ldap_user_read=None): """ Get members of passed groups. Only students are returned. request.options must contain a key `groups` with a list of DNs (only ucsschool.lib WorkGroup and SchoolClass are supported). The UMC call will return a list of dicts:: [{'dn': …, 'firstname': …, 'lastname': …, 'school_classes': …}, …] """ students = {} for group_dn in request.options["groups"]: try: group_obj = Group.from_dn(group_dn, None, ldap_user_read) except (WrongObjectType, noObject) as exc: logger.error( "DN %r does not exist or is not a work group or school class: %s", group_dn, exc ) raise UMC_Error(_("Error loading group DN {!r}.").format(group_dn)) school_class_name = group_obj.name[len(group_obj.school) + 1 :] for user_dn in group_obj.users: try: user_obj = User.from_dn(user_dn, None, ldap_user_read) except (WrongObjectType, noObject) as exc: logger.warning( "Ignoring DN %r - it does not exist or is not a school user: %s", user_dn, exc ) continue if user_obj.is_student(ldap_user_read) and not user_obj.is_exam_student(ldap_user_read): if user_dn in students: students[user_dn]["school_classes"].append(school_class_name) else: students[user_dn] = { "dn": user_dn, "firstname": user_obj.firstname, "lastname": user_obj.lastname, "school_classes": [school_class_name], } # Validate students # Bug #57319 students_with_validation_errors = [] for student_vals in students.values(): logger.info("Validating student {}".format(student_vals["dn"])) student_obj = Student.from_dn(student_vals["dn"], None, ldap_user_read) student_obj.validate(ldap_user_read) if student_obj.errors: logger.error( "Student {} has validation errors: \n{}".format(student_obj.dn, student_obj.errors) ) students_with_validation_errors.append(student_obj) if students_with_validation_errors: formatted_errors = "" for student in students_with_validation_errors: formatted_errors += "{}\n".format(student.dn) for error_key, error_descriptions in student.errors.items(): for error_description in error_descriptions: formatted_errors += "{}: {}\n".format(error_key, error_description) error_msg = "".join( [ _("The following students have validation errors:\n\n"), formatted_errors, _( "\nThe student data must be corrected by an " "Administrator before the students can be added to the exam." ), ] ) raise UMC_Error(error_msg) res = sorted(students.values(), key=lambda x: x["dn"]) self.finished(request.id, res) # cannot use @simple_response with @LDAP_Connection :/