Source code for univention.testing.connector_common

# SPDX-FileCopyrightText: 2024-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only


import subprocess

import ldap

import univention.config_registry
import univention.testing.strings as tstrings
from univention.testing.udm import verify_udm_object


configRegistry = univention.config_registry.ConfigRegistry()
configRegistry.load()

UTF8_CHARSET = tstrings.STR_UMLAUT + "КирилицаКириллицаĆirilicaЋирилица" + "普通話普通话"
# the CON sync can't # handle them (see bug #44373)
SPECIAL_CHARSET = "".join(set(tstrings.STR_SPECIAL_CHARACTER) - set('\\#"?'))
# We exclude '$' as it has special meaning . A '.' (dot) may not be the last
# character in a samAccountName, so we forbid it as well.
FORBIDDEN_SAMACCOUNTNAME = "\\/[]:;|=,+*?<>@ $."
SPECIAL_CHARSET_USERNAME = "".join(set(SPECIAL_CHARSET) - set(FORBIDDEN_SAMACCOUNTNAME))


[docs] def random_string(length: int = 10, alpha: bool = False, numeric: bool = False, charset: str = "", encoding: str = 'utf-8') -> str: return tstrings.random_string(length, alpha, numeric, charset, encoding)
[docs] def random_bytestring(length: int = 10, alpha: bool = False, numeric: bool = False, charset: str = "") -> bytes: string = random_string(length, alpha, numeric, charset) if not isinstance(string, bytes): return string.encode('utf-8') return string
[docs] def normalize_dn(dn: str) -> str: r""" Normalize a given dn. This removes some escaping of special chars in the DNs. Note: The CON-LDAP returns DNs with escaping chars, OpenLDAP does not. >>> normalize_dn(r"cn=peter\#,cn=groups") 'cn=peter#,cn=groups' """ return ldap.dn.dn2str(ldap.dn.str2dn(dn))
[docs] def to_unicode(string: bytes | str) -> str: if isinstance(string, bytes): return string.decode('utf-8') return string
[docs] def restart_univention_cli_server() -> None: print("Restarting Univention-CLI-Server") subprocess.call(["pkill", "-f", "univention-cli-server"])
[docs] class TestUser: def __init__(self, user, rename={}, container=None, selection=None): selection = selection or ("username", "firstname", "lastname") self.basic = {k: v for (k, v) in user.items() if k in selection} self.user = user self.rename = dict(self.basic) self.rename.update(rename) self.container = container
[docs] @classmethod def to_unicode(cls, dictionary): return {k: to_unicode(v) for k, v in dictionary.items()}
def __repr__(self): args = (self.user, self.rename, self.container) return "{}({})".format(self.__class__.__name__, ", ".join(repr(a) for a in args))
[docs] class NormalUser(TestUser): def __init__(self, selection=None): super().__init__( user={ "username": tstrings.random_username().encode('UTF-8'), "firstname": tstrings.random_name().encode('UTF-8'), "lastname": tstrings.random_name().encode('UTF-8'), "description": random_bytestring(alpha=True, numeric=True), "street": random_bytestring(alpha=True, numeric=True), "city": random_bytestring(alpha=True, numeric=True), "postcode": random_bytestring(numeric=True), "profilepath": random_bytestring(alpha=True, numeric=True), "scriptpath": random_bytestring(alpha=True, numeric=True), "phone": random_bytestring(numeric=True), "homeTelephoneNumber": random_bytestring(numeric=True), "mobileTelephoneNumber": random_bytestring(numeric=True), "pagerTelephoneNumber": random_bytestring(numeric=True), "sambaUserWorkstations": random_bytestring(numeric=True), }, rename={"username": tstrings.random_username().encode('UTF-8')}, container=tstrings.random_name(), selection=selection, )
[docs] class Utf8User(TestUser): def __init__(self, selection=None): super().__init__( user={ "username": random_bytestring(charset=UTF8_CHARSET), "firstname": random_bytestring(charset=UTF8_CHARSET), "lastname": random_bytestring(charset=UTF8_CHARSET), "description": random_bytestring(charset=UTF8_CHARSET), "street": random_bytestring(charset=UTF8_CHARSET), "city": random_bytestring(charset=UTF8_CHARSET), "postcode": random_bytestring(numeric=True), "profilepath": random_bytestring(charset=UTF8_CHARSET), "scriptpath": random_bytestring(charset=UTF8_CHARSET), "phone": random_bytestring(numeric=True), "homeTelephoneNumber": random_bytestring(numeric=True), "mobileTelephoneNumber": random_bytestring(numeric=True), "pagerTelephoneNumber": random_bytestring(numeric=True), "sambaUserWorkstations": random_bytestring(numeric=True), }, rename={"username": random_bytestring(charset=UTF8_CHARSET)}, container=random_string(charset=UTF8_CHARSET), selection=selection, )
[docs] class SpecialUser(TestUser): def __init__(self, selection=None): super().__init__( user={ "username": random_bytestring(charset=SPECIAL_CHARSET_USERNAME), "firstname": tstrings.random_name_special_characters().encode('UTF-8'), "lastname": tstrings.random_name_special_characters().encode('UTF-8'), "description": random_bytestring(charset=SPECIAL_CHARSET), "street": random_bytestring(charset=SPECIAL_CHARSET), "city": random_bytestring(charset=SPECIAL_CHARSET), "postcode": random_bytestring(numeric=True), "profilepath": random_bytestring(charset=SPECIAL_CHARSET), "scriptpath": random_bytestring(charset=SPECIAL_CHARSET), "phone": random_bytestring(numeric=True), "homeTelephoneNumber": random_bytestring(numeric=True), "mobileTelephoneNumber": random_bytestring(numeric=True), "pagerTelephoneNumber": random_bytestring(numeric=True), "sambaUserWorkstations": random_bytestring(numeric=True), }, rename={"username": random_bytestring(charset=SPECIAL_CHARSET_USERNAME)}, container=random_string(charset=SPECIAL_CHARSET), selection=selection, )
[docs] class TestGroup: def __init__(self, group, rename={}, container=None): self.group = group self.rename = dict(self.group) self.rename.update(rename) self.container = container
[docs] @classmethod def to_unicode(cls, dictionary): return {k: to_unicode(v) for k, v in dictionary.items()}
def __repr__(self): args = (self.group, self.rename, self.container) return "{}({})".format(self.__class__.__name__, ", ".join(repr(a) for a in args))
[docs] class NormalGroup(TestGroup): def __init__(self): super().__init__( group={ "name": tstrings.random_groupname().encode('UTF-8'), "description": random_bytestring(alpha=True, numeric=True), }, rename={"name": tstrings.random_groupname().encode('UTF-8')}, container=tstrings.random_name(), )
[docs] class Utf8Group(TestGroup): def __init__(self): super().__init__( group={ "name": random_bytestring(charset=UTF8_CHARSET), "description": random_bytestring(charset=UTF8_CHARSET), }, rename={"name": random_bytestring(charset=UTF8_CHARSET)}, container=random_string(charset=UTF8_CHARSET), )
[docs] class SpecialGroup(TestGroup): def __init__(self): super().__init__( group={ "name": random_bytestring(charset=SPECIAL_CHARSET_USERNAME), "description": random_bytestring(charset=SPECIAL_CHARSET), }, rename={"name": random_bytestring(charset=SPECIAL_CHARSET_USERNAME)}, container=random_string(charset=SPECIAL_CHARSET), )
[docs] class TestObject: def __init__(self, obj, rename={}, container=None): self.obj = obj self.rename = dict(self.obj) self.rename.update(rename) self.container = container
[docs] @classmethod def to_unicode(cls, dictionary): return {k: to_unicode(v) for k, v in dictionary.items()}
def __repr__(self): args = (self.obj, self.rename, self.container) return "{}({})".format(self.__class__.__name__, ", ".join(repr(a) for a in args))
[docs] class NormalWindows(TestObject): def __init__(self): super().__init__( obj={ "name": tstrings.random_groupname().encode('UTF-8'), "description": random_bytestring(alpha=True, numeric=True), # "inventoryNumber": random_bytestring(alpha=False, numeric=True), "operatingSystem": b"Windows", "operatingSystemVersion": b"11", }, rename={"name": tstrings.random_groupname().encode('UTF-8')}, container=tstrings.random_name(), )
[docs] class NormalContainer(TestObject): def __init__(self): super().__init__( obj={ "name": tstrings.random_groupname().encode('UTF-8'), "description": random_bytestring(alpha=True, numeric=True), }, rename={"name": tstrings.random_groupname().encode('UTF-8')}, container=tstrings.random_name(), )
[docs] class NormalOU(TestObject): def __init__(self): super().__init__( obj={ "name": tstrings.random_groupname().encode('UTF-8'), "description": random_bytestring(alpha=True, numeric=True), }, rename={"name": tstrings.random_groupname().encode('UTF-8')}, container=tstrings.random_name(), )
[docs] def map_udm_user_to_con(user): """ Map a UDM user given as a dictionary of `property`:`values` mappings to a dictionary of `attributes`:`values` mappings as required by the CON-LDAP. Note: This expects the properties from the UDM users/user module and not OpenLDAP-attributes!. """ mapping = { "username": "sAMAccountName", "firstname": "givenName", "lastname": "sn", "description": "description", "street": "streetAddress", "city": "l", "postcode": "postalCode", "profilepath": "profilePath", "scriptpath": "scriptPath", "phone": "telephoneNumber", "homeTelephoneNumber": "homePhone", "mobileTelephoneNumber": "mobile", "pagerTelephoneNumber": "pager", "sambaUserWorkstations": "userWorkstations"} # return {mapping[key]: value for (key, value) in user.items() if key in mapping} return {mapping[key]: ([value] if not isinstance(value, list | tuple) else value) for (key, value) in user.items() if key in mapping}
[docs] def map_udm_group_to_con(group): """ Map a UDM group given as a dictionary of `property`:`values` mappings to a dictionary of `attributes`:`values` mappings as required by the CON-LDAP. Note: This expects the properties from the UDM groups/group module and not OpenLDAP-attributes!. """ mapping = {"name": "sAMAccountName", "description": "description"} # return {mapping[key]: value for (key, value) in group.items() if key in mapping} return {mapping[key]: ([value] if not isinstance(value, list | tuple) else value) for (key, value) in group.items() if key in mapping}
[docs] def map_udm_windows_to_con(windows): """ Map a UDM computers/windows given as a dictionary of `property`:`values` mappings to a dictionary of `attributes`:`values` mappings as required by the CON-LDAP. Note: This expects the properties from the UDM computers/windows module and not OpenLDAP-attributes!. """ mapping = { "name": "sAMAccountName", "description": "description", "operatingSystem": "operatingSystem", "operatingSystemVersion": "operatingSystemVersion", } return {mapping[key]: ([value] if not isinstance(value, list | tuple) else value) for (key, value) in windows.items() if key in mapping}
[docs] def map_udm_container_to_con(container): """ Map a UDM container/* given as a dictionary of `property`:`values` mappings to a dictionary of `attributes`:`values` mappings as required by the CON-LDAP. Note: This expects the properties from the UDM container/* module and not OpenLDAP-attributes!. """ mapping = { "name": "cn", "description": "description", } return {mapping[key]: ([value] if not isinstance(value, list | tuple) else value) for (key, value) in container.items() if key in mapping}
[docs] def map_udm_ou_to_con(container): """ Map a UDM container/* given as a dictionary of `property`:`values` mappings to a dictionary of `attributes`:`values` mappings as required by the CON-LDAP. Note: This expects the properties from the UDM container/* module and not OpenLDAP-attributes!. """ mapping = { "name": "ou", "description": "description", } return {mapping[key]: ([value] if not isinstance(value, list | tuple) else value) for (key, value) in container.items() if key in mapping}
[docs] def create_udm_user(udm, con, user, wait_for_sync, verify=True): print(f"\nCreating UDM user {user.basic}\n") (udm_user_dn, username) = udm.create_user(**user.to_unicode(user.basic)) con_user_dn = ldap.dn.dn2str([[("CN", to_unicode(username), ldap.AVA_STRING)], [("CN", "users", ldap.AVA_STRING)], *ldap.dn.str2dn(con.adldapbase)]) wait_for_sync() if verify: con.verify_object(con_user_dn, map_udm_user_to_con(user.basic)) return (udm_user_dn, con_user_dn)
[docs] def delete_udm_user(udm, con, udm_user_dn, con_user_dn, wait_for_sync): print("\nDeleting UDM user\n") udm.remove_object('users/user', dn=udm_user_dn) wait_for_sync() con.verify_object(con_user_dn, None)
[docs] def create_con_user(con, udm_user, wait_for_sync, basic_con_user=None): if not basic_con_user: basic_con_user = map_udm_user_to_con(udm_user.basic) print(f"\nCreating CON user {basic_con_user}\n") username = udm_user.basic.get("username") con_user_dn = con.createuser(username, **basic_con_user) udm_user_dn = ldap.dn.dn2str([[("uid", to_unicode(username), ldap.AVA_STRING)], [("CN", "users", ldap.AVA_STRING)], *ldap.dn.str2dn(configRegistry.get("ldap/base"))]) wait_for_sync() verify_udm_object("users/user", udm_user_dn, udm_user.basic) return (basic_con_user, con_user_dn, udm_user_dn)
[docs] def delete_con_user(con, con_user_dn, udm_user_dn, wait_for_sync): print("\nDeleting CON user\n") con.delete(con_user_dn) wait_for_sync() verify_udm_object("users/user", udm_user_dn, None)
[docs] def create_udm_group(udm, con, group, wait_for_sync): print(f"\nCreating UDM group {group}\n") (udm_group_dn, groupname) = udm.create_group(**group.to_unicode(group.group)) con_group_dn = ldap.dn.dn2str([[("CN", to_unicode(groupname), ldap.AVA_STRING)], [("CN", "groups", ldap.AVA_STRING)], *ldap.dn.str2dn(con.adldapbase)]) wait_for_sync() con.verify_object(con_group_dn, map_udm_group_to_con(group.group)) return (udm_group_dn, con_group_dn)
[docs] def delete_udm_group(udm, con, udm_group_dn, con_group_dn, wait_for_sync): print("\nDeleting UDM group\n") udm.remove_object('groups/group', dn=udm_group_dn) wait_for_sync() con.verify_object(con_group_dn, None)
[docs] def create_con_group(con, udm_group, wait_for_sync): con_group = map_udm_group_to_con(udm_group.group) print(f"\nCreating CON group {con_group}\n") groupname = to_unicode(udm_group.group.get("name")) con_group_dn = con.group_create(groupname, **con_group) udm_group_dn = ldap.dn.dn2str([[("cn", groupname, ldap.AVA_STRING)], [("CN", "groups", ldap.AVA_STRING)], *ldap.dn.str2dn(configRegistry.get("ldap/base"))]) wait_for_sync() verify_udm_object("groups/group", udm_group_dn, udm_group.group) return (con_group, con_group_dn, udm_group_dn)
[docs] def delete_con_group(con, con_group_dn, udm_group_dn, wait_for_sync): print("\nDeleting CON group\n") con.delete(con_group_dn) wait_for_sync() verify_udm_object("groups/group", udm_group_dn, None)