Source code for univention.management.console.modules.computerroom.room_management

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Management Console module:
#   Control computers of pupils in a room
#
# Copyright 2012-2025 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.

import copy
import random
import re
import tempfile
import threading
import time
import traceback
import uuid
from typing import TYPE_CHECKING, Any, List, Optional, TypeVar  # noqa: F401

import ldap
from ldap.dn import explode_rdn
from ldap.filter import filter_format

from ucsschool.lib.models.base import MultipleObjectsError
from ucsschool.lib.models.group import ComputerRoom
from ucsschool.lib.models.user import User
from ucsschool.lib.school_umc_ldap_connection import LDAP_Connection
from ucsschool.veyon_client.client import VeyonClient
from ucsschool.veyon_client.models import (
    AuthenticationMethod,
    Dimension,
    Feature,
    ScreenshotFormat,
    VeyonConnectionError,
    VeyonError,
)
from univention.admin.uexceptions import noObject
from univention.lib.i18n import Translation
from univention.management.console.config import ucr
from univention.management.console.log import MODULE
from univention.management.console.modules.computerroom import wakeonlan

if TYPE_CHECKING:
    from univention.admin.uldap import access as LoType  # noqa: F401

LV = TypeVar("LV")

_ = Translation("ucs-school-umc-computerroom").translate

VEYON_USER_REGEX = r"(?P<domain>.*)\\(?P<username>[^\(\\]+)$"

VEYON_KEY_FILE = "/etc/ucsschool-veyon/key.pem"


