Source code for univention.testing.coverage

# SPDX-FileCopyrightText: 2024-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""Code coverage measurement for ucs-test"""


import os
import shutil
import signal
import subprocess
import time
from argparse import ArgumentParser, Namespace, _ArgumentGroup
from collections.abc import Callable
from typing import Any

import atexit


[docs] class MissingCoverage(Exception): pass
[docs] class Coverage: COVERAGE_PTH = '/usr/lib/python3/dist-packages/ucstest-coverage.pth' COVERAGE_PTH_CONTENT = '''import univention.testing.coverage; univention.testing.coverage.Coverage.startup()''' COVERAGE_DEBUG_PATH = '/tmp/ucs-test-coverage' COVERAGE_DEBUG = os.path.exists(COVERAGE_DEBUG_PATH) coverage = None def __init__(self, options: Namespace) -> None: self.coverage_config = options.coverage_config self.branch_coverage = options.branch_coverage self.coverage = options.coverage self.coverage_sources = options.coverage_sources or ['univention'] self.services = options.coverage_restart_services or [ 'univention-management-console-server', 'univention-s4-connector', 'univention-directory-listener', 'univention-portal-server', 'univention-directory-manager-rest', ] self.show_missing = options.coverage_show_missing self.output_directory = options.coverage_output_directory try: subprocess.check_call( ["dpkg", "-l", "python3-ucs-school"], stderr=open("/dev/null", "a"), stdout=open("/dev/null", "a"), ) self.coverage_sources.append('ucsschool') subprocess.check_call( ["dpkg", "-l", "ucs-school-import-http-api"], stderr=open("/dev/null", "a"), stdout=open("/dev/null", "a"), ) self.services.extend([ 'celery-worker-ucsschool-import', 'ucs-school-import-http-api', ]) except subprocess.CalledProcessError: pass if self.coverage and options.coverage_debug: with open(self.COVERAGE_DEBUG_PATH, 'w'): self.COVERAGE_DEBUG = True
[docs] def start(self) -> None: """Start measuring of coverage. Only called by ucs-test-framework once. Sets up the configuration.""" if not self.coverage: return self.write_config_file() os.environ['COVERAGE_PROCESS_START'] = self.coverage_config self.restart_python_services()
[docs] def write_config_file(self) -> None: """Write a Python .pth file which is invoked before any Python process""" with open(self.COVERAGE_PTH, 'w') as fd: fd.write(self.COVERAGE_PTH_CONTENT) with open(self.coverage_config, 'w') as fd: fd.write('''[run] data_file = {data_file} branch = {branch} parallel = True source = {source} [report] ignore_errors = True show_missing = {show_missing} omit = handlers/ucstest syntax.d/* hooks.d/* [html] directory = {directory} '''.format( data_file=os.path.join(os.path.dirname(self.coverage_config), '.coverage'), branch=repr(self.branch_coverage), source='\n\t'.join(self.coverage_sources), show_missing=self.show_missing, directory=self.output_directory, ))
[docs] def restart_python_services(self) -> None: """Restart currently running Python services, so that they start/stop measuring code""" for service in self.services: try: subprocess.call(['/usr/sbin/service', service, 'restart']) except OSError: pass try: subprocess.call(['pkill', '-f', 'python3.*univention-cli-server']) except OSError: pass
[docs] def stop(self) -> None: """Stop coverage measuring. Only called by ucs-test-framework once. Stores the results.""" if not self.coverage: return # stop all services, so that their atexit-handler/signal handler stores the result before evaluating the result if os.path.exists(self.COVERAGE_PTH): os.remove(self.COVERAGE_PTH) self.restart_python_services() for exe in ("coverage", "python3-coverage"): coverage_bin = shutil.which(exe) if coverage_bin: break else: raise MissingCoverage() subprocess.call([coverage_bin, '--version']) subprocess.call([coverage_bin, 'combine']) subprocess.call([coverage_bin, 'html']) subprocess.call([coverage_bin, 'report']) subprocess.call([coverage_bin, 'erase']) if os.path.exists(self.coverage_config): os.remove(self.coverage_config)
[docs] @classmethod def get_argument_group(cls, parser: ArgumentParser) -> _ArgumentGroup: """The option group for ucs-test-framework""" coverage_group = parser.add_argument_group('Code coverage measurement options') coverage_group.add_argument("--with-coverage", dest="coverage", action='store_true') coverage_group.add_argument("--coverage-config", default=os.path.abspath(os.path.expanduser('~/.coveragerc'))) # don't use this, doesn't work! coverage_group.add_argument("--branch-coverage", action='store_true') coverage_group.add_argument('--coverage-sources', action='append', default=[]) coverage_group.add_argument("--coverage-debug", action='store_true') coverage_group.add_argument('--coverage-restart-services', action='append', default=[]) coverage_group.add_argument('--coverage-show-missing', action='store_true') coverage_group.add_argument("--coverage-output-directory", default=os.path.abspath(os.path.expanduser('~/htmlcov'))) return coverage_group
[docs] @classmethod def is_candidate(cls, argv: list[str]) -> bool: if os.getuid(): return False exe = os.path.basename(argv[0]) if exe not in {'python', 'python3', 'python3.7', 'python3.9', 'python3.10', 'python3.11'}: return False if not any(s in arg for arg in argv for s in ('univention', 'udm', 'ucs', 'ucr')): cls.debug_message('skip non-ucs process', argv) return False if any(s in arg for arg in argv[2:] for s in ('listener', 'notifier')): # we don't need to cover the listener currently. some tests failed, maybe because of measuring the listener? cls.debug_message('skip UDL/UDN', argv) return False return True
[docs] @classmethod def startup(cls) -> None: """Startup function which is invoked by every(!) Python process during coverage measurement. If the process is relevant we start measuring coverage.""" argv = open('/proc/%s/cmdline' % os.getpid()).read().split('\x00') if not cls.is_candidate(argv): return cls.debug_message('START', argv) atexit.register(lambda: cls.debug_message('STOP')) if not os.environ.get('COVERAGE_PROCESS_START'): os.environ["COVERAGE_PROCESS_START"] = os.path.abspath(os.path.expanduser('~/.coveragerc')) cls.debug_message('ENVIRON WAS CLEARED BY PARENT PROCESS', argv) import coverage cov = coverage.process_startup() if not cov: cls.debug_message('no coverage startup (already started?, environ cleared?): %r' % (os.environ.get('COVERAGE_PROCESS_START'),)) return cls.coverage = cov # FIXME: univention-cli-server calls os.fork() which causes the coverage measurement not to start in the forked process # https://github.com/nedbat/coveragepy/issues/310 # Coverage fails with os.fork and os._exit osfork = os.fork def fork(*args: Any, **kwargs: Any) -> int: pid = osfork(*args, **kwargs) if pid == 0: cls.debug_message('FORK CHILD') cls.startup() else: cls.debug_message('FORK PARENT') cls.stop_measurement(True) return pid os.fork = fork # https://github.com/nedbat/coveragepy/issues/43 # Coverage measurement fails on code containing os.exec* methods # if the process calls one of the process-replacement functions the coverage must be started in the new process for method in ['execl', 'execle', 'execlp', 'execlpe', 'execv', 'execve', 'execvp', 'execvpe', '_exit']: if isinstance(getattr(os, method), StopCoverageDecorator): continue # restarted in the same process (e.g. os.fork()) setattr(os, method, StopCoverageDecorator(getattr(os, method))) # There are test cases which e.g. kill the univention-cli-server. # The atexit-handler of coverage will not be called for SIGTERM, so we need to stop coverage manually def sigterm(sig: int, frame: Any) -> None: cls.debug_message('signal handler', sig, argv) cls.stop_measurement() signal.signal(signal.SIGTERM, previous) os.kill(os.getpid(), sig) previous = signal.signal(signal.SIGTERM, sigterm)
[docs] @classmethod def stop_measurement(cls, start: bool = False) -> None: cover = cls.coverage cls.debug_message('STOP MEASURE', bool(cover)) if not cover: return cover.stop() cover.save() if start: cover.start()
[docs] @classmethod def debug_message(cls, *messages: Any) -> None: if not cls.COVERAGE_DEBUG: return try: with open(cls.COVERAGE_DEBUG_PATH, 'a') as fd: fd.write('%s : %s: %s\n' % (os.getpid(), time.time(), ' '.join(repr(m) for m in messages))) except OSError: pass
[docs] class StopCoverageDecorator: inDecorator = False def __init__(self, method: Callable[..., Any]) -> None: self.method = method def __call__(self, *args: Any, **kw: Any) -> None: if not StopCoverageDecorator.inDecorator: StopCoverageDecorator.inDecorator = True Coverage.debug_message('StopCoverageDecorator', self.method.__name__, open('/proc/%s/cmdline' % os.getpid()).read().split('\x00')) Coverage.stop_measurement(True) try: self.method(*args, **kw) finally: StopCoverageDecorator.inDecorator = False def __repr__(self) -> str: return f'<StopCoverageDecorator {self.method!r}>'