Source code for ucsschool.lib.models.base

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# UCS@school python lib: models
#
# Copyright 2014-2025 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.

from copy import deepcopy
from typing import (
    TYPE_CHECKING,
    Any,  # noqa: F401
    Dict,  # noqa: F401
    Iterable,  # noqa: F401
    List,  # noqa: F401
    Optional,  # noqa: F401
    Sequence,  # noqa: F401
    Set,  # noqa: F401
    Tuple,  # noqa: F401
    Type,  # noqa: F401
    TypeVar,
    Union,
)

import lazy_object_proxy
import ldap
from ldap.dn import escape_dn_chars, explode_rdn
from ldap.filter import escape_filter_chars
from six import add_metaclass, iteritems

import univention.admin.modules as udm_modules
import univention.admin.objects as udm_objects
import univention.admin.uldap as udm_uldap
from univention.admin.filter import conjunction, expression
from univention.admin.uexceptions import noObject

from ..pyhooks.pyhooks_loader import PyHooksLoader
from ..roles import all_roles, create_ucsschool_role_string
from ..schoolldap import SchoolSearchBase
from .attributes import CommonName, Roles, SchoolAttribute, ValidationError
from .meta import UCSSchoolHelperMetaClass
from .utils import _, ucr
from .validator import validate

if TYPE_CHECKING:
    import univention.admin.handlers.simpleLdap
    from univention.admin.uldap import access as LoType  # noqa: F401

    UdmObject = univention.admin.handlers.simpleLdap
    SuperOrdinateType = Union[str, UdmObject]

UldapFilter = Union[str, conjunction, expression]
UCSSchoolModel = TypeVar("UCSSchoolModel", bound="UCSSchoolHelperAbstractClass")
UCSSchoolHelperAbstractClassTV = TypeVar(
    "UCSSchoolHelperAbstractClassTV", bound="UCSSchoolHelperAbstractClass"
)


PYHOOKS_PATH = "/var/lib/ucs-school-lib/hooks"
PYHOOKS_BASE_CLASS = "ucsschool.lib.models.hook.Hook"
_pyhook_loader = lazy_object_proxy.Proxy(
    lambda: PyHooksLoader(PYHOOKS_PATH, PYHOOKS_BASE_CLASS)
)  # type: PyHooksLoader


