Source code for univention.management.console.modules.distribution

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Management Console module:
#
#
# Copyright 2012-2025 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.

import os
import shutil
import sys
import tempfile
from datetime import datetime, timedelta

from six import reraise as raise_

import univention.admin.uexceptions as udm_exceptions
from ucsschool.lib.models.user import User
from ucsschool.lib.school_umc_base import Display, SchoolBaseModule
from ucsschool.lib.school_umc_ldap_connection import LDAP_Connection
from univention.lib.i18n import Translation
from univention.management.console.log import MODULE
from univention.management.console.modules import UMC_Error
from univention.management.console.modules.decorators import (
    file_upload,
    sanitize,
    simple_response,
)
from univention.management.console.modules.distribution import util
from univention.management.console.modules.sanitizers import (
    ChoicesSanitizer,
    DictSanitizer,
    ListSanitizer,
    PatternSanitizer,
    StringSanitizer,
)

_ = Translation("ucs-school-umc-distribution").translate


[docs] def compare_dn(a, b): return a and b and a.lower() == b.lower()
[docs] class Instance(SchoolBaseModule): def __init__(self): SchoolBaseModule.__init__(self) self._tmpDir = None
[docs] def init(self): SchoolBaseModule.init(self) # initiate paths for data distribution util.initPaths()
[docs] def destroy(self): self._cleanTmpDir()
def _cleanTmpDir(self): # clean up the temporary upload directory if self._tmpDir: MODULE.info("Clean up temporary directory: %s" % self._tmpDir) shutil.rmtree(self._tmpDir, ignore_errors=True) self._tmpDir = None
[docs] @file_upload @sanitize( DictSanitizer( { "filename": StringSanitizer(required=True), "tmpfile": StringSanitizer(required=True), }, required=True, ) ) def upload(self, request): # create a temporary upload directory, if it does not already exist if not self._tmpDir: self._tmpDir = tempfile.mkdtemp(prefix="ucsschool-distribution-upload-") MODULE.info("Created temporary directory: %s" % self._tmpDir) for file in request.options: filename = file["filename"] if "\\" in filename: # Bug 46709/46710: filename seems to be a UNC / windows path MODULE.info("Filename seems to contain Windows path name or UNC - fixing filename") filename = filename.rsplit("\\", 1)[-1] or filename.replace("\\", "_").lstrip("_") destPath = os.path.join(self._tmpDir, filename) MODULE.info("Received file %r, saving it to %r" % (file["tmpfile"], destPath)) shutil.move(file["tmpfile"], destPath) self.finished(request.id, None)
[docs] @sanitize( filenames=ListSanitizer(min_elements=1), # project=StringSanitizer(allow_none=True) ) @simple_response def checkfiles(self, project, filenames): """ Checks whether the given filename has already been uploaded: request.options: { 'filenames': [ '...', ... ], project: '...' } returns: { 'filename': '...', 'sessionDuplicate': True|False, 'projectDuplicate': True|False, 'distributed': True|False } """ # load project if project: project = util.Project.load(project) result = [] for ifile in filenames: # check whether file has already been upload in this session iresult = { "sessionDuplicate": False, "projectDuplicate": False, "distributed": False, } iresult["filename"] = ifile iresult["sessionDuplicate"] = self._tmpDir is not None and os.path.exists( os.path.join(self._tmpDir, ifile) ) # check whether the file exists in the specified project and whether # it has already been distributed if project: iresult["projectDuplicate"] = ifile in project.files iresult["distributed"] = ifile in project.files and not os.path.exists( os.path.join(project.cachedir, ifile) ) result.append(iresult) return result
[docs] @sanitize( pattern=PatternSanitizer(required=False, default=".*"), filter=ChoicesSanitizer(["all", "private"], default="private"), ) @simple_response(with_request=True) def query(self, request, pattern, filter): result = [ { # only show necessary information "description": i.description, "name": i.name, "sender": i.sender.username, "recipients": len(i.recipients), "files": len(i.files), "isDistributed": i.isDistributed, } for i in util.Project.list() if (pattern.match(i.name) or pattern.match(i.description)) and (filter == "all" or compare_dn(i.sender.dn, request.user_dn)) ] return result
@LDAP_Connection() def _get_sender(self, request, ldap_user_read=None, ldap_position=None): """Return a User instance of the currently logged in user.""" try: user = User.from_dn(request.user_dn, None, ldap_user_read) obj = user.get_udm_object(ldap_user_read) return util.User(obj.info, dn=obj.dn) except udm_exceptions.base as exc: raise UMC_Error(_("Failed to load user information: %s") % exc)
[docs] @sanitize(DictSanitizer({"object": DictSanitizer({}, required=True)}, required=True)) def put(self, request): """Modify an existing project""" result = [self._save(request, entry["object"], True) for entry in request.options] self.finished(request.id, result)
[docs] @sanitize(DictSanitizer({"object": DictSanitizer({}, required=True)}, required=True)) def add(self, request): """Add a new project""" result = [self._save(request, entry["object"], False) for entry in request.options] self.finished(request.id, result)
@LDAP_Connection() def _save(self, request, iprops, doUpdate=True, ldap_user_read=None, ldap_position=None): # try to open the UDM user object of the current user sender = self._get_sender(request) try: # remove keys that may not be set from outside for k in ("atJobNumCollect", "atJobNumDistribute"): iprops.pop(k, None) # load the project or create a new one project = None orgProject = None if doUpdate: # try to load the given project orgProject = util.Project.load(iprops.get("name", "")) if not orgProject: raise UMC_Error(_("The specified project does not exist: %s") % iprops["name"]) # create a new project with the updated values project = util.Project(orgProject.dict) project.update(iprops) else: # create a new project project = util.Project(iprops) # make sure that the project owner himself is modifying the project if doUpdate and not compare_dn(project.sender.dn, request.user_dn): raise UMC_Error(_("The project can only be modified by the owner himself")) # handle time settings for distribution/collection of project files for jsuffix, jprop, jname in ( ("distribute", "starttime", _("Project distribution")), ("collect", "deadline", _("Project collection")), ): if "%sType" % jsuffix in iprops: # check the distribution/collection type: manual/automat jtype = (iprops["%sType" % jsuffix]).lower() if jtype == "automatic": try: # try to parse the given time parameters strtime = "%s %s" % ( iprops["%sDate" % jsuffix], iprops["%sTime" % jsuffix], ) jdate = datetime.strptime(strtime, "%Y-%m-%d %H:%M") setattr(project, jprop, jdate) except ValueError: raise UMC_Error(_("Could not set date for: %s") % jname) # make sure the execution time lies sufficiently in the future if getattr(project, jprop) - datetime.now() < timedelta(minutes=1): raise UMC_Error( _("The specified time needs to lie in the future for: %s") % jname ) else: # manual distribution/collection setattr(project, jprop, None) if project.starttime and project.deadline: # make sure distributing happens before collecting if project.deadline - project.starttime < timedelta(minutes=3): raise UMC_Error( _( "Distributing the data needs to happen sufficiently long enough before " "collecting them" ) ) if "recipients" in iprops: # lookup the users in LDAP and save them to the project project.recipients = [ util.openRecipients(idn, ldap_user_read) for idn in iprops.get("recipients", []) ] project.recipients = [x for x in project.recipients if x] MODULE.info("recipients: %s" % (project.recipients,)) if not doUpdate: # set the sender (i.e., owner) of the project project.sender = sender # initiate project and validate its values try: project.validate() except ValueError as exc: raise UMC_Error(str(exc)) # make sure that there is no other project with the same directory name # if we add new projects if not doUpdate and project.isNameInUse(): MODULE.error("The project name is already in use: %s" % (project.name)) raise UMC_Error( _( 'The specified project directory name "%s" is already in use by a different ' "project." ) % (project.name) ) # try to save project to disk project.save() # move new files into project directory if self._tmpDir: for ifile in project.files: isrc = os.path.join(self._tmpDir, ifile) itarget = os.path.join(project.cachedir, ifile) if os.path.exists(isrc): # mv file to cachedir shutil.move(isrc, itarget) os.chown(itarget, 0, 0) # remove files that have been marked for removal if doUpdate: for ifile in set(orgProject.files) - set(project.files): itarget = os.path.join(project.cachedir, ifile) try: os.remove(itarget) except OSError: pass # re-distribute the project in case it has already been distributed if doUpdate and project.isDistributed: usersFailed = [] project.distribute(usersFailed) if usersFailed: # not all files could be distributed MODULE.info("Failed processing the following users: %s" % usersFailed) usersStr = ", ".join([Display.user(i) for i in usersFailed]) raise UMC_Error( _("The project could not distributed to the following users: %s") % usersStr ) except (IOError, OSError, UMC_Error): # TODO: catch only UMC_Error etype, exc, etraceback = sys.exc_info() # data not valid... create error info MODULE.info('data for project "%s" is not valid: %s' % (iprops.get("name"), exc)) if not doUpdate: # remove eventually created project file and cache dir for ipath in (project.projectfile, project.cachedir): if os.path.basename(ipath) not in os.listdir(util.DISTRIBUTION_DATA_PATH): # no file / directory has been created yet continue try: MODULE.info("cleaning up... removing: %s" % ipath) shutil.rmtree(ipath) except (IOError, OSError): pass raise_(UMC_Error, exc, etraceback) self._cleanTmpDir() return {"success": True, "name": iprops.get("name")}
[docs] @sanitize(StringSanitizer(required=True)) @LDAP_Connection() def get(self, request, ldap_user_read=None, ldap_position=None): """ Returns the objects for the given IDs requests.options = [ <ID>, ... ] return: [ { ... }, ... ] """ # try to load all given projects result = [] # list of all project properties (dicts) or None if project is not valid for iproject in [util.Project.load(iid) for iid in request.options]: # make sure that project could be loaded if not iproject: result.append(None) continue # make sure that only the project owner himself (or an admin) is able # to see the content of a project if request.flavor == "teacher" and not compare_dn(iproject.sender.dn, request.user_dn): raise UMC_Error( _( "Project details are only visible to the project owner himself or an " "administrator." ), status=403, ) # prepare date and time properties for distribution/collection of project files props = iproject.dict for jjob, jsuffix in ( (iproject.atJobDistribute, "distribute"), (iproject.atJobCollect, "collect"), ): MODULE.info("check job: %s" % jsuffix) if not jjob: # no job is registered -> manual job distribution/collection MODULE.info("no existing job -> manual execution") props["%sType" % jsuffix] = "manual" continue # job is registered -> prepare date and time fields MODULE.info( "job nr #%d scheduled for %s -> automatic execution" % (jjob.nr, jjob.execTime) ) props["%sType" % jsuffix] = "automatic" props["%sDate" % jsuffix] = datetime.strftime(jjob.execTime, "%Y-%m-%d") props["%sTime" % jsuffix] = datetime.strftime(jjob.execTime, "%H:%M") # adjust sender / recipients properties props["sender"] = props["sender"].username recipients = [] for recip in props["recipients"]: recipients.append( { "id": recip.dn, "label": (recip.type == util.TYPE_USER and Display.user(recip.dict)) or recip.name, } ) props["recipients"] = recipients # append final dict to result list MODULE.info("final project dict: %s" % props) result.append(props) self.finished(request.id, result)
[docs] @sanitize(StringSanitizer(required=True)) def distribute(self, request): # update the sender information of the selected projects result = [] for iid in request.options: MODULE.info("Distribute project: %s" % iid) try: # make sure that project could be loaded iproject = util.Project.load(iid) if not iproject: raise IOError(_('Project "%s" could not be loaded') % iid) # make sure that only the project owner himself (or an admin) is able # to distribute a project if request.flavor == "teacher" and not compare_dn(iproject.sender.dn, request.user_dn): raise ValueError( _("Only the owner himself or an administrator may distribute a project.") ) # project was loaded successfully... try to distribute it usersFailed = [] iproject.distribute(usersFailed) # raise an error in case distribution failed for some users if usersFailed: MODULE.info("Failed processing the following users: %s" % usersFailed) usersStr = ", ".join([Display.user(i) for i in usersFailed]) raise IOError( _("The project could not distributed to the following users: %s") % usersStr ) # save result result.append({"name": iid, "success": True}) except (ValueError, IOError) as exc: result.append({"name": iid, "success": False, "details": str(exc)}) # return the results self.finished(request.id, result)
[docs] @sanitize(StringSanitizer(required=True)) def collect(self, request): # try to open the UDM user object of the current user sender = self._get_sender(request) # update the sender information of the selected projects result = [] for iid in request.options: MODULE.info("Collect project: %s" % iid) try: # make sure that project could be loaded iproject = util.Project.load(iid) if not iproject: raise IOError(_('Project "%s" could not be loaded') % iid) # replace the projects sender with the current logged in user iproject.sender = sender # project was loaded successfully... try to distribute it dirsFailed = [] iproject.collect(dirsFailed) # raise an error in case distribution failed for some users if dirsFailed: dirsStr = ", ".join(dirsFailed) MODULE.info("Failed collecting the following dirs: %s" % dirsStr) raise IOError( _("The following user directories could not been collected: %s") % dirsStr ) # save result result.append({"name": iid, "success": True}) except (ValueError, IOError) as exc: result.append({"name": iid, "success": False, "details": str(exc)}) # return the results self.finished(request.id, result)
[docs] @sanitize(StringSanitizer(required=True)) def adopt(self, request): # try to open the UDM user object of the current user sender = self._get_sender(request) # update the sender information of the selected projects result = [] for iid in request.options: try: # make sure that project could be loaded iproject = util.Project.load(iid) if not iproject: raise IOError(_('Project "%s" could not be loaded') % iid) # project was loaded successfully iproject.sender = sender iproject.save() except (ValueError, IOError) as exc: result.append({"name": iid, "success": False, "details": str(exc)}) # return the results self.finished(request.id, result)
[docs] @sanitize(DictSanitizer({"object": StringSanitizer(required=True)}, required=True)) def remove(self, request): """Removes the specified projects""" for iproject in [util.Project.load(ientry.get("object")) for ientry in request.options]: if not iproject: continue # make sure that only the project owner himself (or an admin) is able # to see the content of a project if request.flavor == "teacher" and not compare_dn(iproject.sender.dn, request.user_dn): raise UMC_Error( _("Only the owner himself or an administrator may delete a project."), status=403, ) # purge the project iproject.purge() self.finished(request.id, None)