Source code for univention.management.console.modules.printermoderation

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Management Console module:
#   Moderating print jobs of students
#
# Copyright 2012-2025 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.

import datetime
import glob
import os
import stat
import subprocess

import cups
import six

import univention.admin.modules as udm_modules
import univention.admin.uexceptions as udm_errors
from ucsschool.lib.models.school import School
from ucsschool.lib.models.user import User
from ucsschool.lib.school_umc_base import Display, SchoolBaseModule, SchoolSanitizer
from ucsschool.lib.school_umc_ldap_connection import LDAP_Connection
from univention.lib.i18n import Translation
from univention.management.console.config import ucr
from univention.management.console.log import MODULE
from univention.management.console.modules import UMC_Error
from univention.management.console.modules.decorators import (
    allow_get_request,
    require_password,
    sanitize,
    simple_response,
)
from univention.management.console.modules.sanitizers import LDAPSearchSanitizer, StringSanitizer

DISTRIBUTION_DATA_PATH = "/var/lib/ucs-school-umc-distribution"
DISTRIBUTION_CMD = "/usr/lib/ucs-school-umc-distribution/umc-distribution"

OWNER = "root"
WWWGROUP = "www-data"
CACHE_DIR = "/var/cache/printermoderation"
CUPSPDF_DIR = None
CUPSPDF_USERSUBDIR = None

_ = Translation("ucs-school-umc-printermoderation").translate

# read list of UDM modules
udm_modules.update()


