Source code for univention.testing.ucsschool.importou

# -*- coding: utf-8 -*-

from __future__ import print_function

import base64
import random
import subprocess

from six import string_types

import ucsschool.lib.models.utils
import univention.admin.filter
import univention.admin.modules
import univention.admin.uldap
import univention.config_registry
import univention.testing.strings as uts
import univention.testing.ucr
import univention.testing.udm
import univention.uldap
from ucsschool.lib.models.dhcp import DHCPServer
from ucsschool.lib.models.school import School
from ucsschool.lib.models.user import User
from ucsschool.lib.models.utils import add_stream_logger_to_schoollib
from ucsschool.lib.roles import create_ucsschool_role_string, role_school_admin_group
from univention.config_registry.interfaces import Interfaces
from univention.testing import utils
from univention.testing.ucsschool.ucs_test_school import UCSTestSchool

univention.admin.modules.update()
add_stream_logger_to_schoollib()


TYPE_DC_ADMINISTRATIVE = "administrative"
TYPE_DC_EDUCATIONAL = "educational"


[docs] class DCNotFound(Exception): pass
[docs] class DCMembership(Exception): pass
[docs] class DCisMemberOfGroup(Exception): pass
[docs] class DhcpdLDAPBase(Exception): pass
[docs] class DhcpServerLocation(Exception): pass
[docs] def create_mail_domain(ucr, udm): if not ucr.get("mail/hosteddomains"): try: maildomain = ucr["mail/hosteddomains"].split()[0] except (AttributeError, IndexError): maildomain = ucr["domainname"] print("\n\n*** Creating mail domain {!r}...\n".format(maildomain)) try: udm.create_object( "mail/domain", position="cn=domain,cn=mail,{}".format(ucr["ldap/base"]), name=maildomain ) except univention.testing.udm.UCSTestUDM_CreateUDMObjectFailed as exc: if "Object exists" in str(exc): print("\n\n*** Mail domain {!r} exists.\n".format(maildomain))
[docs] def remove_ou(ou_name): schoolenv = UCSTestSchool() # the reload is necessary, otherwise the UCR variables are not up-to-date schoolenv.ucr.load() schoolenv.cleanup_ou(ou_name)
[docs] def get_school_base(ou): configRegistry = univention.config_registry.ConfigRegistry() configRegistry.load() if configRegistry.is_true("ucsschool/ldap/district/enable"): return "ou=%(ou)s,ou=%(district)s,%(basedn)s" % { "ou": ou, "district": ou[0:2], "basedn": configRegistry.get("ldap/base"), } else: return "ou=%(ou)s,%(basedn)s" % {"ou": ou, "basedn": configRegistry.get("ldap/base")}
[docs] def get_school_ou_from_dn(dn, ucr=None): if not ucr: ucr = univention.config_registry.ConfigRegistry() ucr.load() oulist = [x[3:] for x in univention.admin.uldap.explodeDn(dn) if x.startswith("ou=")] if ucr.is_true("ucsschool/ldap/district/enable"): return oulist[-2] return oulist[-1]
[docs] def create_ou_cli( ou, dc=None, dc_administrative=None, sharefileserver=None, ou_displayname=None, alter_dhcpd_base=None ): cmd_block = ["/usr/share/ucs-school-import/scripts/create_ou", "--verbose", ou] if dc: cmd_block.append(dc) if dc_administrative: cmd_block.append(dc_administrative) if ou_displayname: cmd_block.append("--displayName=%s" % ou_displayname) if sharefileserver: cmd_block.append("--sharefileserver=%s" % sharefileserver) if alter_dhcpd_base is None: cmd_block.append("--alter-dhcpd-base=%s" % "auto") elif alter_dhcpd_base: cmd_block.append("--alter-dhcpd-base=%s" % "true") elif not alter_dhcpd_base: cmd_block.append("--alter-dhcpd-base=%s" % "false") print("cmd_block: %r" % cmd_block) subprocess.check_call(cmd_block)
[docs] def create_ou_python_api( ou, dc, dc_administrative, sharefileserver, ou_displayname, alter_dhcpd_base=None ): kwargs = {"name": ou, "dc_name": dc} if dc_administrative: kwargs["dc_name_administrative"] = dc_administrative if sharefileserver: kwargs["class_share_file_server"] = sharefileserver kwargs["home_share_file_server"] = sharefileserver if ou_displayname: kwargs["display_name"] = ou_displayname if alter_dhcpd_base is not None: kwargs[alter_dhcpd_base] = alter_dhcpd_base # invalidate caches and reload UCR ucsschool.lib.models.utils.ucr.load() ucsschool.lib.models.utils._pw_length_cache.clear() # UCSSchoolHelperAbstractClass._search_base_cache.clear() User._profile_path_cache.clear() User._samba_home_path_cache.clear() lo = univention.admin.uldap.getAdminConnection()[0] # TODO FIXME has to be fixed in ucs-school-lib - should be done automatically: School.init_udm_module(lo) School(**kwargs).create(lo)
[docs] def move_domaincontroller_to_ou_cli(dc_name, ou): cmd_block = [ "/usr/share/ucs-school-import/scripts/move_domaincontroller_to_ou", "--ou", ou, "--dcname", dc_name, ] print("cmd_block: %r" % cmd_block) subprocess.check_call(cmd_block)
[docs] def get_ou_base(ou, district_enable): ucr = univention.testing.ucr.UCSTestConfigRegistry() ucr.load() base_dn = ucr.get("ldap/base") if district_enable: ou_base = "ou=%s,ou=%s,%s" % (ou, ou[0:2], base_dn) else: ou_base = "ou=%s,%s" % (ou, base_dn) return ou_base
[docs] def create_and_verify_ou( ucr, ou, dc, sharefileserver, dc_administrative=None, ou_displayname=None, singlemaster=False, noneducational_create_objects=False, district_enable=False, default_dcs=None, dhcp_dns_clearou=False, do_cleanup=True, unset_dhcpd_base=True, alter_dhcpd_base_option=None, use_cli_api=True, use_python_api=False, ): assert use_cli_api != use_python_api print("******************************************************") print("**** create_and_verify_ou test run") print("**** ou=%s" % ou) print("**** ou_displayname=%r" % ou_displayname) print("**** dc=%s" % dc) print("**** dc_administrative=%s" % dc_administrative) print("**** sharefileserver=%s" % sharefileserver) print("**** singlemaster=%s" % singlemaster) print("**** noneducational_create_objects=%s" % noneducational_create_objects) print("**** district_enable=%s" % district_enable) print("**** default_dcs=%s" % default_dcs) print("**** dhcp_dns_clearou=%s" % dhcp_dns_clearou) print("******************************************************") ucr.load() lo = univention.uldap.getMachineConnection() # set UCR univention.config_registry.handler_set( [ "ucsschool/singlemaster=%s" % ("true" if singlemaster else "false"), "ucsschool/ldap/noneducational/create/objects=%s" % ("true" if noneducational_create_objects else "false"), "ucsschool/ldap/district/enable=%s" % ("true" if district_enable else "false"), "ucsschool/ldap/default/dcs=%s" % default_dcs, "ucsschool/import/generate/policy/dhcp/dns/clearou=%s" % ("true" if dhcp_dns_clearou else "false"), ] ) if unset_dhcpd_base: univention.config_registry.handler_unset(["dhcpd/ldap/base"]) ucr.load() base_dn = ucr.get("ldap/base") move_dc_after_create_ou = False # does dc exist? if singlemaster: dc_name = ucr.get("hostname") elif dc: result = lo.search( filter="(&(objectClass=univentionDomainController)(cn=%s))" % dc, base=base_dn, attr=["cn"] ) if result: move_dc_after_create_ou = True dc_name = dc else: dc_name = "dc%s" % ou if use_cli_api: create_ou_cli( ou, dc, dc_administrative, sharefileserver, ou_displayname, alter_dhcpd_base_option ) if use_python_api: create_ou_python_api( ou, dc, dc_administrative, sharefileserver, ou_displayname, alter_dhcpd_base_option ) if move_dc_after_create_ou: move_domaincontroller_to_ou_cli(dc_name, ou) verify_ou(ou, dc, ucr, sharefileserver, dc_administrative, must_exist=True) if do_cleanup: remove_ou(ou)
[docs] def verify_ou(ou, dc, ucr, sharefileserver, dc_administrative, must_exist): print("*** Verifying OU (%s) ... " % ou) ucr.load() lo = univention.uldap.getMachineConnection() base_dn = ucr.get("ldap/base") cn_pupils = ucr.get("ucsschool/ldap/default/container/pupils", "schueler") cn_teachers = ucr.get("ucsschool/ldap/default/container/teachers", "lehrer") cn_teachers_staff = ucr.get( "ucsschool/ldap/default/container/teachers-and-staff", "lehrer und mitarbeiter" ) cn_admins = ucr.get("ucsschool/ldap/default/container/admins", "admins") cn_staff = ucr.get("ucsschool/ldap/default/container/staff", "mitarbeiter") singlemaster = ucr.is_true("ucsschool/singlemaster") noneducational_create_objects = ucr.is_true("ucsschool/ldap/noneducational/create/objects") district_enable = ucr.is_true("ucsschool/ldap/district/enable") # default_dcs = ucr.get('ucsschool/ldap/default/dcs') dhcp_dns_clearou = ucr.is_true("ucsschool/import/generate/policy/dhcp/dns/clearou") ou_base = get_ou_base(ou, district_enable) # does dc exist? if singlemaster: dc_dn = ucr.get("ldap/hostdn") dc_name = ucr.get("hostname") elif dc: dc_dn = "cn=%s,cn=dc,cn=server,cn=computers,%s" % (dc, ou_base) dc_name = dc else: dc_dn = "cn=dc%s,cn=dc,cn=server,cn=computers,%s" % (ou, ou_base) dc_name = "dc%s" % ou sharefileserver_dn = dc_dn if sharefileserver: result = lo.searchDn( filter="(&(objectClass=univentionDomainController)(cn=%s))" % sharefileserver, base=base_dn, ) if result: sharefileserver_dn = result[0] utils.verify_ldap_object( ou_base, expected_attr={ "ou": [ou], "ucsschoolClassShareFileServer": [sharefileserver_dn], "ucsschoolHomeShareFileServer": [sharefileserver_dn], }, should_exist=must_exist, ) utils.verify_ldap_object( "cn=printers,%s" % ou_base, expected_attr={"cn": ["printers"]}, should_exist=must_exist ) utils.verify_ldap_object( "cn=users,%s" % ou_base, expected_attr={"cn": ["users"]}, should_exist=must_exist ) utils.verify_ldap_object( "cn=%s,cn=users,%s" % (cn_pupils, ou_base), expected_attr={"cn": [cn_pupils]}, should_exist=must_exist, ) utils.verify_ldap_object( "cn=%s,cn=users,%s" % (cn_teachers, ou_base), expected_attr={"cn": [cn_teachers]}, should_exist=must_exist, ) utils.verify_ldap_object( "cn=%s,cn=users,%s" % (cn_admins, ou_base), expected_attr={"cn": [cn_admins]}, should_exist=must_exist, ) utils.verify_ldap_object( "cn=%s,cn=users,%s" % (cn_admins, ou_base), expected_attr={"cn": [cn_admins]}, should_exist=must_exist, ) utils.verify_ldap_object( "cn=computers,%s" % ou_base, expected_attr={"cn": ["computers"]}, should_exist=must_exist ) utils.verify_ldap_object( "cn=server,cn=computers,%s" % ou_base, expected_attr={"cn": ["server"]}, should_exist=must_exist ) utils.verify_ldap_object( "cn=dc,cn=server,cn=computers,%s" % ou_base, expected_attr={"cn": ["dc"]}, should_exist=must_exist, ) utils.verify_ldap_object( "cn=networks,%s" % ou_base, expected_attr={"cn": ["networks"]}, should_exist=must_exist ) utils.verify_ldap_object( "cn=groups,%s" % ou_base, expected_attr={"cn": ["groups"]}, should_exist=must_exist ) utils.verify_ldap_object( "cn=%s,cn=groups,%s" % (cn_pupils, ou_base), expected_attr={"cn": [cn_pupils]}, should_exist=must_exist, ) utils.verify_ldap_object( "cn=%s,cn=groups,%s" % (cn_teachers, ou_base), expected_attr={"cn": [cn_teachers]}, should_exist=must_exist, ) utils.verify_ldap_object( "cn=klassen,cn=%s,cn=groups,%s" % (cn_pupils, ou_base), expected_attr={"cn": ["klassen"]}, should_exist=must_exist, ) utils.verify_ldap_object( "cn=raeume,cn=groups,%s" % ou_base, expected_attr={"cn": ["raeume"]}, should_exist=must_exist ) utils.verify_ldap_object( "cn=dhcp,%s" % ou_base, expected_attr={"cn": ["dhcp"]}, should_exist=must_exist ) utils.verify_ldap_object( "cn=policies,%s" % ou_base, expected_attr={"cn": ["policies"]}, should_exist=must_exist ) utils.verify_ldap_object( "cn=shares,%s" % ou_base, expected_attr={"cn": ["shares"]}, should_exist=must_exist ) utils.verify_ldap_object( "cn=klassen,cn=shares,%s" % ou_base, expected_attr={"cn": ["klassen"]}, should_exist=must_exist ) utils.verify_ldap_object( "cn=dc,cn=server,cn=computers,%s" % ou_base, expected_attr={"cn": ["dc"]}, should_exist=must_exist, ) if noneducational_create_objects: utils.verify_ldap_object("cn=%s,cn=users,%s" % (cn_staff, ou_base), should_exist=must_exist) utils.verify_ldap_object( "cn=%s,cn=users,%s" % (cn_teachers_staff, ou_base), should_exist=must_exist ) utils.verify_ldap_object("cn=%s,cn=groups,%s" % (cn_staff, ou_base), should_exist=must_exist) else: utils.verify_ldap_object("cn=%s,cn=users,%s" % (cn_staff, ou_base), should_exist=False) utils.verify_ldap_object("cn=%s,cn=users,%s" % (cn_teachers_staff, ou_base), should_exist=False) utils.verify_ldap_object("cn=%s,cn=groups,%s" % (cn_staff, ou_base), should_exist=False) if noneducational_create_objects: utils.verify_ldap_object( "cn=DC-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % base_dn, should_exist=True ) utils.verify_ldap_object( "cn=Member-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % base_dn, should_exist=True ) utils.verify_ldap_object( "cn=OU%s-DC-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % (ou, base_dn), should_exist=True ) utils.verify_ldap_object( "cn=OU%s-Member-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % (ou, base_dn), should_exist=True ) # This will fail because we don't cleanup these groups in cleanup_ou # else: # utils.verify_ldap_object("cn=DC-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % base_dn, # should_exist=False) # utils.verify_ldap_object("cn=Member-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % base_dn, # should_exist=False) # utils.verify_ldap_object('cn=OU%s-DC-Verwaltungsnetz,cn=ucsschool,cn=groups,%s' % (ou, base_dn), # should_exist=False) # utils.verify_ldap_object('cn=OU%s-Member-Verwaltungsnetz,cn=ucsschool,cn=groups,%s' % (ou, # base_dn), should_exist=False) if not singlemaster: verify_dc(ou, dc_name, TYPE_DC_EDUCATIONAL, base_dn, must_exist) if dc_administrative: verify_dc(ou, dc_administrative, TYPE_DC_ADMINISTRATIVE, base_dn, must_exist) grp_prefix_pupils = ucr.get("ucsschool/ldap/default/groupprefix/pupils", "schueler-") grp_prefix_teachers = ucr.get("ucsschool/ldap/default/groupprefix/teachers", "lehrer-") grp_prefix_admins = ucr.get("ucsschool/ldap/default/groupprefix/admins", "admins-") grp_prefix_staff = ucr.get("ucsschool/ldap/default/groupprefix/staff", "mitarbeiter-") grp_policy_pupils = ucr.get( "ucsschool/ldap/default/policy/umc/pupils", "cn=ucsschool-umc-pupils-default,cn=UMC,cn=policies,%s" % base_dn, ) grp_policy_teachers = ucr.get( "ucsschool/ldap/default/policy/umc/teachers", "cn=ucsschool-umc-teachers-default,cn=UMC,cn=policies,%s" % base_dn, ) grp_policy_admins = ucr.get( "ucsschool/ldap/default/policy/umc/admins", "cn=ucsschool-umc-admins-default,cn=UMC,cn=policies,%s" % base_dn, ) grp_policy_staff = ucr.get( "ucsschool/ldap/default/policy/umc/staff", "cn=ucsschool-umc-staff-default,cn=UMC,cn=policies,%s" % base_dn, ) utils.verify_ldap_object( "cn=%s%s,cn=ouadmins,cn=groups,%s" % (grp_prefix_admins, ou, base_dn), expected_attr={ "univentionPolicyReference": [grp_policy_admins], "ucsschoolRole": [create_ucsschool_role_string(role_school_admin_group, ou)], }, should_exist=True, ) utils.verify_ldap_object( "cn=%s%s,cn=groups,%s" % (grp_prefix_pupils, ou, ou_base), expected_attr={"univentionPolicyReference": [grp_policy_pupils]}, should_exist=must_exist, ) utils.verify_ldap_object( "cn=%s%s,cn=groups,%s" % (grp_prefix_teachers, ou, ou_base), expected_attr={"univentionPolicyReference": [grp_policy_teachers]}, should_exist=must_exist, ) if noneducational_create_objects: utils.verify_ldap_object( "cn=%s%s,cn=groups,%s" % (grp_prefix_staff, ou, ou_base), expected_attr={"univentionPolicyReference": [grp_policy_staff]}, should_exist=must_exist, ) check_group_membership(ou, dc_name, base_dn, lo, must_exist) check_dhcp(ou_base, dc_name, singlemaster, dhcp_dns_clearou, ucr, must_exist)
[docs] def check_group_membership(ou, dc_name, base_dn, lo, must_exist): dcmaster_module = univention.admin.modules.get("computers/domaincontroller_master") dcbackup_module = univention.admin.modules.get("computers/domaincontroller_backup") dcslave_module = univention.admin.modules.get("computers/domaincontroller_slave") masterobjs = univention.admin.modules.lookup( dcmaster_module, None, lo, scope="sub", superordinate=None, base=base_dn, filter=univention.admin.filter.expression("cn", dc_name), ) backupobjs = univention.admin.modules.lookup( dcbackup_module, None, lo, scope="sub", superordinate=None, base=base_dn, filter=univention.admin.filter.expression("cn", dc_name), ) slaveobjs = univention.admin.modules.lookup( dcslave_module, None, lo, scope="sub", superordinate=None, base=base_dn, filter=univention.admin.filter.expression("cn", dc_name), ) # check group membership # Replica Directory Node should be member # Primary Directory Node and Backup Directory NOde should not be member dcgroups = [ "cn=OU%s-DC-Edukativnetz,cn=ucsschool,cn=groups,%s" % (ou, base_dn), "cn=DC-Edukativnetz,cn=ucsschool,cn=groups,%s" % (base_dn), ] if must_exist: if masterobjs: is_master_or_backup = True dcobject = masterobjs[0] elif backupobjs: is_master_or_backup = True dcobject = backupobjs[0] elif slaveobjs: is_master_or_backup = False dcobject = slaveobjs[0] else: raise DCNotFound() dcobject.open() groups = [] membership = False for group in dcobject.get("groups"): groups.append(group.lower()) for dcgroup in dcgroups: if dcgroup.lower() in groups: membership = True if is_master_or_backup and membership: raise DCMembership() elif not is_master_or_backup and not membership: raise DCMembership()
[docs] def check_dhcp(ou_base, dc_name, singlemaster, dhcp_dns_clearou, ucr, must_exist): ucr.load() if not singlemaster: # in multiserver setups all dhcp settings have to be checked dhcp_dn = "cn=dhcp,%s" % (ou_base) else: # in singleserver setup only the first OU sets dhcpd/ldap/base and all following OUs # should leave the UCR variable untouched. dhcpd_ldap_base = ucr.get("dhcpd/ldap/base") if not dhcpd_ldap_base or "ou=" not in dhcpd_ldap_base: raise DhcpdLDAPBase('dhcpd/ldap/base=%r contains no "ou="' % (dhcpd_ldap_base,)) # use the UCR value and check if the DHCP service exists dhcp_dn = dhcpd_ldap_base # dhcp print("LDAP base of dhcpd = %r" % dhcp_dn) dhcp_service_dn = "cn=%s,%s" % (get_school_ou_from_dn(dhcp_dn, ucr), dhcp_dn) dhcp_server_dn = "cn=%s,%s" % (dc_name, dhcp_service_dn) if must_exist: utils.verify_ldap_object( dhcp_service_dn, expected_attr={ "dhcpOption": ['wpad "http://%s.%s/proxy.pac"' % (dc_name, ucr.get("domainname"))] }, should_exist=True, ) utils.verify_ldap_object(dhcp_server_dn, should_exist=True) dhcp_dns_clearou_dn = "cn=dhcp-dns-clear,cn=policies,%s" % ou_base if dhcp_dns_clearou: utils.verify_ldap_object( dhcp_dns_clearou_dn, expected_attr={"emptyAttributes": ["univentionDhcpDomainNameServers"]}, should_exist=must_exist, ) try: utils.verify_ldap_object( ou_base, expected_attr={"univentionPolicyReference": [dhcp_dns_clearou_dn]}, should_exist=must_exist, retry_count=0, ) except utils.LDAPObjectUnexpectedValue: # ignore other policies pass else: utils.verify_ldap_object(dhcp_dns_clearou_dn, should_exist=False)
[docs] def verify_dc(ou, dc_name, dc_type, base_dn=None, must_exist=True): """ Arguments: dc_name: name of the domaincontroller (cn) dc_type: type of the domaincontroller ('educational' or 'administrative') """ assert dc_type in (TYPE_DC_ADMINISTRATIVE, TYPE_DC_EDUCATIONAL) ucr = univention.testing.ucr.UCSTestConfigRegistry() ucr.load() if not base_dn: base_dn = ucr.get("ldap/base") ou_base = get_ou_base(ou, ucr.is_true("ucsschool/ldap/district/enable", False)) dc_dn = "cn=%s,cn=dc,cn=server,cn=computers,%s" % (dc_name, ou_base) # define list of (un-)desired group memberships ==> [(IS_MEMBER, GROUP_DN), ...] group_dn_list = [] if dc_type == TYPE_DC_ADMINISTRATIVE: group_dn_list += [ (True, "cn=OU%s-DC-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % (ou.lower(), base_dn)), (True, "cn=DC-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % (base_dn,)), (False, "cn=Member-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % base_dn), (False, "cn=OU%s-Member-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % (ou, base_dn)), (False, "cn=OU%s-DC-Edukativnetz,cn=ucsschool,cn=groups,%s" % (ou.lower(), base_dn)), (False, "cn=DC-Edukativnetz,cn=ucsschool,cn=groups,%s" % (base_dn,)), (False, "cn=Member-Edukativnetz,cn=ucsschool,cn=groups,%s" % base_dn), (False, "cn=OU%s-Member-Edukativnetz,cn=ucsschool,cn=groups,%s" % (ou, base_dn)), ] else: group_dn_list += [ (True, "cn=OU%s-DC-Edukativnetz,cn=ucsschool,cn=groups,%s" % (ou.lower(), base_dn)), (True, "cn=DC-Edukativnetz,cn=ucsschool,cn=groups,%s" % (base_dn,)), (False, "cn=Member-Edukativnetz,cn=ucsschool,cn=groups,%s" % base_dn), (False, "cn=OU%s-Member-Edukativnetz,cn=ucsschool,cn=groups,%s" % (ou, base_dn)), ] if ucr.is_true("ucsschool/ldap/noneducational/create/objects", must_exist): group_dn_list += [ (False, "cn=OU%s-DC-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % (ou.lower(), base_dn)), (False, "cn=DC-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % (base_dn,)), (False, "cn=Member-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % base_dn), (False, "cn=OU%s-Member-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % (ou, base_dn)), ] utils.verify_ldap_object(dc_dn, should_exist=must_exist) for expected_membership, grpdn in group_dn_list: if must_exist: utils.verify_ldap_object( grpdn, expected_attr={"uniqueMember": [dc_dn]} if expected_membership else None, not_expected_attr={"uniqueMember": [dc_dn]} if not expected_membership else None, strict=False, should_exist=True, retry_count=0, )
[docs] def parametrization_id_base64_decode(val): if isinstance(val, string_types) and val.strip().endswith("="): try: return base64.b64decode(val.encode("ASCII")).decode("ascii", errors="replace") except ValueError: pass
[docs] def generate_import_ou_basics_test_data(use_cli_api=True, use_python_api=False): for dc_administrative in [None, "generate"]: for dc in [None, "generate"]: for singlemaster in [True, False]: for noneducational_create_object in [True, False]: for district_enable in [True, False]: for default_dcs in [None, "edukativ", uts.random_string()]: for dhcp_dns_clearou in [True, False]: for sharefileserver in [None, "generate"]: if singlemaster and dc == "generate": continue if not dc and dc_administrative: # cannot specify administrative dc without educational dc continue if not noneducational_create_object and dc_administrative: # cannot create administrative DC without administrative objects # in LDAP continue ou_name = uts.random_name() # character set contains multiple whitespaces to increase chance to # get several words charset = ( uts.STR_ALPHANUMDOTDASH + uts.STR_ALPHA.upper() + '()[]/,;:_#"+*@<>~ßöäüÖÄÜ$%&! ' ) ou_displayname = uts.random_string( length=random.randint(1, 50), charset=charset ) yield ( ou_name, base64.b64encode(ou_displayname.encode("UTF-8")).decode( "ASCII" ), # pytest doesn't like non-ascii chars in parametrization args uts.random_name() if dc else None, uts.random_name() if dc_administrative else None, bool(sharefileserver), singlemaster, noneducational_create_object, district_enable, default_dcs, dhcp_dns_clearou, use_cli_api, use_python_api, )
[docs] def import_ou_with_existing_dc(use_cli_api=True, use_python_api=False): with univention.testing.ucr.UCSTestConfigRegistry() as ucr: with univention.testing.udm.UCSTestUDM() as udm: create_mail_domain(ucr, udm) dc_name = uts.random_name() dhcp_service_name = uts.random_name() dhcp_service = udm.create_object("dhcp/service", service=dhcp_service_name) dhcp_server = udm.create_object("dhcp/server", server=dc_name, superordinate=dhcp_service) dhcp_subnet_properties = { "subnet": "10.20.30.0", "subnetmask": "24", } dhcp_subnet1 = udm.create_object( "dhcp/subnet", superordinate=dhcp_service, **dhcp_subnet_properties ) default_ip = Interfaces().get_default_ip_address() dhcp_subnet_properties = { "subnet": str(default_ip.network.network_address), "subnetmask": str(default_ip.network.prefixlen), } dhcp_subnet2 = udm.create_object( "dhcp/subnet", superordinate=dhcp_service, **dhcp_subnet_properties ) ou_name = uts.random_name() # creatd dc try: create_and_verify_ou( ucr, ou=ou_name, ou_displayname=None, dc=dc_name, dc_administrative=None, sharefileserver=None, singlemaster=False, noneducational_create_objects=True, district_enable=False, default_dcs=None, dhcp_dns_clearou=False, do_cleanup=False, use_cli_api=use_cli_api, use_python_api=use_python_api, ) utils.verify_ldap_object(dhcp_subnet1, should_exist=True) utils.verify_ldap_object(dhcp_subnet2, should_exist=True) # dhcp subnet2 should be copied ou_base = get_ou_base(ou=ou_name, district_enable=False) new_dhcp_service_dn = "cn=%(ou)s,cn=dhcp,%(ou_base)s" % { "ou": ou_name, "ou_base": ou_base, } new_dhcp_subnet2_dn = "cn=%s,%s" % ( default_ip.network.network_address, new_dhcp_service_dn, ) utils.verify_ldap_object(new_dhcp_subnet2_dn, should_exist=True) # dhcp subnet1 should not be copied new_dhcp_subnet1_dn = "cn=10.20.30.0,%s" % (new_dhcp_service_dn) utils.verify_ldap_object(new_dhcp_subnet1_dn, should_exist=False) # dhcp server has been moved utils.verify_ldap_object("cn=%s,%s" % (dc_name, new_dhcp_service_dn), should_exist=True) utils.verify_ldap_object(dhcp_server, should_exist=False) finally: remove_ou(ou_name) utils.wait_for_replication()
[docs] def import_3_ou_in_a_row(use_cli_api=True, use_python_api=False): """Creates 3 OUs in a row""" with univention.testing.ucr.UCSTestConfigRegistry() as ucr, univention.testing.udm.UCSTestUDM() as udm: # noqa: E501 create_mail_domain(ucr, udm) for singlemaster in [True, False]: for district_enable in [False, True]: cleanup_ou_list = [] try: # reset DHCPD ldap search base which is also to be tested univention.config_registry.handler_unset(["dhcpd/ldap/base"]) ucr.load() for i in range(1, 4): ou_name = uts.random_name() print("\n*** Creating OU #%d (ou_name=%s) ***\n" % (i, ou_name)) cleanup_ou_list.append(ou_name) create_and_verify_ou( ucr, ou=ou_name, ou_displayname=ou_name, dc=(uts.random_name() if not singlemaster else None), dc_administrative=None, sharefileserver=None, singlemaster=singlemaster, noneducational_create_objects=False, district_enable=district_enable, default_dcs=None, dhcp_dns_clearou=True, unset_dhcpd_base=False, do_cleanup=False, use_cli_api=use_cli_api, use_python_api=use_python_api, ) finally: for ou_name in cleanup_ou_list: remove_ou(ou_name)
[docs] def import_ou_alter_dhcpd_base_flag(use_cli_api=True, use_python_api=False): with univention.testing.ucr.UCSTestConfigRegistry() as ucr, univention.testing.udm.UCSTestUDM() as udm: # noqa: E501 create_mail_domain(ucr, udm) cleanup_ou_list = [] try: univention.config_registry.handler_unset(["dhcpd/ldap/base"]) ucr.load() ou_name = uts.random_name() print("\n*** Creating OU #1 (ou_name=%s) ***\n" % (ou_name)) cleanup_ou_list.append(ou_name) try: create_and_verify_ou( ucr, ou=ou_name, ou_displayname=ou_name, dc=None, dc_administrative=None, sharefileserver=None, singlemaster=True, noneducational_create_objects=False, district_enable=False, default_dcs=None, dhcp_dns_clearou=True, unset_dhcpd_base=False, do_cleanup=False, use_cli_api=use_cli_api, use_python_api=use_python_api, alter_dhcpd_base_option=False, ) except DhcpdLDAPBase: print("dhcpd/ldap/base unset as expected!") for i in (True, None): ou_name = uts.random_name() print("\n*** Creating OU with alter-dhcpd-base: %s (ou_name=%s) ***\n" % (i, ou_name)) cleanup_ou_list.append(ou_name) create_and_verify_ou( ucr, ou=ou_name, ou_displayname=ou_name, dc=None, dc_administrative=None, sharefileserver=None, singlemaster=True, noneducational_create_objects=False, district_enable=False, default_dcs=None, dhcp_dns_clearou=True, unset_dhcpd_base=False, do_cleanup=False, use_cli_api=use_cli_api, use_python_api=use_python_api, alter_dhcpd_base_option=i, ) if "ou=%s" % cleanup_ou_list[1] not in ucr.get("dhcpd/ldap/base"): raise DhcpdLDAPBase( "Wrong dhcpd/ldap/base value set!: %s" % ucr.get("dhcpd/ldap/base") ) lo = univention.admin.uldap.getAdminConnection()[0] if len(DHCPServer.get_all(lo, cleanup_ou_list[1])) != 1: raise DhcpServerLocation( "No DHCP Server found in ou %s, but was expected." % cleanup_ou_list[1] ) univention.config_registry.handler_unset(["dhcpd/ldap/base"]) ucr.load() ou_name = uts.random_name() print("\n*** Creating OU with alter-dhcpd-base: None (ou_name=%s) ***\n" % (ou_name)) cleanup_ou_list.append(ou_name) create_and_verify_ou( ucr, ou=ou_name, ou_displayname=ou_name, dc=None, dc_administrative=None, sharefileserver=None, singlemaster=True, noneducational_create_objects=False, district_enable=False, default_dcs=None, dhcp_dns_clearou=True, unset_dhcpd_base=False, do_cleanup=False, use_cli_api=use_cli_api, use_python_api=use_python_api, alter_dhcpd_base_option=None, ) if "ou=%s" % cleanup_ou_list[-1] not in ucr.get("dhcpd/ldap/base"): raise DhcpdLDAPBase( "Wrong dhcpd/ldap/base value set! Expected to find ou: %s; found: %s" % (cleanup_ou_list[-1], ucr.get("dhcpd/ldap/base")) ) if len(DHCPServer.get_all(lo, cleanup_ou_list[-1])) != 1: raise DhcpServerLocation( "No DHCP Server found in ou %s, but was expected." % cleanup_ou_list[-1] ) finally: for ou_name in cleanup_ou_list: remove_ou(ou_name)