[docs] class ComputerRoomError(Exception): pass
[docs] class UserInfo(object): def __init__(self, ldap_dn, username, isTeacher=False, hide_screenshot=False): # type: (str, str, Optional[bool], Optional[bool], Optional[bool]) -> None self.dn = ldap_dn self.isTeacher = isTeacher self.username = username self.hide_screenshot = hide_screenshot
[docs] class UserMap(dict): def __init__(self, user_regex): # type: (str) -> None super(UserMap, self).__init__() self._user_regex = re.compile(user_regex) def __getitem__(self, user): # type: (str) -> UserInfo if user not in self: self._read_user(user) return dict.__getitem__(self, user)
[docs] def validate_userstr(self, userstr): # type: (str) -> str match = self._user_regex.match(userstr) if not userstr: raise AttributeError("Received empty user string: {!r}".format(userstr)) if not match: MODULE.warning("Invalid user string format: {!r}".format(userstr)) return userstr else: username = match.groupdict()["username"] if not username: raise AttributeError("username missing: {!r}".format(userstr)) return username
@LDAP_Connection() def _read_user(self, userstr, ldap_user_read=None): # type: (str, Optional[LoType]) -> None username = self.validate_userstr(userstr) lo = ldap_user_read try: userobj = User.get_only_udm_obj(lo, filter_format("uid=%s", (username,))) if userobj is None: raise noObject(username) user = User.from_udm_obj(userobj, None, lo) # type: User except (noObject, MultipleObjectsError): MODULE.warning( 'Unknown user "%s". It is assumed the user is a local account, prepending LOCAL\\.' % username ) dict.__setitem__(self, userstr, UserInfo("", "LOCAL\\{}".format(username))) return blacklisted_groups = { x.strip().lower() for x in ucr.get( "ucsschool/umc/computerroom/hide_screenshots/groups", "Domain Admins" ).split(",") } users_groupmemberships = {explode_rdn(x, True)[0].lower() for x in userobj["groups"]} MODULE.info( "UserMap: %s: hide screenshots for following groups: %s" % (username, blacklisted_groups) ) MODULE.info( "UserMap: %s: user is member of following groups: %s" % (username, users_groupmemberships) ) hide_screenshot = bool(blacklisted_groups & users_groupmemberships) if ucr.is_true("ucsschool/umc/computerroom/hide_screenshots/teachers", True) and user.is_teacher( lo ): MODULE.info("UserMap: %s: is teacher hiding screenshot" % (username,)) hide_screenshot = True MODULE.info("UserMap: %s: hide_screenshot=%r" % (username, hide_screenshot)) dict.__setitem__( self, userstr, UserInfo(user.dn, username, isTeacher=user.is_teacher(lo), hide_screenshot=hide_screenshot), )
[docs] class LockableAttribute(object): def __init__(self, initial_value=None, locking=True): # type: (Optional[LV], Optional[bool]) -> None self._lock = (locking and threading.Lock()) or None # MODULE.info('Locking object: %s' % self._lock) self._old = initial_value self._has_changed = False self._current = copy.deepcopy(initial_value)
[docs] def lock(self): # type: () -> None if self._lock is None: return if not self._lock.acquire(3000): raise ComputerRoomError("Could not lock attribute")
[docs] def unlock(self): if self._lock is None: return self._lock.release()
@property def current(self): # type: () -> LV self.lock() tmp = copy.deepcopy(self._current) self.unlock() return tmp @property def old(self): # type: () -> LV self.lock() tmp = copy.deepcopy(self._old) self.unlock() return tmp @property def isInitialized(self): # type: () -> bool self.lock() ret = self._current is not None self.unlock() return ret @property def hasChanged(self): # type: () -> bool self.lock() diff = self._has_changed self._has_changed = False self._old = copy.deepcopy(self._current) self.unlock() return diff
[docs] def reset(self, inital_value=None): # type: (Optional[LV]) -> None self.lock() self._old = copy.deepcopy(inital_value) self._current = copy.deepcopy(inital_value) self.unlock()
[docs] def set(self, value, force=False): # type: (LV, Optional[bool]) -> None self.lock() if value != self._current or force: if value != self._current: self._has_changed = True self._old = copy.deepcopy(self._current) self._current = copy.deepcopy(value) self.unlock()
[docs] class ComputerRoomManager(dict): SCHOOL = None ROOM = None ROOM_DN = None VEYON_BACKEND = True def __init__(self): dict.__init__(self) self._user_map = UserMap(VEYON_USER_REGEX) self._veyon_client = None # type: Optional[VeyonClient] self.screenshot_dimension = self.get_screenshot_dimension()
[docs] @staticmethod def get_screenshot_dimension(): ucr_value = ucr.get("ucsschool/umc/computerroom/screenshot_dimension") if not ucr_value: return None ucr_dim = re.match(r"(\d*)x(\d*)", ucr_value) if ucr_dim: LOWER_BOUND = 240 UPPER_BOUND = 8000 try: width = int(ucr_dim.group(1)) if not (LOWER_BOUND <= width <= UPPER_BOUND): MODULE.warning( "Set width of screenshot is not within bounds " "{} and {}, falling back to native resolution.".format(LOWER_BOUND, UPPER_BOUND) ) width = None except ValueError: width = None try: height = int(ucr_dim.group(2)) if not (LOWER_BOUND <= height <= UPPER_BOUND): MODULE.warning( "Set height of screenshot is not within bounds " "{} and {}, falling back to native resolution.".format(LOWER_BOUND, UPPER_BOUND) ) height = None except ValueError: height = None return Dimension(width, height) else: MODULE.warning( "UCR variable 'ucsschool/umc/computerroom/screenshot_dimension' has " "been set to a value with an incorrect format: {}".format(ucr_value) ) return None
@property def room(self): return ComputerRoomManager.ROOM @room.setter def room(self, value): self._clear() self._set(value) @property def roomDN(self): return ComputerRoomManager.ROOM_DN @property def school(self): return ComputerRoomManager.SCHOOL @school.setter def school(self, value): self._clear() ComputerRoomManager.SCHOOL = value @property def users(self): """Return a list of valid domain users who are logged into the computers""" valid_users = [] for computer in self.values(): if computer.user.current and computer.connected: user_info = self._user_map[computer.user.current] valid_users.append(user_info.username) return valid_users @property def veyon_backend(self): # type: () -> bool return ComputerRoomManager.VEYON_BACKEND @property def veyon_client(self): # type: () -> VeyonClient if not self._veyon_client: with open(VEYON_KEY_FILE) as fp: key_data = fp.read().strip() self._veyon_client = VeyonClient( "http://localhost:11080/api/v1", credentials={"keyname": "teacher", "keydata": key_data}, auth_method=AuthenticationMethod.AUTH_KEYS, ) return self._veyon_client
[docs] def ipAddresses(self, students_only=True): values = self.values() if students_only: values = [x for x in values if not x.isTeacher] return [x.ipAddress for x in values]
def _clear(self): if ComputerRoomManager.ROOM: for computer in self.values(): computer.stop() computer.close() del computer # FIXME: no-op, needs to be removed from dict self.clear() ComputerRoomManager.ROOM = None ComputerRoomManager.ROOM_DN = None ComputerRoomManager.VEYON_BACKEND = True @LDAP_Connection() def _set(self, room, ldap_user_read=None): lo = ldap_user_read room_dn = room try: # room DN ldap.dn.str2dn(room) except ldap.DECODING_ERROR: # room name room_dn = None # got a name instead of a DN try: if room_dn: computerroom = ComputerRoom.from_dn(room, ComputerRoomManager.SCHOOL, lo) else: computerroom = ComputerRoom.get_only_udm_obj( lo, filter_format("cn=%s-%s", (ComputerRoomManager.SCHOOL, room)) ) if computerroom is None: raise noObject(computerroom) computerroom = ComputerRoom.from_udm_obj(computerroom, ComputerRoomManager.SCHOOL, lo) except noObject: raise ComputerRoomError("Unknown computer room") except MultipleObjectsError as exc: raise ComputerRoomError( "Did not find exactly 1 group for the room (count: %d)" % len(exc.objs) ) ComputerRoomManager.ROOM = computerroom.get_relative_name() ComputerRoomManager.ROOM_DN = computerroom.dn ComputerRoomManager.VEYON_BACKEND = computerroom.veyon_backend computers = list(computerroom.get_computers(lo)) if not computers: raise ComputerRoomError("There are no computers in the selected room.") MODULE.info("Computerroom {!r} will be initialized with Computers.".format(self.ROOM)) self._user_map = UserMap(VEYON_USER_REGEX) for computer in computers: try: comp = VeyonComputer( computer.get_udm_object(lo), self.veyon_client, self._user_map, self.screenshot_dimension, ) comp.start() self.__setitem__(comp.name, comp) except ComputerRoomError as exc: MODULE.warn("Computer could not be added: {}".format(exc)) @property def isDemoActive(self): return any(comp for comp in self.values() if comp.demoServer or comp.demoClient) @property def demoServer(self): for comp in self.values(): if comp.demoServer: return comp @property def demoClients(self): return [comp for comp in self.values() if comp.demoClient]
[docs] def startDemo(self, demo_server, fullscreen=True): if self.isDemoActive: self.stopDemo() server = self.get(demo_server) if server is None: raise AttributeError("unknown system %s" % demo_server) # start demo server clients = [comp for comp in self.values() if comp.name != demo_server and comp.connected] MODULE.info("Demo server is %s" % (demo_server,)) MODULE.info("Demo clients: %s" % ", ".join(x.name for x in clients)) MODULE.info("Demo client users: %s" % ", ".join(str(x.user.current) for x in clients)) try: teachers = [ x.name for x in clients if not x.user.current or self._user_map[str(x.user.current)].isTeacher ] except AttributeError as exc: MODULE.error("Could not determine the list of teachers: %s" % (exc,)) return False MODULE.info("Demo clients (teachers): %s" % ", ".join(teachers)) demo_access_token = str(uuid.uuid4()) server.startDemoServer(token=demo_access_token) for client in clients: client.startDemoClient( server=server, token=demo_access_token, full_screen=False if client.name in teachers else fullscreen, )
[docs] def stopDemo(self): if self.demoServer is not None: self.demoServer.stopDemoServer() # This is necessary since Veyon has a considerable delay with exposing its demo client status. # So we just end the demo client on all computers. clients = self.values() for client in clients: client.stopDemoClient()
[docs] class VeyonComputer(threading.Thread): def __init__(self, computer, veyon_client, user_map, screenshot_dimension): # type: (Any, VeyonClient, UserMap) -> None super(VeyonComputer, self).__init__() self._computer = computer # type: Any self._veyon_client = veyon_client # type: VeyonClient self._user_map = user_map self._ip_addresses = self._computer.info.get("ip", []) # type: List[str] self._reachable_ip = None self._username = LockableAttribute() self._update_successful = LockableAttribute(initial_value=True) self._state = LockableAttribute(initial_value="disconnected") self._teacher = LockableAttribute(initial_value=False) self._screen_lock = LockableAttribute(initial_value=None) self._input_lock = LockableAttribute(initial_value=None) self._demo_server = LockableAttribute(initial_value=None) self._demo_client = LockableAttribute(initial_value=None) self._idle_timeout = 30.0 self._timer = None self.connected = False self._update_interval = None self._update_paused = False self._last_use_of_info = time.monotonic() self.should_run = True self.screenshot_dimension = screenshot_dimension
[docs] def run(self): while self.should_run: if not self.update_paused(): self._check_connection() if self.connected: self.update() time.sleep(self.update_interval + random.uniform(0, 1)) # nosec
def _find_reachable_ip_address(self): if self._reachable_ip: return self._reachable_ip if not ucr.is_true("ucsschool/umc/computerroom/ping-client-ip-addresses", False): self._reachable_ip = self._ip_addresses[0] if self._ip_addresses else "" return self._reachable_ip if self._ip_addresses: try: self._reachable_ip = self._first_available_ip() except VeyonConnectionError as exc: msg = "Could not connect to veyon proxy {!r}".format(exc) if self._update_successful.current: MODULE.warn(msg) return "" MODULE.debug(msg) return "" return self._reachable_ip if self._reachable_ip else self._ip_addresses[0] return "" def _check_connection(self): try: if not self._reachable_ip: self._reachable_ip = self._find_reachable_ip_address() if not self._reachable_ip: MODULE.info("No reachable IP address found for {}".format(self.name)) return try: self.connected = self._veyon_client.ping(self.ipAddress) except VeyonConnectionError: self.connected = False except Exception: self.connected = False MODULE.warning( "Error when checking connection for {}: {}".format(self.name, traceback.format_exc()) ) if not self.connected: MODULE.info("No connection for {} with IP {}".format(self.name, self.ipAddress)) self.reset_state()
[docs] def update_paused(self): if time.monotonic() > self._last_use_of_info + self._idle_timeout: msg = ( "{}: not updating information as VeyonComputer thread " "has not been used for more than {} seconds." ).format(self.name, self._idle_timeout) if self._update_paused: MODULE.debug(msg) return True MODULE.info(msg) self._update_paused = True return True else: msg = "{}: updating information as module is active.".format(self.name) if not self._update_paused: MODULE.debug(msg) return False MODULE.info(msg) self._update_paused = False return False
@property def update_interval(self): if not self._update_interval: try: self._update_interval = int(ucr.get("ucsschool/umc/computerroom/update-interval", 1)) except ValueError: MODULE.warning("ucsschool/umc/computerroom/update-interval is not a valid integer") self._update_interval = 1 if self._update_interval <= 0: MODULE.warning("ucsschool/umc/computerroom/update-interval should be positive") self._update_interval = 1 return self._update_interval @property def name(self): # type: () -> Optional[str] return self._computer.info.get("name", None) @property def user(self): return self._username @property def state(self): return self._state @property def teacher(self): return self._teacher @property def isTeacher(self): try: return self._user_map[str(self._username.current)].isTeacher except AttributeError: return False @property def description(self): # type: () -> Optional[str] return self._computer.info.get("description", None) @property def configuration_ok(self): if not self._ip_addresses: MODULE.warn("Computer {} is missing an IP.".format(self.name)) return False return True @property def ipAddress(self): return self._reachable_ip @property def macAddress(self): return (self._computer.info.get("mac") or [""])[0] @property def objectType(self): return self._computer.module @property def hasChanged(self): self._last_use_of_info = time.monotonic() return any( state.hasChanged for state in ( self.state, self.user, self.teacher, self._screen_lock, self._input_lock, self._demo_client, self._demo_server, ) )
[docs] def screenshot(self, size=None): if not self.connected: MODULE.warn("{} not connected - skipping screenshot".format(self.name)) return None width = getattr(self.screenshot_dimension, "width", None) height = getattr(self.screenshot_dimension, "height", None) size_to_width = {"3": 640, "6": 480, "9": 320} if size in size_to_width: width = min(size_to_width[size], width) if width else size_to_width[size] height = None dimension = Dimension(width, height) image = None try: image = self._veyon_client.get_screenshot( host=self.ipAddress, screenshot_format=ScreenshotFormat.JPEG, dimension=dimension, ) except VeyonError: pass # might just be a non reachable IP. TODO: Catch errors other than 404 if image: tmp_file = tempfile.NamedTemporaryFile(delete=False) tmp_file.write(image) return tmp_file else: MODULE.warn("{}: no screenshot available yet".format(self.name)) return None
@property def hide_screenshot(self): try: return self._user_map[str(self._username.current)].hide_screenshot except AttributeError: return False @property def flagsDict(self): return { "ScreenLock": self._screen_lock.current, "InputLock": self._input_lock.current, "DemoServer": self._demo_server.current, "DemoClient": self._demo_client.current, } @property def dict(self): self._last_use_of_info = time.monotonic() result = { "id": self.name, "name": self.name, "teacher": self.isTeacher, "connection": self.state.current, "description": self.description, "ip": self.ipAddress, "mac": self.macAddress, "objectType": self.objectType, "configurationOK": self.configuration_ok, } result.update(self.flagsDict) if self.user.current: try: result["user"] = self._user_map[self.user.current].username except AttributeError: result["user"] = "LOCAL\\{}".format(self.user.current) else: result["user"] = self.user.current return result @property def screenLock(self): return self._screen_lock.current @property def inputLock(self): return self._input_lock.current @property def demoServer(self): return self._demo_server.current @property def demoClient(self): return self._demo_client.current def _first_available_ip(self): # type: () -> Optional[str] for ip_address in self._ip_addresses: try: reachable = self._veyon_client.ping(host=ip_address) except VeyonError: reachable = False if reachable: return ip_address return None def _fetch_feature_status(self, feature): # type: (Feature) -> Optional[bool] try: return self._veyon_client.get_feature_status(feature, host=self.ipAddress) except VeyonError as exc: # might just be a non reachable IP. TODO: Catch errors other than 404: MODULE.error("Fetching feature status failed: {}".format(exc)) return None
[docs] def update(self): MODULE.info("{}: updating information.".format(self.name)) try: veyon_user = None input_lock = None screen_lock = None demo_server = None demo_client = None try: veyon_user = self._veyon_client.get_user_info(host=self.ipAddress) input_lock = self._fetch_feature_status(Feature.INPUT_DEVICE_LOCK) screen_lock = self._fetch_feature_status(Feature.SCREEN_LOCK) demo_server = self._fetch_feature_status(Feature.DEMO_SERVER) demo_client = any( [ self._fetch_feature_status(Feature.DEMO_CLIENT_FULLSCREEN), self._fetch_feature_status(Feature.DEMO_CLIENT_WINDOWED), ] ) except VeyonError as exc: MODULE.warn("Veyon error on {}: {}".format(self.name, exc)) # InvalidConnection (2)from WebAPI # remove current session as it is invalid # (e.g. webapi has been restarted) if exc.code == 2: self._veyon_client.remove_session(host=self.ipAddress) if veyon_user is None: MODULE.warn("{}: Updating information was not successful.".format(self.name)) self.reset_state() return self.state.set("connected") self.user.set(veyon_user.login) self._input_lock.set(input_lock) self._screen_lock.set(screen_lock) self.teacher.set(self.isTeacher) self._demo_server.set(demo_server) self._demo_client.set(demo_client) self._update_successful.set(True) except VeyonConnectionError as exc: msg = "Error updating information for {}: {!r}".format(self.name, exc) if self._update_successful.current: # Only log if previous attempt was successful self._update_successful.set(False) MODULE.warning(msg) else: MODULE.info(msg) self.reset_state() except Exception: self._update_successful.set(False) MODULE.warning( "Error updating information for {}: {}".format(self.name, traceback.format_exc()) ) self.reset_state()
[docs] def stop(self): self.should_run = False
[docs] def reset_state(self): self.state.set("disconnected") self.user.set(None) self.teacher.set(False) self._input_lock.set(None) self._screen_lock.set(None) self._demo_client.set(None) self._demo_server.set(None)
[docs] def open(self): pass # Nothing to do for the VeyonComputer
[docs] def close(self): try: self._veyon_client.remove_session(self.ipAddress) except VeyonConnectionError as exc: MODULE.warning("Could not remove veyon session for {}: {}".format(self.name, exc))
def _set_feature(self, feature, active=True): # type: (Feature, bool) -> None try: if self.connected: self._veyon_client.set_feature(feature, host=self.ipAddress, active=active) else: MODULE.debug( "Not setting feature {} as computer {} is offline.".format(repr(feature), self.name) ) except VeyonError: MODULE.warning( "{} could not be reached - skipped setting feature{}".format(self.name, feature) ) except VeyonConnectionError: MODULE.warning( "Error connecting with the veyon proxy - skipped setting feature {} for {}".format( feature, self.name ) )
[docs] def lockScreen(self, lock): # type: (bool) -> None self._set_feature(Feature.SCREEN_LOCK, lock)
[docs] def lockInput(self, lock): # type: (bool) -> None self._set_feature(Feature.INPUT_DEVICE_LOCK, lock)
[docs] def startDemoServer(self, token): # type: (str) -> None MODULE.process("Starting demo server on %s with token: %s" % (self.ipAddress, token)) self._veyon_client.set_feature( Feature.DEMO_SERVER, host=self.ipAddress, active=True, arguments={"demoAccessToken": token} )
[docs] def stopDemoServer(self): self._set_feature(Feature.DEMO_SERVER, active=False)
[docs] def startDemoClient(self, server, token, full_screen=False): MODULE.process( "Starting demo client on %s with token %s, server %s and fullscreen: %s" % (self.ipAddress, token, server.ipAddress, full_screen) ) arguments = {"demoAccessToken": token, "demoServerHost": server.ipAddress} feature = Feature.DEMO_CLIENT_FULLSCREEN if full_screen else Feature.DEMO_CLIENT_WINDOWED self._veyon_client.set_feature(feature, host=self.ipAddress, active=True, arguments=arguments)
[docs] def stopDemoClient(self): self._set_feature(Feature.DEMO_CLIENT_FULLSCREEN, active=False) self._set_feature(Feature.DEMO_CLIENT_WINDOWED, active=False)
[docs] def powerOff(self): self._set_feature(Feature.POWER_DOWN)
[docs] def powerOn(self): if self.macAddress: blacklisted_interfaces = [ x for x in ucr.get( "ucsschool/umc/computerroom/wakeonlan/blacklisted/interfaces", "" ).split() if x ] blacklisted_interface_prefixes = [ x for x in ucr.get( "ucsschool/umc/computerroom/wakeonlan/blacklisted/interface_prefixes", "" ).split() if x ] target_broadcast_ips = [ x for x in ucr.get("ucsschool/umc/computerroom/wakeonlan/target_nets", "").split() if x ] target_broadcast_ips = target_broadcast_ips or ["255.255.255.255"] wakeonlan.send_wol_packet( self.macAddress, blacklisted_interfaces=blacklisted_interfaces, blacklisted_interface_prefixes=blacklisted_interface_prefixes, target_broadcast_ips=target_broadcast_ips, ) else: MODULE.error("%s: no MAC address set - skipping powerOn" % (self.ipAddress,))
[docs] def restart(self): self._set_feature(Feature.REBOOT)
[docs] def logOut(self): self._set_feature(Feature.USER_LOGOFF)