# SPDX-FileCopyrightText: 2013-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""
ALPHA VERSION
Wrapper around Univention Directory Manager CLI to simplify
creation/modification of UDM objects in python. The wrapper automatically
removed created objects during wrapper destruction.
For usage examples look at the end of this file.
WARNING:
The API currently allows only modifications to objects created by the wrapper
itself. Also the deletion of objects is currently unsupported. Also not all
UDM object types are currently supported.
WARNING2:
The API is currently under heavy development and may/will change before next UCS release!
"""
import base64
import copy
import functools
import json
import os
import random
import shlex
import subprocess
import sys
import time
from collections.abc import Iterable, Mapping, Sequence
from inspect import getfullargspec
from typing import Any
import ldap
import ldap.filter
import psutil
import univention.admin.modules
import univention.admin.objects
import univention.admin.uexceptions
import univention.admin.uldap
import univention.testing.strings as uts
import univention.testing.ucr
from univention.testing import utils
from univention.testing.ucs_samba import DRSReplicationFailed, wait_for_drs_replication
[docs]
class UCSTestUDM_Exception(Exception):
def __str__(self):
if self.args and len(self.args) == 1 and isinstance(self.args[0], dict):
return '\n'.join(f'{key}={value}' for key, value in self.args[0].items())
else:
return Exception.__str__(self)
__repr__ = __str__
[docs]
class UCSTestUDM_MissingModulename(UCSTestUDM_Exception):
pass
[docs]
class UCSTestUDM_MissingDn(UCSTestUDM_Exception):
pass
[docs]
class UCSTestUDM_CreateUDMObjectFailed(UCSTestUDM_Exception):
pass
[docs]
class UCSTestUDM_CreateUDMUnknownDN(UCSTestUDM_Exception):
pass
[docs]
class UCSTestUDM_ModifyUDMObjectFailed(UCSTestUDM_Exception):
pass
[docs]
class UCSTestUDM_RestoreUDMObjectFailed(UCSTestUDM_Exception):
pass
[docs]
class UCSTestUDM_MoveUDMObjectFailed(UCSTestUDM_Exception):
pass
[docs]
class UCSTestUDM_NoModification(UCSTestUDM_Exception):
pass
[docs]
class UCSTestUDM_ModifyUDMUnknownDN(UCSTestUDM_Exception):
pass
[docs]
class UCSTestUDM_RestoreUDMUnknownDN(UCSTestUDM_Exception):
pass
[docs]
class UCSTestUDM_RemoveUDMObjectFailed(UCSTestUDM_Exception):
pass
[docs]
class UCSTestUDM_CleanupFailed(UCSTestUDM_Exception):
pass
[docs]
class UCSTestUDM_CannotModifyExistingObject(UCSTestUDM_Exception):
pass
[docs]
class UCSTestUDM_ListUDMObjectFailed(UCSTestUDM_Exception):
pass
[docs]
class UCSTestUDM_GetUDMObjectFailed(UCSTestUDM_Exception):
pass
[docs]
class UCSTestUDM:
PATH_UDM_CLI_SERVER = '/usr/share/univention-directory-manager-tools/univention-cli-server'
PATH_UDM_CLI_CLIENT = '/usr/sbin/udm'
PATH_UDM_CLI_CLIENT_WRAPPED = '/usr/sbin/udm-test'
COMPUTER_MODULES = (
'computers/ubuntu',
'computers/linux',
'computers/windows',
'computers/windows_domaincontroller',
'computers/domaincontroller_master',
'computers/domaincontroller_backup',
'computers/domaincontroller_slave',
'computers/memberserver',
'computers/macos',
'computers/ipmanagedclient')
# map identifying UDM module or rdn-attribute to samba4 rdn attribute
[docs]
def ad_object_identifying_filter(self, modulename: str, dn: str) -> dict[str, str] | None:
udm_mainmodule, udm_submodule = modulename.split('/', 1)
objname = ldap.dn.str2dn(dn)[0][0][1]
attr = ''
ad_ldap_controls = None
con_search_filter = ''
match_filter = ''
if udm_mainmodule == 'users':
attr = 'sAMAccountName'
con_search_filter = '(&(objectClass=user)(!(objectClass=computer))(userAccountControl:1.2.840.113556.1.4.803:=512))'
match_filter = '(&(|(&(objectClass=posixAccount)(objectClass=krb5Principal))(objectClass=user))(!(objectClass=univentionHost)))'
elif udm_mainmodule == 'groups':
attr = 'sAMAccountName'
con_search_filter = '(objectClass=group)'
elif udm_mainmodule == 'computers':
if udm_submodule.startswith('domaincontroller_') or udm_submodule == 'windows_domaincontroller':
attr = 'cn'
con_search_filter = '(&(objectClass=computer)(userAccountControl:1.2.840.113556.1.4.803:=532480))'
match_filter = '(|(&(objectClass=univentionDomainController)(univentionService=Samba 4))(objectClass=computer)(univentionServerRole=windows_domaincontroller))'
elif udm_submodule in ('windows', 'memberserver', 'linux', 'ubuntu', 'macos'):
attr = 'cn'
con_search_filter = '(&(objectClass=computer)(userAccountControl:1.2.840.113556.1.4.803:=4096))'
match_filter = '(|(&(objectClass=univentionWindows)(!(univentionServerRole=windows_domaincontroller)))(objectClass=computer)(objectClass=univentionMemberServer)(objectClass=univentionUbuntuClient)(objectClass=univentionLinuxClient)(objectClass=univentionMacOSClient))'
elif modulename == 'containers/cn':
attr = 'cn'
con_search_filter = '(&(|(objectClass=container)(objectClass=builtinDomain))(!(objectClass=groupPolicyContainer)))'
elif modulename == 'container/msgpo':
attr = 'cn'
con_search_filter = '(&(objectClass=container)(objectClass=groupPolicyContainer))'
elif modulename == 'containers/ou':
attr = 'ou'
con_search_filter = 'objectClass=organizationalUnit'
elif udm_mainmodule == 'dns':
attr = 'dc'
ad_ldap_controls = ["search_options:1:2"]
if udm_submodule in ('alias', 'host_record', 'ptr_record', 'srv_record', 'txt_record', 'ns_record', 'host_record'):
con_search_filter = '(&(objectClass=dnsNode)(!(dNSTombstoned=TRUE)))'
elif udm_submodule in ('forward_zone', 'reverse_zone'):
con_search_filter = '(objectClass=dnsZone)' # partly true, actually we map the SOA too
if match_filter:
try:
res = self._primary_lo.search(base=dn, filter=match_filter, scope='base', attr=[])
except (ldap.NO_SUCH_OBJECT, univention.admin.uexceptions.noObject):
print(f"OpenLDAP object to check against S4-Connector match_filter doesn't exist: {dn}")
res = None # TODO: This happens during delete. By setting res=None here, we will not wait for DRS replication for deletes!
except Exception as ex:
print(f"OpenLDAP search with S4-Connector match_filter failed: {ex}")
raise
if not res:
print(f"DRS wait not required, S4-Connector match_filter did not match the OpenLDAP object: {dn}")
return None
if attr:
filter_template = f'(&({attr}=%s){con_search_filter})'
ad_ldap_search_args = {'ldap_filter': ldap.filter.filter_format(filter_template, (objname,)), 'controls': ad_ldap_controls}
return ad_ldap_search_args
__lo = None
__primary_lo = None
__ucr = None
@property
def _lo(self) -> univention.admin.uldap.access:
if self.__lo is None:
self.__lo = utils.get_ldap_connection(admin_uldap=True)
return self.__lo
@property
def _primary_lo(self) -> univention.admin.uldap.access:
if self._ucr.get('server/role') == 'domaincontroller_master':
return self._lo
if self.__primary_lo is None:
self.__primary_lo = utils.get_ldap_connection(admin_uldap=True, primary=True)
return self.__primary_lo
@property
def _ucr(self) -> univention.testing.ucr.UCSTestConfigRegistry:
if self.__ucr is None:
self.__ucr = univention.testing.ucr.UCSTestConfigRegistry()
self.__ucr.load()
return self.__ucr
@property
def LDAP_BASE(self) -> str:
return self._ucr['ldap/base']
@property
def FQHN(self) -> str:
return '%(hostname)s.%(domainname)s.' % self._ucr
@property
def UNIVENTION_CONTAINER(self) -> str:
return 'cn=univention,%(ldap/base)s' % self._ucr
@property
def UNIVENTION_TEMPORARY_CONTAINER(self) -> str:
return 'cn=temporary,cn=univention,%(ldap/base)s' % self._ucr
def __init__(self, language=None) -> None:
self._env = {}
self._cleanup = {}
self._cleanupLocks = {}
if language:
self._env['LANG'] = '%s.UTF-8' % (language.replace('-', '_'),)
@classmethod
def _build_udm_cmdline(cls, modulename: str, action: str, kwargs: dict[str, Any]) -> list[str]:
"""
Pass modulename, action (create, modify, delete) and a bunch of keyword arguments
to _build_udm_cmdline to build a command for UDM CLI.
:param str modulename: name of UDM module (e.g. 'users/user')
:param str action: An action, like 'create', 'modify', 'delete'.
:param dict kwargs: A dictionary containing properties or one of the following special keys:
:param str binddn: The LDAP simple-bind DN.
:param str bindpwd: The LDAP simple-bind password.
:param str bindpwdfile: A pathname to a file containing the LDAP simple-bind password.
:param str dn: The LDAP distinguished name to operate on.
:param str position: The LDAP distinguished name of the parent container.
:param str superordinate: The LDAP distinguished name of the logical parent.
:param str policy_reference: The LDAP distinguished name of the UDM policy to add.
:param str policy_dereference: The LDAP distinguished name of the UDM policy to remove.
:param str append_option: The name of an UDM option group to add.
:param list options: A list of UDM option group to set.
:param set str_or_list: A list or one single *name=value* property.
:param list append: A list of *name=value* properties to add.
:param list remove: A list of *name=value* properties to remove.
:param bool remove_referring: Remove other LDAP entries referred by this entry.
:param bool ignore_exists: Ignore error on creation if entry already exists.
:param bool ignore_not_exists: Ignore error on deletion if entry does not exists.
>>> UCSTestUDM._build_udm_cmdline('users/user', 'create', {'username': 'foobar'})
['/usr/sbin/udm-test', 'users/user', 'create', '--set', 'username=foobar']
"""
cmd = [cls.PATH_UDM_CLI_CLIENT_WRAPPED, modulename, action]
args = copy.deepcopy(kwargs)
for arg in ('binddn', 'bindpwd', 'bindpwdfile', 'dn', 'position', 'superordinate', 'policy_reference', 'policy_dereference', 'append_option'):
if arg not in args:
continue
value = args.pop(arg)
if not isinstance(value, list | tuple):
value = (value,)
for item in value:
cmd.extend(['--%s' % arg.replace('_', '-'), item])
if action == 'list' and 'policies' in args:
cmd.extend(['--policies=%s' % (args.pop('policies'),)])
if action == 'list' and 'filter' in args:
cmd.extend(['--filter=%s' % (args.pop('filter'),)])
if action == 'list' and args.pop('as_json', False):
cmd.append('--as-json')
if action == 'list' and 'properties' in args:
for prop in args.pop('properties'):
cmd.extend(['--properties', prop])
for option in args.pop('options', ()):
cmd.extend(['--option', option])
for key, value in args.pop('set', {}).items():
if isinstance(value, list | tuple):
for item in value:
cmd.extend(['--set', f'{key}={item}'])
else:
cmd.extend(['--set', f'{key}={value}'])
for operation in ('append', 'remove'):
for key, values in args.pop(operation, {}).items():
for value in values:
cmd.extend(['--%s' % operation, f'{key}={value}'])
if args.pop('remove_referring', True) and action == 'remove':
cmd.append('--remove_referring')
if args.pop('ignore_exists', False) and action == 'create':
cmd.append('--ignore_exists')
if args.pop('ignore_not_exists', False) and action == 'remove':
cmd.append('--ignore_not_exists')
# set all other remaining properties
for key, value in args.items():
if isinstance(value, list | tuple):
for item in value:
cmd.extend(['--append', f'{key}={item}'])
elif value:
cmd.extend(['--set', f'{key}={value}'])
return cmd
[docs]
def create_object(self, modulename: str, wait_for_replication: bool = True, check_for_drs_replication: bool = False, wait_for: bool = False, **kwargs: Any) -> str:
r"""
Creates a LDAP object via UDM. Values for UDM properties can be passed via keyword arguments
only and have to exactly match UDM property names (case-sensitive!).
:param str modulename: name of UDM module (e.g. 'users/user')
:param bool wait_for_replication: delay return until Listener has settled.
:param bool check_for_drs_replication: delay return until Samab4 has settled.
:param \*\*kwargs:
"""
if not modulename:
raise UCSTestUDM_MissingModulename()
dn = None
cmd = self._build_udm_cmdline(modulename, 'create', kwargs)
print(f'Creating {modulename} object with {_prettify_cmd(cmd)}')
returncode, stdout, stderr = self._execute_udm(cmd)
if returncode:
raise UCSTestUDM_CreateUDMObjectFailed({'module': modulename, 'kwargs': kwargs, 'returncode': returncode, 'stdout': stdout, 'stderr': stderr})
# find DN of freshly created object and add it to cleanup list
for line in stdout.splitlines(): # :pylint: disable-msg=E1103
if line.startswith(('Object created: ', 'Object exists: ')):
dn = line.split(': ', 1)[-1]
if not line.startswith('Object exists: '):
self._cleanup.setdefault(modulename, []).append(dn)
break
else:
raise UCSTestUDM_CreateUDMUnknownDN({'module': modulename, 'kwargs': kwargs, 'stdout': stdout, 'stderr': stderr})
self.wait_for(modulename, dn, wait_for_replication, everything=wait_for)
return dn
[docs]
def create_with_defaults(self, modulename: str, **kwargs: Any) -> tuple[str, dict]:
"""Create any object with as maximum as possible prefilled random default values"""
module = univention.admin.modules._get(modulename)
# TODO: cache objects
if 'position' not in kwargs:
kwargs['position'] = (module.object.get_default_containers(self._lo) or [self.LDAP_BASE])[0]
superordinate_props = {}
if 'superordinate' not in kwargs and getattr(module, 'superordinate', None) not in (None, 'settings/cn'):
superordinate_module = random.choice(module.superordinate) if isinstance(module.superordinate, list) else module.superordinate
superordinate, superordinate_props = self.create_with_defaults(superordinate_module, position=kwargs['position'])
kwargs['superordinate'] = kwargs['position'] = superordinate
max_recursion = kwargs.pop('max_recursion', 1)
def ldap_search(prop):
m, p = prop.syntax.value.split(': ', 1)
if max_recursion <= 0:
return # random.choice(self._cleanup.get(m, [None]))
return self.create_with_defaults(m, max_recursion=max_recursion - 1)[1][p]
def udm_attribute(prop):
if max_recursion <= 0:
return # random.choice(self._cleanup.get(prop.syntax.udm_module, [None]))
# TODO: parse udm_filter and set values
if prop.syntax.udm_filter:
return
return self.create_with_defaults(prop.syntax.udm_module, max_recursion=max_recursion - 1)[1][prop.syntax.attribute]
def udm_objects(prop):
m = list(reversed(prop.syntax.udm_modules))
try:
m.remove('computers/computer')
m.append('computers/linux')
m.append('computers/windows')
m.append('computers/ubuntu')
except ValueError:
pass
for mod in ('computers/domaincontroller_master', 'container/dc'):
try:
m.remove(mod)
except ValueError:
pass
if max_recursion <= 0:
if prop.syntax.key == 'dn':
return # random.choice(self._cleanup.get(m[0], [None])) # warning: would cause circular group memberships for groups/group
return
# TODO: parse udm_filter and set values
if prop.syntax.udm_filter:
return
_dn, _props = self.create_with_defaults(m[0], max_recursion=max_recursion - 1)
try:
p = prop.syntax.key % _props
except KeyError:
obj = univention.admin.objects.get(univention.admin.modules.get(m[0]), None, self._lo, None, _dn)
# obj.open()
p = prop.syntax.key % obj.info
if p == 'dn':
return _dn
return p
def choices(syntax_name: str):
def func(prop):
return random.choice(getattr(univention.admin.syntax, syntax_name, prop.syntax).choices)[0]
return func
def complex(syntax_name: str):
def func(prop):
syn = getattr(univention.admin.syntax, syntax_name)
if not all(s[1].name in syntax_classes_mapping for s in syn.subsyntaxes):
# TODO: warning
return
def _quote(s):
return s if '"' not in s else '"%s"' % (s.replace('\\', '\\\\').replace('"', '\\"'),)
functions = [
_func(syntax_classes_mapping[s[1].name], prop)
for s in syn.subsyntaxes
]
return ' '.join(_quote(f()) for f in functions)
return func
def known_mail_address():
if len(known_mail_address.cache) >= 5:
return f'{uts.random_name()}@{random.choice(known_mail_address.cache)}'
email = uts.random_email()
domain = email.rsplit('@', 1)[-1]
self.create_with_defaults('mail/domain', name=domain)
known_mail_address.cache.append(domain)
return email
known_mail_address.cache = []
def default_container(prop):
# Bug #53827
class DefaultContainer(univention.admin.syntax.UDM_Objects):
"""Syntax to select a |UCS| default container from |LDAP|"""
udm_modules = ('container/cn', 'container/ou', 'container/dc')
regex = None
key = '%(name)s'
label = '%(name)s'
simple = True
s = prop.syntax
prop.syntax = DefaultContainer()
try:
return udm_objects(prop)
finally:
prop.syntax = s
def random_ip() -> str:
return uts.random_ip(iter(range(11, 121)))
syntax_classes_mapping = {
'string': uts.random_string,
'string_numbers_letters_dots': uts.random_string,
'string_numbers_letters_dots_spaces': uts.random_string,
'string64': lambda: uts.random_string(64),
'string6': lambda: uts.random_string(6),
'HalfString': uts.random_string,
'OneThirdString': uts.random_string,
'TwoThirdsString': uts.random_string,
'FiveThirdsString': uts.random_string,
'TwoString': uts.random_string,
'IA5string': uts.random_string,
'integer': lambda: uts.random_int(0, 100000),
'integerOrEmpty': lambda: uts.random_int(1, 100000),
'uid': uts.random_username,
'gid': uts.random_groupname,
'userPasswd': uts.random_string,
'passwd': uts.random_string,
'Country': choices('Country'),
'univentionAdminModules': choices('univentionAdminModules'),
'SambaPrivileges': choices('SambaPrivileges'),
'sambaGroupType': choices('sambaGroupType'),
'adGroupType': choices('adGroupType'),
'ipProtocol': choices('ipProtocol'),
'Hour': choices('Hour'),
'Minute': choices('Minute'),
'Month': choices('Month'),
'Weekday': choices('Weekday'),
'Day': choices('Day'),
'NewPortalEntryLinkTarget': choices('NewPortalEntryLinkTarget'),
'NewPortalDefaultLinkTarget': choices('NewPortalDefaultLinkTarget'),
'AllowDeny': choices('AllowDeny'),
'timeSpec': choices('timeSpec'),
'booleanNone': choices('booleanNone'),
'netbiosNodeType': choices('netbiosNodeType'),
'ddnsUpdates': choices('ddnsUpdates'),
'ddnsUpdateStyle': choices('ddnsUpdateStyle'),
'language': choices('language'),
'AllowDenyIgnore': choices('AllowDenyIgnore'),
'emailForwardSetting': choices('emailForwardSetting'),
'UCSServerRole': choices('UCSServerRole'),
'PortalFontColor': choices('PortalFontColor'),
'PortalDefaultLinkTarget': choices('PortalDefaultLinkTarget'),
'PortalCategory': choices('PortalCategory'),
'AuthRestriction': choices('AuthRestriction'),
'PortalEntryLinkTarget': choices('PortalEntryLinkTarget'),
'optionsUsersUser': choices('optionsUsersUser'),
'nfssync': choices('nfssync'),
'auto_one_zero': choices('auto_one_zero'),
'TimeZone': choices('TimeZone'),
'MAC_Address': uts.random_mac,
'ipAddress': random_ip,
'IPv4_AddressRange': lambda: '%s.2 %s.254' % tuple([random_ip().rsplit('.', 1)[0]] * 2),
'ipv4Address': random_ip,
'absolutePath': lambda: '/' + uts.random_string(),
'sharePath': lambda: '/' + uts.random_string(),
'BaseFilename': lambda: '%s.%s' % (uts.random_string(), uts.random_string(3)),
'PrinterURI': lambda: '%s %s' % (random.choice(['lpd://', 'ipp://', 'http://', 'usb:/', 'socket://', 'parallel:/', 'file:/', 'smb://']), uts.random_string()),
'Base64Bzip2Text': lambda: base64.b64encode(__import__('bz2').compress(uts.random_string().encode())).decode('ASCII'),
'Base64Upload': lambda: base64.b64encode(uts.random_string().encode()).decode('ASCII'),
'Base64BaseUpload': lambda: base64.b64encode(uts.random_string().encode()).decode('ASCII'),
'Base64Bzip2XML': lambda: base64.b64encode(__import__('bz2').compress((f'<?xml?><foo>{uts.random_string()}</foo>').encode())).decode('ASCII'),
'emailAddress': uts.random_email,
'emailAddressTemplate': lambda: f'<username>@{uts.random_domain_name()}',
'emailAddressValidDomain': known_mail_address,
'primaryEmailAddressValidDomain': known_mail_address,
'MailHomeServer': uts.random_domain_name,
'boolean': lambda: uts.random_int(0, 1),
'disabled': lambda: uts.random_int(0, 1),
'locked': lambda: uts.random_int(0, 1),
'v4netmask': lambda: uts.random_int(1, 31),
'netmask': lambda: uts.random_int(1, 31),
'printerName': lambda: uts.random_string(16),
'DHCP_HardwareAddress': lambda: f'ethernet {uts.random_mac()}',
'hostName': uts.random_string,
'policyName': uts.random_string,
'LocalizedDescription': lambda: '{} {}'.format(random.choice(['de_DE', 'en_US']), uts.random_string()),
'LocalizedDisplayName': lambda: '{} {}'.format(random.choice(['de_DE', 'en_US']), uts.random_string()),
'LocalizedLink': lambda: '{} {}://{}/{}'.format(random.choice(['de_DE', 'en_US']), random.choice(['http', 'https']), uts.random_domain_name(), uts.random_string()),
'reverseLookupSubnet': lambda: random_ip().rsplit('.', 1)[0],
'dnsPTR': lambda: random_ip().rsplit('.', 1)[1],
'dnsHostname': uts.random_domain_name,
'dnsName': uts.random_domain_name,
'dnsName_umlauts': uts.random_string,
'dnsSRVName': lambda: f'ldap tcp {uts.random_string()}',
'dnsSRVLocation': lambda: f'{uts.random_int()} {uts.random_int()} {uts.random_int()} {uts.random_domain_name()}',
'mailinglist_name': uts.random_string,
'mail_folder_name': uts.random_string,
'date': uts.random_date,
'date2': uts.random_date,
'iso8601Date': uts.random_date,
'TimeString': lambda: uts.random_time().rsplit(':', 1)[0], # Bug #53829
'phone': lambda: f'+49 421 {uts.random_int(10000, 99000)}-{uts.random_int(0, 9)}',
'postalAddress': lambda: f'"{uts.random_string()} street 1A" "{uts.random_int(10000, 99999)}" "{uts.random_string()}"',
'keyAndValue': lambda: f'{uts.random_string()} {uts.random_string()}',
'SambaLogonHours': lambda: uts.random_int(0, 167),
'DebianPackageVersion': uts.random_version,
'UCSVersion': uts.random_ucs_version,
'LDAP_Search': ldap_search,
'GroupDN': udm_objects,
'UserDN': udm_objects,
'HostDN': udm_objects,
'UCS_Server': udm_objects,
'Windows_Server': udm_objects,
'DomainController': udm_objects,
'ServicePrint_FQDN': udm_objects,
'DNS_ForwardZone': udm_objects,
'DNS_ReverseZone': udm_objects,
'NewPortalEntries': udm_objects,
'NewPortalCategoryEntries': udm_objects,
'NewPortalCategories': udm_objects,
'network': udm_objects,
'Service': udm_objects,
'nagiosHostsEnabledDn': udm_objects,
'nagiosServiceDn': udm_objects,
'dhcpService': udm_objects,
'UMC_OperationSet': udm_objects,
'WritableShare': udm_objects,
'PortalComputer': udm_objects,
'PortalEntries': udm_objects,
'Portals': udm_objects,
'GroupDNOrEmpty': udm_objects,
'PrinterProducerList': udm_objects,
'UserID': udm_objects,
'GroupID': udm_objects,
'printerModel': lambda: '"%s" "%s"' % (random.choice(['smb', 'cupsfilters/pxlmono.ppd', 'hp-ppd/HP/HP_LaserJet_6P.ppd', 'cups-pdf/CUPS-PDF_opt.ppd']), uts.random_string()),
'PrinterNames': udm_objects,
'PrinterDriverList': udm_attribute,
'Packages': udm_attribute,
'PackagesRemove': udm_attribute,
'KDE_Profile': udm_attribute,
'TrueFalseUp': lambda: random.choice(['TRUE', 'FALSE']),
'TrueFalseUpper': lambda: random.choice(['TRUE', 'FALSE', 'NONE']),
'TrueFalse': lambda: random.choice(['true', 'false', 'none']),
'TextArea': lambda: '\n'.join([uts.random_string()] * random.randint(2, 5)),
'SignedInteger': uts.random_int,
'hostOrIP': random_ip,
'hostname_or_ipadress_or_network': lambda: random.choice([random_ip(), uts.random_name(), '%s/%s' % (random_ip(), random.randint(1, 31))]),
'jpegPhoto': lambda: '/9j/2wBDAP%swAALCAABAAEBAREA/8QAFAABA%s//EABQQAQ%sD/2gAIAQEAAD8AN//Z' % ('/' * 86, 'A' * 20, 'A' * 20),
'Base64UMCIcon': lambda: 'AAABAAEAAQEAAAEAIAAwAAAAFgAAACgAAAABAAAAAgAAAAEAIAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAD/////AAAAAA==',
'SharedFolderUserACL': complex('SharedFolderUserACL'),
'SharedFolderGroupACL': complex('SharedFolderGroupACL'),
'dnsMX': complex('dnsMX'),
'dnsEntry': complex('dnsEntry'),
'dnsEntryReverse': complex('dnsEntryReverse'),
'dnsEntryAlias': complex('dnsEntryAlias'),
'dhcpEntry': complex('dhcpEntry'),
'IP_AddressRange': complex('IP_AddressRange'),
'UCR_Variable': complex('UCR_Variable'),
'nfsMounts': complex('nfsMounts'),
'ActivationDateTimeTimezone': complex('ActivationDateTimeTimezone'),
'Localesubdirname_and_GNUMessageCatalog': complex('Localesubdirname_and_GNUMessageCatalog'),
'translationTupleShortDescription': complex('translationTupleShortDescription'),
'translationTupleLongDescription': complex('translationTupleLongDescription'),
'translationTupleTabName': complex('translationTupleTabName'),
'I18N_GroupName': complex('I18N_GroupName'),
'UMCMessageCatalogFilename_and_GNUMessageCatalog': complex('UMCMessageCatalogFilename_and_GNUMessageCatalog'),
'LocalizedAnonymousEmpty': complex('LocalizedAnonymousEmpty'),
'PortalLinks': complex('PortalLinks'),
'SambaMinPwdAge': complex('SambaMinPwdAge'),
'SambaMaxPwdAge': complex('SambaMaxPwdAge'),
'UMC_CommandPattern': complex('UMC_CommandPattern'),
'attributeMapping': complex('attributeMapping'),
'UNIX_TimeInterval': complex('UNIX_TimeInterval'),
'TimeUnits': choices('TimeUnits'),
'adminFixedAttributes': choices('adminFixedAttributes'),
'desktopFixedAttributes': choices('desktopFixedAttributes'),
'dhcp_dnsFixedAttributes': choices('dhcp_dnsFixedAttributes'),
'dhcp_dnsupdateFixedAttributes': choices('dhcp_dnsupdateFixedAttributes'),
'dhcp_leasetimeFixedAttributes': choices('dhcp_leasetimeFixedAttributes'),
'dhcp_netbiosFixedAttributes': choices('dhcp_netbiosFixedAttributes'),
'dhcp_routingFixedAttributes': choices('dhcp_routingFixedAttributes'),
'dhcp_scopeFixedAttributes': choices('dhcp_scopeFixedAttributes'),
'dhcp_statementsFixedAttributes': choices('dhcp_statementsFixedAttributes'),
'dvcp_bootFixedAttributes': choices('dvcp_bootFixedAttributes'),
'maintenanceFixedAttributes': choices('maintenanceFixedAttributes'),
'masterPackagesFixedAttributes': choices('masterPackagesFixedAttributes'),
'memberPackagesFixedAttributes': choices('memberPackagesFixedAttributes'),
'pwhistoryFixedAttributes': choices('pwhistoryFixedAttributes'),
'registryFixedAttributes': choices('registryFixedAttributes'),
'releaseFixedAttributes': choices('releaseFixedAttributes'),
'repositorySyncFixedAttributes': choices('repositorySyncFixedAttributes'),
'shareUserQuotaFixedAttributes': choices('shareUserQuotaFixedAttributes'),
'slavePackagesFixedAttributes': choices('slavePackagesFixedAttributes'),
'umcFixedAttributes': choices('umcFixedAttributes'),
'updateFixedAttributes': choices('updateFixedAttributes'),
'printerACLTypes': choices('printerACLTypes'),
'cscPolicy': choices('cscPolicy'),
'ldapFilter': lambda: '(objectClass=*)',
'UNIX_AccessRight': lambda: oct(random.randint(0, 0o777)).replace('o', ''),
'UNIX_AccessRight_extended': lambda: oct(random.randint(0, 0o2777)).replace('o', ''),
'timeperiod': lambda: ','.join(
'-'.join((uts.random_time((a, b)), uts.random_time((c, d))))
for a, b, c, d in random.choices(((0, 2, 4, 6), (8, 10, 10, 12), (24, 16, 18, 20), (20, 21, 22, 23)), k=random.randint(1, 4))
),
'listAttributes': uts.random_string,
'ldapDn': lambda: self.LDAP_BASE, # only relevant for settings/syntax:base
'filesize': lambda: '%d%s%s' % (random.randint(0, 100), random.choice('gGmMkK'), random.choice('bB')),
# 'PortalCategorySelection': uts.random_string, kein bock... deprecated
}
module_property_mapping = {
'sambaRID': lambda: None, # uts.random_int(1000, 9999), # prevent The relative ID (SAMBA) is already in use: 5608
'uidNumber': lambda: None, # prevent noLock / already used
'gidNumber': lambda: None, # prevent noLock / already used
'mailForwardAddress': None, # depends on mailPrimaryAddress
'preferredDeliveryMethod': lambda: random.choice(["any", "mhs", "physical", "telex", "teletex", "g3fax", "g4fax", "ia5", "videotex", "telephone"]),
'shell': lambda: random.choice(['/bin/false', '/bin/bash', '/bin/sh', '/usr/sbin/nologin']),
'shares/share': {
'sambaCustomSettings': lambda: random.choice(['"acl xattr update mtime" yes', '"access based share enum" yes', '"follow symlinks" "yes"']),
},
'dns/reverse_zone': {
'contact': syntax_classes_mapping['emailAddress'], # Bug #53794
},
'dns/ptr_record': {
'ip': None, # prevent, that a ip is set. instead address is set, which builds the ip from address.$superordinate
},
'settings/extended_attribute': {
'version': lambda: '2', # other versions aren't detected as extended attribute!
},
'computers/windows': {
'ntCompatibility': lambda: '0', # Bug #53819
},
'settings/mswmifilter': {
'description': None, # Bug #53797
'displayName': None, # Bug #53797
},
"settings/directory": {
"policies": default_container,
"dns": default_container,
"dhcp": default_container,
"users": default_container,
"groups": default_container,
"computers": default_container,
"domaincontroller": default_container,
"networks": default_container,
"shares": default_container,
"printers": default_container,
"mail": default_container,
"license": default_container,
"base": default_container,
},
'users/user': {
'userCertificate': lambda: base64.b64encode(subprocess.check_output(
('openssl', 'x509', '-inform', 'pem', '-in', '/etc/univention/ssl/%s/cert.pem' % (self.FQHN.rstrip('.'),), '-outform', 'der', '-out', '-'),
)).decode('ASCII'), # expensive!
},
}
def _func(func, prop):
if 'prop' in getfullargspec(func).args:
func = functools.partial(func, prop)
return func
for name, prop in module.property_descriptions.items():
if name in kwargs:
continue
if not prop.editable: # or (is_modification and not prop.may_change)
continue
func = module_property_mapping.get(modulename, {}).get(name, module_property_mapping.get(name, syntax_classes_mapping.get(prop.syntax.name)))
if not func:
continue
func = _func(func, prop)
value = list({func() for i in range(random.randint(int(prop.required or name in ('ip', 'range')), 4))}) if prop.multivalue else func()
if value is None or isinstance(value, list) and all(v is None for v in value):
continue
kwargs.setdefault(name, value)
if modulename == 'shares/printer':
# when creating a shares/printergroup recursion is prevented: circular references aren't created
# therefore set some (invalid) values here
kwargs.setdefault('spoolHost', 'localhost')
kwargs.setdefault('model', 'cups-pdf/CUPS-PDF_noopt.ppd FAKE')
if modulename in ('dhcp/subnet', 'dhcp/sharedsubnet'):
import ipaddress
kwargs['subnetmask'] = str(min(29, int(kwargs['subnetmask'])))
iface = ipaddress.IPv4Interface('%(subnet)s/%(subnetmask)s' % kwargs)
kwargs['subnet'] = str(iface.network.network_address)
elif modulename in ('dhcp/pool',):
import ipaddress
iface = ipaddress.IPv4Interface('%(subnet)s/%(subnetmask)s' % superordinate_props)
if kwargs.get('dynamic_bootp_clients') != 'deny':
kwargs.pop('failover_peer')
if modulename in ('dhcp/subnet', 'dhcp/sharedsubnet', 'dhcp/pool'):
hosts = iface.network.hosts()
next(hosts)
ranges = []
for _i in range(len(kwargs['range']) if isinstance(kwargs['range'], list) else 1):
first = last = None
try:
first = last = next(hosts)
for _i in range(random.randrange(20)):
last = next(hosts)
except StopIteration:
pass
if first and first != last:
ranges.append(f'{first} {last}')
else:
break
kwargs['range'] = ranges
return self.create_object(modulename, **kwargs), kwargs
[docs]
def modify_object(self, modulename: str, wait_for_replication: bool = True, check_for_drs_replication: bool = False, wait_for: bool = False, **kwargs: Any) -> str:
"""
Modifies a LDAP object via UDM. Values for UDM properties can be passed via keyword arguments
only and have to exactly match UDM property names (case-sensitive!).
Please note: the object has to be created by create_object otherwise this call will raise an exception!
:param str modulename: name of UDM module (e.g. 'users/user')
"""
if not modulename:
raise UCSTestUDM_MissingModulename()
dn = kwargs.get('dn')
if not dn:
raise UCSTestUDM_MissingDn()
if dn not in self._cleanup.get(modulename, set()):
raise UCSTestUDM_CannotModifyExistingObject(dn)
cmd = self._build_udm_cmdline(modulename, 'modify', kwargs)
print(f'Modifying {modulename} object with {_prettify_cmd(cmd)}')
returncode, stdout, stderr = self._execute_udm(cmd)
if returncode:
raise UCSTestUDM_ModifyUDMObjectFailed({'module': modulename, 'kwargs': kwargs, 'returncode': returncode, 'stdout': stdout, 'stderr': stderr})
for line in stdout.splitlines(): # :pylint: disable-msg=E1103
if line.startswith('Object modified: '):
dn = line.split('Object modified: ', 1)[-1]
if dn != kwargs.get('dn'):
print('modrdn detected: %r ==> %r' % (kwargs.get('dn'), dn))
if kwargs.get('dn') in self._cleanup.get(modulename, []):
self._cleanup.setdefault(modulename, []).append(dn)
self._cleanup[modulename].remove(kwargs.get('dn'))
break
elif line.startswith('No modification: '):
raise UCSTestUDM_NoModification({'module': modulename, 'kwargs': kwargs, 'stdout': stdout, 'stderr': stderr})
else:
raise UCSTestUDM_ModifyUDMUnknownDN({'module': modulename, 'kwargs': kwargs, 'stdout': stdout, 'stderr': stderr})
self.wait_for(modulename, dn, wait_for_replication, everything=wait_for)
return dn
[docs]
def restore_object(self, modulename: str, wait_for_replication: bool = True, check_for_drs_replication: bool = False, wait_for: bool = False, **kwargs: Any) -> str:
if not modulename:
raise UCSTestUDM_MissingModulename()
dn = kwargs.get('dn')
if not dn:
raise UCSTestUDM_MissingDn()
cmd = self._build_udm_cmdline(modulename, 'restore', kwargs)
print(f'Restoring {modulename} object {_prettify_cmd(cmd)}')
returncode, stdout, stderr = self._execute_udm(cmd)
if returncode:
raise UCSTestUDM_RestoreUDMObjectFailed({'module': modulename, 'kwargs': kwargs, 'returncode': returncode, 'stdout': stdout, 'stderr': stderr})
for line in stdout.splitlines(): # :pylint: disable-msg=E1103
if line.startswith('Object restored: '):
restored_dn = line.split('Object restored: ', 1)[-1]
break
else:
raise UCSTestUDM_RestoreUDMUnknownDN({'module': modulename, 'kwargs': kwargs, 'stdout': stdout, 'stderr': stderr})
self.wait_for(modulename, restored_dn, wait_for_replication, everything=wait_for)
return restored_dn
[docs]
def move_object(self, modulename: str, wait_for_replication: bool = True, check_for_drs_replication: bool = False, wait_for: bool = False, **kwargs: Any) -> str:
if not modulename:
raise UCSTestUDM_MissingModulename()
dn = kwargs.get('dn')
if not dn:
raise UCSTestUDM_MissingDn()
if dn not in self._cleanup.get(modulename, set()):
raise UCSTestUDM_CannotModifyExistingObject(dn)
cmd = self._build_udm_cmdline(modulename, 'move', kwargs)
print(f'Moving {modulename} object {_prettify_cmd(cmd)}')
returncode, stdout, stderr = self._execute_udm(cmd)
if returncode:
raise UCSTestUDM_MoveUDMObjectFailed({'module': modulename, 'kwargs': kwargs, 'returncode': returncode, 'stdout': stdout, 'stderr': stderr})
for line in stdout.splitlines(): # :pylint: disable-msg=E1103
if line.startswith('Object modified: '):
self._cleanup.get(modulename, []).remove(dn)
new_dn = ldap.dn.dn2str(ldap.dn.str2dn(dn)[0:1] + ldap.dn.str2dn(kwargs.get('position', '')))
self._cleanup.setdefault(modulename, []).append(new_dn)
break
else:
raise UCSTestUDM_ModifyUDMUnknownDN({'module': modulename, 'kwargs': kwargs, 'stdout': stdout, 'stderr': stderr})
self.wait_for(modulename, dn, wait_for_replication, everything=wait_for)
return new_dn
[docs]
def remove_object(self, modulename: str, wait_for_replication: bool = True, wait_for: bool = False, **kwargs: Any) -> None:
if not modulename:
raise UCSTestUDM_MissingModulename()
dn = kwargs.get('dn')
if not dn:
raise UCSTestUDM_MissingDn()
if dn not in self._cleanup.get(modulename, set()):
raise UCSTestUDM_CannotModifyExistingObject(dn)
cmd = self._build_udm_cmdline(modulename, 'remove', kwargs)
print(f'Removing {modulename} object {_prettify_cmd(cmd)}')
returncode, stdout, stderr = self._execute_udm(cmd)
if returncode:
raise UCSTestUDM_RemoveUDMObjectFailed({'module': modulename, 'kwargs': kwargs, 'returncode': returncode, 'stdout': stdout, 'stderr': stderr})
if dn in self._cleanup.get(modulename, []):
self._cleanup[modulename].remove(dn)
self.wait_for(modulename, dn, wait_for_replication, everything=wait_for)
[docs]
def wait_for(self, modulename: str, dn: str, wait_for_replication: bool = True, wait_for_drs_replication: bool = False, wait_for_s4connector: bool = False, everything: bool = False) -> None:
# the order of the conditions is imporant
conditions: list[tuple[utils.ReplicationType, bool]] = []
if wait_for_replication:
conditions.append((utils.ReplicationType.LISTENER, wait_for_replication))
if everything:
wait_for_drs_replication = True
wait_for_s4connector = True
drs_replication = wait_for_drs_replication
ad_ldap_search_args = self.ad_object_identifying_filter(modulename, dn)
if wait_for_drs_replication and not isinstance(wait_for_drs_replication, str):
drs_replication = False
if ad_ldap_search_args:
drs_replication = ad_ldap_search_args
if wait_for_s4connector and ad_ldap_search_args and self._ucr.get('samba4/ldap/base'):
conditions.append((utils.ReplicationType.S4C_FROM_UCS, ad_ldap_search_args))
if drs_replication:
if not wait_for_replication:
conditions.append((utils.ReplicationType.LISTENER, wait_for_replication))
if self._ucr.get('server/role') in ('domaincontroller_backup', 'domaincontroller_slave', 'memberserver'):
conditions.append((utils.ReplicationType.DRS, drs_replication))
return utils.wait_for(conditions, verbose=False)
[docs]
def create_user(self, wait_for_replication: bool = True, check_for_drs_replication: bool = True, wait_for: bool = True, **kwargs: Any) -> tuple[str, str]: # :pylint: disable-msg=W0613
"""
Creates a user via UDM CLI. Values for UDM properties can be passed via keyword arguments only and
have to exactly match UDM property names (case-sensitive!). Some properties have default values:
:param str position: 'cn=users,$ldap_base'
:param str password: 'univention'
:param str firstname: 'Foo Bar'
:param str lastname: <random string>
:param str username: <random string> If username is missing, a random user name will be used.
:return: (dn, username)
"""
attr = self._set_module_default_attr(kwargs, (
('position', 'cn=users,%s' % self.LDAP_BASE),
('password', 'univention'),
('username', uts.random_username()),
('lastname', uts.random_name()),
('firstname', uts.random_name()),
))
return (self.create_object('users/user', wait_for_replication, check_for_drs_replication, wait_for=wait_for, **attr), attr['username'])
[docs]
def create_ldap_user(self, wait_for_replication: bool = True, check_for_drs_replication: bool = False, **kwargs: Any) -> tuple[str, str]: # :pylint: disable-msg=W0613
# check_for_drs_replication=False -> ldap users are not replicated to s4
attr = self._set_module_default_attr(kwargs, (
('position', 'cn=users,%s' % self.LDAP_BASE),
('password', 'univention'),
('username', uts.random_username()),
('lastname', uts.random_name()),
('name', uts.random_name()),
))
return (self.create_object('users/ldap', wait_for_replication, check_for_drs_replication, **attr), attr['username'])
[docs]
def remove_user(self, username: str, wait_for_replication: bool = True) -> None:
"""Removes a user object from the ldap given it's username."""
kwargs = {
'dn': f'uid={username},cn=users,{self.LDAP_BASE}',
}
self.remove_object('users/user', wait_for_replication, **kwargs)
[docs]
def create_group(self, wait_for_replication: bool = True, check_for_drs_replication: bool = True, **kwargs: Any) -> tuple[str, str]: # :pylint: disable-msg=W0613
"""
Creates a group via UDM CLI. Values for UDM properties can be passed via keyword arguments only and
have to exactly match UDM property names (case-sensitive!). Some properties have default values:
:param str position: `cn=users,$ldap_base`
:param str name: <random value>
:return: (dn, groupname)
If "groupname" is missing, a random group name will be used.
"""
attr = self._set_module_default_attr(kwargs, (
('position', 'cn=groups,%s' % self.LDAP_BASE),
('name', uts.random_groupname()),
))
return (self.create_object('groups/group', wait_for_replication, check_for_drs_replication, **attr), attr['name'])
def _set_module_default_attr(self, attributes, defaults):
"""
Returns the given attributes, extended by every property given in defaults if not yet set.
:param tuple defaults: should be a tupel containing tupels like "('username', <default_value>)".
"""
attr = copy.deepcopy(attributes)
for prop, value in defaults:
attr.setdefault(prop, value)
return attr
[docs]
def addCleanupLock(self, lockType, lockValue):
self._cleanupLocks.setdefault(lockType, []).append(lockValue)
def _wait_for_drs_removal(self, modulename: str, dn: str, verbose: bool = True) -> None:
ad_ldap_search_args = self.ad_object_identifying_filter(modulename, dn)
if ad_ldap_search_args:
wait_for_drs_replication(should_exist=False, verbose=verbose, timeout=20, **ad_ldap_search_args)
[docs]
def get_object(self, modulename: str, dn: str, **kwargs: Any) -> dict[str, Any]:
kwargs['position'] = dn
result = self.list_objects(modulename, **kwargs)
if result and len(result) == 1:
return result[0][1]
raise UCSTestUDM_GetUDMObjectFailed(result)
[docs]
def list_objects(self, modulename: str, **kwargs: Any) -> list[tuple[str, dict[str, Any]]]:
cmd = self._build_udm_cmdline(modulename, 'list', kwargs)
print(f'Listing {modulename} objects {_prettify_cmd(cmd)}')
returncode, stdout, stderr = self._execute_udm(cmd)
if returncode:
raise UCSTestUDM_ListUDMObjectFailed(returncode, stdout, stderr)
objects: list[tuple[str, dict[str, Any]]] = []
dn = None
attrs: dict[str, Any] = {}
pattr = None
pvalue = None
pdn = None
current_policy_type = None
for line in stdout.splitlines():
if line.startswith('DN: '):
if dn:
objects.append((dn, attrs))
dn = None
attrs = {}
dn = line[3:].strip()
elif not line.strip():
continue
elif line.startswith(' ') and ':' in line: # list --policies=1
name, value = line.split(':', 1)
if name.strip() == 'Policy':
pvalue = pattr = None
pdn = value
elif name.strip() == 'Attribute':
pattr = value
elif name.strip() == 'Value':
pvalue = value
attrs.setdefault(current_policy_type, {}).setdefault(pdn.strip(), {}).setdefault(pattr.strip(), []).append(pvalue.strip())
elif line.startswith(' ') and '=' in line: # list --policies=2
name, value = line.split('=', 1)
attrs.setdefault(current_policy_type, {}).setdefault(name.strip(), []).append(value.strip().strip('"'))
elif any(x in line for x in ('Policy-based Settings', 'Subnet-based Settings', 'Merged Settings')):
current_policy_type = line.split(':')[0].strip()
elif line.startswith(' ') and ':' in line:
name, value = line.split(':', 1)
attrs.setdefault(name.strip(), []).append(value.strip())
if dn:
objects.append((dn, attrs))
return objects
[docs]
def cleanup(self) -> None:
"""Automatically removes LDAP objects via UDM CLI that have been created before."""
if not self._cleanup and not self._cleanupLocks:
return
failedObjects = {}
print('Performing UCSTestUDM cleanup...')
objects = []
removed = []
for modulename, objs in self._cleanup.items():
objects.extend((modulename, dn) for dn in objs)
for modulename, dn in sorted(objects, key=lambda x: len(x[1]), reverse=True):
cmd = ['/usr/sbin/udm-test', modulename, 'remove', '--dn', dn, '--remove_referring']
print('removing DN:', dn)
returncode, stdout, stderr = self._execute_udm(cmd)
if returncode or 'Object removed:' not in stdout:
failedObjects.setdefault(modulename, []).append(dn)
else:
removed.append((modulename, dn))
# simply iterate over the remaining objects again, removing them might just have failed for chronology reasons
# (e.g groups can not be removed while there are still objects using it as primary group)
for modulename, objects in failedObjects.items():
for dn in objects:
cmd = ['/usr/sbin/udm-test', modulename, 'remove', '--dn', dn, '--remove_referring']
returncode, stdout, stderr = self._execute_udm(cmd)
if returncode or 'Object removed:' not in stdout:
print(f'Warning: Failed to remove {modulename!r} object {dn!r}', file=sys.stderr)
print(f'stdout={stdout!r} {stderr!r} {self._lo.get(dn)!r}', file=sys.stderr)
else:
removed.append((modulename, dn))
self._cleanup = {}
for lock_type, values in self._cleanupLocks.items():
for value in values:
lockDN = f'cn={value},cn={lock_type},{self.UNIVENTION_TEMPORARY_CONTAINER}'
try:
self._lo.delete(lockDN)
except (ldap.NO_SUCH_OBJECT, univention.admin.uexceptions.noObject):
pass
except Exception as ex:
print(f'Failed to remove locking object "{lockDN}" during cleanup: {ex!r}')
self._cleanupLocks = {}
print('Cleanup: wait for replication and drs removal')
utils.wait_for_replication(verbose=False)
for module, dn in removed:
try:
self._wait_for_drs_removal(module, dn, verbose=True)
except DRSReplicationFailed as exc:
print('Cleanup: DRS replication failed:', exc)
self.stop_cli_server()
print('UCSTestUDM cleanup done')
[docs]
def stop_cli_server(self) -> None:
"""restart UDM CLI server"""
print('trying to restart UDM CLI server')
procs = []
for proc in psutil.process_iter():
try:
cmdline = proc.cmdline()
if len(cmdline) >= 2 and cmdline[0].startswith('/usr/bin/python') and cmdline[1] == self.PATH_UDM_CLI_SERVER:
procs.append(proc)
except psutil.NoSuchProcess:
pass
for signal in (15, 9):
for proc in procs[:]:
try:
print(f'sending signal {signal} to process {proc.pid} ({proc.cmdline()!r})')
os.kill(proc.pid, signal)
except (psutil.NoSuchProcess, OSError):
print('process already terminated')
procs.remove(proc)
if signal == 15:
time.sleep(1)
[docs]
def verify_udm_object(self, *args, **kwargs):
return verify_udm_object(*args, **kwargs)
[docs]
def verify_ldap_object(self, *args, **kwargs):
return utils.verify_ldap_object(*args, **kwargs)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type:
print(f'Cleanup after exception: {exc_type} {exc_value}')
self.cleanup()
def _execute_udm(self, cmd):
child = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, env=self._env)
(stdout, stderr) = child.communicate()
stdout, stderr = stdout.decode('utf-8', 'replace'), stderr.decode('utf-8', 'replace')
return child.returncode, stdout, stderr
[docs]
class UDM(UCSTestUDM):
"""UDM interface using the REST API"""
PATH_UDM_CLI_CLIENT_WRAPPED = '/usr/sbin/udm-test-rest'
[docs]
def list_objects_json(self, modulename: str, **kwargs: Any) -> list[tuple[str, dict[str, Any]]]:
kwargs['as_json'] = True
cmd = self._build_udm_cmdline(modulename, 'list', kwargs)
print(f'Listing {modulename} objects {_prettify_cmd(cmd)}')
returncode, stdout, stderr = self._execute_udm(cmd)
if returncode:
raise UCSTestUDM_ListUDMObjectFailed(returncode, stdout, stderr)
objects = {}
dn = None
current = None
for line in stdout.splitlines():
if line.startswith('DN: '):
dn = line[3:].strip()
elif line.startswith(('URL: ', 'Object-Type: ')):
key, val = line.split(': ', 1)
objects.setdefault(dn, {})[key.lower().replace('-', '_')] = val
elif line.startswith(('Options: ', 'Properties: ', 'Policies: ')):
if current:
name, current = current.split(': ', 1)
objects.setdefault(dn, {})[name.lower()] = json.loads(current)
current = line
continue
elif current:
current += line
if dn and current:
name, current = current.split(': ', 1)
objects.setdefault(dn, {})[name.lower()] = json.loads(current)
return objects
[docs]
def stop_cli_server(self) -> None:
super().stop_cli_server()
subprocess.call(['systemctl', 'reload', 'univention-directory-manager-rest.service'])
[docs]
def verify_udm_object(module: Any, dn: str, expected_properties: Mapping[str, bytes | str | Sequence[str]] | None) -> None:
"""
Verify an object exists with the given `dn` in the given UDM `module` with
some properties. Setting `expected_properties` to `None` requires the
object to not exist.
:param dict expected_properties: is a dictionary of (property,value) pairs.
:raises AssertionError: in case of a mismatch.
"""
lo = utils.get_ldap_connection(admin_uldap=True)
try:
position = univention.admin.uldap.position(lo.base)
udm_module = univention.admin.modules.get(module)
if not udm_module:
univention.admin.modules.update()
udm_module = univention.admin.modules.get(module)
if not udm_module.initialized:
univention.admin.modules.init(lo, position, udm_module)
udm_object = univention.admin.objects.get(udm_module, None, lo, position, dn)
udm_object.open()
except univention.admin.uexceptions.noObject:
if expected_properties is None:
return
raise
if expected_properties is None:
raise AssertionError(f"UDM object {dn} should not exist")
difference = {}
for (key, value) in expected_properties.items():
udm_value = udm_object.info.get(key, [])
if udm_value is None:
udm_value = []
if isinstance(udm_value, bytes | str):
udm_value = {udm_value}
if not isinstance(value, tuple | list):
value = {value}
value = {_to_unicode(v).lower() for v in value}
udm_value = {_to_unicode(v).lower() for v in udm_value}
if udm_value != value:
try:
value = {_normalize_dn(dn) for dn in value}
udm_value = {_normalize_dn(dn) for dn in udm_value}
except ldap.DECODING_ERROR:
pass
if udm_value != value:
difference[key] = (udm_value, value)
assert not difference, '\n'.join(f'{key}: {udm_value} != expected {value}' for key, (udm_value, value) in difference.items())
def _prettify_cmd(cmd: Iterable[str]) -> str:
cmd = ' '.join(shlex.quote(x) for x in cmd)
if set(cmd) & {'\x00', '\n'}:
cmd = repr(cmd)
return cmd
def _to_unicode(string: bytes | str) -> str:
if isinstance(string, bytes):
return string.decode('utf-8')
return string
def _normalize_dn(dn: str) -> str:
r"""
Normalize a given dn. This removes some escaping of special chars in the
DNs. Note: The CON-LDAP returns DNs with escaping chars, OpenLDAP does not.
>>> _normalize_dn(r"cn=peter\#,cn=groups")
'cn=peter#,cn=groups'
"""
return ldap.dn.dn2str(ldap.dn.str2dn(dn))
if __name__ == '__main__':
import doctest
print(doctest.testmod())
if __name__ == '__disabled__':
ucr = univention.testing.ucr.UCSTestConfigRegistry()
ucr.load()
with UCSTestUDM() as udm:
# create user
dnUser, _username = udm.create_user()
# stop CLI daemon
udm.stop_cli_server()
# create group
_dnGroup, _groupname = udm.create_group()
# modify user from above
udm.modify_object('users/user', dn=dnUser, description='Foo Bar')
# test with malformed arguments
try:
_dnUser, _username = udm.create_user(username='')
except UCSTestUDM_CreateUDMObjectFailed:
print('Caught anticipated exception UCSTestUDM_CreateUDMObjectFailed - SUCCESS')
# try to modify object not created by create_udm_object()
try:
udm.modify_object('users/user', dn='uid=Administrator,cn=users,%s' % ucr.get('ldap/base'), description='Foo Bar')
except UCSTestUDM_CannotModifyExistingObject:
print('Caught anticipated exception UCSTestUDM_CannotModifyExistingObject - SUCCESS')