#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Management Console module:
# Control computers of pupils in a room
#
# Copyright 2012-2025 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.
import datetime
import fcntl
import importlib
import inspect
import itertools
import os
import signal
import subprocess
import time
import traceback
from ipaddress import ip_address
from random import Random
from shlex import quote
import ldap
import psutil
from ldap.filter import filter_format
from six.moves.urllib_parse import urlsplit
import univention.admin.uexceptions as udm_exceptions
from ucsschool.lib import internetrules
try:
from .room_management import ComputerRoomError, ComputerRoomManager
except ImportError:
pass
from ucsschool.lib.models.group import ComputerRoom
from ucsschool.lib.models.school import School
from ucsschool.lib.models.user import User
from ucsschool.lib.roles import create_ucsschool_role_string, role_computer_room_backend_veyon
from ucsschool.lib.school_umc_base import Display, SchoolBaseModule, SchoolSanitizer
from ucsschool.lib.school_umc_ldap_connection import LDAP_Connection
from ucsschool.lib.schoollessons import SchoolLessons
from ucsschool.lib.smbstatus import SMB_Status
from ucsschool.veyon_client.models import VeyonConnectionError
from univention.admin.syntax import gid
from univention.admin.uldap import getMachineConnection
from univention.config_registry import handler_set, handler_unset
from univention.config_registry.frontend import ucr_update
from univention.lib import atjobs
from univention.lib.i18n import Translation
from univention.management.console.config import ucr
from univention.management.console.log import MODULE
from univention.management.console.modules import UMC_Error
from univention.management.console.modules.decorators import allow_get_request, sanitize, simple_response
from univention.management.console.modules.sanitizers import (
BooleanSanitizer,
ChoicesSanitizer,
DNSanitizer,
ListSanitizer,
Sanitizer,
StringSanitizer,
)
_ = Translation("ucs-school-umc-computerroom").translate
ROOMDIR = "/var/cache/ucs-school-umc-computerroom"
_SCREENSHOT_DIR = "/usr/share/univention-management-console-frontend/js/dijit/themes/umc/icons/scalable"
FN_SCREENSHOT_DENIED = os.path.join(_SCREENSHOT_DIR, _("screenshot_denied.svg"))
FN_SCREENSHOT_NOTREADY = os.path.join(_SCREENSHOT_DIR, _("screenshot_notready.svg"))
[docs]
def compare_dn(a, b):
return a and b and a.lower() == b.lower()
def _getRoomFile(roomDN):
"""Get path to a room file from a computer rooms DN."""
room_name = ComputerRoomDNSanitizer(required=True, _return_room_name=True).sanitize(
"roomDN", {"roomDN": roomDN}
)
return os.path.join(ROOMDIR, room_name)
def _isUmcProcess(pid):
if not psutil.pid_exists(pid):
return False # process is not running anymore
# process is running
cmdline = psutil.Process(pid).cmdline()
# check if the process is the computerroom UMC module
return "computerroom" in cmdline and any(
"univention-management-console-module" in line for line in cmdline
)
def _readRoomInfo(roomDN):
"""returns a dict of properties for the current room."""
roomFile = _getRoomFile(roomDN)
info = {}
if os.path.exists(roomFile):
try:
with open(roomFile) as f:
# the room file contains key-value pairs, separated by '='
# ... parse the file as dict
lines = f.readlines()
info = dict(iline.strip().split("=", 1) for iline in lines if "=" in iline)
except (OSError, IOError, ValueError) as exc:
MODULE.warn("Failed to read file %s: %s" % (roomFile, exc))
# special handling for the PID
if "pid" in info:
try:
# translate PID to int and verify that it is a UMC process
pid = int(info.pop("pid"))
if _isUmcProcess(pid):
info["pid"] = pid
except (ValueError, OverflowError):
pass # invalid format, do nothing
return info
def _updateRoomInfo(roomDN, **kwargs):
"""Update infos for a room, i.e., leave unspecified values untouched."""
info = _readRoomInfo(roomDN)
new_info = {}
for key in ("user", "exam", "examDescription", "examEndTime", "atjobID"):
# set the specified value (can also be None for deleting the attribute)
# or fallback to currently set value
new_info[key] = kwargs.get(key, info.get(key))
_writeRoomInfo(roomDN, **new_info)
def _writeRoomInfo(roomDN, user=None, exam=None, examDescription=None, examEndTime=None, atjobID=None):
"""Set infos for a room and lock the room."""
info = {
"room": roomDN,
"user": user,
"exam": exam,
"examDescription": examDescription,
"examEndTime": examEndTime,
"atjobID": atjobID,
"pid": os.getpid(),
}
MODULE.info('Writing info file for room "%s": %s' % (roomDN, info))
try:
# write user DN in the room file
with open(_getRoomFile(roomDN), "w") as fd:
fcntl.lockf(fd, fcntl.LOCK_EX)
try:
for key, val in info.items():
if val is not None:
fd.write("%s=%s\n" % (key, val))
finally:
# make sure that the file is unlocked
fcntl.lockf(fd, fcntl.LOCK_UN)
except EnvironmentError:
MODULE.warn("Failed to write file: %s" % _getRoomFile(roomDN))
def _getRoomOwner(roomDN):
"""Read the room lock file and return the saved user DN. If it does not exist, return None."""
info = _readRoomInfo(roomDN)
if "pid" not in info:
return None
return info.get("user")
def _freeRoom(roomDN, userDN):
"""Remove the lock file if the room is locked by the given user"""
roomFile = _getRoomFile(roomDN)
MODULE.warn("lockDN: %s, userDN: %s" % (_getRoomOwner(roomDN), userDN))
if _getRoomOwner(roomDN) == userDN:
try:
os.unlink(roomFile)
except (OSError, IOError):
MODULE.warn("Failed to remove room lock file: %s" % roomFile)
[docs]
def check_room_access(func):
"""Block access to session from other users"""
def _decorated(self, request, *args, **kwargs):
self._checkRoomAccess(request)
return func(self, request, *args, **kwargs)
return _decorated
[docs]
def reset_room_settings(room, hosts):
unset_vars = [
"samba/printmode/room/{}",
"samba/sharemode/room/{}",
"proxy/filter/room/{}/rule",
]
extract_from_vars = [
"samba/printmode/hosts/none",
"cups/printmode/hosts/none",
"samba/othershares/hosts/deny",
"samba/share/Marktplatz/hosts/deny",
"proxy/filter/room/{}/ip",
]
update_vars = {key.format(room): None for key in unset_vars}
ucr.load()
hosts = set(hosts)
for variable in (v.format(room) for v in extract_from_vars):
if ucr.get(variable):
old = set(ucr[variable].split(" "))
else:
old = set()
new = old.difference(hosts)
if new:
update_vars[variable] = " ".join(new)
else:
update_vars[variable] = None
ucr_update(ucr, update_vars)
[docs]
class IPAddressSanitizer(Sanitizer):
def _sanitize(self, value, name, further_fields):
try:
return ip_address(value)
except ValueError as exc:
self.raise_validation_error("%s" % (exc,))
[docs]
class PeriodSanitizer(StringSanitizer):
def _sanitize(self, value, name, further_fields):
try:
return datetime.datetime.strptime(value or "00:00", "%H:%M").time()
except ValueError as exc:
self.raise_validation_error("Failed to read end time: %s" % (exc,))
[docs]
class ComputerSanitizer(StringSanitizer):
instance = None
def _sanitize(self, value, name, further_args):
value = super(ComputerSanitizer, self)._sanitize(value, name, further_args)
computer = self.instance._computerroom.get(value)
if not computer:
MODULE.error("Requested computer was not found in computerroom: %s." % value)
raise UMC_Error(
_(
"Unknown computer: %s\n"
"Please contact your system administrator to resolve this problem."
)
% value
)
return computer
[docs]
class ComputerRoomDNSanitizer(DNSanitizer):
def __init__(self, *args, **kwargs):
# don't want to modify request.options, so cannot use "may_change_value"
try:
self._return_room_name = kwargs.pop("_return_room_name")
except KeyError:
self._return_room_name = False
super(ComputerRoomDNSanitizer, self).__init__(*args, **kwargs)
def _sanitize(self, value, name, further_args):
value = super(ComputerRoomDNSanitizer, self)._sanitize(value, name, further_args)
try:
room_name = ldap.dn.str2dn(value)[0][0][1]
except (KeyError, ldap.DECODING_ERROR):
raise UMC_Error(_("Invalid room DN: %s") % (value,))
try:
gid.parse(room_name)
except udm_exceptions.valueError:
raise UMC_Error(_("Invalid room DN: %s") % (value,))
if not os.path.basename(room_name) == room_name:
# Check for path traversal
raise UMC_Error(_("Invalid room DN: %s") % (value,))
if room_name and self._return_room_name:
return room_name
elif not self._return_room_name:
return value
else:
raise UMC_Error(_("Invalid room DN: %s") % value)
[docs]
class Plugin(object):
gettext_domain = "ucs-school-umc-computerroom"
def __init__(self, computerroom, manager):
self.computerroom = computerroom
self.manager = manager
self._ = Translation(self.gettext_domain).translate
@property
def name(self):
return type(self).__name__
[docs]
class Instance(SchoolBaseModule):
ATJOB_KEY = "UMC-computerroom"
[docs]
def prepare(self, request):
super(Instance, self).prepare(request)
self.__init_user_dn = request.user_dn
[docs]
def init(self):
SchoolBaseModule.init(self)
ComputerSanitizer.instance = self
self._computerroom = ComputerRoomManager()
self._random = Random()
self._random.seed()
self._lessons = SchoolLessons()
self._ruleEndAt = None
self._load_plugins()
def _load_plugins(self):
self._plugins = {}
for module in os.listdir(os.path.join(os.path.dirname(__file__), "plugins")):
if module.endswith(".py"):
try:
module = importlib.import_module(
"univention.management.console.modules.computerroom.plugins.%s" % (module[:-3],)
)
except ImportError:
MODULE.error(traceback.format_exc())
for name, plugin in inspect.getmembers(module, inspect.isclass):
MODULE.info("Loading plugin %r from module %r" % (plugin, module))
if not name.startswith("_") and plugin is not Plugin and issubclass(plugin, Plugin):
try:
plugin = plugin(self, self._computerroom)
except Exception:
MODULE.error(traceback.format_exc())
else:
self._plugins[plugin.name] = plugin
[docs]
def destroy(self):
"""Remove lock file when UMC module exists"""
MODULE.info("Cleaning up")
if self._computerroom.room:
# do not remove lock file during exam mode
info = _readRoomInfo(self._computerroom.roomDN)
MODULE.info("room info: %s" % (info,))
if info and not info.get("exam"):
MODULE.info(
"Removing lock file for room %s (%s)"
% (self._computerroom.room, self._computerroom.roomDN)
)
_freeRoom(self._computerroom.roomDN, self.__init_user_dn)
for comp in self._computerroom.values():
comp.should_run = False
while any(comp.is_alive() for comp in self._computerroom.values()):
time.sleep(0.1)
MODULE.info("All threads dead!")
[docs]
def lessons(self, request):
"""Returns a list of school lessons. Lessons in the past are filtered out"""
current = self._lessons.current
if current is None:
current = self._lessons.previous
if current:
lessons = [x for x in self._lessons.lessons if x.begin >= current.begin]
else:
lessons = self._lessons.lessons
self.finished(request.id, [x.name for x in lessons])
[docs]
def internetrules(self, request):
"""Returns a list of available internet rules"""
self.finished(request.id, [x.name for x in internetrules.list()])
def _room_acquire(self, request, roomDN, ldap_user_read):
"""Acquires the specified computerroom"""
success = True
message = "OK"
umc_veyon_client_error_message = _(
"Computers in the computerroom can currently not be remote controlled, "
"because the associated service is not available. "
"The system will try to restart this service, please try again "
"in a few moments. "
"If the problem persists, please contact the system administrator."
)
umc_veyon_client_error_log_message = (
"Connection to Veyon WebAPI Server (UCS@school Veyon Proxy) failed."
)
try:
self._computerroom.veyon_client.test_connection()
except VeyonConnectionError:
MODULE.warning(umc_veyon_client_error_log_message)
# match the corresponding school OU
try:
room = ComputerRoom.from_dn(roomDN, None, ldap_user_read)
school = room.school
except udm_exceptions.noObject:
success = False
message = "UNKNOWN_ROOM"
else:
# set room and school
if self._computerroom.school != school:
self._computerroom.school = school
if self._computerroom.room != roomDN:
try:
self._computerroom.room = roomDN
except ComputerRoomError:
success = False
message = "EMPTY_ROOM"
except VeyonConnectionError:
MODULE.error(umc_veyon_client_error_log_message)
MODULE.error(traceback.format_exc())
raise UMC_Error(
message=umc_veyon_client_error_message, traceback=traceback.format_exc()
)
# update the room info file
if success:
_updateRoomInfo(roomDN, user=request.user_dn)
if not compare_dn(_getRoomOwner(roomDN), request.user_dn):
success = False
message = "ALREADY_LOCKED"
info = {}
if success:
info = _readRoomInfo(roomDN)
return success, message, info
[docs]
@sanitize(room=ComputerRoomDNSanitizer(required=True))
@LDAP_Connection()
def room_acquire(self, request, ldap_user_read=None):
"""Acquires the specified computerroom"""
roomDN = request.options["room"]
success, message, info = self._room_acquire(request, roomDN, ldap_user_read)
self.finished(
request.id,
{
"success": success,
"message": message,
"info": {
"exam": info.get("exam"),
"examDescription": info.get("examDescription"),
"examEndTime": info.get("examEndTime"),
"room": info.get("room"),
"user": info.get("user"),
},
},
)
[docs]
@sanitize(school=SchoolSanitizer(required=True))
@LDAP_Connection()
def rooms(self, request, ldap_user_read=None):
"""Returns a list of all available rooms"""
rooms = []
veyon_backend_role = create_ucsschool_role_string(role_computer_room_backend_veyon, "-")
try:
all_rooms = ComputerRoom.get_all(
ldap_user_read,
request.options["school"],
filter_str="(ucsschoolRole={})".format(veyon_backend_role),
)
except udm_exceptions.noObject:
all_rooms = []
for room in all_rooms:
room_info = _readRoomInfo(room.dn)
user_dn = room_info.get("user")
locked = (
user_dn
and not compare_dn(user_dn, request.user_dn)
and ("pid" in room_info or "exam" in room_info)
)
if locked:
try:
# open the corresponding UDM object to get a displayable user name
user_dn = Display.user(
User.from_dn(user_dn, None, ldap_user_read).get_udm_object(ldap_user_read)
)
except udm_exceptions.base as exc:
MODULE.warn("Cannot open LDAP information for user %r: %s" % (user_dn, exc))
rooms.append(
{
"id": room.dn,
"label": room.get_relative_name(),
"user": user_dn,
"locked": locked,
"exam": room_info.get("exam"),
"examDescription": room_info.get("examDescription"),
"examEndTime": room_info.get("examEndTime"),
}
)
self.finished(request.id, rooms)
[docs]
@sanitize(
ipaddress=ListSanitizer(
required=True, sanitizer=IPAddressSanitizer(), min_elements=1, max_elements=10
)
)
@LDAP_Connection()
def guess_room(self, request, ldap_user_read=None):
ipaddress = request.options["ipaddress"]
host_filter = self._get_host_filter(ipaddress)
computers = ldap_user_read.searchDn(host_filter)
if computers:
room_filter = self._get_room_filter(computers)
for school in School.get_all(ldap_user_read):
school = school.name
for room in ComputerRoom.get_all(ldap_user_read, school, room_filter):
self.finished(request.id, {"school": school, "room": room.dn})
return
self.finished(request.id, {"school": None, "room": None})
def _get_room_filter(self, computers):
return "(|(%s))" % ")(".join(
filter_format("uniqueMember=%s", (computer,)) for computer in computers
)
def _get_host_filter(self, ipaddresses):
records = {4: "aRecord=%s", 6: "aAAARecord=%s"}
return "(|(%s))" % ")(".join(
filter_format(records[ipaddress.version], (ipaddress.exploded,)) for ipaddress in ipaddresses
)
def _checkRoomAccess(self, request):
if not self._computerroom.room:
return # no room has been selected so far
# make sure that we run the current room session
userDN = _getRoomOwner(self._computerroom.roomDN)
if userDN and not compare_dn(userDN, request.user_dn):
raise UMC_Error(_("A different user is already running a computer room session."))
def _get_computers(self, only_changed=False):
result = []
for computer in self._computerroom.values():
if only_changed and not computer.hasChanged:
continue
result.append(computer.dict)
return result
[docs]
@LDAP_Connection()
def query(self, request, ldap_user_read=None):
"""Searches for entries. This is not allowed if the room could not be acquired."""
if not self._computerroom.school or not self._computerroom.room:
raise UMC_Error(_("no room selected"))
if request.options.get("reload", False):
self._computerroom.room = self._computerroom.room # believe me that makes sense :)
result = self._get_computers()
result.sort(key=lambda c: c["id"])
MODULE.info("computerroom.query: result: %s" % (result,))
self.finished(request.id, result)
[docs]
@LDAP_Connection()
def update(self, request, ldap_user_read=None):
"""
Returns an update for the computers in the selected
room. Just attributes that have changed since the last call will
be included in the result
"""
if not self._computerroom.school or not self._computerroom.room:
raise UMC_Error(_("no room selected"))
computers = self._get_computers(only_changed=True)
info = _readRoomInfo(self._computerroom.roomDN)
result = {
"computers": computers,
"room_info": info,
"locked": info.get("user", request.user_dn) != request.user_dn,
"user": request.user_dn,
}
if result["locked"] and "pid" in info:
result["user"] = info["user"]
# somebody else acquired the room, the room is locked
try:
# open the corresponding UDM object to get a displayable user name
result["user"] = Display.user(
User.from_dn(result["user"], None, ldap_user_read).get_udm_object(ldap_user_read)
)
except udm_exceptions.base as exc:
# could not oben the LDAP object, show the DN
MODULE.warn("Cannot open LDAP information for user %r: %s" % (result["user"], exc))
# settings info
if self._ruleEndAt is not None:
diff = self._positiveTimeDiff()
if diff is not None:
result["settingEndsIn"] = diff.seconds // 60
MODULE.info("Update: result: %s" % (result,))
self.finished(request.id, result)
def _positiveTimeDiff(self):
now = datetime.datetime.now()
end = datetime.datetime.now()
end = end.replace(hour=self._ruleEndAt.hour, minute=self._ruleEndAt.minute)
if now > end:
return None
return end - now
[docs]
@check_room_access
@sanitize(
computer=ComputerSanitizer(required=True),
device=ChoicesSanitizer(["screen", "input"], required=True),
lock=BooleanSanitizer(required=True),
)
@simple_response
def lock(self, computer, device, lock):
"""Lock or Unlock the screen or input of a specific computer"""
MODULE.warn("Locking device %s" % (device,))
if device == "screen":
computer.lockScreen(lock)
else:
computer.lockInput(lock)
[docs]
@allow_get_request
@check_room_access
@sanitize(computer=ComputerSanitizer(required=True))
def screenshot(self, request):
"""
Returns a JPEG image containing a screenshot of the given computer
or a premade SVG image for special situations like when a screenshots is not ready yet
"""
computer = request.options["computer"]
tmpfile = computer.screenshot(size=request.options.get("size", None))
if computer.hide_screenshot:
mimetype = "image/svg+xml"
filename = FN_SCREENSHOT_DENIED
elif tmpfile is None:
mimetype = "image/svg+xml"
filename = FN_SCREENSHOT_NOTREADY
else:
mimetype = "image/jpeg"
filename = tmpfile.name
MODULE.info("screenshot(%s): hide screenshot = %r" % (computer.name, computer.hide_screenshot))
try:
with open(filename, "rb") as fd:
response = fd.read()
except EnvironmentError as exc:
MODULE.error("Unable to load screenshot file %r: %s" % (filename, exc))
try:
if tmpfile:
os.unlink(tmpfile.name)
except EnvironmentError as exc:
MODULE.error("Unable to remove temporary screenshot file %r: %s" % (tmpfile.name, exc))
self.finished(request.id, response, mimetype=mimetype)
def _read_rules_end_at(self):
room_file = _getRoomFile(self._computerroom.roomDN)
rule_end_at = None
if os.path.exists(room_file):
roomInfo = _readRoomInfo(self._computerroom.roomDN)
atjob_id = roomInfo.get("atjobID")
if atjob_id is not None:
job = atjobs.load(atjob_id, extended=True)
if job is not None and job.execTime >= datetime.datetime.now():
rule_end_at = job.execTime
else:
# Fallback in case the roomInfo file was deleted
MODULE.warn("No room file {}".format(self._computerroom.roomDN))
for job in atjobs.list(extended=True):
if job.comments.get(Instance.ATJOB_KEY, False) == self._computerroom.room:
if job.execTime >= datetime.datetime.now():
rule_end_at = job.execTime
break
return rule_end_at
[docs]
@simple_response(with_request=True)
def settings_get(self, request):
"""Return the current settings for a room"""
if not self._computerroom.school or not self._computerroom.room:
raise UMC_Error(_("no room selected"))
ucr.load()
rule = ucr.get("proxy/filter/room/%s/rule" % self._computerroom.room, "none")
if rule == request.username:
rule = "custom"
shareMode = ucr.get("samba/sharemode/room/%s" % self._computerroom.room, "all")
# load custom rule:
key_prefix = "proxy/filter/setting-user/%s/domain/whitelisted/" % request.username
custom_rules = []
for key in ucr:
if key.startswith(key_prefix):
custom_rules.append(ucr[key])
printMode = ucr.get("samba/printmode/room/%s" % self._computerroom.room, "default")
# find end of lesson
period = self._lessons.current
if period is None:
if self._lessons.next: # between two lessons
period = self._lessons.next.end
else: # school is out ... 1 hour should be good (FIXME: configurable?)
period = datetime.datetime.now() + datetime.timedelta(hours=1)
period = period.time()
else:
period = period.end
if rule == "none" and shareMode == "all" and printMode == "default":
self._ruleEndAt = None
else:
self._ruleEndAt = self._read_rules_end_at()
if self._ruleEndAt:
time = self._ruleEndAt.time()
for lesson in self._lessons.lessons:
if time == lesson.begin:
period = lesson
break
return {
"internetRule": rule,
"customRule": "\n".join(custom_rules),
"shareMode": shareMode,
"printMode": printMode,
"period": str(period),
}
[docs]
@check_room_access
@simple_response(with_request=True)
def finish_exam(self, request):
"""Finish the exam in the current room"""
self._settings_set(
request, printMode="default", internetRule="none", shareMode="all", customRule=""
)
_updateRoomInfo(self._computerroom.roomDN, exam=None, examDescription=None, examEndTime=None)
@check_room_access
def _start_exam(self, request, room, exam, examDescription, examEndTime):
"""Start an exam in the current room"""
info = _readRoomInfo(room)
if info.get("exam"):
raise UMC_Error(
_("In this room an exam is currently already written. Please select another room.")
)
_updateRoomInfo(
self._computerroom.roomDN,
exam=exam,
examDescription=examDescription,
examEndTime=examEndTime,
)
[docs]
@sanitize(
room=ComputerRoomDNSanitizer(required=True),
exam=StringSanitizer(required=True),
examDescription=StringSanitizer(required=True),
examEndTime=StringSanitizer(required=True),
)
@simple_response(with_request=True)
def start_exam(self, request, room, exam, examDescription, examEndTime):
"""Start an exam in the current room"""
self._start_exam(request, room, exam, examDescription, examEndTime)
[docs]
@sanitize(
printMode=ChoicesSanitizer(["none", "default"], required=True),
internetRule=StringSanitizer(required=True),
shareMode=ChoicesSanitizer(["home", "all"], required=True),
period=PeriodSanitizer(default="00:00", required=False),
customRule=StringSanitizer(allow_none=True, required=False),
)
@simple_response(with_request=True)
def settings_set(self, request, printMode, internetRule, shareMode, period=None, customRule=None):
return self._settings_set(request, printMode, internetRule, shareMode, period, customRule)
@check_room_access
def _settings_set(self, request, printMode, internetRule, shareMode, period=None, customRule=None):
"""Defines settings for a room"""
if not self._computerroom.school or not self._computerroom.room:
raise UMC_Error(_("no room selected"))
MODULE.debug("set settings for {}".format(self._computerroom.roomDN))
# find AT jobs for the room at remove them
jobs = atjobs.list(extended=True)
for job in jobs:
if job.comments.get(Instance.ATJOB_KEY, False) == self._computerroom.room:
job.rm()
lo, po = getMachineConnection()
hosts = list(
itertools.chain.from_iterable(
[
c.ip_address
for c in ComputerRoom.from_dn(self._computerroom.roomDN, None, lo).get_computers(lo)
if not c.teacher_computer
]
)
)
reset_room_settings(self._computerroom.room, hosts)
_updateRoomInfo(self._computerroom.roomDN, atjobID=None)
roomInfo = _readRoomInfo(self._computerroom.roomDN)
in_exam_mode = roomInfo.get("exam")
# reset to defaults. No atjob is necessary.
if internetRule == "none" and shareMode == "all" and printMode == "default":
self._ruleEndAt = None
self.reset_smb_connections()
self.reload_cups()
return
# collect new settings
vset = {}
vappend = {}
vunset_now = []
# print mode
if printMode == "none":
vappend["samba/printmode/hosts/%s" % printMode] = hosts
vappend["cups/printmode/hosts/%s" % printMode] = hosts
vset["samba/printmode/room/%s" % self._computerroom.room] = printMode
else:
vunset_now.append("samba/printmode/room/%s" % self._computerroom.room)
# share mode
if shareMode == "home":
vset["samba/sharemode/room/%s" % self._computerroom.room] = shareMode
vappend["samba/othershares/hosts/deny"] = hosts
vappend["samba/share/Marktplatz/hosts/deny"] = hosts
else:
vunset_now.append("samba/sharemode/room/%s" % self._computerroom.room)
# internet rule
if internetRule != "none":
vappend["proxy/filter/room/%s/ip" % self._computerroom.room] = hosts
if internetRule == "custom":
# remove old rules
i = 1
while True:
var = "proxy/filter/setting-user/%s/domain/whitelisted/%d" % (request.username, i)
if var in ucr:
vunset_now.append(var)
i += 1
else:
break
vset["proxy/filter/room/%s/rule" % self._computerroom.room] = request.username
vset["proxy/filter/setting-user/%s/filtertype" % request.username] = "whitelist-block"
i = 1
for domain in (customRule or "").split("\n"):
MODULE.info("Setting whitelist entry for domain %s" % domain)
if not domain:
continue
parsed = urlsplit(domain)
MODULE.info("Setting whitelist entry for domain %s" % str(parsed))
if parsed.netloc:
vset[
"proxy/filter/setting-user/%s/domain/whitelisted/%d" % (request.username, i)
] = parsed.netloc
i += 1
elif parsed.path:
vset[
"proxy/filter/setting-user/%s/domain/whitelisted/%d" % (request.username, i)
] = parsed.path
i += 1
else:
vset["proxy/filter/room/%s/rule" % self._computerroom.room] = internetRule
else:
vunset_now.append("proxy/filter/room/%s/ip" % self._computerroom.room)
vunset_now.append("proxy/filter/room/%s/rule" % self._computerroom.room)
# write configuration
# remove old values
handler_unset(vunset_now)
# append values
ucr.load()
MODULE.info("Merging UCR variables")
for key, value in vappend.items():
if ucr.get(key):
old = set(ucr[key].split(" "))
MODULE.info("Old value: %s" % old)
else:
old = set()
MODULE.info("Old value empty")
new = set(value)
MODULE.info("New value: %s" % new)
new = old.union(new)
MODULE.info("Merged value of %s: %s" % (key, new))
if new:
vset[key] = " ".join(new)
# Workaround for bug 30450:
# if samba/printmode/hosts/none is not set but samba/printmode/hosts/all then all other hosts
# are unable to print on samba shares. Solution: set empty value for .../none if no host is on
# deny list.
varname = "samba/printmode/hosts/none"
if varname not in vset:
ucr.load()
if not ucr.get(varname):
vset[varname] = '""'
else:
# remove empty items ('""') in list
vset[varname] = " ".join(x for x in vset[varname].split(" ") if x != '""')
# set values
ucr_vars = sorted("%s=%s" % x for x in vset.items())
MODULE.info("Writing room rules: %s" % "\n".join(ucr_vars))
handler_set(ucr_vars)
# create at job to remove settings
cmd = "/usr/share/ucs-school-umc-computerroom/ucs-school-deactivate-rules --room %s" % (
quote(self._computerroom.roomDN)
)
MODULE.info("command for reinitialization is: %s" % (cmd,))
if not in_exam_mode:
starttime = datetime.datetime.now()
MODULE.info("Now: %s" % starttime)
MODULE.info("Endtime: %s" % period)
starttime = starttime.replace(
hour=period.hour, minute=period.minute, second=0, microsecond=0
)
while starttime < datetime.datetime.now(): # prevent problems due to intra-day limit
starttime += datetime.timedelta(days=1)
# AT job for the normal case
MODULE.info("Remove settings at %s" % (starttime,))
atjob_id = atjobs.add(cmd, starttime, {Instance.ATJOB_KEY: self._computerroom.room}).nr
_updateRoomInfo(self._computerroom.roomDN, atjobID=atjob_id)
self._ruleEndAt = starttime
self.reset_smb_connections()
self.reload_cups()
[docs]
def reload_cups(self):
if not subprocess.call( # nosec
["/bin/systemctl", "is-enabled", "--quiet", "cups.service"],
stdout=open("/dev/null", "w"),
stderr=subprocess.STDOUT,
):
MODULE.info("Reloading cups")
if subprocess.call(["/bin/systemctl", "reload", "cups.service"]) != 0: # nosec
MODULE.error("Failed to reload cups! Printer settings not applied.")
[docs]
def reset_smb_connections(self):
smbstatus = SMB_Status()
veyon_users = [x.lower() for x in self._computerroom.users if x]
MODULE.info("veyon users: %s" % ", ".join(veyon_users))
for process in smbstatus:
MODULE.info("SMB process: %s" % str(process))
if process.username and process.username.lower() in veyon_users:
MODULE.info("Kill SMB process %s" % process.pid)
try:
os.kill(int(process.pid), signal.SIGTERM)
except ProcessLookupError:
# Bug 56904
# It is possible (though rare) that an smb process is stopped
# between the `smbstatus` call and the os.kill, resulting in a ProcessLookupError
MODULE.info(
"Tried to kill SMB process with pid %s, but process does not exist anymore."
% (process.pid)
)
except Exception:
MODULE.error("Unable to kill SMB process %s." % (process.pid))
MODULE.error(traceback.format_exc())
[docs]
@sanitize(server=StringSanitizer(required=True))
@check_room_access
def demo_start(self, request):
"""Starts a presentation mode"""
self._computerroom.startDemo(request.options["server"], True)
self.finished(request.id, True)
[docs]
@check_room_access
def demo_stop(self, request):
"""Stops a presentation mode"""
self._computerroom.stopDemo()
self.finished(request.id, True)
[docs]
@sanitize(
state=ChoicesSanitizer(["poweroff", "poweron", "restart"]),
computer=ComputerSanitizer(required=True),
)
@check_room_access
@simple_response
def computer_state(self, computer, state):
"""Stops, starts or restarts a computer"""
if state == "poweroff":
computer.powerOff()
elif state == "poweron":
computer.powerOn()
elif state == "restart":
computer.restart()
return True
[docs]
@check_room_access
@sanitize(computer=ComputerSanitizer(required=True))
@simple_response
def user_logout(self, computer):
"""Log out the user at the given computer"""
computer.logOut()
return True
[docs]
@simple_response
def plugins_load(self):
plugins = {"buttons": []}
for plugin in self._plugins.values():
plugins["buttons"].append(plugin.button())
return plugins
[docs]
@check_room_access
@sanitize(plugin=StringSanitizer(required=True), computer=StringSanitizer(required=True))
def plugins_execute(self, request):
plugin = self._plugins.get(request.options["plugin"])
if not plugin:
raise UMC_Error(_("Plugin not found."))
result = plugin.execute(request.options["computer"])
self.finished(request.id, result)