# -*- coding: utf-8 -*-
#
# Copyright 2013-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
ALPHA VERSION
Wrapper around Univention Directory Manager CLI to simplify
creation/modification of UDM objects in python. The wrapper automatically
removed created objects during wrapper destruction.
For usage examples look at the end of this file.
WARNING:
The API currently allows only modifications to objects created by the wrapper
itself. Also the deletion of objects is currently unsupported. Also not all
UDM object types are currently supported.
WARNING2:
The API is currently under heavy development and may/will change before next UCS release!
"""
from __future__ import print_function
import base64
import copy
import functools
import os
import pipes
import random
import subprocess
import sys
import time
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Text, Tuple, Union # noqa: F401
import ldap
import ldap.filter
import psutil
import six
import univention.admin.modules
import univention.admin.objects
import univention.admin.uldap
import univention.testing.strings as uts
import univention.testing.ucr
import univention.testing.utils as utils
from univention.testing.ucs_samba import wait_for_drs_replication, DRSReplicationFailed
try:
from inspect import getfullargspec as getargspec
except ImportError:
from inspect import getargspec # python 2
[docs]class UCSTestUDM_Exception(Exception):
def __str__(self):
if self.args and len(self.args) == 1 and isinstance(self.args[0], dict):
return '\n'.join('%s=%s' % (key, value) for key, value in self.args[0].items())
else:
return Exception.__str__(self)
__repr__ = __str__
[docs]class UCSTestUDM_MissingModulename(UCSTestUDM_Exception):
pass
[docs]class UCSTestUDM_MissingDn(UCSTestUDM_Exception):
pass
[docs]class UCSTestUDM_CreateUDMObjectFailed(UCSTestUDM_Exception):
pass
[docs]class UCSTestUDM_CreateUDMUnknownDN(UCSTestUDM_Exception):
pass
[docs]class UCSTestUDM_ModifyUDMObjectFailed(UCSTestUDM_Exception):
pass
[docs]class UCSTestUDM_MoveUDMObjectFailed(UCSTestUDM_Exception):
pass
[docs]class UCSTestUDM_NoModification(UCSTestUDM_Exception):
pass
[docs]class UCSTestUDM_ModifyUDMUnknownDN(UCSTestUDM_Exception):
pass
[docs]class UCSTestUDM_RemoveUDMObjectFailed(UCSTestUDM_Exception):
pass
[docs]class UCSTestUDM_CleanupFailed(UCSTestUDM_Exception):
pass
[docs]class UCSTestUDM_CannotModifyExistingObject(UCSTestUDM_Exception):
pass
[docs]class UCSTestUDM_ListUDMObjectFailed(UCSTestUDM_Exception):
pass
[docs]class UCSTestUDM(object):
PATH_UDM_CLI_SERVER = '/usr/share/univention-directory-manager-tools/univention-cli-server'
PATH_UDM_CLI_CLIENT = '/usr/sbin/udm'
PATH_UDM_CLI_CLIENT_WRAPPED = '/usr/sbin/udm-test'
COMPUTER_MODULES = (
'computers/ubuntu',
'computers/linux',
'computers/windows',
'computers/windows_domaincontroller',
'computers/domaincontroller_master',
'computers/domaincontroller_backup',
'computers/domaincontroller_slave',
'computers/memberserver',
'computers/macos',
'computers/ipmanagedclient')
# map identifying UDM module or rdn-attribute to samba4 rdn attribute
[docs] def ad_object_identifying_filter(self, modulename, dn):
# type: (str, str) -> Optional[Dict[str, str]]
udm_mainmodule, udm_submodule = modulename.split('/', 1)
objname = ldap.dn.str2dn(dn)[0][0][1]
attr = ''
ad_ldap_controls = None
con_search_filter = ''
match_filter = ''
if udm_mainmodule == 'users':
attr = 'sAMAccountName'
con_search_filter = '(&(objectClass=user)(!(objectClass=computer))(userAccountControl:1.2.840.113556.1.4.803:=512))'
match_filter = '(&(|(&(objectClass=posixAccount)(objectClass=krb5Principal))(objectClass=user))(!(objectClass=univentionHost)))'
elif udm_mainmodule == 'groups':
attr = 'sAMAccountName'
con_search_filter = '(objectClass=group)'
elif udm_mainmodule == 'computers':
if udm_submodule.startswith('domaincontroller_') or udm_submodule == 'windows_domaincontroller':
attr = 'cn'
con_search_filter = '(&(objectClass=computer)(userAccountControl:1.2.840.113556.1.4.803:=532480))'
match_filter = '(|(&(objectClass=univentionDomainController)(univentionService=Samba 4))(objectClass=computer)(univentionServerRole=windows_domaincontroller))'
elif udm_submodule in ('windows', 'memberserver', 'linux', 'ubuntu', 'macos'):
attr = 'cn'
con_search_filter = '(&(objectClass=computer)(userAccountControl:1.2.840.113556.1.4.803:=4096))'
match_filter = '(|(&(objectClass=univentionWindows)(!(univentionServerRole=windows_domaincontroller)))(objectClass=computer)(objectClass=univentionMemberServer)(objectClass=univentionUbuntuClient)(objectClass=univentionLinuxClient)(objectClass=univentionMacOSClient))'
elif modulename == 'containers/cn':
attr = 'cn'
con_search_filter = '(&(|(objectClass=container)(objectClass=builtinDomain))(!(objectClass=groupPolicyContainer)))'
elif modulename == 'container/msgpo':
attr = 'cn'
con_search_filter = '(&(objectClass=container)(objectClass=groupPolicyContainer))'
elif modulename == 'containers/ou':
attr = 'ou'
con_search_filter = 'objectClass=organizationalUnit'
elif udm_mainmodule == 'dns':
attr = 'dc'
ad_ldap_controls = ["search_options:1:2"]
if udm_submodule in ('alias', 'host_record', 'ptr_record', 'srv_record', 'txt_record', 'ns_record', 'host_record'):
con_search_filter = '(&(objectClass=dnsNode)(!(dNSTombstoned=TRUE)))'
elif udm_submodule in ('forward_zone', 'reverse_zone'):
con_search_filter = '(objectClass=dnsZone)' # partly true, actually we map the SOA too
if match_filter:
try:
res = self._lo.search(base=dn, filter=match_filter, scope='base', attr=[])
except ldap.NO_SUCH_OBJECT:
print("OpenLDAP object to check against S4-Connector match_filter doesn't exist: %s" % (dn, ))
res = None # TODO: This happens during delete. By setting res=None here, we will not wait for DRS replication for deletes!
except Exception as ex:
print("OpenLDAP search with S4-Connector match_filter failed: %s" % (ex, ))
raise
if not res:
print("DRS wait not required, S4-Connector match_filter did not match the OpenLDAP object: %s" % (dn,))
return
if attr:
filter_template = '(&(%s=%%s)%s)' % (attr, con_search_filter)
ad_ldap_search_args = {'ldap_filter': ldap.filter.filter_format(filter_template, (objname,)), 'controls': ad_ldap_controls}
return ad_ldap_search_args
__lo = None
__ucr = None
@property
def _lo(self):
# type: () -> univention.admin.uldap.access
if self.__lo is None:
self.__lo = utils.get_ldap_connection()
return self.__lo
@property
def _ucr(self):
# type: () -> univention.testing.ucr.UCSTestConfigRegistry
if self.__ucr is None:
self.__ucr = univention.testing.ucr.UCSTestConfigRegistry()
self.__ucr.load()
return self.__ucr
@property
def LDAP_BASE(self):
# type: () -> str
return self._ucr['ldap/base']
@property
def FQHN(self):
# type: () -> str
return '%(hostname)s.%(domainname)s.' % self._ucr
@property
def UNIVENTION_CONTAINER(self):
# type: () -> str
return 'cn=univention,%(ldap/base)s' % self._ucr
@property
def UNIVENTION_TEMPORARY_CONTAINER(self):
# type: () -> str
return 'cn=temporary,cn=univention,%(ldap/base)s' % self._ucr
def __init__(self):
# type: () -> None
self._cleanup = {}
self._cleanupLocks = {}
@classmethod
def _build_udm_cmdline(cls, modulename, action, kwargs):
# type: (str, str, Dict[str, Any]) -> List[str]
"""
Pass modulename, action (create, modify, delete) and a bunch of keyword arguments
to _build_udm_cmdline to build a command for UDM CLI.
:param str modulename: name of UDM module (e.g. 'users/user')
:param str action: An action, like 'create', 'modify', 'delete'.
:param dict kwargs: A dictionary containing properties or one of the following special keys:
:param str binddn: The LDAP simple-bind DN.
:param str bindpwd: The LDAP simple-bind password.
:param str bindpwdfile: A pathname to a file containing the LDAP simple-bind password.
:param str dn: The LDAP distinguished name to operate on.
:param str position: The LDAP distinguished name of the parent container.
:param str superordinate: The LDAP distinguished name of the logical parent.
:param str policy_reference: The LDAP distinguished name of the UDM policy to add.
:param str policy_dereference: The LDAP distinguished name of the UDM policy to remove.
:param str append_option: The name of an UDM option group to add.
:param list options: A list of UDM option group to set.
:param set str_or_list: A list or one single *name=value* property.
:param list append: A list of *name=value* properties to add.
:param list remove: A list of *name=value* properties to remove.
:param bool remove_referring: Remove other LDAP entries referred by this entry.
:param bool ignore_exists: Ignore error on creation if entry already exists.
:param bool ignore_not_exists: Ignore error on deletion if entry does not exists.
>>> UCSTestUDM._build_udm_cmdline('users/user', 'create', {'username': 'foobar'})
['/usr/sbin/udm-test', 'users/user', 'create', '--set', 'username=foobar']
"""
cmd = [cls.PATH_UDM_CLI_CLIENT_WRAPPED, modulename, action]
args = copy.deepcopy(kwargs)
for arg in ('binddn', 'bindpwd', 'bindpwdfile', 'dn', 'position', 'superordinate', 'policy_reference', 'policy_dereference', 'append_option'):
if arg not in args:
continue
value = args.pop(arg)
if not isinstance(value, (list, tuple)):
value = (value,)
for item in value:
cmd.extend(['--%s' % arg.replace('_', '-'), item])
if action == 'list' and 'policies' in args:
cmd.extend(['--policies=%s' % (args.pop('policies'),)])
if action == 'list' and 'filter' in args:
cmd.extend(['--filter=%s' % (args.pop('filter'),)])
for option in args.pop('options', ()):
cmd.extend(['--option', option])
for key, value in args.pop('set', {}).items():
if isinstance(value, (list, tuple)):
for item in value:
cmd.extend(['--set', '%s=%s' % (key, item)])
else:
cmd.extend(['--set', '%s=%s' % (key, value)])
for operation in ('append', 'remove'):
for key, values in args.pop(operation, {}).items():
for value in values:
cmd.extend(['--%s' % operation, '%s=%s' % (key, value)])
if args.pop('remove_referring', True) and action == 'remove':
cmd.append('--remove_referring')
if args.pop('ignore_exists', False) and action == 'create':
cmd.append('--ignore_exists')
if args.pop('ignore_not_exists', False) and action == 'remove':
cmd.append('--ignore_not_exists')
# set all other remaining properties
for key, value in args.items():
if isinstance(value, (list, tuple)):
for item in value:
cmd.extend(['--append', '%s=%s' % (key, item)])
elif value:
cmd.extend(['--set', '%s=%s' % (key, value)])
return cmd
[docs] def create_object(self, modulename, wait_for_replication=True, check_for_drs_replication=False, wait_for=False, **kwargs):
# type: (str, bool, bool, bool, **Any) -> str
r"""
Creates a LDAP object via UDM. Values for UDM properties can be passed via keyword arguments
only and have to exactly match UDM property names (case-sensitive!).
:param str modulename: name of UDM module (e.g. 'users/user')
:param bool wait_for_replication: delay return until Listener has settled.
:param bool check_for_drs_replication: delay return until Samab4 has settled.
:param \*\*kwargs:
"""
if not modulename:
raise UCSTestUDM_MissingModulename()
dn = None
cmd = self._build_udm_cmdline(modulename, 'create', kwargs)
print('Creating %s object with %s' % (modulename, _prettify_cmd(cmd)))
child = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
(stdout, stderr) = child.communicate()
if six.PY3:
stdout, stderr = stdout.decode('utf-8', 'replace'), stderr.decode('utf-8', 'replace')
if child.returncode:
raise UCSTestUDM_CreateUDMObjectFailed({'module': modulename, 'kwargs': kwargs, 'returncode': child.returncode, 'stdout': stdout, 'stderr': stderr})
# find DN of freshly created object and add it to cleanup list
for line in stdout.splitlines(): # :pylint: disable-msg=E1103
if line.startswith('Object created: ') or line.startswith('Object exists: '):
dn = line.split(': ', 1)[-1]
if not line.startswith('Object exists: '):
self._cleanup.setdefault(modulename, []).append(dn)
break
else:
raise UCSTestUDM_CreateUDMUnknownDN({'module': modulename, 'kwargs': kwargs, 'stdout': stdout, 'stderr': stderr})
self.wait_for(modulename, dn, wait_for_replication, everything=wait_for)
return dn
[docs] def create_with_defaults(self, modulename, **kwargs):
# type: (str, **Any) -> Tuple[str, dict]
"""Create any object with as maximum as possible prefilled random default values"""
module = univention.admin.modules.get_module(modulename)
# TODO: cache objects
if 'position' not in kwargs and not modulename.startswith('settings/portal'):
kwargs['position'] = (module.object.get_default_containers(self._lo) or [self.LDAP_BASE])[0]
superordinate_props = {}
if 'superordinate' not in kwargs and getattr(module, 'superordinate', None) not in (None, 'settings/cn'):
superordinate_module = random.choice(module.superordinate) if isinstance(module.superordinate, list) else module.superordinate
superordinate, superordinate_props = self.create_with_defaults(superordinate_module, position=kwargs['position'])
kwargs['superordinate'] = kwargs['position'] = superordinate
max_recursion = kwargs.pop('max_recursion', 1)
def ldap_search(prop):
m, p = prop.syntax.value.split(': ', 1)
if max_recursion <= 0:
return # random.choice(self._cleanup.get(m, [None]))
return self.create_with_defaults(m, max_recursion=max_recursion - 1)[1][p]
def udm_attribute(prop):
if max_recursion <= 0:
return # random.choice(self._cleanup.get(prop.syntax.udm_module, [None]))
# TODO: parse udm_filter and set values
if prop.syntax.udm_filter:
return
return self.create_with_defaults(prop.syntax.udm_module, max_recursion=max_recursion - 1)[1][prop.syntax.attribute]
def udm_objects(prop):
m = list(reversed(prop.syntax.udm_modules))
try:
m.remove('computers/computer')
m.append('computers/linux')
m.append('computers/windows')
m.append('computers/ubuntu')
except ValueError:
pass
for mod in ('computers/domaincontroller_master', 'container/dc'):
try:
m.remove(mod)
except ValueError:
pass
if max_recursion <= 0:
if prop.syntax.key == 'dn':
return # random.choice(self._cleanup.get(m[0], [None])) # warning: would cause circular group memberships for groups/group
return
# TODO: parse udm_filter and set values
if prop.syntax.udm_filter:
return
_dn, _props = self.create_with_defaults(m[0], max_recursion=max_recursion - 1)
try:
p = prop.syntax.key % _props
except KeyError:
obj = univention.admin.objects.get(univention.admin.modules.get(m[0]), None, self._lo, None, _dn)
# obj.open()
p = prop.syntax.key % obj.info
if p == 'dn':
return _dn
return p
def choices(syntax_name):
def func(prop):
return random.choice(getattr(univention.admin.syntax, syntax_name, prop.syntax).choices)[0]
return func
def complex(syntax_name):
def func(prop):
syn = getattr(univention.admin.syntax, syntax_name)
if not all(s[1].name in syntax_classes_mapping for s in syn.subsyntaxes):
# TODO: warning
return
def _quote(s):
return s if '"' not in s else '"%s"' % (s.replace('\\', '\\\\').replace('"', '\\"'),)
functions = [
_func(syntax_classes_mapping[s[1].name], prop)
for s in syn.subsyntaxes
]
return ' '.join(_quote(f()) for f in functions)
return func
def known_mail_address():
if len(known_mail_address.cache) >= 5:
return '%s@%s' % (uts.random_name(), random.choice(known_mail_address.cache))
email = uts.random_email()
domain = email.rsplit('@', 1)[-1]
self.create_with_defaults('mail/domain', name=domain)
known_mail_address.cache.append(domain)
return email
known_mail_address.cache = []
def default_container(prop):
# Bug #53827
class DefaultContainer(univention.admin.syntax.UDM_Objects):
"""
Syntax to select a |UCS| default container from |LDAP|
"""
udm_modules = ('container/cn', 'container/ou', 'container/dc')
regex = None
key = '%(name)s'
label = '%(name)s'
simple = True
s = prop.syntax
prop.syntax = DefaultContainer()
try:
return udm_objects(prop)
finally:
prop.syntax = s
def random_ip():
return uts.random_ip(iter(range(11, 121)))
syntax_classes_mapping = {
'string': uts.random_string,
'string_numbers_letters_dots': uts.random_string,
'string_numbers_letters_dots_spaces': uts.random_string,
'string64': lambda: uts.random_string(64),
'string6': lambda: uts.random_string(6),
'HalfString': uts.random_string,
'OneThirdString': uts.random_string,
'TwoThirdsString': uts.random_string,
'FiveThirdsString': uts.random_string,
'TwoString': uts.random_string,
'IA5string': uts.random_string,
'integer': lambda: uts.random_int(0, 100000),
'integerOrEmpty': lambda: uts.random_int(1, 100000),
'uid': uts.random_username,
'gid': uts.random_groupname,
'userPasswd': uts.random_string,
'passwd': uts.random_string,
'Country': choices('Country'),
'univentionAdminModules': choices('univentionAdminModules'),
'SambaPrivileges': choices('SambaPrivileges'),
'sambaGroupType': choices('sambaGroupType'),
'adGroupType': choices('adGroupType'),
'ipProtocol': choices('ipProtocol'),
'Hour': choices('Hour'),
'Minute': choices('Minute'),
'Month': choices('Month'),
'Weekday': choices('Weekday'),
'Day': choices('Day'),
'NewPortalEntryLinkTarget': choices('NewPortalEntryLinkTarget'),
'NewPortalDefaultLinkTarget': choices('NewPortalDefaultLinkTarget'),
'AllowDeny': choices('AllowDeny'),
'timeSpec': choices('timeSpec'),
'booleanNone': choices('booleanNone'),
'netbiosNodeType': choices('netbiosNodeType'),
'ddnsUpdates': choices('ddnsUpdates'),
'ddnsUpdateStyle': choices('ddnsUpdateStyle'),
'language': choices('language'),
'AllowDenyIgnore': choices('AllowDenyIgnore'),
'emailForwardSetting': choices('emailForwardSetting'),
'UCSServerRole': choices('UCSServerRole'),
'PortalFontColor': choices('PortalFontColor'),
'PortalDefaultLinkTarget': choices('PortalDefaultLinkTarget'),
'PortalCategory': choices('PortalCategory'),
'AuthRestriction': choices('AuthRestriction'),
'PortalEntryLinkTarget': choices('PortalEntryLinkTarget'),
'optionsUsersUser': choices('optionsUsersUser'),
'nfssync': choices('nfssync'),
'auto_one_zero': choices('auto_one_zero'),
'TimeZone': choices('TimeZone'),
'MAC_Address': uts.random_mac,
'ipAddress': random_ip,
'IPv4_AddressRange': lambda: '%s.2 %s.254' % tuple([random_ip().rsplit('.', 1)[0]] * 2),
'ipv4Address': random_ip,
'absolutePath': lambda: '/' + uts.random_string(),
'sharePath': lambda: '/' + uts.random_string(),
'BaseFilename': lambda: '%s.%s' % (uts.random_string(), uts.random_string(3)),
'PrinterURI': lambda: '%s %s' % (random.choice(['lpd://', 'ipp://', 'http://', 'usb:/', 'socket://', 'parallel:/', 'file:/', 'smb://']), uts.random_string()),
'Base64Bzip2Text': lambda: base64.b64encode(__import__('bz2').compress(uts.random_string().encode())).decode('ASCII'),
'Base64Upload': lambda: base64.b64encode(uts.random_string().encode()).decode('ASCII'),
'Base64BaseUpload': lambda: base64.b64encode(uts.random_string().encode()).decode('ASCII'),
'Base64Bzip2XML': lambda: base64.b64encode(__import__('bz2').compress(('<?xml?><foo>%s</foo>' % (uts.random_string(),)).encode())).decode('ASCII'),
'emailAddress': uts.random_email,
'emailAddressTemplate': lambda: '<username>@%s' % (uts.random_domain_name(),),
'emailAddressValidDomain': known_mail_address,
'primaryEmailAddressValidDomain': known_mail_address,
'MailHomeServer': uts.random_domain_name,
'boolean': lambda: uts.random_int(0, 1),
'disabled': lambda: uts.random_int(0, 1),
'locked': lambda: uts.random_int(0, 1),
'v4netmask': lambda: uts.random_int(1, 31),
'netmask': lambda: uts.random_int(1, 31),
'printerName': lambda: uts.random_string(16),
'DHCP_HardwareAddress': lambda: 'ethernet %s' % (uts.random_mac(),),
'hostName': uts.random_string,
'policyName': uts.random_string,
'LocalizedDescription': lambda: '%s %s' % (random.choice(['de_DE', 'en_US']), uts.random_string()),
'LocalizedDisplayName': lambda: '%s %s' % (random.choice(['de_DE', 'en_US']), uts.random_string()),
'LocalizedLink': lambda: '%s %s://%s/%s' % (random.choice(['de_DE', 'en_US']), random.choice(['http', 'https']), uts.random_domain_name(), uts.random_string()),
'reverseLookupSubnet': lambda: random_ip().rsplit('.', 1)[0],
'dnsPTR': lambda: random_ip().rsplit('.', 1)[1],
'dnsHostname': uts.random_domain_name,
'dnsName': uts.random_domain_name,
'dnsName_umlauts': uts.random_string,
'dnsSRVName': lambda: 'ldap tcp %s' % (uts.random_string(),),
'dnsSRVLocation': lambda: '%s %s %s %s' % (uts.random_int(), uts.random_int(), uts.random_int(), uts.random_domain_name()),
'mailinglist_name': uts.random_string,
'mail_folder_name': uts.random_string,
'date': uts.random_date,
'date2': uts.random_date,
'iso8601Date': uts.random_date,
'TimeString': lambda: uts.random_time().rsplit(':', 1)[0], # Bug #53829
'phone': lambda: '+49 421 %s-%s' % (uts.random_int(10000, 99000), uts.random_int(0, 9)),
'postalAddress': lambda: '"%s street 1A" "%s" "%s"' % (uts.random_string(), uts.random_int(10000, 99999), uts.random_string()),
'keyAndValue': lambda: '%s %s' % (uts.random_string(), uts.random_string()),
'SambaLogonHours': lambda: uts.random_int(0, 167),
'DebianPackageVersion': uts.random_version,
'UCSVersion': uts.random_ucs_version,
'LDAP_Search': ldap_search,
'GroupDN': udm_objects,
'UserDN': udm_objects,
'HostDN': udm_objects,
'UCS_Server': udm_objects,
'Windows_Server': udm_objects,
'DomainController': udm_objects,
'ServicePrint_FQDN': udm_objects,
'DNS_ForwardZone': udm_objects,
'DNS_ReverseZone': udm_objects,
'NewPortalEntries': udm_objects,
'NewPortalCategoryEntries': udm_objects,
'NewPortalCategories': udm_objects,
'network': udm_objects,
'Service': udm_objects,
'nagiosHostsEnabledDn': udm_objects,
'nagiosServiceDn': udm_objects,
'dhcpService': udm_objects,
'UMC_OperationSet': udm_objects,
'WritableShare': udm_objects,
'PortalComputer': udm_objects,
'PortalEntries': udm_objects,
'Portals': udm_objects,
'GroupDNOrEmpty': udm_objects,
'PrinterProducerList': udm_objects,
'UserID': udm_objects,
'GroupID': udm_objects,
'printerModel': lambda: '"%s" "%s"' % (random.choice(['smb', 'cupsfilters/pxlmono.ppd', 'hp-ppd/HP/HP_LaserJet_6P.ppd', 'cups-pdf/CUPS-PDF_opt.ppd']), uts.random_string()),
'PrinterNames': udm_objects,
'PrinterDriverList': udm_attribute,
'Packages': udm_attribute,
'PackagesRemove': udm_attribute,
'KDE_Profile': udm_attribute,
'TrueFalseUp': lambda: random.choice(['TRUE', 'FALSE']),
'TrueFalseUpper': lambda: random.choice(['TRUE', 'FALSE', 'NONE']),
'TrueFalse': lambda: random.choice(['true', 'false', 'none']),
'TextArea': lambda: '\n'.join([uts.random_string()] * random.randint(2, 5)),
'SignedInteger': uts.random_int,
'hostOrIP': random_ip,
'hostname_or_ipadress_or_network': lambda: random.choice([random_ip(), uts.random_name(), '%s/%s' % (random_ip(), random.randint(1, 31))]),
'jpegPhoto': lambda: '/9j/2wBDAP%swAALCAABAAEBAREA/8QAFAABA%s//EABQQAQ%sD/2gAIAQEAAD8AN//Z' % ('/' * 86, 'A' * 20, 'A' * 20),
'Base64UMCIcon': lambda: 'AAABAAEAAQEAAAEAIAAwAAAAFgAAACgAAAABAAAAAgAAAAEAIAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAD/////AAAAAA==',
'SharedFolderUserACL': complex('SharedFolderUserACL'),
'SharedFolderGroupACL': complex('SharedFolderGroupACL'),
'dnsMX': complex('dnsMX'),
'dnsEntry': complex('dnsEntry'),
'dnsEntryReverse': complex('dnsEntryReverse'),
'dnsEntryAlias': complex('dnsEntryAlias'),
'dhcpEntry': complex('dhcpEntry'),
'IP_AddressRange': complex('IP_AddressRange'),
'UCR_Variable': complex('UCR_Variable'),
'nfsMounts': complex('nfsMounts'),
'ActivationDateTimeTimezone': complex('ActivationDateTimeTimezone'),
'Localesubdirname_and_GNUMessageCatalog': complex('Localesubdirname_and_GNUMessageCatalog'),
'translationTupleShortDescription': complex('translationTupleShortDescription'),
'translationTupleLongDescription': complex('translationTupleLongDescription'),
'translationTupleTabName': complex('translationTupleTabName'),
'I18N_GroupName': complex('I18N_GroupName'),
'UMCMessageCatalogFilename_and_GNUMessageCatalog': complex('UMCMessageCatalogFilename_and_GNUMessageCatalog'),
'LocalizedAnonymousEmpty': complex('LocalizedAnonymousEmpty'),
'PortalLinks': complex('PortalLinks'),
'SambaMinPwdAge': complex('SambaMinPwdAge'),
'SambaMaxPwdAge': complex('SambaMaxPwdAge'),
'UMC_CommandPattern': complex('UMC_CommandPattern'),
'attributeMapping': complex('attributeMapping'),
'UNIX_TimeInterval': complex('UNIX_TimeInterval'),
'TimeUnits': choices('TimeUnits'),
'adminFixedAttributes': choices('adminFixedAttributes'),
'desktopFixedAttributes': choices('desktopFixedAttributes'),
'dhcp_dnsFixedAttributes': choices('dhcp_dnsFixedAttributes'),
'dhcp_dnsupdateFixedAttributes': choices('dhcp_dnsupdateFixedAttributes'),
'dhcp_leasetimeFixedAttributes': choices('dhcp_leasetimeFixedAttributes'),
'dhcp_netbiosFixedAttributes': choices('dhcp_netbiosFixedAttributes'),
'dhcp_routingFixedAttributes': choices('dhcp_routingFixedAttributes'),
'dhcp_scopeFixedAttributes': choices('dhcp_scopeFixedAttributes'),
'dhcp_statementsFixedAttributes': choices('dhcp_statementsFixedAttributes'),
'dvcp_bootFixedAttributes': choices('dvcp_bootFixedAttributes'),
'maintenanceFixedAttributes': choices('maintenanceFixedAttributes'),
'masterPackagesFixedAttributes': choices('masterPackagesFixedAttributes'),
'memberPackagesFixedAttributes': choices('memberPackagesFixedAttributes'),
'pwhistoryFixedAttributes': choices('pwhistoryFixedAttributes'),
'registryFixedAttributes': choices('registryFixedAttributes'),
'releaseFixedAttributes': choices('releaseFixedAttributes'),
'repositorySyncFixedAttributes': choices('repositorySyncFixedAttributes'),
'shareUserQuotaFixedAttributes': choices('shareUserQuotaFixedAttributes'),
'slavePackagesFixedAttributes': choices('slavePackagesFixedAttributes'),
'umcFixedAttributes': choices('umcFixedAttributes'),
'updateFixedAttributes': choices('updateFixedAttributes'),
'printerACLTypes': choices('printerACLTypes'),
'cscPolicy': choices('cscPolicy'),
'ldapFilter': lambda: '(objectClass=*)',
'UNIX_AccessRight': lambda: oct(random.randint(0, 0o777)).replace('o', ''),
'UNIX_AccessRight_extended': lambda: oct(random.randint(0, 0o2777)).replace('o', ''),
'timeperiod': lambda: ','.join(
'-'.join((uts.random_time((a, b)), uts.random_time((c, d))))
for a, b, c, d in random.choices(((0, 2, 4, 6), (8, 10, 10, 12), (24, 16, 18, 20), (20, 21, 22, 23)), k=random.randint(1, 4))
),
'listAttributes': uts.random_string,
'ldapDn': lambda: self.LDAP_BASE, # only relevant for settings/syntax:base
'filesize': lambda: '%d%s%s' % (random.randint(0, 100), random.choice('gGmMkK'), random.choice('bB')),
# 'PortalCategorySelection': uts.random_string, kein bock... deprecated
}
module_property_mapping = {
'sambaRID': lambda: None, # uts.random_int(1000, 9999), # prevent The relative ID (SAMBA) is already in use: 5608
'uidNumber': lambda: None, # prevent noLock / already used
'gidNumber': lambda: None, # prevent noLock / already used
'mailForwardAddress': None, # depends on mailPrimaryAddress
'preferredDeliveryMethod': lambda: random.choice(["any", "mhs", "physical", "telex", "teletex", "g3fax", "g4fax", "ia5", "videotex", "telephone"]),
'shell': lambda: random.choice(['/bin/false', '/bin/bash', '/bin/sh', '/usr/sbin/nologin']),
'shares/share': {
'sambaCustomSettings': lambda: random.choice(['"acl xattr update mtime" yes', '"access based share enum" yes', '"follow symlinks" "yes"']),
},
'dns/reverse_zone': {
'contact': syntax_classes_mapping['emailAddress'], # Bug #53794
},
'dns/ptr_record': {
'ip': None, # prevent, that a ip is set. instead address is set, which builds the ip from address.$superordinate
},
'settings/extended_attribute': {
'version': lambda: '2', # other versions aren't detected as extended attribute!
},
'computers/windows': {
'ntCompatibility': lambda: '0', # Bug #53819
},
'settings/mswmifilter': {
'description': None, # Bug #53797
'displayName': None, # Bug #53797
},
"settings/directory": {
"policies": default_container,
"dns": default_container,
"dhcp": default_container,
"users": default_container,
"groups": default_container,
"computers": default_container,
"domaincontroller": default_container,
"networks": default_container,
"shares": default_container,
"printers": default_container,
"mail": default_container,
"license": default_container,
"base": default_container,
},
'users/user': {
'userCertificate': lambda: base64.b64encode(subprocess.check_output(
('openssl', 'x509', '-inform', 'pem', '-in', '/etc/univention/ssl/%s/cert.pem' % (self.FQHN.rstrip('.'),), '-outform', 'der', '-out', '-')
)).decode('ASCII'), # expensive!
},
}
def _func(func, prop):
if 'prop' in getargspec(func).args:
func = functools.partial(func, prop)
return func
for name, prop in module.property_descriptions.items():
if name in kwargs:
continue
if not prop.editable: # or (is_modification and not prop.may_change)
continue
func = module_property_mapping.get(modulename, {}).get(name, module_property_mapping.get(name, syntax_classes_mapping.get(prop.syntax.name)))
if not func:
continue
func = _func(func, prop)
value = list(set(func() for i in range(random.randint(int(prop.required or name in ('ip', 'range')), 4)))) if prop.multivalue else func()
if value is None or isinstance(value, list) and all(v is None for v in value):
continue
kwargs.setdefault(name, value)
if modulename == 'shares/printer':
# when creating a shares/printergroup recursion is prevented: circular references aren't created
# therefore set some (invalid) values here
kwargs.setdefault('spoolHost', 'localhost')
kwargs.setdefault('model', 'cups-pdf/CUPS-PDF_noopt.ppd FAKE')
if modulename in ('dhcp/subnet', 'dhcp/sharedsubnet'):
import ipaddress
kwargs['subnetmask'] = str(min(29, int(kwargs['subnetmask'])))
iface = ipaddress.IPv4Interface(u'%(subnet)s/%(subnetmask)s' % kwargs)
kwargs['subnet'] = str(iface.network.network_address)
elif modulename in ('dhcp/pool',):
import ipaddress
iface = ipaddress.IPv4Interface(u'%(subnet)s/%(subnetmask)s' % superordinate_props)
if kwargs.get('dynamic_bootp_clients') != 'deny':
kwargs.pop('failover_peer')
if modulename in ('dhcp/subnet', 'dhcp/sharedsubnet', 'dhcp/pool'):
hosts = iface.network.hosts()
next(hosts)
ranges = []
for i in range(len(kwargs['range']) if isinstance(kwargs['range'], list) else 1):
first = last = None
try:
first = last = next(hosts)
for i in range(random.randrange(20)):
last = next(hosts)
except StopIteration:
pass
if first and first != last:
ranges.append('%s %s' % (first, last))
else:
break
kwargs['range'] = ranges
return self.create_object(modulename, **kwargs), kwargs
[docs] def modify_object(self, modulename, wait_for_replication=True, check_for_drs_replication=False, wait_for=False, **kwargs):
# type: (str, bool, bool, bool, **Any) -> str
"""
Modifies a LDAP object via UDM. Values for UDM properties can be passed via keyword arguments
only and have to exactly match UDM property names (case-sensitive!).
Please note: the object has to be created by create_object otherwise this call will raise an exception!
:param str modulename: name of UDM module (e.g. 'users/user')
"""
if not modulename:
raise UCSTestUDM_MissingModulename()
dn = kwargs.get('dn')
if not dn:
raise UCSTestUDM_MissingDn()
if dn not in self._cleanup.get(modulename, set()):
raise UCSTestUDM_CannotModifyExistingObject(dn)
cmd = self._build_udm_cmdline(modulename, 'modify', kwargs)
print('Modifying %s object with %s' % (modulename, _prettify_cmd(cmd)))
child = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
(stdout, stderr) = child.communicate()
if six.PY3:
stdout, stderr = stdout.decode('utf-8', 'replace'), stderr.decode('utf-8', 'replace')
if child.returncode:
raise UCSTestUDM_ModifyUDMObjectFailed({'module': modulename, 'kwargs': kwargs, 'returncode': child.returncode, 'stdout': stdout, 'stderr': stderr})
for line in stdout.splitlines(): # :pylint: disable-msg=E1103
if line.startswith('Object modified: '):
dn = line.split('Object modified: ', 1)[-1]
if dn != kwargs.get('dn'):
print('modrdn detected: %r ==> %r' % (kwargs.get('dn'), dn))
if kwargs.get('dn') in self._cleanup.get(modulename, []):
self._cleanup.setdefault(modulename, []).append(dn)
self._cleanup[modulename].remove(kwargs.get('dn'))
break
elif line.startswith('No modification: '):
raise UCSTestUDM_NoModification({'module': modulename, 'kwargs': kwargs, 'stdout': stdout, 'stderr': stderr})
else:
raise UCSTestUDM_ModifyUDMUnknownDN({'module': modulename, 'kwargs': kwargs, 'stdout': stdout, 'stderr': stderr})
self.wait_for(modulename, dn, wait_for_replication, everything=wait_for)
return dn
[docs] def move_object(self, modulename, wait_for_replication=True, check_for_drs_replication=False, wait_for=False, **kwargs):
# type: (str, bool, bool, bool, **Any) -> str
if not modulename:
raise UCSTestUDM_MissingModulename()
dn = kwargs.get('dn')
if not dn:
raise UCSTestUDM_MissingDn()
if dn not in self._cleanup.get(modulename, set()):
raise UCSTestUDM_CannotModifyExistingObject(dn)
cmd = self._build_udm_cmdline(modulename, 'move', kwargs)
print('Moving %s object %s' % (modulename, _prettify_cmd(cmd)))
child = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
(stdout, stderr) = child.communicate()
if six.PY3:
stdout, stderr = stdout.decode('utf-8', 'replace'), stderr.decode('utf-8', 'replace')
if child.returncode:
raise UCSTestUDM_MoveUDMObjectFailed({'module': modulename, 'kwargs': kwargs, 'returncode': child.returncode, 'stdout': stdout, 'stderr': stderr})
for line in stdout.splitlines(): # :pylint: disable-msg=E1103
if line.startswith('Object modified: '):
self._cleanup.get(modulename, []).remove(dn)
new_dn = ldap.dn.dn2str(ldap.dn.str2dn(dn)[0:1] + ldap.dn.str2dn(kwargs.get('position', '')))
self._cleanup.setdefault(modulename, []).append(new_dn)
break
else:
raise UCSTestUDM_ModifyUDMUnknownDN({'module': modulename, 'kwargs': kwargs, 'stdout': stdout, 'stderr': stderr})
self.wait_for(modulename, dn, wait_for_replication, everything=wait_for)
return new_dn
[docs] def remove_object(self, modulename, wait_for_replication=True, wait_for=False, **kwargs):
# type: (str, bool, bool, **Any) -> None
if not modulename:
raise UCSTestUDM_MissingModulename()
dn = kwargs.get('dn')
if not dn:
raise UCSTestUDM_MissingDn()
if dn not in self._cleanup.get(modulename, set()):
raise UCSTestUDM_CannotModifyExistingObject(dn)
cmd = self._build_udm_cmdline(modulename, 'remove', kwargs)
print('Removing %s object %s' % (modulename, _prettify_cmd(cmd)))
child = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
(stdout, stderr) = child.communicate()
if six.PY3:
stdout, stderr = stdout.decode('utf-8', 'replace'), stderr.decode('utf-8', 'replace')
if child.returncode:
raise UCSTestUDM_RemoveUDMObjectFailed({'module': modulename, 'kwargs': kwargs, 'returncode': child.returncode, 'stdout': stdout, 'stderr': stderr})
if dn in self._cleanup.get(modulename, []):
self._cleanup[modulename].remove(dn)
self.wait_for(modulename, dn, wait_for_replication, everything=wait_for)
[docs] def wait_for(self, modulename, dn, wait_for_replication=True, wait_for_drs_replication=False, wait_for_s4connector=False, everything=False):
# type: (str, str, bool, bool, bool, bool) -> None
# the order of the conditions is imporant
conditions = []
if wait_for_replication:
conditions.append((utils.ReplicationType.LISTENER, wait_for_replication))
if everything:
wait_for_drs_replication = True
wait_for_s4connector = True
drs_replication = wait_for_drs_replication
ad_ldap_search_args = self.ad_object_identifying_filter(modulename, dn)
if wait_for_drs_replication and not isinstance(wait_for_drs_replication, six.string_types):
drs_replication = False
if ad_ldap_search_args:
drs_replication = ad_ldap_search_args
if wait_for_s4connector and ad_ldap_search_args:
if self._ucr.get('samba4/ldap/base'):
conditions.append((utils.ReplicationType.S4C_FROM_UCS, ad_ldap_search_args))
if drs_replication:
if not wait_for_replication:
conditions.append((utils.ReplicationType.LISTENER, wait_for_replication))
if self._ucr.get('server/role') in ('domaincontroller_backup', 'domaincontroller_slave', 'memberserver'):
conditions.append((utils.ReplicationType.DRS, drs_replication))
return utils.wait_for(conditions, verbose=False)
[docs] def create_user(self, wait_for_replication=True, check_for_drs_replication=True, wait_for=True, **kwargs): # :pylint: disable-msg=W0613
# type: (bool, bool, bool, **Any) -> Tuple[str, str]
"""
Creates a user via UDM CLI. Values for UDM properties can be passed via keyword arguments only and
have to exactly match UDM property names (case-sensitive!). Some properties have default values:
:param str position: 'cn=users,$ldap_base'
:param str password: 'univention'
:param str firstname: 'Foo Bar'
:param str lastname: <random string>
:param str username: <random string> If username is missing, a random user name will be used.
:return: (dn, username)
"""
attr = self._set_module_default_attr(kwargs, (
('position', 'cn=users,%s' % self.LDAP_BASE),
('password', 'univention'),
('username', uts.random_username()),
('lastname', uts.random_name()),
('firstname', uts.random_name())
))
return (self.create_object('users/user', wait_for_replication, check_for_drs_replication, wait_for=wait_for, **attr), attr['username'])
[docs] def create_ldap_user(self, wait_for_replication=True, check_for_drs_replication=False, **kwargs): # :pylint: disable-msg=W0613
# type: (bool, bool, **Any) -> Tuple[str, str]
# check_for_drs_replication=False -> ldap users are not replicated to s4
attr = self._set_module_default_attr(kwargs, (
('position', 'cn=users,%s' % self.LDAP_BASE),
('password', 'univention'),
('username', uts.random_username()),
('lastname', uts.random_name()),
('name', uts.random_name())
))
return (self.create_object('users/ldap', wait_for_replication, check_for_drs_replication, **attr), attr['username'])
[docs] def remove_user(self, username, wait_for_replication=True):
# type: (str, bool) -> None
"""Removes a user object from the ldap given it's username."""
kwargs = {
'dn': 'uid=%s,cn=users,%s' % (username, self.LDAP_BASE)
}
self.remove_object('users/user', wait_for_replication, **kwargs)
[docs] def create_group(self, wait_for_replication=True, check_for_drs_replication=True, **kwargs): # :pylint: disable-msg=W0613
# type: (bool, bool, **Any) -> Tuple[str, str]
"""
Creates a group via UDM CLI. Values for UDM properties can be passed via keyword arguments only and
have to exactly match UDM property names (case-sensitive!). Some properties have default values:
:param str position: `cn=users,$ldap_base`
:param str name: <random value>
:return: (dn, groupname)
If "groupname" is missing, a random group name will be used.
"""
attr = self._set_module_default_attr(kwargs, (
('position', 'cn=groups,%s' % self.LDAP_BASE),
('name', uts.random_groupname())
))
return (self.create_object('groups/group', wait_for_replication, check_for_drs_replication, **attr), attr['name'])
def _set_module_default_attr(self, attributes, defaults):
"""
Returns the given attributes, extended by every property given in defaults if not yet set.
:param tuple defaults: should be a tupel containing tupels like "('username', <default_value>)".
"""
attr = copy.deepcopy(attributes)
for prop, value in defaults:
attr.setdefault(prop, value)
return attr
[docs] def addCleanupLock(self, lockType, lockValue):
self._cleanupLocks.setdefault(lockType, []).append(lockValue)
def _wait_for_drs_removal(self, modulename, dn, verbose=True):
# type: (str, str, bool) -> None
ad_ldap_search_args = self.ad_object_identifying_filter(modulename, dn)
if ad_ldap_search_args:
wait_for_drs_replication(should_exist=False, verbose=verbose, timeout=20, **ad_ldap_search_args)
[docs] def list_objects(self, modulename, **kwargs):
# type: (str, **Any) -> List[Tuple[str, Dict[str, Any]]]
cmd = self._build_udm_cmdline(modulename, 'list', kwargs)
print('Listing %s objects %s' % (modulename, _prettify_cmd(cmd)))
child = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
(stdout, stderr) = child.communicate()
if six.PY3:
stdout, stderr = stdout.decode('utf-8', 'replace'), stderr.decode('utf-8', 'replace')
if child.returncode:
raise UCSTestUDM_ListUDMObjectFailed(child.returncode, stdout, stderr)
objects = [] # type: List[Tuple[str, Dict[str, Any]]]
dn = None
attrs = {} # type: Dict[str, Any]
pattr = None
pvalue = None
pdn = None
current_policy_type = None
for line in stdout.splitlines():
if line.startswith('DN: '):
if dn:
objects.append((dn, attrs))
dn = None
attrs = {}
dn = line[3:].strip()
elif not line.strip():
continue
elif line.startswith(' ') and ':' in line: # list --policies=1
name, value = line.split(':', 1)
if name.strip() == 'Policy':
pvalue = pattr = None
pdn = value
elif name.strip() == 'Attribute':
pattr = value
elif name.strip() == 'Value':
pvalue = value
attrs.setdefault(current_policy_type, {}).setdefault(pdn.strip(), {}).setdefault(pattr.strip(), []).append(pvalue.strip())
elif line.startswith(' ') and '=' in line: # list --policies=2
name, value = line.split('=', 1)
attrs.setdefault(current_policy_type, {}).setdefault(name.strip(), []).append(value.strip().strip('"'))
elif any(x in line for x in ('Policy-based Settings', 'Subnet-based Settings', 'Merged Settings')):
current_policy_type = line.split(':')[0].strip()
elif line.startswith(' ') and ':' in line:
name, value = line.split(':', 1)
attrs.setdefault(name.strip(), []).append(value.strip())
if dn:
objects.append((dn, attrs))
return objects
[docs] def cleanup(self):
# type: () -> None
"""
Automatically removes LDAP objects via UDM CLI that have been created before.
"""
if not self._cleanup and not self._cleanupLocks:
return
failedObjects = {}
print('Performing UCSTestUDM cleanup...')
objects = []
removed = []
for modulename, objs in self._cleanup.items():
objects.extend((modulename, dn) for dn in objs)
for modulename, dn in sorted(objects, key=lambda x: len(x[1]), reverse=True):
cmd = ['/usr/sbin/udm-test', modulename, 'remove', '--dn', dn, '--remove_referring']
print('removing DN:', dn)
child = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
(stdout, stderr) = child.communicate()
if six.PY3:
stdout, stderr = stdout.decode('utf-8', 'replace'), stderr.decode('utf-8', 'replace')
if child.returncode or 'Object removed:' not in stdout:
failedObjects.setdefault(modulename, []).append(dn)
else:
removed.append((modulename, dn))
# simply iterate over the remaining objects again, removing them might just have failed for chronology reasons
# (e.g groups can not be removed while there are still objects using it as primary group)
for modulename, objects in failedObjects.items():
for dn in objects:
cmd = ['/usr/sbin/udm-test', modulename, 'remove', '--dn', dn, '--remove_referring']
child = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
(stdout, stderr) = child.communicate()
if six.PY3:
stdout, stderr = stdout.decode('utf-8', 'replace'), stderr.decode('utf-8', 'replace')
if child.returncode or 'Object removed:' not in stdout:
print('Warning: Failed to remove %r object %r' % (modulename, dn), file=sys.stderr)
print('stdout=%r %r %r' % (stdout, stderr, self._lo.get(dn)), file=sys.stderr)
else:
removed.append((modulename, dn))
self._cleanup = {}
for lock_type, values in self._cleanupLocks.items():
for value in values:
lockDN = 'cn=%s,cn=%s,%s' % (value, lock_type, self.UNIVENTION_TEMPORARY_CONTAINER)
try:
self._lo.delete(lockDN)
except ldap.NO_SUCH_OBJECT:
pass
except Exception as ex:
print('Failed to remove locking object "%s" during cleanup: %r' % (lockDN, ex))
self._cleanupLocks = {}
print('Cleanup: wait for replication and drs removal')
utils.wait_for_replication(verbose=False)
for module, dn in removed:
try:
self._wait_for_drs_removal(module, dn, verbose=True)
except DRSReplicationFailed as exc:
print('Cleanup: DRS replication failed:', exc)
self.stop_cli_server()
print('UCSTestUDM cleanup done')
[docs] def stop_cli_server(self):
# type: () -> None
""" restart UDM CLI server """
print('trying to restart UDM CLI server')
procs = []
for proc in psutil.process_iter():
try:
cmdline = proc.cmdline()
if len(cmdline) >= 2 and cmdline[0].startswith('/usr/bin/python') and cmdline[1] == self.PATH_UDM_CLI_SERVER:
procs.append(proc)
except psutil.NoSuchProcess:
pass
for signal in (15, 9):
for proc in procs:
try:
print('sending signal %s to process %s (%r)' % (signal, proc.pid, proc.cmdline(),))
os.kill(proc.pid, signal)
except (psutil.NoSuchProcess, EnvironmentError):
print('process already terminated')
procs.remove(proc)
if signal == 15:
time.sleep(1)
[docs] def verify_udm_object(self, *args, **kwargs):
return verify_udm_object(*args, **kwargs)
[docs] def verify_ldap_object(self, *args, **kwargs):
return utils.verify_ldap_object(*args, **kwargs)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type:
print('Cleanup after exception: %s %s' % (exc_type, exc_value))
self.cleanup()
[docs]class UDM(UCSTestUDM):
"""UDM interface using the REST API"""
PATH_UDM_CLI_CLIENT_WRAPPED = '/usr/sbin/udm-test-rest'
[docs] def stop_cli_server(self):
# type: () -> None
super(UDM, self).stop_cli_server()
subprocess.call(['systemctl', 'reload', 'univention-directory-manager-rest.service'])
[docs]def verify_udm_object(module, dn, expected_properties):
# type: (Any, str, Optional[Mapping[str, Union[bytes, Text, Tuple[str, ...], List[str]]]]) -> None
"""
Verify an object exists with the given `dn` in the given UDM `module` with
some properties. Setting `expected_properties` to `None` requires the
object to not exist.
:param dict expected_properties: is a dictionary of (property,value) pairs.
:raises AssertionError: in case of a mismatch.
"""
lo = utils.get_ldap_connection(admin_uldap=True)
try:
position = univention.admin.uldap.position(lo.base)
udm_module = univention.admin.modules.get(module)
if not udm_module:
univention.admin.modules.update()
udm_module = univention.admin.modules.get(module)
udm_object = univention.admin.objects.get(udm_module, None, lo, position, dn)
udm_object.open()
except univention.admin.uexceptions.noObject:
if expected_properties is None:
return
raise
if expected_properties is None:
raise AssertionError("UDM object {} should not exist".format(dn))
difference = {}
for (key, value) in expected_properties.items():
udm_value = udm_object.info.get(key, [])
if udm_value is None:
udm_value = []
if isinstance(udm_value, (bytes, six.string_types)):
udm_value = {udm_value}
if not isinstance(value, (tuple, list)):
value = {value}
value = {_to_unicode(v).lower() for v in value}
udm_value = {_to_unicode(v).lower() for v in udm_value}
if udm_value != value:
try:
value = {_normalize_dn(dn) for dn in value}
udm_value = {_normalize_dn(dn) for dn in udm_value}
except ldap.DECODING_ERROR:
pass
if udm_value != value:
difference[key] = (udm_value, value)
assert not difference, '\n'.join('{}: {} != expected {}'.format(key, udm_value, value) for key, (udm_value, value) in difference.items())
def _prettify_cmd(cmd):
# type: (Iterable[str]) -> str
cmd = ' '.join(pipes.quote(x) for x in cmd)
if set(cmd) & {'\x00', '\n'}:
cmd = repr(cmd)
return cmd
def _to_unicode(string):
# type: (Union[bytes, Text]) -> Text
if isinstance(string, bytes):
return string.decode('utf-8')
return string
def _normalize_dn(dn):
# type: (str) -> str
r"""
Normalize a given dn. This removes some escaping of special chars in the
DNs. Note: The CON-LDAP returns DNs with escaping chars, OpenLDAP does not.
>>> _normalize_dn(r"cn=peter\#,cn=groups")
'cn=peter#,cn=groups'
"""
return ldap.dn.dn2str(ldap.dn.str2dn(dn))
if __name__ == '__main__':
import doctest
print(doctest.testmod())
if __name__ == '__disabled__':
ucr = univention.testing.ucr.UCSTestConfigRegistry()
ucr.load()
with UCSTestUDM() as udm:
# create user
dnUser, _username = udm.create_user()
# stop CLI daemon
udm.stop_cli_server()
# create group
_dnGroup, _groupname = udm.create_group()
# modify user from above
udm.modify_object('users/user', dn=dnUser, description='Foo Bar')
# test with malformed arguments
try:
_dnUser, _username = udm.create_user(username='')
except UCSTestUDM_CreateUDMObjectFailed:
print('Caught anticipated exception UCSTestUDM_CreateUDMObjectFailed - SUCCESS')
# try to modify object not created by create_udm_object()
try:
udm.modify_object('users/user', dn='uid=Administrator,cn=users,%s' % ucr.get('ldap/base'), description='Foo Bar')
except UCSTestUDM_CannotModifyExistingObject:
print('Caught anticipated exception UCSTestUDM_CannotModifyExistingObject - SUCCESS')