Source code for univention.testing.ucsschool.workgroup

"""
.. module:: Workgroup
    :platform: Unix

.. moduleauthor:: Ammar Najjar <najjar@univention.de>
"""
from __future__ import print_function

import univention.testing.strings as uts
import univention.testing.ucr as ucr_test
import univention.uldap as uu
from ucsschool.lib.roles import create_ucsschool_role_string, role_workgroup
from univention.lib.umc import HTTPError
from univention.testing import utils
from univention.testing.ucsschool.ucs_test_school import UCSTestSchool
from univention.testing.umc import Client


[docs] class Workgroup(object): """ Contains the needed functionality for workgroups in an already created OU, By default they are randomly formed except the OU, should be provided\n :param school: name of the ou :type school: str :param connection: :type connection: UMC connection object :param ucr: :type ucr: UCR object :param name: name of the class to be created later :type name: str :param description: description of the class to be created later :type description: str :param members: list of dns of members :type members: [str=memberdn] """ def __init__( self, school, connection=None, ulConnection=None, ucr=None, name=None, description=None, members=None, create_share=True, create_email=False, email="", allowed_email_senders_groups=None, allowed_email_senders_users=None, ): self.school = school self.name = name or uts.random_string() self.create_share = create_share self.email = email self.create_email = create_email self.allowed_email_senders_groups = allowed_email_senders_groups or [] self.allowed_email_senders_users = allowed_email_senders_users or [] self.description = description or uts.random_string() self.members = members or [] self.ucr = ucr or ucr_test.UCSTestConfigRegistry() self.ucr.load() self.ulConnection = ulConnection or uu.getMachineConnection(ldap_master=False) self.client = connection or Client.get_test_connection() def __enter__(self): return self def __exit__(self, type, value, trace_back): self.ucr.revert_to_original_registry()
[docs] def create(self, expect_creation_fails_due_to_duplicated_name=False): """ Creates object workgroup\n :param expect_creation_fails_due_to_duplicated_name: if user allow duplicate names no exception is raised, no group is created either :type expect_creation_fails_due_to_duplicated_name: bool """ try: createResult = self._create() if createResult and expect_creation_fails_due_to_duplicated_name: utils.fail( "Workgroup %s already exists, though a new workgroup is created with a the same name" % self.name ) utils.wait_for_replication() except HTTPError as exc: group_fullname = "%s-%s" % (self.school, self.name) exception_strings = [ "The groupname is already in use as groupname or as username", "Der Gruppenname wird bereits als Gruppenname oder als Benutzername verwendet", "Die Arbeitsgruppe '%s' existiert bereits!" % group_fullname, "The workgroup '%s' already exists!" % group_fullname, "Die Arbeitsgruppe u'%s' existiert bereits!" % group_fullname, "The workgroup u'%s' already exists!" % group_fullname, ] for entry in exception_strings: if expect_creation_fails_due_to_duplicated_name and entry in str(exc.message): print("Fail : %s" % (exc)) break else: print("Exception: '%s' '%s' '%r'" % (str(exc), type(exc), exc)) raise
def _create(self): print("Creating workgroup %s in school %s" % (self.name, self.school)) flavor = "workgroup-admin" param = [ { "object": { "name": self.name, "school": self.school, "members": self.members, "description": self.description, "create_share": self.create_share, "create_email": self.create_email, "allowed_email_senders_users": self.allowed_email_senders_users, "allowed_email_senders_groups": self.allowed_email_senders_groups, "email": self.email, } } ] requestResult = self.client.umc_command("schoolgroups/add", param, flavor).result assert requestResult, "Unable to add workgroup (%r)" % (param,) return requestResult
[docs] def remove(self, options=None): """Removing a Workgroup from ldap""" print("Removing group %s from ldap" % (self.name)) groupdn = self.dn() flavor = "workgroup-admin" removingParam = [{"object": [groupdn], "options": options}] requestResult = self.client.umc_command("schoolgroups/remove", removingParam, flavor).result assert requestResult, "Group %s failed to be removed" % self.name utils.wait_for_replication()
[docs] def addMembers(self, memberListdn, options=None): """ Add members to workgroup\n :param memberListdn: list of the new members :type memberListdn: list :param options: :type options: None """ print("Adding members %r to group %s" % (memberListdn, self.name)) groupdn = self.dn() currentMembers = sorted( x.decode("UTF-8") for x in self.ulConnection.getAttr(groupdn, "uniqueMember") ) for member in memberListdn: if member not in currentMembers: currentMembers.append(member) else: print("member", member, "already exist in the group") self.set_members(currentMembers)
[docs] def removeMembers(self, memberListdn, options=None): """ Remove members from workgroup\n :param memberListdn: list of the removed members :type memberListdn: list :param options: :type options: None """ print("Removing members %r from group %s" % (memberListdn, self.name)) groupdn = self.dn() currentMembers = sorted( x.decode("UTF-8") for x in self.ulConnection.getAttr(groupdn, "uniqueMember") ) for member in memberListdn: if member in currentMembers: currentMembers.remove(member) self.set_members(currentMembers)
[docs] def deactivate_email(self): """Deactivates the email address for the workgroup via UMC""" print("Deactivating email for the workgroup {}".format(self.dn())) flavor = "workgroup-admin" group_dn = self.dn() self.create_email = False creationParam = [ { "object": { "$dn$": group_dn, "school": self.school, "create_email": self.create_email, "email": self.email, "allowed_email_senders_users": self.allowed_email_senders_users, "allowed_email_senders_groups": self.allowed_email_senders_groups, "name": self.name, "description": self.description, "members": self.members, }, } ] requestResult = self.client.umc_command("schoolgroups/put", creationParam, flavor).result assert requestResult, "Email address failed to be deactivated" self.email = "" self.allowed_email_senders_groups = [] self.allowed_email_senders_users = [] utils.wait_for_replication()
[docs] def set_members(self, new_members, options=None): """ Set members for workgroup\n :param new_members: list of the new members :type new_members: list """ print("Setting members %r from group %s" % (new_members, self.name)) flavor = "workgroup-admin" groupdn = self.dn() creationParam = [ { "object": { "$dn$": groupdn, "school": self.school, "create_email": self.create_email, "email": self.email, "allowed_email_senders_users": self.allowed_email_senders_users, "allowed_email_senders_groups": self.allowed_email_senders_groups, "name": self.name, "description": self.description, "members": new_members, }, "options": options, } ] requestResult = self.client.umc_command("schoolgroups/put", creationParam, flavor).result assert requestResult, "Members %s failed to be set" % new_members self.members = new_members utils.wait_for_replication()
[docs] def verify_ldap_attributes(self): """checking group attributes in ldap""" print("Checking the attributes for group %s in ldap" % (self.name,)) members = [] if self.members: for member in self.members: m = member.split(",")[0][4:] members.append(m) expected_attr = { "memberUid": members, "description": [self.description], "ucsschoolRole": [create_ucsschool_role_string(role_workgroup, self.school)], "mailPrimaryAddress": [self.email] if self.create_email else [], "univentionAllowedEmailUsers": self.allowed_email_senders_users if self.create_email else [], "univentionAllowedEmailGroups": self.allowed_email_senders_groups if self.create_email else [], } utils.verify_ldap_object(self.dn(), expected_attr=expected_attr)
[docs] def verify_exists(self, group_should_exist, share_should_exist): """check for group and file share objects existance in ldap""" print("Checking if group %s and its share object exist in ldap" % (self.name,)) groupdn = self.dn() utils.verify_ldap_object(groupdn, should_exist=group_should_exist) ucsschool = UCSTestSchool() sharedn = "cn=%s-%s,cn=shares,%s" % ( self.school, self.name, ucsschool.get_ou_base_dn(self.school), ) utils.verify_ldap_object(sharedn, should_exist=share_should_exist)
[docs] def dn(self): ucsschool = UCSTestSchool() groupdn = ucsschool.get_workinggroup_dn(self.school, self.name) return groupdn