Source code for univention.testing.ucsschool.importusers_http

# -*- coding: utf-8 -*-

import json
import os
import tempfile
import time
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple  # noqa: F401

from univention.testing import utils
from univention.testing.ucsschool.importusers_cli_v2 import ImportTestbase

if TYPE_CHECKING:
    from ucsschool.http_api.client import Client, ResourceRepresentation  # noqa: F401


[docs] class HttpApiImportTester(ImportTestbase): default_config_path = "/usr/share/ucs-school-import/configs/ucs-school-testuser-http-import.json" _default_config = {}
[docs] def create_import_security_group( self, ou_dn, # type: str allowed_ou_names=None, # type: Optional[List[str]] roles=None, # type: Optional[List[str]] user_dns=None, # type: Optional[List[str]] ): # type: (...) -> Tuple[str, str] """ Create a import security group. :param str ou_dn: DN of OU in which to store the group :param list allowed_ou_names: list of OU names, none if None :param list roles: list of user roles (staff, student...), all if None :param list user_dns: list of DNs of users to add to group, none if None :return: group_dn, group_name :rtype: Tuple[str, str] """ allowed_ou_names = [] if allowed_ou_names is None else allowed_ou_names roles = self.all_roles if roles is None else roles user_dns = [] if user_dns is None else user_dns group_dn, group_name = self.udm.create_group( position="cn=groups,{}".format(ou_dn), options=["posix", "samba", "ucsschoolImportGroup"], append={ "users": user_dns, "ucsschoolImportRole": roles, "ucsschoolImportSchool": allowed_ou_names, }, ) utils.verify_ldap_object( group_dn, expected_attr={ "cn": [group_name], "ucsschoolImportRole": roles, "ucsschoolImportSchool": allowed_ou_names, "uniqueMember": user_dns, }, strict=False, should_exist=True, ) return group_dn, group_name
@property def default_config(self): if not self._default_config: with open(self.default_config_path) as fp: self._default_config.update(json.load(fp)) return self._default_config
[docs] def run_http_import_through_python_client( self, client, # type: Client filename, # type: str school, # type: str role, # type: str dryrun=True, # type: Optional[bool] timeout=600, # type: Optional[int] config=None, # type: Optional[Dict[str, str]] ): # type: (...) -> ResourceRepresentation.UserImportJobResource """ Run an import through the Python client of the HTTP-API. :param ucsschool.http_api.client.Client client: an instance of ucsschool.http_api.client.Client :param str filename: the CSV file to import :param str school: OU to import into :param str role: UCS@school user role :param bool dryrun: whether to do a dry-run or a real import :param int timeout: seconds to wait for the import to finish :param dict config: if not None: configuration to temporarily write to `/var/lib/ucs-school-import/configs/user_import_http-api.json` :return: UserImportJob resource representation object :rtype: ResourceRepresentation.UserImportJobResource """ if not config and config is not None: self.log.warning('Empty "config" passed!') with TempHttpApiConfig(config): t0 = time.time() import_job = client.userimportjob.create( filename=filename, school=school, user_role=role, dryrun=dryrun ) while time.time() - t0 < timeout: job = client.userimportjob.get( import_job.id ) # type: ResourceRepresentation.UserImportJobResource if job.status in ("Finished", "Aborted"): return job if job.result and isinstance(job.result.result, dict): progress = float(job.result.result.get("percentage", 0.0)) else: progress = 0.0 self.log.debug( "Waiting for import job %r to finish (%d%% - %d/%ds)...", import_job.id, int(progress), int(time.time() - t0), timeout, ) time.sleep(1) else: utils.fail("Import job did not finish in {} seconds.".format(timeout))
[docs] def fail(self, msg, returncode=1, import_job=None): """ Print import jobs logfile, then print package versions, traceback and error message. """ if import_job: self.log.debug("------ Start logfile of job %r ------", import_job.id) self.log.debug(import_job.log_file) self.log.debug("------ End logfile of job %r ------", import_job.id) else: self.log.debug("No import_job - no logfile.") return super(HttpApiImportTester, self).fail(msg, returncode)
[docs] class TempHttpApiConfig(object): default_config_path = "/var/lib/ucs-school-import/configs/user_import_http-api.json" def __init__(self, config): self.config = config if config is not None: _fd, self.original_config_backup = tempfile.mkstemp( dir=os.path.dirname(self.default_config_path) ) def __enter__(self): if self.config is None: return # copy original to backup file. copy only content, leaving permissions intact. with open(self.original_config_backup, "w") as fpw: with open(self.default_config_path) as fpr: fpw.write(fpr.read()) with open(self.default_config_path, "w") as fpw: json.dump(self.config, fpw) def __exit__(self, exc_type, exc_val, exc_tb): if self.config is None: return with open(self.default_config_path, "w") as fpw: with open(self.original_config_backup) as fpr: fpw.write(fpr.read()) os.remove(self.original_config_backup)