"""Code coverage measurement for ucs-test"""
from __future__ import absolute_import, print_function
import atexit
import distutils.spawn
import os
import signal
import subprocess
import time
from argparse import ArgumentParser, Namespace, _ArgumentGroup # noqa: F401
from typing import Any, Callable, List # noqa: F401
import six
[docs]class MissingCoverage(Exception):
pass
[docs]class Coverage(object):
COVERAGE_PTH = '/usr/lib/python2.7/dist-packages/ucstest-coverage.pth' if six.PY2 else '/usr/lib/python3/dist-packages/ucstest-coverage.pth'
COVERAGE_PTH_CONTENT = '''import univention.testing.coverage; univention.testing.coverage.Coverage.startup()'''
COVERAGE_DEBUG_PATH = '/tmp/ucs-test-coverage'
COVERAGE_DEBUG = os.path.exists(COVERAGE_DEBUG_PATH)
coverage = None
def __init__(self, options):
# type: (Namespace) -> None
self.coverage_config = options.coverage_config
self.branch_coverage = options.branch_coverage
self.coverage = options.coverage
self.coverage_sources = options.coverage_sources or ['univention']
self.services = options.coverage_restart_services or [
'univention-management-console-server',
'univention-management-console-web-server',
'univention-s4-connector',
'univention-directory-listener',
'univention-portal-server',
'univention-directory-manager-rest',
]
self.show_missing = options.coverage_show_missing
self.output_directory = options.coverage_output_directory
try:
subprocess.check_call(
["dpkg", "-l", "python-ucs-school"],
stderr=open("/dev/null", "a"),
stdout=open("/dev/null", "a")
)
self.coverage_sources.append('ucsschool')
subprocess.check_call(
["dpkg", "-l", "ucs-school-import-http-api"],
stderr=open("/dev/null", "a"),
stdout=open("/dev/null", "a")
)
self.services.extend([
'celery-worker-ucsschool-import',
'ucs-school-import-http-api',
])
except subprocess.CalledProcessError:
pass
if self.coverage and options.coverage_debug:
with open(self.COVERAGE_DEBUG_PATH, 'w'):
self.COVERAGE_DEBUG = True
[docs] def start(self):
# type: () -> None
"""Start measuring of coverage. Only called by ucs-test-framework once. Sets up the configuration."""
if not self.coverage:
return
self.write_config_file()
os.environ['COVERAGE_PROCESS_START'] = self.coverage_config
self.restart_python_services()
[docs] def write_config_file(self):
# type: () -> None
"""Write a python .pth file which is invoked before any python process"""
with open(self.COVERAGE_PTH, 'w') as fd:
fd.write(self.COVERAGE_PTH_CONTENT)
with open(self.coverage_config, 'w') as fd:
fd.write('''[run]
data_file = {data_file}
branch = {branch}
parallel = True
source = {source}
[report]
ignore_errors = True
show_missing = {show_missing}
omit = handlers/ucstest
syntax.d/*
hooks.d/*
[html]
directory = {directory}
'''.format(
data_file=os.path.join(os.path.dirname(self.coverage_config), '.coverage'),
branch=repr(self.branch_coverage),
source='\n\t'.join(self.coverage_sources),
show_missing=self.show_missing,
directory=self.output_directory,
))
[docs] def restart_python_services(self):
# type: () -> None
"""Restart currently running python services, so that they start/stop measuring code"""
for service in self.services:
try:
subprocess.call(['/usr/sbin/service', service, 'restart'])
except EnvironmentError:
pass
try:
subprocess.call(['pkill', '-f', 'python.*univention-cli-server'])
except EnvironmentError:
pass
[docs] def stop(self):
# type: () -> None
"""Stop coverage measuring. Only called by ucs-test-framework once. Stores the results."""
if not self.coverage:
return
# stop all services, so that their atexit-handler/signal handler stores the result before evaluating the result
if os.path.exists(self.COVERAGE_PTH):
os.remove(self.COVERAGE_PTH)
self.restart_python_services()
for exe in ("coverage", "python3-coverage", "python-coverage"):
coverage_bin = distutils.spawn.find_executable(exe)
if coverage_bin:
break
else:
raise MissingCoverage()
subprocess.call([coverage_bin, '--version'])
subprocess.call([coverage_bin, 'combine'])
subprocess.call([coverage_bin, 'html'])
subprocess.call([coverage_bin, 'report'])
subprocess.call([coverage_bin, 'erase'])
if os.path.exists(self.coverage_config):
os.remove(self.coverage_config)
[docs] @classmethod
def get_argument_group(cls, parser):
# type: (ArgumentParser) -> _ArgumentGroup
"""The option group for ucs-test-framework"""
coverage_group = parser.add_argument_group('Code coverage measurement options')
coverage_group.add_argument("--with-coverage", dest="coverage", action='store_true')
coverage_group.add_argument("--coverage-config", default=os.path.abspath(os.path.expanduser('~/.coveragerc'))) # don't use this, doesn't work!
coverage_group.add_argument("--branch-coverage", action='store_true')
coverage_group.add_argument('--coverage-sources', action='append', default=[])
coverage_group.add_argument("--coverage-debug", action='store_true')
coverage_group.add_argument('--coverage-restart-services', action='append', default=[])
coverage_group.add_argument('--coverage-show-missing', action='store_true')
coverage_group.add_argument("--coverage-output-directory", default=os.path.abspath(os.path.expanduser('~/htmlcov')))
return coverage_group
[docs] @classmethod
def is_candidate(cls, argv):
# type: (List[str]) -> bool
if os.getuid():
return False
exe = os.path.basename(argv[0])
if exe not in {'python', 'python2', 'python2.7', 'python3', 'python3.5', 'python3.7'}:
return False
if not any(s in arg for arg in argv for s in {'univention', 'udm', 'ucs', 'ucr'}):
cls.debug_message('skip non-ucs process', argv)
return False
if any(s in arg for arg in argv[2:] for s in {'listener', 'notifier'}):
# we don't need to cover the listener currently. some tests failed, maybe because of measuring the listener?
cls.debug_message('skip UDL/UDN', argv)
return False
return True
[docs] @classmethod # noqa: C901
def startup(cls):
# type: () -> None
"""Startup function which is invoked by every(!) python process during coverage measurement. If the process is relevant we start measuring coverage."""
argv = open('/proc/%s/cmdline' % os.getpid()).read().split('\x00')
if not cls.is_candidate(argv):
return
cls.debug_message('START', argv)
atexit.register(lambda: cls.debug_message('STOP'))
if not os.environ.get('COVERAGE_PROCESS_START'):
os.environ["COVERAGE_PROCESS_START"] = os.path.abspath(os.path.expanduser('~/.coveragerc'))
cls.debug_message('ENVIRON WAS CLEARED BY PARENT PROCESS', argv)
import coverage
cov = coverage.process_startup()
if not cov:
cls.debug_message('no coverage startup (already started?, environ cleared?): %r' % (os.environ.get('COVERAGE_PROCESS_START'),))
return
cls.coverage = cov
# FIXME: univention-cli-server calls os.fork() which causes the coverage measurement not to start in the forked process
# https://github.com/nedbat/coveragepy/issues/310 # Coverage fails with os.fork and os._exit
osfork = getattr(os, 'fork')
def fork(*args, **kwargs):
# type: (*Any, **Any) -> int
pid = osfork(*args, **kwargs)
if pid == 0:
cls.debug_message('FORK CHILD')
cls.startup()
else:
cls.debug_message('FORK PARENT')
cls.stop_measurement(True)
return pid
os.fork = fork
# https://github.com/nedbat/coveragepy/issues/43 # Coverage measurement fails on code containing os.exec* methods
# if the process calls one of the process-replacement functions the coverage must be started in the new process
for method in ['execl', 'execle', 'execlp', 'execlpe', 'execv', 'execve', 'execvp', 'execvpe', '_exit']:
if isinstance(getattr(os, method), StopCoverageDecorator):
continue # restarted in the same process (e.g. os.fork())
setattr(os, method, StopCoverageDecorator(getattr(os, method)))
# There are test cases which e.g. kill the univention-cli-server.
# The atexit-handler of coverage will not be called for SIGTERM, so we need to stop coverage manually
def sigterm(sig, frame):
# type: (int, Any) -> None
cls.debug_message('signal handler', sig, argv)
cls.stop_measurement()
signal.signal(signal.SIGTERM, previous)
os.kill(os.getpid(), sig)
previous = signal.signal(signal.SIGTERM, sigterm)
[docs] @classmethod
def stop_measurement(cls, start=False):
# type: (bool) -> None
cover = cls.coverage
cls.debug_message('STOP MEASURE', bool(cover))
if not cover:
return
cover.stop()
cover.save()
if start:
cover.start()
[docs] @classmethod
def debug_message(cls, *messages):
# type: (object) -> None
if not cls.COVERAGE_DEBUG:
return
try:
with open(cls.COVERAGE_DEBUG_PATH, 'a') as fd:
fd.write('%s : %s: %s\n' % (os.getpid(), time.time(), ' '.join(repr(m) for m in messages),))
except EnvironmentError:
pass
[docs]class StopCoverageDecorator(object):
inDecorator = False
def __init__(self, method):
# type: (Callable[..., Any]) -> None
self.method = method
def __call__(self, *args, **kw):
# type: (*Any, **Any) -> None
if not StopCoverageDecorator.inDecorator:
StopCoverageDecorator.inDecorator = True
Coverage.debug_message('StopCoverageDecorator', self.method.__name__, open('/proc/%s/cmdline' % os.getpid()).read().split('\x00'))
Coverage.stop_measurement(True)
try:
self.method(*args, **kw)
finally:
StopCoverageDecorator.inDecorator = False
def __repr__(self):
# type: () -> str
return '<StopCoverageDecorator %r>' % (self.method,)