# coding: utf-8
from __future__ import print_function
import subprocess
from typing import Mapping, Text, Tuple, Union # noqa: F401
import ldap
import univention.config_registry
import univention.testing.strings as tstrings
from univention.testing.udm import verify_udm_object
configRegistry = univention.config_registry.ConfigRegistry()
configRegistry.load()
UTF8_CHARSET = tstrings.STR_UMLAUT + u"КирилицаКириллицаĆirilicaЋирилица" + u"普通話普通话"
# the CON sync can't # handle them (see bug #44373)
SPECIAL_CHARSET = "".join(set(tstrings.STR_SPECIAL_CHARACTER) - set('\\#"?'))
# We exclude '$' as it has special meaning . A '.' (dot) may not be the last
# character in a samAccountName, so we forbid it as well.
FORBIDDEN_SAMACCOUNTNAME = "\\/[]:;|=,+*?<>@ " + '$.'
SPECIAL_CHARSET_USERNAME = "".join(set(SPECIAL_CHARSET) - set(FORBIDDEN_SAMACCOUNTNAME))
[docs]def random_string(length=10, alpha=False, numeric=False, charset=u"", encoding='utf-8'):
# type: (int, bool, bool, str, Text) -> str
return tstrings.random_string(length, alpha, numeric, charset, encoding)
[docs]def random_bytestring(length=10, alpha=False, numeric=False, charset=u""):
# type: (int, bool, bool, Text) -> bytes
string = random_string(length, alpha, numeric, charset)
if not isinstance(string, bytes):
return string.encode('utf-8')
return string
[docs]def normalize_dn(dn):
# type: (str) -> str
r"""
Normalize a given dn. This removes some escaping of special chars in the
DNs. Note: The CON-LDAP returns DNs with escaping chars, OpenLDAP does not.
>>> normalize_dn(r"cn=peter\#,cn=groups")
'cn=peter#,cn=groups'
"""
return ldap.dn.dn2str(ldap.dn.str2dn(dn))
[docs]def to_unicode(string):
# type: (Union[bytes, Text]) -> Text
if isinstance(string, bytes):
return string.decode('utf-8')
return string
[docs]def restart_univention_cli_server():
# type: () -> None
print("Restarting Univention-CLI-Server")
subprocess.call(["pkill", "-f", "univention-cli-server"])
[docs]class TestUser(object):
def __init__(self, user, rename={}, container=None, selection=None):
selection = selection or ("username", "firstname", "lastname")
self.basic = {k: v for (k, v) in user.items() if k in selection}
self.user = user
self.rename = dict(self.basic)
self.rename.update(rename)
self.container = container
[docs] @classmethod
def to_unicode(cls, dictionary):
return {k: to_unicode(v) for k, v in dictionary.items()}
def __repr__(self):
args = (self.user, self.rename, self.container)
return "{}({})".format(self.__class__.__name__, ", ".join(repr(a) for a in args))
[docs]class NormalUser(TestUser):
def __init__(self, selection=None):
super(NormalUser, self).__init__(
user={
"username": tstrings.random_username().encode('UTF-8'),
"firstname": tstrings.random_name().encode('UTF-8'),
"lastname": tstrings.random_name().encode('UTF-8'),
"description": random_bytestring(alpha=True, numeric=True),
"street": random_bytestring(alpha=True, numeric=True),
"city": random_bytestring(alpha=True, numeric=True),
"postcode": random_bytestring(numeric=True),
"profilepath": random_bytestring(alpha=True, numeric=True),
"scriptpath": random_bytestring(alpha=True, numeric=True),
"phone": random_bytestring(numeric=True),
"homeTelephoneNumber": random_bytestring(numeric=True),
"mobileTelephoneNumber": random_bytestring(numeric=True),
"pagerTelephoneNumber": random_bytestring(numeric=True),
"sambaUserWorkstations": random_bytestring(numeric=True),
},
rename={"username": tstrings.random_username().encode('UTF-8')},
container=tstrings.random_name(),
selection=selection,
)
[docs]class Utf8User(TestUser):
def __init__(self, selection=None):
super(Utf8User, self).__init__(
user={
"username": random_bytestring(charset=UTF8_CHARSET),
"firstname": random_bytestring(charset=UTF8_CHARSET),
"lastname": random_bytestring(charset=UTF8_CHARSET),
"description": random_bytestring(charset=UTF8_CHARSET),
"street": random_bytestring(charset=UTF8_CHARSET),
"city": random_bytestring(charset=UTF8_CHARSET),
"postcode": random_bytestring(numeric=True),
"profilepath": random_bytestring(charset=UTF8_CHARSET),
"scriptpath": random_bytestring(charset=UTF8_CHARSET),
"phone": random_bytestring(numeric=True),
"homeTelephoneNumber": random_bytestring(numeric=True),
"mobileTelephoneNumber": random_bytestring(numeric=True),
"pagerTelephoneNumber": random_bytestring(numeric=True),
"sambaUserWorkstations": random_bytestring(numeric=True),
},
rename={"username": random_bytestring(charset=UTF8_CHARSET)},
container=random_string(charset=UTF8_CHARSET),
selection=selection,
)
[docs]class SpecialUser(TestUser):
def __init__(self, selection=None):
super(SpecialUser, self).__init__(
user={
"username": random_bytestring(charset=SPECIAL_CHARSET_USERNAME),
"firstname": tstrings.random_name_special_characters().encode('UTF-8'),
"lastname": tstrings.random_name_special_characters().encode('UTF-8'),
"description": random_bytestring(charset=SPECIAL_CHARSET),
"street": random_bytestring(charset=SPECIAL_CHARSET),
"city": random_bytestring(charset=SPECIAL_CHARSET),
"postcode": random_bytestring(numeric=True),
"profilepath": random_bytestring(charset=SPECIAL_CHARSET),
"scriptpath": random_bytestring(charset=SPECIAL_CHARSET),
"phone": random_bytestring(numeric=True),
"homeTelephoneNumber": random_bytestring(numeric=True),
"mobileTelephoneNumber": random_bytestring(numeric=True),
"pagerTelephoneNumber": random_bytestring(numeric=True),
"sambaUserWorkstations": random_bytestring(numeric=True),
},
rename={"username": random_bytestring(charset=SPECIAL_CHARSET_USERNAME)},
container=random_string(charset=SPECIAL_CHARSET),
selection=selection,
)
[docs]class TestGroup(object):
def __init__(self, group, rename={}, container=None):
self.group = group
self.rename = dict(self.group)
self.rename.update(rename)
self.container = container
[docs] @classmethod
def to_unicode(cls, dictionary):
return {k: to_unicode(v) for k, v in dictionary.items()}
def __repr__(self):
args = (self.group, self.rename, self.container)
return "{}({})".format(self.__class__.__name__, ", ".join(repr(a) for a in args))
[docs]class NormalGroup(TestGroup):
def __init__(self):
super(NormalGroup, self).__init__(
group={
"name": tstrings.random_groupname().encode('UTF-8'),
"description": random_bytestring(alpha=True, numeric=True),
},
rename={"name": tstrings.random_groupname().encode('UTF-8')},
container=tstrings.random_name(),
)
[docs]class Utf8Group(TestGroup):
def __init__(self):
super(Utf8Group, self).__init__(
group={
"name": random_bytestring(charset=UTF8_CHARSET),
"description": random_bytestring(charset=UTF8_CHARSET),
},
rename={"name": random_bytestring(charset=UTF8_CHARSET)},
container=random_string(charset=UTF8_CHARSET),
)
[docs]class SpecialGroup(TestGroup):
def __init__(self):
super(SpecialGroup, self).__init__(
group={
"name": random_bytestring(charset=SPECIAL_CHARSET_USERNAME),
"description": random_bytestring(charset=SPECIAL_CHARSET),
},
rename={"name": random_bytestring(charset=SPECIAL_CHARSET_USERNAME)},
container=random_string(charset=SPECIAL_CHARSET),
)
[docs]def map_udm_user_to_con(user):
"""
Map a UDM user given as a dictionary of `property`:`values` mappings to a
dictionary of `attributes`:`values` mappings as required by the CON-LDAP.
Note: This expects the properties from the UDM users/user module and not
OpenLDAP-attributes!.
"""
mapping = {
"username": "sAMAccountName",
"firstname": "givenName",
"lastname": "sn",
"description": "description",
"street": "streetAddress",
"city": "l",
"postcode": "postalCode",
"profilepath": "profilePath",
"scriptpath": "scriptPath",
"phone": "telephoneNumber",
"homeTelephoneNumber": "homePhone",
"mobileTelephoneNumber": "mobile",
"pagerTelephoneNumber": "pager",
"sambaUserWorkstations": "userWorkstations"}
# return {mapping[key]: value for (key, value) in user.items() if key in mapping}
return {mapping[key]: ([value] if not isinstance(value, (list, tuple)) else value) for (key, value) in user.items() if key in mapping}
[docs]def map_udm_group_to_con(group):
"""
Map a UDM group given as a dictionary of `property`:`values` mappings to a
dictionary of `attributes`:`values` mappings as required by the CON-LDAP.
Note: This expects the properties from the UDM groups/group module and not
OpenLDAP-attributes!.
"""
mapping = {"name": "sAMAccountName", "description": "description"}
# return {mapping[key]: value for (key, value) in group.items() if key in mapping}
return {mapping[key]: ([value] if not isinstance(value, (list, tuple)) else value) for (key, value) in group.items() if key in mapping}
[docs]def create_udm_user(udm, con, user, wait_for_sync):
print("\nCreating UDM user {}\n".format(user.basic))
(udm_user_dn, username) = udm.create_user(**user.to_unicode(user.basic))
con_user_dn = ldap.dn.dn2str([
[("CN", to_unicode(username), ldap.AVA_STRING)],
[("CN", "users", ldap.AVA_STRING)]] + ldap.dn.str2dn(con.adldapbase))
wait_for_sync()
con.verify_object(con_user_dn, map_udm_user_to_con(user.basic))
return (udm_user_dn, con_user_dn)
[docs]def delete_udm_user(udm, con, udm_user_dn, con_user_dn, wait_for_sync):
print("\nDeleting UDM user\n")
udm.remove_object('users/user', dn=udm_user_dn)
wait_for_sync()
con.verify_object(con_user_dn, None)
[docs]def create_con_user(con, udm_user, wait_for_sync):
basic_con_user = map_udm_user_to_con(udm_user.basic)
print("\nCreating CON user {}\n".format(basic_con_user))
username = udm_user.basic.get("username")
con_user_dn = con.createuser(username, **basic_con_user)
udm_user_dn = ldap.dn.dn2str([
[("uid", to_unicode(username), ldap.AVA_STRING)],
[("CN", "users", ldap.AVA_STRING)]] + ldap.dn.str2dn(configRegistry.get('ldap/base')))
wait_for_sync()
verify_udm_object("users/user", udm_user_dn, udm_user.basic)
return (basic_con_user, con_user_dn, udm_user_dn)
[docs]def delete_con_user(con, con_user_dn, udm_user_dn, wait_for_sync):
print("\nDeleting CON user\n")
con.delete(con_user_dn)
wait_for_sync()
verify_udm_object("users/user", udm_user_dn, None)
[docs]def create_udm_group(udm, con, group, wait_for_sync):
print("\nCreating UDM group {}\n".format(group))
(udm_group_dn, groupname) = udm.create_group(**group.to_unicode(group.group))
con_group_dn = ldap.dn.dn2str([
[("CN", to_unicode(groupname), ldap.AVA_STRING)],
[("CN", "groups", ldap.AVA_STRING)]] + ldap.dn.str2dn(con.adldapbase))
wait_for_sync()
con.verify_object(con_group_dn, map_udm_group_to_con(group.group))
return (udm_group_dn, con_group_dn)
[docs]def delete_udm_group(udm, con, udm_group_dn, con_group_dn, wait_for_sync):
print("\nDeleting UDM group\n")
udm.remove_object('groups/group', dn=udm_group_dn)
wait_for_sync()
con.verify_object(con_group_dn, None)
[docs]def create_con_group(con, udm_group, wait_for_sync):
con_group = map_udm_group_to_con(udm_group.group)
print("\nCreating CON group {}\n".format(con_group))
groupname = to_unicode(udm_group.group.get("name"))
con_group_dn = con.group_create(groupname, **con_group)
udm_group_dn = ldap.dn.dn2str([
[("cn", groupname, ldap.AVA_STRING)],
[("CN", "groups", ldap.AVA_STRING)]] + ldap.dn.str2dn(configRegistry.get('ldap/base')))
wait_for_sync()
verify_udm_object("groups/group", udm_group_dn, udm_group.group)
return (con_group, con_group_dn, udm_group_dn)
[docs]def delete_con_group(con, con_group_dn, udm_group_dn, wait_for_sync):
print("\nDeleting CON group\n")
con.delete(con_group_dn)
wait_for_sync()
verify_udm_object("groups/group", udm_group_dn, None)