[docs] class NoObject(noObject): def __init__(self, dn=None, type=None, *args): # type: (str, Type[UCSSchoolModel], *Any) -> None self.dn = dn self.type = type if not args: args = ("Could not find object of type {!r} with DN {!r}.".format(self.type, self.dn),) super(NoObject, self).__init__(*args)
[docs] class UnknownModel(NoObject): def __init__(self, dn, cls): # type: (str, Type[UCSSchoolModel]) -> None self.dn = dn self.wrong_model = cls super(UnknownModel, self).__init__("No python class: %r is not a %s" % (dn, cls.__name__))
[docs] class WrongModel(NoObject): def __init__(self, dn, model, wrong_model): # type: (str, Type[UCSSchoolModel], Type[UCSSchoolModel]) -> None self.dn = dn self.model = model self.wrong_model = wrong_model super(WrongModel, self).__init__( "Wrong python class: %r is not a %r but a %r" % (dn, wrong_model.__name__, model.__name__) )
[docs] class WrongObjectType(NoObject): def __init__(self, dn, cls): # type: (str, Type[UCSSchoolModel]) -> None self.dn = dn self.wrong_model = cls super(WrongObjectType, self).__init__("Wrong objectClass: %r is not a %r." % (dn, cls.__name__))
[docs] class MultipleObjectsError(Exception): def __init__(self, objs, *args, **kwargs): # type: (Sequence[UCSSchoolModel], *Any, **Any) -> None super(MultipleObjectsError, self).__init__(*args, **kwargs) self.objs = objs
[docs] @add_metaclass(UCSSchoolHelperMetaClass) class UCSSchoolHelperAbstractClass(object): """ Base class of all UCS@school models. Hides UDM. Attributes used for a class are defined like this:: class MyModel(UCSSchoolHelperAbstractClass): my_attribute = Attribute('Label', required=True, udm_name='myAttr') From there on ``my_attribute=value`` may be passed to :py:meth:``__init__()``, ``my_model.my_attribute`` can be accessed and the value will be saved as ``obj['myAttr']`` in UDM when saving this instance. If an attribute of a base class is not wanted, it can be overridden:: class MyModel(UCSSchoolHelperAbstractClass): school = None Meta information about the class are defined like this:: class MyModel(UCSSchoolHelperAbstractClass): class Meta: udm_module = 'my/model' The meta information is then accessible in ``cls._meta``. Important functions: :py:meth:``__init__(**kwargs)``: kwargs should be the defined attributes :py:meth:``create(lo)`` lo is an LDAP connection, specifically univention.admin.access. creates a new object. Returns False is the object already exists. And True after the creation :py:meth:``modify(lo)`` modifies an existing object. Returns False if the object does not exist and True after the modification (regardless whether something actually changed or not) :py:meth:``remove(lo)`` deletes the object. Returns False if the object does not exist and True after the deletion. :py:meth:``get_all(lo, school, filter_str, easy_filter=False)`` classmethod; retrieves all objects found for this school. filter can be a string that is used to narrow down a search. Each property of the class' udm_module that is include_in_default_search is queried for that string. Example:: User.get_all(lo, 'school', filter_str='name', easy_filter=True) will search in ``cn=users,ou=school,$base`` for users/user UDM objects with ``|(username=*name*)(firstname=*name*)(...)`` and return User objects (not UDM objects) With ``easy_filter=False`` (default) it will use this very ``filter_str`` :py:meth:``get_container(school)`` a classmethod that points to the container where new instances are created and existing ones are searched. :py:meth:``dn`` property, current distinguishable name of the instance. Calculated on the fly, it changes if instance.name or instance.school changes. ``instance.old_dn`` will be set to the original dn when the instance was created :py:meth:``get_udm_object(lo)`` searches UDM for an entry that corresponds to ``self``. Normally uses the old_dn or dn. If ``cls._meta.name_is_unique`` then any object with ``self.name`` will match :py:meth:``exists(lo)`` whether this object can be found in UDM. :py:meth:``from_udm_obj(udm_obj, school, lo)`` classmethod; maps the info of ``udm_obj`` into a new instance (and sets ``school``) :py:meth:``from_dn(dn, school, lo)`` finds dn in LDAP and uses ``from_udm_obj`` :py:meth:``get_first_udm_obj(lo, filter_str)`` returns the first found object of type ``cls._meta.udm_module`` that matches an arbitrary ``filter_str`` More features: Validation: There are some auto checks built in: Attributes of the model that have a UDM syntax attached are validated against this syntax. Attributes that are required must be present. Attributes that are unlikely_to_change give a warning (not error) if the object already exists with other values. If the Meta information states that name_is_unique, the complete LDAP is searched for the instance's name before continuing. :py:meth:``validate()`` can be further customized. ``%(module)s`` is ``'windows'`` for ``cls._meta.udm_module == 'computers/windows'`` by default and can be explicitely set with:: class Meta: hook_path = 'computer' """ _cache = {} # type: Dict[Tuple[str, Tuple[str, str]], UCSSchoolModel] _machine_connection = None # type: LoType _search_base_cache = {} # type: Dict[str, SchoolSearchBase] _initialized_udm_modules = [] # type: List[str] hook_sep_char = "\t" name = CommonName(_("Name"), aka=["Name"]) # type: str school = SchoolAttribute(_("School"), aka=["School"]) # type: str
[docs] @classmethod def cache(cls, *args, **kwargs): # type: (*Any, **Any) -> UCSSchoolModel """ Initializes a new instance and caches it for subsequent calls. Useful when using School.cache(school_name) a lot in different functions, in loops, etc. """ # TODO: rewrite function to have optional positional 'name' and 'school' arguments args = list(args) if args: kwargs["name"] = args.pop(0) if args: kwargs["school"] = args.pop(0) key = [cls.__name__] + [ (k, kwargs[k]) for k in sorted(kwargs) ] # TODO: rewrite: sorted(kwargs.items()) key = tuple(key) if key not in cls._cache: obj = cls(**kwargs) cls._cache[key] = obj return cls._cache[key]
[docs] @classmethod def invalidate_all_caches(cls): # type: () -> None from ucsschool.lib.models.network import Network from ucsschool.lib.models.user import User from ucsschool.lib.models.utils import _pw_length_cache cls._cache.clear() # cls._search_base_cache.clear() # useless to clear _pw_length_cache.clear() Network._netmask_cache.clear() User._profile_path_cache.clear() User._samba_home_path_cache.clear()
[docs] @classmethod def invalidate_cache(cls): # type: () -> None keys_to_remove = [key for key in cls._cache.keys() if key[0] == cls.__name__] for key in keys_to_remove: del cls._cache[key]
[docs] @classmethod def supports_school(cls): # type: () -> bool return "school" in cls._attributes
[docs] @classmethod def supports_schools(cls): # type: () -> bool return "schools" in cls._attributes
def __init__(self, name=None, school=None, **kwargs): # type: (Optional[str], Optional[str], **Any) -> None """ Initializes a new instance with kwargs. Not every kwarg is accepted, though: The name must be defined as a attribute at class level (or by a base class). All attributes are initialized at least with None Sets self.old_dn to self.dn, i.e. the name in __init__ will determine the old_dn, changing it after __init__ will result in trying to move the object! """ self._udm_obj_searched = False self._udm_obj = None kwargs["name"] = name kwargs["school"] = school for key, attr in self._attributes.items(): default = attr.value_default if callable(default): default = default() setattr(self, key, kwargs.get(key, default)) self.__position = None self.old_dn = None self.old_dn = self.dn # type: str self.errors = {} # type: Dict[str, List[str]] self.warnings = {} # type: Dict[str, List[str]] self._in_hook = False # if a hook is currently running
[docs] @classmethod def get_machine_connection(cls): # type: () -> LoType """get a cached ldap connection to the Primary Directory Node using this host's credentials""" if not cls._machine_connection: cls._machine_connection = udm_uldap.getMachineConnection()[0] return cls._machine_connection
@property def position(self): # type: () -> Optional[str] if self.__position is None: return self.get_own_container() return self.__position @position.setter def position(self, position): # type: (str) -> None if self.position != position: # allow dynamic school changes until creation self.__position = position @property def dn(self): # type: () -> str """ Generates a DN where the lib would assume this instance to be. Changing name or school of self will most likely change the outcome of self.dn as well """ if self.name and self.position: name = self._meta.ldap_map_function(self.name) return "%s=%s,%s" % (self._meta.ldap_name_part, escape_dn_chars(name), self.position) return self.old_dn
[docs] def set_attributes(self, **kwargs): # type: (**Any) -> None """ A function to set the attributes of a UCS@school object in one function call. Only attributes that exist in self._attributes are set. The rest of the kwargs are simply ignored. :param kwargs: The attributes to set. """ existing_attributes = self._attributes.keys() for key, value in kwargs.items(): if key in existing_attributes: setattr(self, key, value) else: self.logger.debug( "Setting attribute '{!r}' on {!r} does not work, since it doesn't exist.".format( key, self ) )
[docs] def set_dn(self, dn): # type: (str) -> None """ Does not really set dn, as this is generated on-the-fly. Instead, sets old_dn in case it was missed in the beginning or after create/modify/remove/move Also resets cached udm_obj as it may point to somewhere else """ self._udm_obj_searched = False self.position = ldap.dn.dn2str(ldap.dn.str2dn(dn)[1:]) self.old_dn = dn
[docs] def validate( self, lo, validate_unlikely_changes=False, check_name=True ): # type: (LoType, Optional[bool], Optional[bool]) -> None from ucsschool.lib.models.school import School self.errors.clear() self.warnings.clear() for name, attr in iteritems(self._attributes): value = getattr(self, name) try: attr.validate(value) except ValueError as e: self.add_error(name, str(e)) if self._meta.name_is_unique and not self._meta.allow_school_change and check_name: if self.exists_outside_school(lo): self.add_error( "name", _( "The name is already used somewhere outside the school. It may not be taken " "twice and has to be changed." ), ) if self.supports_school() and self.school: if not School.cache(self.school).exists(lo): self.add_error( "school", _('The school "%s" does not exist. Please choose an existing one or create it.') % self.school, ) self.validate_roles(lo) if validate_unlikely_changes: if self.exists(lo): udm_obj = self.get_udm_object(lo) try: original_self = self.from_udm_obj(udm_obj, self.school, lo) except (UnknownModel, WrongModel): pass else: for name, attr in iteritems(self._attributes): if attr.unlikely_to_change: new_value = getattr(self, name) old_value = getattr(original_self, name) if new_value and old_value: if new_value != old_value: self.add_warning( name, _("The value changed from %(old)s. This seems unlikely.") % {"old": old_value}, )
[docs] def validate_roles(self, lo): # type: (LoType) -> None pass
[docs] def add_warning(self, attribute, warning_message): # type: (str, str) -> None warnings = self.warnings.setdefault(attribute, []) if warning_message not in warnings: warnings.append(warning_message)
[docs] def add_error(self, attribute, error_message): # type: (str, str) -> None errors = self.errors.setdefault(attribute, []) if error_message not in errors: errors.append(error_message)
[docs] def exists(self, lo): # type: (LoType) -> bool return self.get_udm_object(lo) is not None
[docs] def exists_outside_school(self, lo): # type: (LoType) -> bool if not self.supports_school(): return False from ucsschool.lib.models.school import School udm_obj = self.get_udm_object(lo) if udm_obj is None: return False return not udm_obj.dn.lower().endswith(School.cache(self.school).dn.lower())
def _call_pyhooks(self, hook_time, func_name, lo): # type: (str, str, LoType) -> None """ Run Python based hooks (`*.py` files in `/usr/share/u-s-i/pyhooks` containing a subclass of :py:class:`ucsschool.lib.models.hook.Hook`). :param str hook_time: `pre` or `post` :param str func_name: `create`, `modify`, `move` or `remove` :param univention.admin.uldap.access lo: LDAP connection object :return: None :rtype: None """ state = self._in_hook self._in_hook = True all_hooks = _pyhook_loader.get_hook_objects(lo) meth_name = "{}_{}".format(hook_time, func_name) try: for func in all_hooks.get(meth_name, []): if issubclass(self.__class__, func.__self__.__class__.model): self.logger.debug( "Running %s hook %s.%s for %s...", meth_name, func.__self__.__class__.__name__, func.__func__.__name__, self, ) func(self) finally: self._in_hook = state
[docs] def call_hooks(self, hook_time, func_name, lo): # type: (str, str, LoType) -> None """ Calls Python based hooks (*.py files in /usr/share/u-s-i/pyhooks). In the case of `post`, this method is only called, if the corresponding function (`create()`, `modify()`, `move()` or `remove()`) returned `True`. :param str hook_time: `pre` or `post` :param str func_name: `create`, `modify`, `move` or `remove` :param univention.admin.uldap.access lo: LDAP connection object """ lo_name = lo.__class__.__name__ if lo_name == "access": lo_name = "lo" self.logger.debug( "Starting %s.call_hooks(%r, %r, %s(%r)) for %r.", self.__class__.__name__, hook_time, func_name, lo_name, lo.binddn, self, ) self._call_pyhooks(hook_time, func_name, lo)
[docs] @classmethod def hook_init(cls, hook): # type: (PYHOOKS_BASE_CLASS) -> None """ Overwrite this method to add individual initialization code to all hooks of a ucsschool.lib.model class. (See `.SchoolClass.hook_init()` for an example.) :param hook: instance of a subclass of :py:class:`ucsschool.lib.model.hook.Hook` :return: None :rtype: None """ pass
def _alter_udm_obj(self, udm_obj): # type: (UdmObject) -> None for name, attr in iteritems(self._attributes): if attr.udm_name: value = getattr(self, name) if (value is not None or attr.map_if_none) and attr.map_to_udm: udm_obj[attr.udm_name] = value # TODO: move g[s]et_default_options() from User here to update udm_obj.options
[docs] def create(self, lo, validate=True): # type: (LoType, Optional[bool]) -> bool """ Creates a new UDM instance. Calls pre-hooks. If the object already exists, returns False. If the object does not yet exist, creates it, returns True and calls post-hooks. """ if self._in_hook: # prevent recursion self.logger.warning( "Running create() from within a hook, skipping hook execution. Please use " "create_without_hooks() from within hooks." ) else: self.call_hooks("pre", "create", lo) success = self.create_without_hooks(lo, validate) if success and not self._in_hook: self.call_hooks("post", "create", lo) return success
[docs] def create_without_hooks(self, lo, validate): # type: (LoType, bool) -> bool if self.exists(lo): return False self.logger.info("Creating %r", self) self.create_without_hooks_roles(lo) if validate: self.validate(lo) if self.errors: raise ValidationError(self.errors.copy()) pos = udm_uldap.position(ucr.get("ldap/base")) container = self.position if not container: self.logger.error("%r cannot determine a container. Unable to create!", self) return False try: pos.setDn(container) udm_obj = udm_modules.get(self._meta.udm_module).object( None, lo, pos, superordinate=self.get_superordinate(lo) ) udm_obj.open() # here is the real logic self.do_create(udm_obj, lo) # get it fresh from the database (needed for udm_obj._exists ...) self.set_dn(self.dn) self.logger.info("%r successfully created", self) return True finally: self.invalidate_cache()
[docs] def create_without_hooks_roles(self, lo): # type: (LoType) -> bool """ Run by py:meth:`create_without_hooks()` before py:meth:`validate()` (and thus before py:meth:`do_create()`). """ pass
[docs] def do_create(self, udm_obj, lo): # type: (UdmObject, LoType) -> None """ Actual udm_obj manipulation. Override this if you want to further change values of udm_obj, e.g. def do_create(self, udm_obj, lo): udm_obj['used_in_ucs_school'] = '1' super(MyModel, self).do_create(udm_obj, lo) """ self._alter_udm_obj(udm_obj) udm_obj.create()
[docs] def modify(self, lo, validate=True, move_if_necessary=None): # type: (LoType, Optional[bool], Optional[bool]) -> bool """ Modifies an existing UDM instance. Calls pre-hooks. If the object does not exist, returns False. If the object exists, modifies it, returns True and calls post-hooks. """ self.call_hooks("pre", "modify", lo) success = self.modify_without_hooks(lo, validate, move_if_necessary) if success: self.call_hooks("post", "modify", lo) return success
[docs] def modify_without_hooks(self, lo, validate=True, move_if_necessary=None): # type: (LoType, Optional[bool], Optional[bool]) -> bool self.logger.info("Modifying %r", self) if move_if_necessary is None: move_if_necessary = self._meta.allow_school_change self.update_ucsschool_roles(lo) if validate: self.validate(lo, validate_unlikely_changes=True) if self.errors: raise ValidationError(self.errors.copy()) udm_obj = self.get_udm_object(lo) if not udm_obj: self.logger.info("%s does not exist!", self.old_dn) return False try: old_attrs = deepcopy(udm_obj.info) self.modify_without_hooks_roles(udm_obj) self.do_modify(udm_obj, lo) # get it fresh from the database self.set_dn(self.dn) udm_obj = self.get_udm_object(lo) same = old_attrs == udm_obj.info if move_if_necessary: if udm_obj.dn != self.dn: if self.move_without_hooks(lo, udm_obj, force=True): same = False if same: self.logger.info("%r not modified. Nothing changed", self) else: self.logger.info("%r successfully modified", self) # return not same return True finally: self.invalidate_cache()
[docs] def modify_without_hooks_roles(self, udm_obj): # type: (UdmObject) -> bool """Run by py:meth:`modify_without_hooks()` before py:meth:`do_modify()`.""" pass
[docs] def update_ucsschool_roles(self, lo): # type: (LoType) -> None """Run by py:meth:`modify_without_hooks()` before py:meth:`validate()`.""" pass
[docs] def do_modify(self, udm_obj, lo): # type: (UdmObject, LoType) -> None """ Actual udm_obj manipulation. Override this if you want to further change values of udm_obj, e.g. def do_modify(self, udm_obj, lo): udm_obj['used_in_ucs_school'] = '1' super(MyModel, self).do_modify(udm_obj, lo) """ self._alter_udm_obj(udm_obj) udm_obj.modify(ignore_license=1)
[docs] def move(self, lo, udm_obj=None, force=False): # type: (LoType, Optional[UdmObject], Optional[bool]) -> bool if self._in_hook: # prevent recursion self.logger.warning( "Running move() from within a hook, skipping hook execution. Please use " "move_without_hooks() from within hooks." ) else: self.call_hooks("pre", "move", lo) success = self.move_without_hooks(lo, udm_obj, force) if success and not self._in_hook: self.call_hooks("post", "move", lo) return success
[docs] def move_without_hooks(self, lo, udm_obj, force=False): # type: (LoType, Optional[UdmObject], Optional[bool]) -> bool if udm_obj is None: udm_obj = self.get_udm_object(lo) if udm_obj is None: self.logger.warning("No UDM object found to move from (%r)", self) return False if self.supports_school() and self.get_school_obj(lo) is None: self.logger.warning("%r wants to move itself to a not existing school", self) return False self.logger.info("Moving %r to %r", udm_obj.dn, self) if udm_obj.dn == self.dn: self.logger.warning("%r wants to move to its own DN!", self) return False if force or self._meta.allow_school_change: try: self.do_move(udm_obj, lo) finally: self.invalidate_cache() self.set_dn(self.dn) else: self.logger.warning("Would like to move %s to %r. But it is not allowed!", udm_obj.dn, self) return False return True
[docs] def do_move(self, udm_obj, lo): # type: (UdmObject, LoType) -> None old_school, new_school = self.get_school_from_dn(self.old_dn), self.get_school_from_dn(self.dn) udm_obj.move(self.dn, ignore_license=1) if self.supports_school() and old_school and old_school != new_school: self.do_school_change(udm_obj, lo, old_school) self.do_move_roles(udm_obj, lo, old_school, new_school)
[docs] def do_move_roles(self, udm_obj, lo, old_school, new_school): # type: (UdmObject, LoType, str, str) -> None self.update_ucsschool_roles(lo)
[docs] def change_school(self, school, lo): # type: (str, LoType) -> bool if self.school in self.schools: self.schools.remove(self.school) if school not in self.schools: self.schools.append(school) self.school = school self.position = self.get_own_container() return self.move(lo, force=True)
[docs] def do_school_change(self, udm_obj, lo, old_school): # type: (UdmObject, LoType, str) -> None self.logger.info("Going to move %r from school %r to %r", self.old_dn, old_school, self.school)
[docs] def remove(self, lo): # type: (LoType) -> bool """ Removes an existing UDM instance. Calls pre-hooks. If the object does not exist, returns False. If the object exists, removes it, returns True and calls post-hooks. """ if self._in_hook: # prevent recursion self.logger.warning( "Running remove() from within a hook, skipping hook execution. Please use " "remove_without_hooks() from within hooks." ) else: self.call_hooks("pre", "remove", lo) success = self.remove_without_hooks(lo) if success and not self._in_hook: self.call_hooks("post", "remove", lo) return success
[docs] def remove_without_hooks(self, lo): # type: (LoType) -> bool self.logger.info("Deleting %r", self) udm_obj = self.get_udm_object(lo) if udm_obj: try: udm_obj.remove(remove_childs=True) udm_objects.performCleanup(udm_obj) self.set_dn(None) self.logger.info("%r successfully removed", self) return True finally: self.invalidate_cache() self.logger.info("%r does not exist!", self) return False
[docs] @classmethod def get_name_from_dn(cls, dn): # type: (str) -> str if dn: try: name = explode_rdn(dn, True)[0] except ldap.DECODING_ERROR: name = "" name = name.encode("utf-8") return cls._meta.ldap_unmap_function([name])
[docs] @classmethod def get_school_from_dn(cls, dn): # type: (str) -> str return SchoolSearchBase.getOU(dn)
[docs] @classmethod def find_field_label_from_name(cls, field): # type: (str) -> str for name, attr in cls._attributes.items(): if name == field: return attr.label
[docs] def get_error_msg(self): # type: () -> str return self.create_validation_msg(iteritems(self.errors))
[docs] def get_warning_msg(self): # type: () -> str return self.create_validation_msg(iteritems(self.warnings))
[docs] def create_validation_msg(self, items): # type: (Iterable[Tuple[str, List[str]]]) -> str validation_msg = "" for key, msg in items: label = self.find_field_label_from_name(key) msg_str = "" for error in msg: msg_str += error if not error.endswith(("!", ".")): msg_str += "." msg_str += " " validation_msg += "%s: %s" % (label, msg_str) return validation_msg[:-1]
[docs] def get_udm_object(self, lo): # type: (LoType) -> UdmObject """ Returns the UDM object that corresponds to self. If self._meta.name_is_unique it searches for any UDM object with self.name. If not (which is the default) it searches for self.old_dn or self.dn Returns None if no object was found. Caches the result, even None If you want to re-search, you need to explicitely set self._udm_obj_searched = False """ self.init_udm_module(lo) if self._udm_obj_searched is False or (self._udm_obj and self._udm_obj.lo.binddn != lo.binddn): dn = self.old_dn or self.dn superordinate = self.get_superordinate(lo) if dn is None: self.logger.error("Getting %s UDM object: No DN!", self.__class__.__name__) return None if self._meta.name_is_unique: if self.name is None: self.logger.error('Getting %s UDM object: Empty "name"!', self.__class__.__name__) return None udm_name = self._attributes["name"].udm_name name = self.get_name_from_dn(dn) filter_str = "%s=%s" % (udm_name, escape_filter_chars(name)) self._udm_obj = self.get_first_udm_obj(lo, filter_str, superordinate) else: self.logger.debug("Getting %s UDM object by dn: %s", self.__class__.__name__, dn) try: self._udm_obj = udm_modules.lookup( self._meta.udm_module, None, lo, scope="base", base=dn, superordinate=superordinate, )[0] except (noObject, IndexError): self._udm_obj = None else: self._udm_obj.open() validate(self._udm_obj, self.logger) self._udm_obj_searched = True return self._udm_obj
[docs] def get_school_obj(self, lo): # type: (LoType) -> "School" from ucsschool.lib.models.school import School if not self.supports_school(): return None school = School.cache(self.school) try: return School.from_dn(school.dn, None, lo) except noObject: self.logger.warning("%r does not exist!", school) return None
[docs] def get_superordinate(self, lo): # type: (LoType) -> UdmObject return None
[docs] def get_own_container(self): # type: () -> Optional[str] if self.supports_school() and not self.school: return None return self.get_container(self.school)
[docs] @classmethod def get_container(cls, school): # type: (str) -> str """raises NotImplementedError by default. Needs to be overridden!""" raise NotImplementedError("%s.get_container()" % (cls.__name__,))
[docs] @classmethod def get_search_base(cls, school_name): # type: (str) -> SchoolSearchBase from ucsschool.lib.models.school import School if school_name not in cls._search_base_cache: school = School(name=school_name) cls._search_base_cache[school_name] = SchoolSearchBase([school.name], dn=school.dn) return cls._search_base_cache[school_name]
[docs] @classmethod def init_udm_module(cls, lo): # type: (LoType) -> None if cls._meta.udm_module in cls._initialized_udm_modules: return pos = udm_uldap.position(lo.base) udm_modules.init(lo, pos, udm_modules.get(cls._meta.udm_module)) cls._initialized_udm_modules.append(cls._meta.udm_module)
[docs] @classmethod def get_all( cls, lo, school, filter_str=None, easy_filter=False, superordinate=None, school_prefix=False ): # type: (LoType, str, Optional[str], Optional[bool], Optional[SuperOrdinateType], Optional[bool]) -> List[UCSSchoolModel] # noqa: E501 """ Returns a list of all objects that can be found in cls.get_container() with the correct udm_module If filter_str is given, all udm properties with include_in_default_search are queried for that string (so that it should be the value) """ cls.init_udm_module(lo) complete_filter = cls._meta.udm_filter if school_prefix: filter_str = school + "-" + filter_str if complete_filter and not complete_filter.startswith("("): complete_filter = "({})".format(complete_filter) if easy_filter: filter_from_filter_str = cls.build_easy_filter(filter_str) else: filter_from_filter_str = filter_str if filter_from_filter_str and not filter_from_filter_str.startswith("("): filter_from_filter_str = "({})".format(filter_from_filter_str) if filter_from_filter_str: if complete_filter: complete_filter = conjunction("&", [complete_filter, filter_from_filter_str]) else: complete_filter = filter_from_filter_str complete_filter = str(complete_filter) cls.logger.debug("Getting all %s of %s with filter %r", cls.__name__, school, complete_filter) ret = [] for udm_obj in cls.lookup(lo, school, complete_filter, superordinate=superordinate): try: ret.append(cls.from_udm_obj(udm_obj, school, lo)) except NoObject: continue return ret
[docs] @classmethod def lookup(cls, lo, school, filter_s="", superordinate=None): # type: (LoType, str, Optional[UldapFilter], Optional[SuperOrdinateType]) -> List[UdmObject] try: return udm_modules.lookup( cls._meta.udm_module, None, lo, filter=filter_s, base=cls.get_container(school), scope="sub", superordinate=superordinate, ) except noObject as exc: cls.logger.warning( "Error while getting all %s of %s (probably %r does not exist): %s", cls.__name__, school, cls.get_container(school), exc, ) return []
@classmethod def _attrs_for_easy_filter(cls): # type: () -> List[str] ret = [] module = udm_modules.get(cls._meta.udm_module) for key, prop in iteritems(module.property_descriptions): if prop.include_in_default_search: ret.append(key) return ret
[docs] @classmethod def build_easy_filter(cls, filter_str): # type: (str) -> Optional[conjunction] def escape_filter_chars_exc_asterisk(value): # type: (str) -> str value = ldap.filter.escape_filter_chars(value) value = value.replace(r"\2a", "*") return value if filter_str: filter_str = escape_filter_chars_exc_asterisk(filter_str) expressions = [] for key in cls._attrs_for_easy_filter(): expressions.append(expression(key, filter_str)) if expressions: return conjunction("|", expressions)
[docs] @classmethod def from_udm_obj(cls, udm_obj, school, lo): # type: (UdmObject, str, LoType) -> UCSSchoolModel """ Creates a new instance with attributes of the udm_obj. Uses get_class_for_udm_obj() """ # Design fault. school is part of the DN or the ucsschoolSchool attribute. cls.init_udm_module(lo) klass = cls.get_class_for_udm_obj(udm_obj, school) if klass is None: cls.logger.warning( "UDM object %r does not correspond to a Python class in the UCS school lib.", udm_obj.dn ) raise UnknownModel(udm_obj.dn, cls) if klass is not cls: cls.logger.debug( "UDM object %s is not %s, but actually %s", udm_obj.dn, cls.__name__, klass.__name__ ) if not issubclass(klass, cls): # security! # ExamStudent must not be converted into Teacher/Student/etc., # SchoolClass must not be converted into ComputerRoom # while Group must be converted into ComputerRoom, etc. and User must be converted into # Student, etc. raise WrongModel(udm_obj.dn, klass, cls) return klass.from_udm_obj(udm_obj, school, lo) udm_obj.open() validate(udm_obj, cls.logger) attrs = { "school": cls.get_school_from_dn(udm_obj.dn) or school } # TODO: is this adjustment okay? if cls.supports_schools(): attrs["schools"] = udm_obj["school"] for name, attr in iteritems(cls._attributes): if attr.udm_name: udm_value = udm_obj[attr.udm_name] if udm_value == "": udm_value = None attrs[name] = udm_value obj = cls(**deepcopy(attrs)) obj.set_dn(udm_obj.dn) obj._udm_obj_searched = True obj._udm_obj = udm_obj return obj
[docs] @classmethod def get_class_for_udm_obj(cls, udm_obj, school): # type: (UdmObject, str) -> Type[UCSSchoolModel] """ Returns cls by default. Can be overridden for base classes: class User(UCSSchoolHelperAbstractClass): @classmethod def get_class_for_udm_obj(cls, udm_obj, school) if something: return SpecialUser return cls class SpecialUser(User): pass Now, User.get_all() will return a list of User and SpecialUser objects If this function returns None for a udm_obj, that obj will not yield a new instance in get_all() and from_udm_obj() will return None for that udm_obj """ return cls
def __repr__(self): # type: () -> str def srepr(x): return repr(x).lstrip("u") dn = self.dn dn = "%s, old_dn=%s" % (srepr(dn), srepr(self.old_dn)) if dn != self.old_dn else srepr(dn) if self.supports_school(): return "%s(name=%s, school=%s, dn=%s)" % ( self.__class__.__name__, srepr(self.name), srepr(self.school), dn, ) else: return "%s(name=%s, dn=%s)" % (self.__class__.__name__, srepr(self.name), srepr(dn)) def __lt__(self, other): # type: (UCSSchoolModel) -> bool return self.name < other.name
[docs] @classmethod def from_dn(cls, dn, school, lo, superordinate=None): # type: (str, str, LoType, Optional[SuperOrdinateType]) -> UCSSchoolModel """ Returns a new instance based on the UDM object found at dn raises noObject if the udm_module does not match the dn or dn is not found """ cls.init_udm_module(lo) if school is None and cls.supports_school(): school = cls.get_school_from_dn(dn) if school is None: cls.logger.warning("Unable to guess school from %r", dn) try: cls.logger.debug("Looking up %s with dn %r", cls.__name__, dn) udm_obj = udm_modules.lookup( cls._meta.udm_module, None, lo, filter=cls._meta.udm_filter, base=dn, scope="base", superordinate=superordinate, )[0] except IndexError: # happens when cls._meta.udm_module does not "match" the dn raise WrongObjectType(dn, cls) return cls.from_udm_obj(udm_obj, school, lo)
[docs] @classmethod def get_only_udm_obj(cls, lo, filter_str, superordinate=None, base=None): # type: (LoType, str, Optional[str], Optional[SuperOrdinateType]) -> UdmObject """ Returns the one UDM object of class cls._meta.udm_module that matches a given filter. If more than one is found, a MultipleObjectsError is raised If none is found, None is returned """ cls.init_udm_module(lo) if cls._meta.udm_filter: filter_str = "(&(%s)(%s))" % (cls._meta.udm_filter, filter_str) cls.logger.debug("Getting %s UDM object by filter: %s", cls.__name__, filter_str) objs = udm_modules.lookup( cls._meta.udm_module, None, lo, scope="sub", base=base or ucr.get("ldap/base"), filter=str(filter_str), superordinate=superordinate, ) if len(objs) == 0: return None if len(objs) > 1: raise MultipleObjectsError(objs=objs) obj = objs[0] obj.open() validate(obj, cls.logger) return obj
[docs] @classmethod def get_first_udm_obj(cls, lo, filter_str, superordinate=None): # type: (LoType, str, Optional[Union[str, UdmObject]]) -> UdmObject """ Returns the first UDM object of class cls._meta.udm_module that matches a given filter """ try: return cls.get_only_udm_obj(lo, filter_str, superordinate) except MultipleObjectsError as exc: obj = exc.objs[0] obj.open() validate(obj, cls.logger) return obj
[docs] @classmethod def find_udm_superordinate(cls, dn, lo): # type: (str, LoType) -> Optional[SuperOrdinateType] module = udm_modules.get(cls._meta.udm_module) return udm_objects.get_superordinate(module, None, lo, dn)
[docs] def to_dict(self): # type: () -> Dict[str, Any] """ Returns a dictionary somewhat representing this instance. This dictionary is usually used when sending the instance to a browser as JSON. By default the attributes are present as well as the dn and the udm_module. """ ret = {"$dn$": self.dn, "objectType": self._meta.udm_module} for name, attr in iteritems(self._attributes): if not attr.internal: ret[name] = getattr(self, name) return ret
def __deepcopy__(self, memo): # type: (Dict[int]) -> UCSSchoolModel id_self = id(self) if not memo.get(id_self): memo[id_self] = self.__class__(**self.to_dict()) return memo[id_self] def _map_func_name_to_code(self, func_name): # type: (str) -> str if func_name == "create": # noqa: SIM116 return "A" elif func_name == "modify": # noqa: SIM116 return "M" elif func_name == "remove": return "D" elif func_name == "move": return "MV"
[docs] class RoleSupportMixin(object): """ Attribute and methods to handle the `ucsschool_roles` / `ucsschoolRoles` attribute. """ ucsschool_roles = Roles(_("Roles"), aka=["Roles"]) # type: List[str] default_roles = [] # type: List[str] _school_in_name = False _school_in_name_prefix = False
[docs] def get_schools(self): # type: () -> Set[str] return set(getattr(self, "schools", []) + [self.school])
[docs] def get_schools_from_udm_obj(self, udm_obj): # type: (UdmObject) -> List[str] if self._school_in_name: return [udm_obj.info["name"]] elif self._school_in_name_prefix: try: return [udm_obj.info["name"].split("-", 1)[0]] except KeyError: return [] else: try: return udm_obj.info["school"] except KeyError as exc: self.logger.exception( "KeyError in RoleSupportMixin.get_schools_from_udm_obj(%r): %s", udm_obj, exc ) raise
@property def roles_as_dicts(self): # type: () -> List[Dict[str, str]] """Get :py:attr:`self.ucsschool_roles` as a dict.""" res = [] for role in self.ucsschool_roles: m = Roles.syntax.regex.match(role) if m: res.append(m.groupdict()) return res @roles_as_dicts.setter def roles_as_dicts(self, roles): # type: (Iterable[Dict[str, str]]) -> None """ Take dict from :py:attr:`roles_as_dicts` and write to :py:attr:`self.ucsschool_roles`. """ self.ucsschool_roles = ["{role}:{context_type}:{context}".format(**role) for role in roles]
[docs] def do_move_roles(self, udm_obj, lo, old_school, new_school): # type: (UdmObject, LoType, str, str) -> None old_roles = list(self.ucsschool_roles) # remove all roles of old school school_roles = [ role for role in self.roles_as_dicts if role["context"] != old_school and role["context_type"] == "school" ] # do not apply faulty roles with context_type = school school_roles = [role for role in school_roles if role in all_roles] non_school_roles = [ role for role in self.roles_as_dicts if role["context"] != old_school and role["context_type"] != "school" ] if all(role["context"] != new_school for role in school_roles): # add only role(s) of current Python class in new school school_roles.extend( [ {"context": new_school, "context_type": "school", "role": role} for role in self.default_roles ] ) self.roles_as_dicts = school_roles + non_school_roles if old_roles != self.ucsschool_roles: self.logger.info("Updating roles: %r -> %r...", old_roles, self.ucsschool_roles) # cannot use do_modify() here, as it would delete the old object lo.modify( self.dn, [ ( "ucsschoolRole", [x.encode("UTF-8") for x in old_roles], [role.encode("utf-8") for role in self.ucsschool_roles], ) ], )
[docs] def validate_roles(self, lo): # type: (LoType) -> None # for now different roles in different schools are not supported schools = self.get_schools() for role in self.roles_as_dicts: if role["context_type"] != "school" or self._meta.udm_module.startswith( "computers/domaincontroller_" ): # check only context_type == 'school' for now # DCs have roles for all schools they handle, but have no 'schools' attribute continue if role["context"] != "-" and role["context"] not in schools: self.add_error( "ucsschool_roles", _( "Context {role}:{context_type}:{context} is not allowed for {dn}. Object is not " "in that school." ).format(dn=self.dn, **role), )
[docs] def create_without_hooks_roles(self, lo): # type: (LoType) -> None """ Run by py:meth:`create_without_hooks()` before py:meth:`validate()` (and thus before py:meth:`do_create()`). """ roles = self.roles_as_dicts if self.default_roles and not any(role["context"] for role in roles if role["context"] != "-"): schools = self.get_schools() self.ucsschool_roles += [ create_ucsschool_role_string(role, school) for role in self.default_roles for school in schools ]
[docs] def update_ucsschool_roles(self, lo): # type: (LoType) -> None """ Run by py:meth:`modify_without_hooks()` before py:meth:`validate()`. Add :py:attr:`ucsschool_roles` entries of `context_type=school` to object, if it got new/additional school(s) and object has no role(s) in those yet. Delete :py:attr:`ucsschool_roles` entries of `context_type=school` of object, if it was removed from school(s). """ roles = self.roles_as_dicts old_schools = {role["context"] for role in roles if role["context"] != "-"} cur_schools = set(self.get_schools()) new_schools = cur_schools - old_schools removed_schools = old_schools - cur_schools for new_school in new_schools: # only add role(s) if object has no roles in new school if any(role["context"] == new_school for role in roles): continue # add only role(s) of current Python class in new school roles.extend( {"context": new_school, "context_type": "school", "role": role} for role in self.default_roles ) for role in deepcopy(roles): if role["context_type"] == "school" and role["context"] in removed_schools: roles.remove(role) if new_schools or removed_schools: self.roles_as_dicts = roles