# Copyright 2013-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
Common functions used by tests.
"""
from __future__ import print_function
import functools
import inspect
import os
import socket
import subprocess
import sys
import time
import traceback
from itertools import chain
from enum import IntEnum
from types import TracebackType # noqa: F401
from typing import IO, Any, Callable, Dict, Iterable, List, NoReturn, Optional, Sequence, Text, Tuple, Type, TypeVar, Union # noqa: F401
import ldap
import six
import univention.uldap as uldap
from univention.config_registry import ConfigRegistry
try:
from univention.admin.uldap import access
except ImportError:
access = None
S4CONNECTOR_INIT_SCRIPT = '/etc/init.d/univention-s4-connector'
FIREWALL_INIT_SCRIPT = '/etc/init.d/univention-firewall'
SLAPD_INIT_SCRIPT = '/etc/init.d/slapd'
UCR = ConfigRegistry()
_T = TypeVar("_T")
[docs]class LDAPError(Exception):
pass
[docs]class LDAPReplicationFailed(LDAPError):
pass
[docs]class LDAPObjectNotFound(LDAPError):
pass
[docs]class LDAPUnexpectedObjectFound(LDAPError):
pass
[docs]class LDAPObjectValueMissing(LDAPError):
pass
[docs]class LDAPObjectUnexpectedValue(LDAPError):
pass
[docs]class UCSTestDomainAdminCredentials(object):
"""
This class fetches the username, the LDAP bind DN and the password
for a domain admin user account from UCR. The account may be used for testing.
>>> dummy_ucr = {'ldap/base': 'dc=example,dc=com', 'tests/domainadmin/pwdfile': '/dev/null'}
>>> account = UCSTestDomainAdminCredentials(ucr=dummy_ucr)
>>> account.username
'Administrator'
>>> account.binddn
'uid=Administrator,cn=users,dc=example,dc=com'
>>> account.bindpw
''
"""
def __init__(self, ucr=None):
# type: (Optional[ConfigRegistry]) -> None
if ucr is None:
ucr = UCR
ucr.load()
self.binddn = ucr.get('tests/domainadmin/account', 'uid=Administrator,cn=users,%(ldap/base)s' % ucr)
self.pwdfile = ucr.get('tests/domainadmin/pwdfile')
if self.pwdfile:
with open(self.pwdfile, 'r') as f:
self.bindpw = f.read().strip('\n\r')
else:
self.bindpw = ucr.get('tests/domainadmin/pwd', 'univention')
if self.binddn:
self.username = uldap.explodeDn(self.binddn, 1)[0] # type: Optional[Text]
else:
self.username = None
[docs]def get_ldap_connection(admin_uldap=False, primary=False):
# type: (bool, bool) -> access
ucr = UCR
ucr.load()
if primary:
port = int(ucr.get('ldap/master/port', 7389))
ldap_servers = [ucr['ldap/master']]
else:
port = int(ucr.get('ldap/server/port', 7389))
ldap_servers = []
if ucr['ldap/server/name']:
ldap_servers.append(ucr['ldap/server/name'])
if ucr['ldap/servers/addition']:
ldap_servers.extend(ucr['ldap/server/addition'].split())
creds = UCSTestDomainAdminCredentials()
for ldap_server in ldap_servers:
try:
lo = uldap.access(host=ldap_server, port=port, base=ucr['ldap/base'], binddn=creds.binddn, bindpw=creds.bindpw, start_tls=2, decode_ignorelist=[], follow_referral=True)
if admin_uldap:
lo = access(lo=lo)
return lo
except ldap.SERVER_DOWN:
pass
raise ldap.SERVER_DOWN()
[docs]def retry_on_error(func, exceptions=(Exception,), retry_count=20, delay=10):
# type: (Callable[..., _T], Tuple[Type[Exception], ...], int, float) -> _T
"""
This function calls the given function `func`.
If one of the specified `exceptions` is caught, `func` is called again until
the retry count is reached or any unspecified exception is caught. Between
two calls of `func` retry_on_error waits for `delay` seconds.
:param func: function to be called
:param exceptions: tuple of exception classes, that cause a rerun of `func`
:param retry_count: retry the execution of `func` max `retry_count` times
:param delay: waiting time in seconds between two calls of `func`
:returns: return value of `func`
"""
for i in range(retry_count + 1):
try:
return func()
except exceptions:
exc_info = sys.exc_info()
if i != retry_count:
print('Exception occurred: %s (%s). Retrying in %.2f seconds (retry %d/%d).\n' % (exc_info[0], exc_info[1], delay, i, retry_count))
time.sleep(delay)
else:
print('Exception occurred: %s (%s). This was the last retry (retry %d/%d).\n' % (exc_info[0], exc_info[1], i, retry_count))
else:
six.reraise(*exc_info)
[docs]def verify_ldap_object(
baseDn, # type: str
expected_attr=None, # type: Optional[Dict[str, str]]
strict=True, # type: bool
should_exist=True, # type: bool
retry_count=20, # type: int
delay=10, # type: float
primary=False, # type: bool
pre_check=None, # type: Optional[Callable[..., None]]
pre_check_kwargs=None, # type: Optional[Dict[str, Any]]
not_expected_attr=None, # type: Optional[Dict[str, str]]
): # type: (...) -> None
"""
Verify [non]existence and attributes of LDAP object.
:param str baseDn: DN of object to check
:param dict expected_attr: attributes and their values that the LDAP object is expected to have
:param bool strict: value lists of multi-value attributes must be complete
:param bool should_exist: whether the object is expected to exist
:param int retry_count: how often to retry the verification if it fails before raising an exception
:param float delay: waiting time in seconds between retries on verification failures
:param bool primary: whether to connect to the primary (DC master) instead of local LDAP (to be
exact: ucr[ldap/server/name], ucr['ldap/server/addition'])
:param pre_check: function to execute before starting verification. Value should be a function object
like `utils.wait_for_replication`.
:param dict pre_check_kwargs: dict with kwargs to pass to `pre_check()` call
:param dict not_expected_attr: attributes and their values that the LDAP object is NOT expected to have
:return: None
:raises LDAPObjectNotFound: when no object was found at `baseDn`
:raises LDAPUnexpectedObjectFound: when an object was found at `baseDn`, but `should_exist=False`
:raises LDAPObjectValueMissing: when a value listed in `expected_attr` is missing in the LDAP object
:raises LDAPObjectUnexpectedValue: if `strict=True` and a multi-value attribute of the LDAP object
has more values than were listed in `expected_attr` or an `not_expected_attr` was found
:raises TypeError: if the value of `pre_check` is not a function object
"""
ucr = UCR
ucr.load()
retry_count = int(ucr.get("tests/verify_ldap_object/retry_count", retry_count))
delay = int(ucr.get("tests/verify_ldap_object/delay", delay))
if pre_check:
if not inspect.isfunction(pre_check) or inspect.ismethod(pre_check):
raise TypeError("Value of argument 'pre_check' is not a function: {!r}".format(pre_check))
pre_check(**(pre_check_kwargs or {}))
return retry_on_error(
functools.partial(__verify_ldap_object, baseDn, expected_attr, strict, should_exist, primary, not_expected_attr),
(LDAPUnexpectedObjectFound, LDAPObjectNotFound, LDAPObjectValueMissing, LDAPObjectUnexpectedValue),
retry_count,
delay)
def __verify_ldap_object(baseDn, expected_attr=None, strict=True, should_exist=True, primary=False, not_expected_attr=None):
# type: (str, Optional[Dict[str, str]], bool, bool, bool) -> None
if expected_attr is None:
expected_attr = {}
if not_expected_attr is None:
not_expected_attr = {}
try:
dn, attr = get_ldap_connection(primary=primary).search(
filter='(objectClass=*)',
base=baseDn,
scope=ldap.SCOPE_BASE,
attr=set(chain(expected_attr.keys(), not_expected_attr.keys()))
)[0]
except (ldap.NO_SUCH_OBJECT, IndexError):
if should_exist:
raise LDAPObjectNotFound('DN: %s' % baseDn)
return
if not should_exist:
raise LDAPUnexpectedObjectFound('DN: %s' % baseDn)
values_missing = {}
unexpected_values = {}
for attribute, expected_values_ in expected_attr.items():
found_values = set(attr.get(attribute, []))
expected_values = {x if isinstance(x, bytes) else x.encode('UTF-8') for x in expected_values_}
difference = expected_values - found_values
if difference:
values_missing[attribute] = difference
if strict:
difference = found_values - expected_values
if difference:
unexpected_values[attribute] = difference
for attribute, not_expected_values_ in not_expected_attr.items():
if strict and attribute in expected_attr.keys():
continue
found_values = set(attr.get(attribute, []))
not_expected_values = {x if isinstance(x, bytes) else x.encode('UTF-8') for x in not_expected_values_}
intersection = found_values.intersection(not_expected_values)
if intersection:
unexpected_values[attribute] = intersection
mixed = dict((key, (values_missing.get(key), unexpected_values.get(key))) for key in list(values_missing) + list(unexpected_values))
msg = u'DN: %s\n%s\n' % (
baseDn,
u'\n'.join(
u"%s: %r, %s %s" % (
attribute,
attr.get(attribute),
('missing: %r;' % u"', ".join(x.decode('UTF-8', 'replace') for x in difference_missing)) if difference_missing else '',
('unexpected: %r' % u"', ".join(x.decode('UTF-8', 'replace') for x in difference_unexpected)) if difference_unexpected else '',
) for attribute, (difference_missing, difference_unexpected) in mixed.items())
)
if values_missing:
raise LDAPObjectValueMissing(msg)
if unexpected_values:
raise LDAPObjectUnexpectedValue(msg)
[docs]def s4connector_present():
# type: () -> bool
ucr = ConfigRegistry()
ucr.load()
if ucr.is_true('directory/manager/samba3/legacy', False):
return False
if ucr.is_false('directory/manager/samba3/legacy', False):
return True
for dn, attr in get_ldap_connection().search(
filter='(&(|(objectClass=univentionDomainController)(objectClass=univentionMemberServer))(univentionService=S4 Connector))',
attr=['aRecord']
):
if 'aRecord' in attr:
return True
return False
[docs]def stop_s4connector():
# type: () -> None
if package_installed('univention-s4-connector'):
subprocess.call((S4CONNECTOR_INIT_SCRIPT, 'stop'))
[docs]def start_s4connector():
# type: () -> None
if package_installed('univention-s4-connector'):
subprocess.call((S4CONNECTOR_INIT_SCRIPT, 'start'))
[docs]def restart_s4connector():
# type: () -> None
stop_s4connector()
start_s4connector()
[docs]def stop_slapd():
# type: () -> None
subprocess.call((SLAPD_INIT_SCRIPT, 'stop'))
[docs]def start_slapd():
# type: () -> None
subprocess.call((SLAPD_INIT_SCRIPT, 'start'))
[docs]def restart_slapd():
# type: () -> None
subprocess.call((SLAPD_INIT_SCRIPT, 'restart'))
[docs]def stop_listener():
# type: () -> None
subprocess.call(('systemctl', 'stop', 'univention-directory-listener'))
[docs]def start_listener():
# type: () -> None
subprocess.call(('systemctl', 'start', 'univention-directory-listener'))
[docs]def restart_listener():
# type: () -> None
subprocess.call(('systemctl', 'restart', 'univention-directory-listener'))
[docs]def restart_firewall():
# type: () -> None
subprocess.call((FIREWALL_INIT_SCRIPT, 'restart'))
[docs]class AutomaticListenerRestart(object):
"""
Automatically restart Univention Directory Listener when leaving the "with" block::
with AutomaticListenerRestart() as alr:
with ucr_test.UCSTestConfigRegistry() as ucr:
# set some ucr variables, that influence the Univention Directory Listener
univention.config_registry.handler_set(['foo/bar=ding/dong'])
""" # noqa: E101
def __enter__(self):
# type: () -> AutomaticListenerRestart
return self
def __exit__(self, exc_type, exc_value, traceback):
# type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None
restart_listener()
[docs]class AutoCallCommand(object):
"""
Automatically call the given commands when entering/leaving the "with" block.
The keyword arguments enter_cmd and exit_cmd are optional::
with AutoCallCommand(
enter_cmd=['/etc/init.d/dovecot', 'reload'],
exit_cmd=['/etc/init.d/dovecot', 'restart']) as acc:
with ucr_test.UCSTestConfigRegistry() as ucr:
# set some ucr variables, that influence the Univention Directory Listener
univention.config_registry.handler_set(['foo/bar=ding/dong'])
In case some filedescriptors for stdout/stderr have to be passed to the executed
command, they may be passed as kwarg::
with AutoCallCommand(
enter_cmd=['/etc/init.d/dovecot', 'reload'],
exit_cmd=['/etc/init.d/dovecot', 'restart'],
stderr=open('/dev/zero', 'w')) as acc:
pass
""" # noqa: E101
def __init__(self, enter_cmd=None, exit_cmd=None, stdout=None, stderr=None):
# type: (Optional[Sequence[str]], Optional[Sequence[str]], Optional[IO[str]], Optional[IO[str]]) -> None
self.enter_cmd = None
if type(enter_cmd) in (list, tuple):
self.enter_cmd = enter_cmd
self.exit_cmd = None
if type(exit_cmd) in (list, tuple):
self.exit_cmd = exit_cmd
self.pipe_stdout = stdout
self.pipe_stderr = stderr
def __enter__(self):
# type: () -> AutoCallCommand
if self.enter_cmd:
subprocess.call(self.enter_cmd, stdout=self.pipe_stdout, stderr=self.pipe_stderr)
return self
def __exit__(self, exc_type, exc_value, traceback):
# type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None
if self.exit_cmd:
subprocess.call(self.exit_cmd, stdout=self.pipe_stdout, stderr=self.pipe_stderr)
[docs]class FollowLogfile(object):
"""
Prints the contents of the listed files on exit of the with block if
an exception occurred.
Set always=True to also print them without exception.
You may wish to make the server flush its logs before existing the
with block. Use AutoCallCommand inside the block for that::
cmd = ('doveadm', 'log', 'reopen')
with FollowLogfile(logfiles=['/var/log/syslog', '/var/log/mail.log']):
with utils.AutoCallCommand(enter_cmd=cmd, exit_cmd=cmd):
pass
with FollowLogfile(logfiles=['/var/log/syslog'], always=True):
with utils.AutoCallCommand(enter_cmd=cmd, exit_cmd=cmd):
pass
""" # noqa: E101
def __init__(self, logfiles, always=False):
# type: (Iterable[str], bool) -> None
"""
:param logfiles: list of absolute filenames to read from
:param always: bool, if True: print logfile change also if no error occurred (default=False)
"""
assert isinstance(always, bool)
self.always = always
self.logfile_pos = dict.fromkeys(logfiles, 0) # type: Dict[str, int]
def __enter__(self):
# type: () -> FollowLogfile
self.logfile_pos.update((logfile, os.path.getsize(logfile)) for logfile in self.logfile_pos)
return self
def __exit__(self, exc_type, exc_value, traceback):
# type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None
if self.always or exc_type:
for logfile, pos in self.logfile_pos.items():
with open(logfile, "r") as log:
log.seek(pos, 0)
print(logfile.center(79, "="))
sys.stdout.writelines(log)
print("=" * 79)
[docs]class ReplicationType(IntEnum):
LISTENER = 1
POSTRUN = 2
S4C_FROM_UCS = 3
S4C_TO_UCS = 4
DRS = 5
[docs]def wait_for_replication_from_master_openldap_to_local_samba(replication_postrun=False, ldap_filter=None, verbose=True):
# type: (bool, Optional[str], bool) -> None
"""Wait for all kind of replications"""
# the order matters!
conditions = [(ReplicationType.LISTENER, 'postrun' if replication_postrun else True)] # type: List[Tuple[ReplicationType, Any]]
ucr = UCR
ucr.load()
if ucr.get('samba4/ldap/base'):
conditions.append((ReplicationType.S4C_FROM_UCS, ldap_filter))
if ucr.get('server/role') in ('domaincontroller_backup', 'domaincontroller_slave'):
conditions.append((ReplicationType.DRS, ldap_filter))
wait_for(conditions, verbose=True)
[docs]def wait_for_replication_from_local_samba_to_local_openldap(replication_postrun=False, ldap_filter=None, verbose=True):
# type: (bool, Optional[str], bool) -> None
"""Wait for all kind of replications"""
conditions = []
# the order matters!
ucr = UCR
ucr.load()
if ucr.get('server/role') in ('domaincontroller_backup', 'domaincontroller_slave'):
conditions.append((ReplicationType.DRS, ldap_filter))
if ucr.get('samba4/ldap/base'):
conditions.append((ReplicationType.S4C_FROM_UCS, ldap_filter))
if replication_postrun:
conditions.append((ReplicationType.LISTENER, 'postrun'))
else:
conditions.append((ReplicationType.LISTENER, None))
wait_for(conditions, verbose=True)
[docs]def wait_for(conditions=None, verbose=True):
# type: (Optional[List[Tuple[ReplicationType, Any]]], bool) -> None
"""Wait for all kind of replications"""
for replicationtype, detail in conditions or []:
if replicationtype == ReplicationType.LISTENER:
if detail == 'postrun':
wait_for_listener_replication_and_postrun(verbose)
else:
wait_for_listener_replication(verbose)
elif replicationtype == ReplicationType.S4C_FROM_UCS:
wait_for_s4connector_replication(verbose)
if detail:
# TODO: search in Samba/AD with filter=detail
pass
elif replicationtype == ReplicationType.S4C_TO_UCS:
wait_for_s4connector_replication(verbose)
if detail:
# TODO: search in OpenLDAP with filter=detail
pass
elif replicationtype == ReplicationType.DRS:
if not isinstance(detail, dict):
detail = {'ldap_filter': detail}
wait_for_drs_replication(verbose=verbose, **detail)
[docs]def wait_for_drs_replication(*args, **kwargs):
# type: (*Any, **Any) -> None
from univention.testing.ucs_samba import wait_for_drs_replication
return wait_for_drs_replication(*args, **kwargs)
[docs]def wait_for_listener_replication(verbose=True):
# type: (bool) -> None
sys.stdout.flush()
time.sleep(1) # Give the notifier some time to increase its transaction id
if verbose:
print('Waiting for replication...')
for _ in range(300):
# The "-c 1" option ensures listener and notifier id are equal.
# Otherwise the check is successful as long as the listener id changed since the last check.
cmd = ('/usr/lib/nagios/plugins/check_univention_replication', '-c', '1')
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, _stderr = proc.communicate()
if proc.returncode == 0:
if verbose:
print('Done: replication complete.')
return
print('.', end=' ')
time.sleep(1)
print('Error: replication incomplete.')
raise LDAPReplicationFailed()
[docs]def get_lid():
# type: () -> int
"""
get_lid() returns the last processed notifier ID of univention-directory-listener.
"""
with open("/var/lib/univention-directory-listener/notifier_id", "r") as notifier_id:
return int(notifier_id.readline())
[docs]def wait_for_listener_replication_and_postrun(verbose=True):
# type: (bool) -> None
# Postrun function in listener modules are called after 15 seconds without any events
wait_for_listener_replication(verbose=verbose)
if verbose:
print("Waiting for postrun...")
lid = get_lid()
seconds_since_last_change = 0
for _ in range(300):
time.sleep(1)
print('.', end=' ')
if lid == get_lid():
seconds_since_last_change += 1
else:
seconds_since_last_change = 0
if seconds_since_last_change > 12:
# Less than 15 sec because a postrun function can potentially make ldap changes,
# which would result in a loop here.
time.sleep(10) # Give the postrun function some time
if verbose:
print("Postrun should have run")
return
lid = get_lid()
print("Postrun was probably never called in the last 300 seconds")
raise LDAPReplicationFailed
[docs]def wait_for_s4connector_replication(verbose=True):
# type: (bool) -> None
if verbose:
print('Waiting for connector replication')
import univention.testing.ucs_samba
try:
univention.testing.ucs_samba.wait_for_s4connector(17)
except OSError as exc: # nagios not installed
if verbose:
print('Nagios not installed: %s' % (exc,), file=sys.stderr)
time.sleep(16)
except univention.testing.ucs_samba.WaitForS4ConnectorTimeout:
if verbose:
print('Warning: S4 Connector replication was not finished after 17 seconds', file=sys.stderr)
# backwards compatibility
wait_for_replication = wait_for_listener_replication
wait_for_replication_and_postrun = wait_for_listener_replication_and_postrun
wait_for_connector_replication = wait_for_s4connector_replication
[docs]def package_installed(package):
# type: (str) -> bool
sys.stdout.flush()
with open('/dev/null', 'w') as null:
return (subprocess.call("dpkg-query -W -f '${Status}' %s | grep -q ^install" % package, stderr=null, shell=True) == 0)
[docs]def fail(log_message=None, returncode=1):
# type: (Optional[str], int) -> NoReturn
print('### FAIL ###')
if log_message:
print('%s\n### ###' % log_message)
if sys.exc_info()[-1]:
print(traceback.format_exc(), file=sys.stderr)
sys.exit(returncode)
[docs]def uppercase_in_ldap_base():
# type: () -> bool
ucr = ConfigRegistry()
ucr.load()
return not ucr.get('ldap/base').islower()
[docs]def is_udp_port_open(port, ip=None):
# type: (int, Optional[str]) -> bool
if ip is None:
ip = '127.0.0.1'
try:
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_sock.connect((ip, int(port)))
os.write(udp_sock.fileno(), b'X')
os.write(udp_sock.fileno(), b'X')
os.write(udp_sock.fileno(), b'X')
return True
except OSError as ex:
print('is_udp_port_open({0}) failed: {1}'.format(port, ex))
return False
[docs]def is_port_open(port, hosts=None, timeout=60):
# type: (int, Optional[Iterable[str]], float) -> bool
'''
check if port is open, if host == None check
hostname and 127.0.0.1
:param int port: TCP port number
:param hosts: list of hostnames or localhost if hosts is None.
:type hosts: list[str] or None
:return: True if at least on host is reachable, False otherwise.
:rtype: boolean
'''
if hosts is None:
hosts = (socket.gethostname(), '127.0.0.1', '::1')
for host in hosts:
address = (host, int(port))
try:
connection = socket.create_connection(address, timeout)
connection.close()
return True
except EnvironmentError as ex:
print('is_port_open({0}) failed: {1}'.format(port, ex))
return False
if __name__ == '__main__':
import doctest
doctest.testmod()
# vim: set fileencoding=utf-8 ft=python sw=4 ts=4 :