Source code for univention.testing.ucsschool.user

"""
.. module:: user
    :platform: Unix

.. moduleauthor:: Ammar Najjar <najjar@univention.de>
"""

from __future__ import print_function

import contextlib
from typing import Any, Dict  # noqa: F401

import univention.testing.ucr as ucr_test
from ucsschool.lib.roles import create_ucsschool_role_string, role_staff, role_student, role_teacher
from univention.lib.umc import BadRequest
from univention.testing import utils
from univention.testing.ucsschool.importusers import Person
from univention.testing.umc import Client


[docs] class User(Person): """ Contains the needed functuality for users in the UMC module schoolwizards/users.\n :param school: school name of the user :type school: str :param role: role of the user :type role: str ['student', 'teacher', 'staff', 'teacherAndStaff'] :param school_classes: dictionary of school -> list of names of the class which contain the user :type school_classes: dict :param workgroups: dictionary of school -> list of names of the workgroups which contain the user :type workgroups: dict """ def __init__( self, school, role, school_classes, workgroups=None, mode="A", username=None, firstname=None, lastname=None, password=None, mail=None, expiration_date=None, schools=None, connection=None, birthday=None, ): super(User, self).__init__(school, role) if username: self.username = username self.dn = self.make_dn() if firstname: self.firstname = firstname if lastname: self.lastname = lastname if mail: self.mail = mail if expiration_date: self.expiration_date = expiration_date if school_classes: self.school_classes = school_classes self.schools = schools or [self.school] self.typ = "teachersAndStaff" if self.role == "teacher_staff" else self.role self.mode = mode self.birthday = birthday self.workgroups = workgroups or {} utils.wait_for_replication() self.ucr = ucr_test.UCSTestConfigRegistry() self.ucr.load() self.client = connection or Client.get_test_connection(self.ucr.get("ldap/master")) account = utils.UCSTestDomainAdminCredentials() passwd = account.bindpw self.password = password or passwd
[docs] def append_random_groups(self): pass
def __enter__(self): return self def __exit__(self, type, value, trace_back): self.ucr.revert_to_original_registry()
[docs] def create(self): """Creates object user""" flavor = "schoolwizards/users" param = [ { "object": { "school": self.school, "schools": self.schools, "school_classes": self.school_classes, "workgroups": self.workgroups, "email": self.mail, "expiration_date": self.expiration_date, "name": self.username, "type": self.typ, "firstname": self.firstname, "lastname": self.lastname, "password": self.password, }, "options": None, } ] print("#### Creating user %s" % (self.username,)) print("#### param = %s" % (param,)) reqResult = self.client.umc_command("schoolwizards/users/add", param, flavor).result assert reqResult[0] is True, "Unable to create user (%r): %r" % (param, reqResult[0]) utils.wait_for_replication()
[docs] def get(self): # type: () -> Dict[str, Any] """Get user""" flavor = "schoolwizards/users" param = [{"object": {"$dn$": self.dn, "school": self.school}}] try: reqResult = self.client.umc_command("schoolwizards/users/get", param, flavor).result except BadRequest as exc: if exc.status == 400: reqResult = [""] else: raise assert reqResult[0], "Unable to get user (%s): %r" % (self.username, reqResult[0]) return reqResult[0]
[docs] def check_get(self, expected_attrs=None): info = { "$dn$": self.dn, "display_name": " ".join([self.firstname, self.lastname]), "name": self.username, "firstname": self.firstname, "lastname": self.lastname, "type_name": self.type_name(), "school": self.school, "schools": set(self.schools), "disabled": "0", "birthday": None, "password": None, "type": self.typ, "email": self.mail, "expiration_date": self.expiration_date, "objectType": "users/user", "school_classes": {}, "workgroups": self.workgroups, "ucsschool_roles": set(self.ucsschool_roles), } if self.is_student() or self.is_teacher() or self.is_teacher_staff(): info["school_classes"] = self.school_classes if expected_attrs: info.update(expected_attrs) get_result = self.get() # Type_name is only used for display, Ignored info["type_name"] = get_result["type_name"] # ignore order get_result["schools"] = set(get_result["schools"]) get_result["ucsschool_roles"] = set(get_result["ucsschool_roles"]) diff = [] if get_result != info: for key in set(get_result.keys()) | set(info.keys()): result_value = get_result.get(key) info_value = info.get(key) if key in ("school_classes", "workgroups"): result_value = {k: set(v) for k, v in result_value.items()} info_value = {k: set(v) for k, v in info_value.items()} if result_value != info_value: diff.append("%s: Got:\n%r; expected:\n%r" % (key, result_value, info_value)) assert get_result == info, "Failed get request for user %s:\n%s" % ( self.username, "\n".join(diff), )
@property def ucsschool_roles(self): roles = { "staff": [role_staff], "student": [role_student], "teacher": [role_teacher], "teacherAndStaff": [role_staff, role_teacher], "teachersAndStaff": [role_staff, role_teacher], # TODO: fix inconsistency } ret = [] for school in set([self.school] + self.schools): ret.extend([create_ucsschool_role_string(role, school) for role in roles[self.typ]]) return ret
[docs] def type_name(self): if self.typ == "student": return "Student" elif self.typ == "teacher": return "Teacher" elif self.typ == "staff": return "Staff" elif self.typ in ("teacherAndStaff", "teachersAndStaff"): # TODO: fix inconsistency return "Teacher and Staff"
[docs] def query(self): """get the list of existing users in the school""" flavor = "schoolwizards/users" param = {"school": self.school, "type": "all", "filter": ""} return self.client.umc_command("schoolwizards/users/query", param, flavor).result
[docs] def check_query(self, users_dn): q = self.query() k = [x["$dn$"] for x in q] assert set(users_dn).issubset( set(k) ), "users from query do not contain the existing users, found (%r), expected (%r)" % ( k, users_dn, )
[docs] def remove(self, remove_from_school=None): """Remove user""" remove_from_school = remove_from_school or self.school print( "#### Removing User %r (%s) from school %r." % (self.username, self.dn, remove_from_school) ) flavor = "schoolwizards/users" param = [ {"object": {"remove_from_school": remove_from_school, "$dn$": self.dn}, "options": None} ] reqResult = self.client.umc_command("schoolwizards/users/remove", param, flavor).result assert reqResult[0], "Unable to remove user (%r): %r" % (self.username, reqResult[0]) schools = self.schools[:] schools.remove(remove_from_school) if not schools: self.set_mode_to_delete() else: self.update(school=sorted(schools)[0], schools=schools, mode="M") with contextlib.suppress(KeyError): del self.school_classes[remove_from_school] with contextlib.suppress(KeyError): del self.workgroups[remove_from_school]
[docs] def edit(self, new_attributes): """Edit object user""" flavor = "schoolwizards/users" object_props = { "school": self.school, "schools": self.schools, "email": new_attributes.get("email") or self.mail, "expiration_date": new_attributes.get("expiration_date") or self.expiration_date, "name": self.username, "type": self.typ, "firstname": new_attributes.get("firstname") or self.firstname, "lastname": new_attributes.get("lastname") or self.lastname, "password": new_attributes.get("password") or self.password, "$dn$": self.dn, "workgroups": new_attributes.get("workgroups") or self.workgroups, } if self.typ not in ("teacher", "staff", "teacherAndStaff"): object_props["school_classes"] = new_attributes.get("school_classes", self.school_classes) param = [{"object": object_props, "options": None}] print("#### Editing user %s" % (self.username,)) print("#### param = %s" % (param,)) reqResult = self.client.umc_command("schoolwizards/users/put", param, flavor).result assert reqResult[0] is True, "Unable to edit user (%s) with the parameters (%r) : %r" % ( self.username, param, reqResult[0], ) self.set_mode_to_modify() self.school_classes = new_attributes.get("school_classes", self.school_classes) self.workgroups = new_attributes.get("workgroups", self.workgroups) self.mail = new_attributes.get("email") or self.mail self.expiration_date = new_attributes.get("expiration_date") or self.expiration_date self.firstname = new_attributes.get("firstname") or self.firstname self.lastname = new_attributes.get("lastname") or self.lastname self.password = new_attributes.get("password") or self.password