Source code for univention.testing.connector_common

# coding: utf-8

from __future__ import print_function

import subprocess
from typing import Mapping, Text, Tuple, Union  # noqa: F401

import ldap

import univention.config_registry
import univention.testing.strings as tstrings
from univention.testing.udm import verify_udm_object

configRegistry = univention.config_registry.ConfigRegistry()
configRegistry.load()

UTF8_CHARSET = tstrings.STR_UMLAUT + u"КирилицаКириллицаĆirilicaЋирилица" + u"普通話普通话"
# the CON sync can't # handle them (see bug #44373)
SPECIAL_CHARSET = "".join(set(tstrings.STR_SPECIAL_CHARACTER) - set('\\#"?'))
# We exclude '$' as it has special meaning . A '.' (dot) may not be the last
# character in a samAccountName, so we forbid it as well.
FORBIDDEN_SAMACCOUNTNAME = "\\/[]:;|=,+*?<>@ " + '$.'
SPECIAL_CHARSET_USERNAME = "".join(set(SPECIAL_CHARSET) - set(FORBIDDEN_SAMACCOUNTNAME))


[docs]def random_string(length=10, alpha=False, numeric=False, charset=u"", encoding='utf-8'): # type: (int, bool, bool, str, Text) -> str return tstrings.random_string(length, alpha, numeric, charset, encoding)
[docs]def random_bytestring(length=10, alpha=False, numeric=False, charset=u""): # type: (int, bool, bool, Text) -> bytes string = random_string(length, alpha, numeric, charset) if not isinstance(string, bytes): return string.encode('utf-8') return string
[docs]def normalize_dn(dn): # type: (str) -> str r""" Normalize a given dn. This removes some escaping of special chars in the DNs. Note: The CON-LDAP returns DNs with escaping chars, OpenLDAP does not. >>> normalize_dn(r"cn=peter\#,cn=groups") 'cn=peter#,cn=groups' """ return ldap.dn.dn2str(ldap.dn.str2dn(dn))
[docs]def to_unicode(string): # type: (Union[bytes, Text]) -> Text if isinstance(string, bytes): return string.decode('utf-8') return string
[docs]def restart_univention_cli_server(): # type: () -> None print("Restarting Univention-CLI-Server") subprocess.call(["pkill", "-f", "univention-cli-server"])
[docs]class TestUser(object): def __init__(self, user, rename={}, container=None, selection=None): selection = selection or ("username", "firstname", "lastname") self.basic = {k: v for (k, v) in user.items() if k in selection} self.user = user self.rename = dict(self.basic) self.rename.update(rename) self.container = container
[docs] @classmethod def to_unicode(cls, dictionary): return {k: to_unicode(v) for k, v in dictionary.items()}
def __repr__(self): args = (self.user, self.rename, self.container) return "{}({})".format(self.__class__.__name__, ", ".join(repr(a) for a in args))
[docs]class NormalUser(TestUser): def __init__(self, selection=None): super(NormalUser, self).__init__( user={ "username": tstrings.random_username().encode('UTF-8'), "firstname": tstrings.random_name().encode('UTF-8'), "lastname": tstrings.random_name().encode('UTF-8'), "description": random_bytestring(alpha=True, numeric=True), "street": random_bytestring(alpha=True, numeric=True), "city": random_bytestring(alpha=True, numeric=True), "postcode": random_bytestring(numeric=True), "profilepath": random_bytestring(alpha=True, numeric=True), "scriptpath": random_bytestring(alpha=True, numeric=True), "phone": random_bytestring(numeric=True), "homeTelephoneNumber": random_bytestring(numeric=True), "mobileTelephoneNumber": random_bytestring(numeric=True), "pagerTelephoneNumber": random_bytestring(numeric=True), "sambaUserWorkstations": random_bytestring(numeric=True), }, rename={"username": tstrings.random_username().encode('UTF-8')}, container=tstrings.random_name(), selection=selection, )
[docs]class Utf8User(TestUser): def __init__(self, selection=None): super(Utf8User, self).__init__( user={ "username": random_bytestring(charset=UTF8_CHARSET), "firstname": random_bytestring(charset=UTF8_CHARSET), "lastname": random_bytestring(charset=UTF8_CHARSET), "description": random_bytestring(charset=UTF8_CHARSET), "street": random_bytestring(charset=UTF8_CHARSET), "city": random_bytestring(charset=UTF8_CHARSET), "postcode": random_bytestring(numeric=True), "profilepath": random_bytestring(charset=UTF8_CHARSET), "scriptpath": random_bytestring(charset=UTF8_CHARSET), "phone": random_bytestring(numeric=True), "homeTelephoneNumber": random_bytestring(numeric=True), "mobileTelephoneNumber": random_bytestring(numeric=True), "pagerTelephoneNumber": random_bytestring(numeric=True), "sambaUserWorkstations": random_bytestring(numeric=True), }, rename={"username": random_bytestring(charset=UTF8_CHARSET)}, container=random_string(charset=UTF8_CHARSET), selection=selection, )
[docs]class SpecialUser(TestUser): def __init__(self, selection=None): super(SpecialUser, self).__init__( user={ "username": random_bytestring(charset=SPECIAL_CHARSET_USERNAME), "firstname": tstrings.random_name_special_characters().encode('UTF-8'), "lastname": tstrings.random_name_special_characters().encode('UTF-8'), "description": random_bytestring(charset=SPECIAL_CHARSET), "street": random_bytestring(charset=SPECIAL_CHARSET), "city": random_bytestring(charset=SPECIAL_CHARSET), "postcode": random_bytestring(numeric=True), "profilepath": random_bytestring(charset=SPECIAL_CHARSET), "scriptpath": random_bytestring(charset=SPECIAL_CHARSET), "phone": random_bytestring(numeric=True), "homeTelephoneNumber": random_bytestring(numeric=True), "mobileTelephoneNumber": random_bytestring(numeric=True), "pagerTelephoneNumber": random_bytestring(numeric=True), "sambaUserWorkstations": random_bytestring(numeric=True), }, rename={"username": random_bytestring(charset=SPECIAL_CHARSET_USERNAME)}, container=random_string(charset=SPECIAL_CHARSET), selection=selection, )
[docs]class TestGroup(object): def __init__(self, group, rename={}, container=None): self.group = group self.rename = dict(self.group) self.rename.update(rename) self.container = container
[docs] @classmethod def to_unicode(cls, dictionary): return {k: to_unicode(v) for k, v in dictionary.items()}
def __repr__(self): args = (self.group, self.rename, self.container) return "{}({})".format(self.__class__.__name__, ", ".join(repr(a) for a in args))
[docs]class NormalGroup(TestGroup): def __init__(self): super(NormalGroup, self).__init__( group={ "name": tstrings.random_groupname().encode('UTF-8'), "description": random_bytestring(alpha=True, numeric=True), }, rename={"name": tstrings.random_groupname().encode('UTF-8')}, container=tstrings.random_name(), )
[docs]class Utf8Group(TestGroup): def __init__(self): super(Utf8Group, self).__init__( group={ "name": random_bytestring(charset=UTF8_CHARSET), "description": random_bytestring(charset=UTF8_CHARSET), }, rename={"name": random_bytestring(charset=UTF8_CHARSET)}, container=random_string(charset=UTF8_CHARSET), )
[docs]class SpecialGroup(TestGroup): def __init__(self): super(SpecialGroup, self).__init__( group={ "name": random_bytestring(charset=SPECIAL_CHARSET_USERNAME), "description": random_bytestring(charset=SPECIAL_CHARSET), }, rename={"name": random_bytestring(charset=SPECIAL_CHARSET_USERNAME)}, container=random_string(charset=SPECIAL_CHARSET), )
[docs]def map_udm_user_to_con(user): """ Map a UDM user given as a dictionary of `property`:`values` mappings to a dictionary of `attributes`:`values` mappings as required by the CON-LDAP. Note: This expects the properties from the UDM users/user module and not OpenLDAP-attributes!. """ mapping = { "username": "sAMAccountName", "firstname": "givenName", "lastname": "sn", "description": "description", "street": "streetAddress", "city": "l", "postcode": "postalCode", "profilepath": "profilePath", "scriptpath": "scriptPath", "phone": "telephoneNumber", "homeTelephoneNumber": "homePhone", "mobileTelephoneNumber": "mobile", "pagerTelephoneNumber": "pager", "sambaUserWorkstations": "userWorkstations"} # return {mapping[key]: value for (key, value) in user.items() if key in mapping} return {mapping[key]: ([value] if not isinstance(value, (list, tuple)) else value) for (key, value) in user.items() if key in mapping}
[docs]def map_udm_group_to_con(group): """ Map a UDM group given as a dictionary of `property`:`values` mappings to a dictionary of `attributes`:`values` mappings as required by the CON-LDAP. Note: This expects the properties from the UDM groups/group module and not OpenLDAP-attributes!. """ mapping = {"name": "sAMAccountName", "description": "description"} # return {mapping[key]: value for (key, value) in group.items() if key in mapping} return {mapping[key]: ([value] if not isinstance(value, (list, tuple)) else value) for (key, value) in group.items() if key in mapping}
[docs]def create_udm_user(udm, con, user, wait_for_sync): print("\nCreating UDM user {}\n".format(user.basic)) (udm_user_dn, username) = udm.create_user(**user.to_unicode(user.basic)) con_user_dn = ldap.dn.dn2str([ [("CN", to_unicode(username), ldap.AVA_STRING)], [("CN", "users", ldap.AVA_STRING)]] + ldap.dn.str2dn(con.adldapbase)) wait_for_sync() con.verify_object(con_user_dn, map_udm_user_to_con(user.basic)) return (udm_user_dn, con_user_dn)
[docs]def delete_udm_user(udm, con, udm_user_dn, con_user_dn, wait_for_sync): print("\nDeleting UDM user\n") udm.remove_object('users/user', dn=udm_user_dn) wait_for_sync() con.verify_object(con_user_dn, None)
[docs]def create_con_user(con, udm_user, wait_for_sync): basic_con_user = map_udm_user_to_con(udm_user.basic) print("\nCreating CON user {}\n".format(basic_con_user)) username = udm_user.basic.get("username") con_user_dn = con.createuser(username, **basic_con_user) udm_user_dn = ldap.dn.dn2str([ [("uid", to_unicode(username), ldap.AVA_STRING)], [("CN", "users", ldap.AVA_STRING)]] + ldap.dn.str2dn(configRegistry.get('ldap/base'))) wait_for_sync() verify_udm_object("users/user", udm_user_dn, udm_user.basic) return (basic_con_user, con_user_dn, udm_user_dn)
[docs]def delete_con_user(con, con_user_dn, udm_user_dn, wait_for_sync): print("\nDeleting CON user\n") con.delete(con_user_dn) wait_for_sync() verify_udm_object("users/user", udm_user_dn, None)
[docs]def create_udm_group(udm, con, group, wait_for_sync): print("\nCreating UDM group {}\n".format(group)) (udm_group_dn, groupname) = udm.create_group(**group.to_unicode(group.group)) con_group_dn = ldap.dn.dn2str([ [("CN", to_unicode(groupname), ldap.AVA_STRING)], [("CN", "groups", ldap.AVA_STRING)]] + ldap.dn.str2dn(con.adldapbase)) wait_for_sync() con.verify_object(con_group_dn, map_udm_group_to_con(group.group)) return (udm_group_dn, con_group_dn)
[docs]def delete_udm_group(udm, con, udm_group_dn, con_group_dn, wait_for_sync): print("\nDeleting UDM group\n") udm.remove_object('groups/group', dn=udm_group_dn) wait_for_sync() con.verify_object(con_group_dn, None)
[docs]def create_con_group(con, udm_group, wait_for_sync): con_group = map_udm_group_to_con(udm_group.group) print("\nCreating CON group {}\n".format(con_group)) groupname = to_unicode(udm_group.group.get("name")) con_group_dn = con.group_create(groupname, **con_group) udm_group_dn = ldap.dn.dn2str([ [("cn", groupname, ldap.AVA_STRING)], [("CN", "groups", ldap.AVA_STRING)]] + ldap.dn.str2dn(configRegistry.get('ldap/base'))) wait_for_sync() verify_udm_object("groups/group", udm_group_dn, udm_group.group) return (con_group, con_group_dn, udm_group_dn)
[docs]def delete_con_group(con, con_group_dn, udm_group_dn, wait_for_sync): print("\nDeleting CON group\n") con.delete(con_group_dn) wait_for_sync() verify_udm_object("groups/group", udm_group_dn, None)