Source code for univention.testing.ucsschool.importusers_cli_v2

# -*- coding: utf-8 -*-

import copy
import csv
import datetime
import json
import os
import pprint
import random
import shutil
import subprocess
import sys
import tempfile
import traceback
from collections.abc import Mapping

from ldap.dn import escape_dn_chars
from ldap.filter import escape_filter_chars, filter_format
from six import iteritems, string_types

import univention.testing.strings as uts
import univention.testing.ucr
import univention.testing.ucsschool.ucs_test_school as utu
import univention.testing.udm
from univention.admin.uexceptions import ldapError, noObject
from univention.testing import utils
from univention.testing.ucs_samba import wait_for_drs_replication
from univention.testing.ucsschool.importusers import get_mail_domain
from univention.testing.ucsschool.ucs_test_school import get_ucsschool_logger

try:
    from univention.testing.ucs_samba import DRSReplicationFailed
except ImportError:
    DRSReplicationFailed = Exception

from typing import Set  # noqa: F401


[docs] class ImportException(Exception): pass
[docs] class TestFailed(Exception): def __init__(self, msg, stack): self.msg = msg self.stack = stack
[docs] def reset_notifier_restart_burst_limit(): """ Avoid "univention-directory-notifier.service: Failed with result 'start-limit-hit'." errors due to too many notifier restarts. This resets the burst limit counter, which includes successful service starts (even though the option is called "reset-failed"). """ subprocess.call(("systemctl", "reset-failed", "univention-directory-notifier.service"))
[docs] class ConfigDict(dict):
[docs] def update_entry(self, key, value): """ update_entry('foo:bar:baz', 'my value') update_entry('foo:bar:ding', False) """ if isinstance(value, string_types): if value.lower() == "false": value = False elif value.lower() == "true": value = True mydict = self items = key.split(":") while items: if len(items) == 1: if items[0] in mydict and isinstance(mydict[items[0]], Mapping): mydict[items[0]].update(value) else: mydict[items[0]] = value else: mydict = mydict.setdefault(items[0], {}) del items[0]
[docs] class PyHooks(object): def __init__(self, hook_basedir=None): self.hook_basedir = hook_basedir or "/usr/share/ucs-school-import/pyhooks" self.tmpdir = tempfile.mkdtemp(prefix="pyhook.", dir="/tmp") self.cleanup_files = set() self.log = get_ucsschool_logger()
[docs] def create_hooks(self): fn = "%s.py" % (uts.random_name(),) data = """from ucsschool.importer.utils.user_pyhook import UserPyHook import os class MyHook(UserPyHook): priority = { "pre_create": 1, "post_create": 1, "pre_modify": 1, "post_modify": 1, "pre_move": 1, "post_move": 1, "pre_remove": 1, "post_remove": 1 } def pre_create(self, user): self.logger.info("Running a pre_create hook for %%s.", user) self.run(user, 'create', 'pre') def post_create(self, user): self.logger.info("Running a post_create hook for %%s.", user) self.run(user, 'create', 'post') def pre_modify(self, user): self.logger.info("Running a pre_modify hook for %%s.", user) self.run(user, 'modify', 'pre') def post_modify(self, user): self.logger.info("Running a post_modify hook for %%s.", user) self.run(user, 'modify', 'post') def pre_move(self, user): self.logger.info("Running a pre_move hook for %%s.", user) self.run(user, 'move', 'pre') def post_move(self, user): self.logger.info("Running a post_move hook for %%s.", user) self.run(user, 'move', 'post') def pre_remove(self, user): self.logger.info("Running a pre_remove hook for %%s.", user) self.run(user, 'remove', 'pre') def post_remove(self, user): self.logger.info("Running a post_remove hook for %%s.", user) self.run(user, 'remove', 'post') def run(self, user, action, when): self.logger.info("***** Running {} {} hook for user {}.".format(when, action, user)) # udm_properties[k] is only filled from LDAP, if k was in the input # don't try to get_udm_object() on a user not {anymore, yet} in ldap if not user.udm_properties.get('street') and not ((action == 'create' and when == 'pre') \ or (action == 'remove' and when == 'post')): obj = user.get_udm_object(self.lo) user.udm_properties['street'] = obj.info.get('street', '') user.udm_properties['street'] = user.udm_properties.get('street', '') + ',{}-{}'.format( when, action) if when == 'post' and action != 'remove': user.modify(self.lo) fn_touchfile = os.path.join(%(tmpdir)r, '%%s-%%s' %% (when, action)) open(fn_touchfile, 'w').write('EXECUTED\\n') """ % { "tmpdir": self.tmpdir } fn = os.path.join(self.hook_basedir, fn) self.cleanup_files.add(fn) with open(fn, "w") as fd: fd.write(data) self.log.info("Created hook %r", fn)
[docs] def cleanup(self): shutil.rmtree(self.tmpdir, ignore_errors=True) for fn in list(self.cleanup_files): try: os.remove(fn) self.log.debug("Removed %r.", fn) except (IOError, OSError): self.log.warning("Failed to remove %r" % (fn,)) if fn.endswith(".py"): try: os.remove("%sc" % (fn,)) # also remove .pyc files self.log.debug("Removed %sc.", fn) except (IOError, OSError): pass self.cleanup_files.remove(fn)
[docs] class ImportTestbase(object): ou_A = utu.Bunch(name=None, dn=None) # will be initializes in create_ous() ou_B = utu.Bunch(name=None, dn=None) # set ou_B to None if a second OU is not needed ou_C = utu.Bunch(name=None, dn=None) # set ou_C to None if a third OU is not needed use_ou_cache = True # if True: use cached OUs, if false create fresh OUs all_roles = ("staff", "student", "teacher", "teacher_and_staff") def __init__(self): self.ucr = univention.testing.ucr.UCSTestConfigRegistry() self.ucr.load() self.log = get_ucsschool_logger() self.lo = None # will be initialized in run() self.ldap_status = None # type: Set[str] # will be initialized in run(): self.schoolenv = None # type: univention.testing.ucsschool.UCSTestSchool self.udm = None # type: univention.testing.udm.UCSTestUDM # will be initialized in run() self.maildomain = get_mail_domain()
[docs] def cleanup(self): self.log.info("Performing ImportTestbase cleanup...") self.udm.cleanup() self.log.info("Reverting UCR...") self.ucr.revert_to_original_registry() self.log.info("ImportTestbase cleanup done")
[docs] def save_ldap_status(self): self.log.debug("Saving LDAP status...") self.ldap_status = utu.UCSTestSchool.get_ldap_status(self.lo) self.log.debug("LDAP status saved.")
[docs] def diff_ldap_status(self): self.log.debug("Reading LDAP status to check differences...") res = utu.UCSTestSchool.diff_ldap_status(self.lo, self.ldap_status) self.log.debug("New objects: {!r}".format(res.new)) self.log.debug("Removed objects: {!r}".format(res.removed)) return res
[docs] @classmethod def pugre_timestamp_ldap2udm(cls, ldap_val): """Convert '20090101000000Z' to '2009-01-01'. Ignores timezones.""" if not ldap_val: return "" ldap_date = datetime.datetime.strptime(ldap_val, cls.ldap_date_format) return ldap_date.strftime(cls.udm_date_format)
[docs] @classmethod def pugre_timestamp_udm2ldap(cls, udm_val): """Convert '2009-01-01' to '20090101000000Z'. Ignores timezones.""" if not udm_val: return "" udm_date = datetime.datetime.strptime(udm_val, cls.udm_date_format) return udm_date.strftime(cls.ldap_date_format)
[docs] def check_new_and_removed_users(self, exp_new, exp_removed): ldap_diff = self.diff_ldap_status() new_users = [x for x in ldap_diff.new if x.startswith("uid=")] if len(new_users) != exp_new: self.log.error( "Invalid number of new users (expected %d, found %d)! Found new objects: %r", exp_new, len(new_users), new_users, ) self.fail("Stopping because of invalid number of new users.") removed_users = [x for x in ldap_diff.removed if x.startswith("uid=")] if len(removed_users) != exp_removed: self.log.error( "Invalid number of removed users (expected %d, found %d)! Removed objects: %r", exp_removed, len(removed_users), removed_users, ) self.fail("Stopping because of invalid number of removed users.")
[docs] def fail(self, msg, returncode=1): """Print package versions, traceback and error message.""" self.log.error("\n%s\n%s%s", "=" * 79, "".join(traceback.format_stack()), "=" * 79) utils.fail(msg, returncode)
[docs] def create_ous(self, schoolenv): self.log.info("Creating OUs...") ous = [ou for ou in [self.ou_A, self.ou_B, self.ou_C] if ou is not None] res = schoolenv.create_multiple_ous( len(ous), name_edudc=self.ucr.get("hostname"), use_cache=self.use_ou_cache ) for num, (name, dn) in enumerate(res): ou = ous[num] ou.name, ou.dn = name, dn self.log.info( "Created OUs: %r.", [_ou.name for _ou in [self.ou_A, self.ou_B, self.ou_C] if _ou is not None], )
[docs] def setup_testenv(self, schoolenv): self.schoolenv = schoolenv self.udm = schoolenv.udm if self.maildomain not in self.ucr.get("mail/hosteddomains", ""): self.log.info("\n\n*** Creating mail domain %r...\n", self.maildomain) self.udm.create_object( "mail/domain", position="cn=domain,cn=mail,{}".format(self.ucr["ldap/base"]), name=self.maildomain, ignore_exists=True, ) has_admin_credentials = self.ucr["server/role"] in ( "domaincontroller_master", "domaincontroller_backup", ) self.lo = schoolenv.open_ldap_connection(admin=has_admin_credentials)
[docs] def run(self): try: with utu.UCSTestSchool() as schoolenv: self.setup_testenv(schoolenv) self.create_ous(schoolenv) self.test() self.log.info("Test was successful.\n\n") finally: self.cleanup()
[docs] def test(self): raise NotImplementedError()
[docs] def wait_for_drs_replication_of_membership( self, group_dn, member_uid, is_member=True, try_resync=True, **kwargs ): """ wait_for_drs_replication() of a user to become a member of a group. :param group: str: DN of a group :param member_uid: str: username :param is_member: bool: whether the user should be a member or not :param try_resync: bool: if waiting for drs replication didn't succeed, run "/usr/share/univention-s4-connector/resync_object_from_ucs.py <group_dn>" and wait again :param kwargs: dict: will be passed to wait_for_drs_replication() with a modified 'ldap_filter' :return: None | <ldb result> """ if not utils.package_installed("univention-samba4"): self.log.info( "wait_for_drs_replication_of_membership(): skip, univention-samba4 not installed." ) return try: user_filter = kwargs["ldap_filter"] if user_filter and not user_filter.startswith("("): user_filter = "({})".format(user_filter) except KeyError: user_filter = "" if is_member: member_filter = filter_format("(memberOf=%s)", (group_dn,)) else: member_filter = filter_format("(!(memberOf=%s))", (group_dn,)) kwargs["ldap_filter"] = "(&(cn={}){}{})".format( escape_filter_chars(member_uid), member_filter, user_filter ) try: res = wait_for_drs_replication(**kwargs) except DRSReplicationFailed as exc: self.log.error("DRSReplicationFailed: %s", exc) res = None if not res: self.log.warning("No result from wait_for_drs_replication().") if try_resync: cmd = ["/usr/share/univention-s4-connector/resync_object_from_ucs.py", group_dn] self.log.info("Running subprocess.call(%r)...", cmd) subprocess.call(cmd) self.log.info( "Waiting again. Executing: wait_for_drs_replication_of_membership(group_dn=%r, " "member_uid=%r, is_member=%r, try_resync=False, **kwargs=%r)...", group_dn, member_uid, is_member, kwargs, ) # recursion once with try_resync=False res = self.wait_for_drs_replication_of_membership( group_dn=group_dn, member_uid=member_uid, is_member=is_member, try_resync=False, **kwargs ) return res
[docs] class CLI_Import_v2_Tester(ImportTestbase): ldap_date_format = "%Y%m%d%H%M%SZ" udm_date_format = "%Y-%m-%d" def __init__(self): super(CLI_Import_v2_Tester, self).__init__() self.tmpdir = tempfile.mkdtemp(prefix="34_import-users_via_cli_v2.", dir="/tmp/") self.hook_fn_set = set() self.default_config = ConfigDict( { "factory": "ucsschool.importer.default_user_import_factory.DefaultUserImportFactory", "classes": {}, "input": {"type": "csv", "filename": "import.csv"}, "csv": { "mapping": { "OUs": "schools", "Vor": "firstname", "Nach": "lastname", "Gruppen": "school_classes", "E-Mail": "email", "Beschreibung": "description", } }, "maildomain": self.maildomain, "scheme": { "email": "<:umlauts><firstname>[0].<lastname>@<maildomain>", "record_uid": "<firstname>;<lastname>;<email>", "username": {"default": "<:umlauts><firstname>[0].<lastname>[COUNTER2]"}, }, "source_uid": "sourceDB", "user_role": "student", "tolerate_errors": 0, "verbose": True, } )
[docs] def cleanup(self): self.log.info("Performing CLI_Import_v2_Tester cleanup...") self.log.info("Purging %r", self.tmpdir) shutil.rmtree(self.tmpdir, ignore_errors=True) for hook_fn in self.hook_fn_set: try: os.remove(hook_fn) except (IOError, OSError): self.log.warning("Failed to remove %r" % (hook_fn,)) reset_notifier_restart_burst_limit() super(CLI_Import_v2_Tester, self).cleanup() self.log.info("CLI_Import_v2_Tester cleanup done")
[docs] def create_config_json(self, values=None, config=None): """ Creates a config file for "ucs-school-user-import". Default values may be overridden via a dict called values. >>> values = {'user_role': 'teacher', 'input:type': 'csv' } >>> create_config_json(values=values) '/tmp/config.dkgfcsdz' >>> create_config_json(values=values, config=DEFAULT_CONFIG) '/tmp/config.dkgfcsdz' """ fn = tempfile.mkstemp(prefix="config.", dir=self.tmpdir)[1] if not config: config = copy.deepcopy(self.default_config) if values: for config_option, value in iteritems(values): config.update_entry(config_option, value) with open(fn, "w") as fd: json.dump(config, fd) self.log.info("Config: %r" % config) return fn
[docs] def create_csv_file( self, person_list, sisopi_school=None, mapping=None, fn_csv=None, prefix_schools=True ): """ Create CSV file for given persons >>> from univention.testing.ucsschool.importusers import Person >>> create_csv_file([Person('schoolA', 'student'), Person('schoolB', 'teacher')]) '/tmp/import.sldfhgsg.csv' >>> create_csv_file([Person('schoolA', 'student'), Person('schoolB', 'teacher')], fn_csv='/tmp/import.foo.csv') '/tmp/import.foo.csv' >>> create_csv_file([Person('schoolA', 'student'), Person('schoolB', 'teacher')], headers={'firstname': 'Vorname', ...}) '/tmp/import.cetjdfgj.csv' """ if mapping: header2properties = mapping else: header2properties = self.default_config["csv"]["mapping"] properties2headers = {v: k for k, v in iteritems(header2properties)} header_row = list(header2properties) random.shuffle(header_row) self.log.info("Header row = %r", header_row) fn = fn_csv or tempfile.mkstemp(prefix="users.", dir=self.tmpdir)[1] writer = csv.DictWriter( open(fn, "w"), header_row, restval="", delimiter=",", quotechar='"', quoting=csv.QUOTE_ALL ) writer.writeheader() for person in person_list: if sisopi_school: tmp = copy.deepcopy(person) tmp.school = sisopi_school tmp.schools = [sisopi_school] if tmp.role != "staff": tmp.school_classes = {} tmp.school_classes[sisopi_school] = person.school_classes[sisopi_school] person_dict = tmp.map_to_dict(properties2headers, prefix_schools=prefix_schools) else: person_dict = person.map_to_dict(properties2headers, prefix_schools=prefix_schools) self.log.info("Person data = %r", person_dict) writer.writerow(person_dict) return fn
[docs] def check_for_non_empty_config(self, raise_exc=True): base_dir = "/var/lib/ucs-school-import/configs/" user_config = ["user_import.json"] school_configs = [ "{}.json".format(ou.name) for ou in [self.ou_A, self.ou_B, self.ou_C] if ou is not None ] configs = school_configs + user_config for config in configs: config_path = os.path.join(base_dir, config) if not os.path.isfile(config_path): continue with open(config_path) as config_file: if len(json.load(config_file)) != 0: msg = ( 'The config under "%s" seems to be non-empty. That often causes problems for ' 'tests. Please replace it with an empty config: "{}".' % (config_path,) ) if raise_exc: raise ImportException(msg) else: self.log.warning("*" * 40) self.log.warning(msg) self.log.warning("*" * 40)
[docs] def check_for_non_empty_pyhooks(self, raise_exc=True): path = "/usr/share/ucs-school-import/pyhooks" dir_content = [x for x in os.listdir(path) if x != "__pycache__"] if dir_content: msg = ( "The directory {!r} seems to be non-empty: {!r} That often causes problems for tests. " "Please remove all files in it.".format(path, dir_content) ) if raise_exc: raise ImportException(msg) else: self.log.warning("*" * 40) self.log.warning(msg) self.log.warning("*" * 40)
[docs] def run_import( self, args, fail_on_error=True, fail_on_preexisting_config=True, fail_on_preexisting_pyhook=True ): self.check_for_non_empty_config(fail_on_preexisting_config) self.check_for_non_empty_pyhooks(fail_on_preexisting_pyhook) cmd = ["/usr/share/ucs-school-import/scripts/ucs-school-user-import", "-v"] + args self.log.info("Starting import: %r", cmd) sys.stdout.flush() sys.stderr.flush() if fail_on_error: try: exitcode = subprocess.check_call(cmd) except subprocess.CalledProcessError as exc: self.log.error("As requested raising an exception due to non-zero exit code") raise ImportException( "Command '%r' returned non-zero exit status %r (output=%r)" % (exc.cmd, exc.returncode, exc.output) ) else: exitcode = subprocess.call(cmd) self.log.info("Import process exited with exit code %r", exitcode) return exitcode
[docs] class UniqueObjectTester(CLI_Import_v2_Tester): def __init__(self): super(UniqueObjectTester, self).__init__() self.unique_basenames_to_remove = []
[docs] def cleanup(self): self.log.info("Removing new unique-usernames,cn=ucsschool entries...") if not self.lo: self.lo = utu.UCSTestSchool.open_ldap_connection(admin=True) for username in self.unique_basenames_to_remove: dn = "cn={},cn=unique-usernames,cn=ucsschool,cn=univention,{}".format( escape_dn_chars(username), self.lo.base ) self.log.debug("Removing %r", dn) try: self.lo.delete(dn) except noObject: pass except ldapError as exc: self.log.error("DN %r -> %s", dn, exc) super(UniqueObjectTester, self).cleanup()
[docs] def check_unique_obj(self, obj_name, prefix, next_num): """check if history object exists""" self.log.info("Checking for %s object...", obj_name) dn = "cn={},cn={},cn=ucsschool,cn=univention,{}".format(prefix, obj_name, self.lo.base) attrs = { "objectClass": ["ucsschoolUsername"], "ucsschoolUsernameNextNumber": [next_num], "cn": [prefix], } utils.verify_ldap_object(dn, expected_attr=attrs, strict=True, should_exist=True) self.log.debug( "%s object %r:\n%s", obj_name, dn, pprint.PrettyPrinter(indent=2).pformat(self.lo.get(dn)) ) self.log.info("%s object has been found and is correct.", obj_name)