[docs] class Instance(SchoolBaseModule):
[docs] def init(self): global CUPSPDF_DIR, CUPSPDF_USERSUBDIR SchoolBaseModule.init(self) CUPSPDF_DIR, CUPSPDF_USERSUBDIR = os.path.normpath( ucr.get("cups/cups-pdf/directory", "/var/spool/cups-pdf/%U") ).split("%U") # create directory if it does not exist try: if not os.path.exists(CUPSPDF_DIR): os.makedirs(DISTRIBUTION_DATA_PATH, 0o755) except (OSError, IOError) as exc: MODULE.error("error occured while creating %s: %s" % (CUPSPDF_DIR, exc)) self.fqdn = "%s.%s" % (ucr.get("hostname"), ucr.get("domainname")) self.pw_callback_bad_password = False
def _get_path(self, username, printjob): printjob = printjob.replace("/", "") username = username.replace("/", "") path = os.path.join(CUPSPDF_DIR, username, CUPSPDF_USERSUBDIR, printjob) if not os.path.realpath(path).startswith(os.path.realpath(CUPSPDF_DIR)): raise UMC_Error(_("Invalid file")) return path def _get_all_username_variants(self, username): """ Checks for print job directories for the given username regardless of the case of the directory name. """ username = username.replace("/", "") all_user_dirs = next(os.walk(CUPSPDF_DIR))[1] return [x for x in all_user_dirs if x.lower() == username.lower()]
[docs] @sanitize(school=SchoolSanitizer(required=True)) @LDAP_Connection() def printers(self, request, ldap_user_read=None): """ List all available printers except PDF printers return: [{'id': <spool host>://<printer name>, 'label': <display name>}, ...] """ try: printers = udm_modules.lookup( "shares/printer", None, ldap_user_read, scope="sub", base=School.get_search_base(request.options["school"]).printers, ) except udm_errors.noObject: printers = [] result = [] for prt in printers: # ignore PDF printers uri = prt.info.get("uri", []) if uri and uri[0].startswith("cups-pdf"): continue name = prt.info["name"] spool_host = prt.info["spoolHost"][0] # allways prefer myself if self.fqdn in prt.info["spoolHost"]: spool_host = self.fqdn result.append({"id": "%s://%s" % (spool_host, name), "label": name}) self.finished(request.id, result)
[docs] @sanitize( **{ "school": SchoolSanitizer(required=True), "class": StringSanitizer(required=True), "pattern": LDAPSearchSanitizer( required=True, default="", use_asterisks=True, add_asterisks=False ), } ) @LDAP_Connection() def query(self, request, ldap_user_read=None, ldap_position=None): """Searches for print jobs""" klass = request.options.get("class") if klass in (None, "None"): klass = None students = self._users( ldap_user_read, request.options["school"], group=klass, user_type="student", pattern=request.options.get("pattern", ""), ) try: students.append( User.from_dn(request.user_dn, None, ldap_user_read).get_udm_object(ldap_user_read) ) except udm_errors.noObject: MODULE.warn( "Could not get user object of teacher %r. Is it a UCS@school user?" % (request.user_dn,) ) printjoblist = [] for student in students: username = student.info["username"] path_username = { self._get_path(username, ""): username for username in self._get_all_username_variants(username) } for user_path, username in six.iteritems(path_username): printjoblist.extend( Printjob(student, username, document).json() for document in glob.glob(os.path.join(user_path, "*.pdf")) if os.path.isfile(document) ) self.finished(request.id, printjoblist)
[docs] @allow_get_request @sanitize(username=StringSanitizer(required=True), printjob=StringSanitizer(required=True)) def download(self, request): """ Searches for print jobs requests.options = {} 'username' -- owner of the print job 'printjob' -- relative filename of the print job return: <PDF document> """ path = self._get_path(request.options["username"], request.options["printjob"]) if not os.path.exists(path): raise UMC_Error(_("Invalid file")) with open(path, "rb") as fd: self.finished(request.id, fd.read(), mimetype="application/pdf")
[docs] @sanitize(username=StringSanitizer(required=True), printjob=StringSanitizer(required=True)) @simple_response def delete(self, username, printjob): """ Delete a print job requests.options = {} 'username' -- owner of the print job 'printjob' -- relative filename of the print job return: <PDF document> """ path = self._get_path(username, printjob) success = True if os.path.exists(path): MODULE.info("Deleting print job %r" % (path,)) try: os.unlink(path) except OSError as exc: success = False MODULE.error("Error deleting print job: %s" % (exc,)) return success
[docs] def pw_callback(self, prompt): if self.pw_callback_bad_password: return None else: self.pw_callback_bad_password = True return self.password
[docs] @require_password @sanitize( username=StringSanitizer(required=True), printjob=StringSanitizer(required=True), printer=StringSanitizer(required=True), ) @simple_response(with_request=True) def printit(self, request, username, printjob, printer): """ Print a given document on the given printer requests.options = {} 'username' -- owner of the print job 'printjob' -- relative filename of the print job 'printer' -- the printer to use (<hostname>://<printer>) return: <PDF document> """ path = self._get_path(username, printjob) try: spoolhost, printer = printer.split("://", 1) except ValueError: raise UMC_Error(_("Invalid printer URI")) if not os.path.exists(path): raise UMC_Error( _("File %r could not be printed as it does not exist (anymore).") % (printjob,) ) MODULE.process("Printing: %s" % path) self.pw_callback_bad_password = False try: cups.setUser(request.username) cups.setEncryption(cups.HTTP_ENCRYPT_ALWAYS) cups.setPasswordCB(self.pw_callback) cups.setServer(spoolhost) conn = cups.Connection(spoolhost) conn.printFile(printer, path, Printjob.filename2label(printjob), {}) except RuntimeError: raise UMC_Error( _("Failed to connect to print server %(printserver)s.") % {"printserver": spoolhost} ) except cups.IPPError as exc: (errno, description) = exc.args IPP_AUTHENTICATION_CANCELED = 4096 description = { cups.IPP_NOT_AUTHORIZED: _("No permission to print"), IPP_AUTHENTICATION_CANCELED: _("Wrong password"), }.get(errno, description) raise UMC_Error( _("Failed to print on %(printer)s: %(stderr)s (error %(errno)d).") % {"printer": printer, "stderr": description, "errno": errno} ) # delete file MODULE.info("Deleting print job %r" % (path,)) os.remove(path) return True
[docs] class Printjob(object): pdf_cache = {} def __init__(self, owner, username, fullfilename): self.owner = owner # got univention.admin.modules here self.username = username # username w.r.t. case sensitivity self.fullfilename = os.path.normpath(fullfilename) self.filename = os.path.basename(self.fullfilename) self.tmpfilename = None stats = os.stat(self.fullfilename) self.ctime = datetime.datetime.fromtimestamp(stats[stat.ST_CTIME]) self.name = Printjob.filename2label(self.filename) self.readPDF()
[docs] @staticmethod def filename2label(filename): name = filename if "-" in filename: name = filename.split("-", 1)[1] if name.endswith(".pdf"): name = name[:-4] return name
[docs] def json(self): return { "id": self.fullfilename, "username": self.username, "user": Display.user(self.owner), "printjob": self.name, "filename": self.filename, "date": ( self.ctime.year, self.ctime.month, self.ctime.day, self.ctime.hour, self.ctime.minute, self.ctime.second, ), "pages": self.metadata.get("pages"), }
[docs] def readPDF(self): self.metadata = {} if self.fullfilename in Printjob.pdf_cache: MODULE.error("PDF file was cached: %s" % self.fullfilename) self.metadata = Printjob.pdf_cache[self.fullfilename] return pdfinfo = subprocess.Popen( # nosec ["/usr/bin/pdfinfo", self.fullfilename], env={"LANG": "C"}, stdout=subprocess.PIPE, ) stdout, stderr = pdfinfo.communicate() for line in stdout.decode("UTF-8", "replace").splitlines(): if not line: continue try: key, value = line.split(":", 1) except ValueError: MODULE.error("Could not parse line: %s" % line) continue self.metadata[key.strip().lower()] = value.strip() Printjob.pdf_cache[self.fullfilename] = self.metadata