Source code for univention.debug2

"""
Python native Univention debugging library.

See :py:mod:`univention.debug` for an alternative being a wrapper for the C
implementation.
"""
# SPDX-FileCopyrightText: 2008-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only


import logging
import sys
from functools import wraps
from itertools import chain
from warnings import warn


# import logging.handlers

ERROR = 0
WARN = 1
PROCESS = 2
INFO = 3
ALL = DEBUG = 4
TRACE = 5
DEFAULT = WARN

# The default levels provided are DEBUG(10), INFO(20), WARNING(30), ERROR(40) and CRITICAL(50).
# Mapping old levels to new ones
_map_lvl_old2new = {
    0: logging.ERROR,    # 40
    1: logging.WARNING,  # 30
    2: 25,               # 25
    3: logging.INFO,     # 20
    4: logging.DEBUG + 5,  # 15
    5: 5,                # 5
}

MAIN = 0x00
LDAP = 0x01
USERS = 0x02
NETWORK = 0x03
SSL = 0x04
SLAPD = 0x05
SEARCH = 0x06
TRANSFILE = 0x07
LISTENER = 0x08
POLICY = 0x09
ADMIN = 0x0A
CONFIG = 0x0B
LICENSE = 0x0C
KERBEROS = 0x0D
DHCP = 0x0E
PROTOCOL = 0x0F
MODULE = 0x10
ACL = 0x11
RESOURCES = 0x12
PARSER = 0x13
LOCALE = 0x14
AUTH = 0x15

NO_FLUSH = 0x00
FLUSH = 0x01

NO_FUNCTION = 0x00
FUNCTION = 0x01

_map_id_old2new = {
    MAIN: "MAIN",
    LDAP: "LDAP",
    USERS: "USERS",
    NETWORK: "NETWORK",
    SSL: "SSL",
    SLAPD: "SLAPD",
    SEARCH: "SEARCH",
    TRANSFILE: "TRANSFILE",
    LISTENER: "LISTENER",
    POLICY: "POLICY",
    ADMIN: "ADMIN",
    CONFIG: "CONFIG",
    LICENSE: "LICENSE",
    KERBEROS: "KERBEROS",
    DHCP: "DHCP",
    PROTOCOL: "PROTOCOL",
    MODULE: "MODULE",
    ACL: "ACL",
    RESOURCES: "RESOURCES",
    PARSER: "PARSER",
    LOCALE: "LOCALE",
    AUTH: "AUTH",
}


# 13.08.2008 13:13:57.123  LISTENER    ( ERROR   ) : listener: 1
# 13.08.2008 13:13:57.123  LISTENER    ( WARN    ) : received signal 2
# 13.08.2008 13:14:02.123  DEBUG_INIT
_outfmt = '%(asctime)s.%(msecs)03d %(name)-11s (%(levelname)-7s): %(message)s'
_outfmt_syslog = '%(name)-11s (%(levelname)-7s): %(message)s'
_datefmt = '%d.%m.%Y %H:%M:%S'


class _Formatter(logging.Formatter):
    def format(self, record):
        if record.name.startswith('ud2.'):
            record.name = record.name.split('.', 1)[-1]
        return super().format(record)


_logfilename = None
_handler_console = None
_handler_file = None
_handler_syslog = None
_do_flush = False
_enable_function = False
_enable_syslog = False
_logger_level = {key: DEFAULT for key in _map_id_old2new.values()}  # noqa: C420
_use_structured = False


[docs] def init(logfile, force_flush=0, enable_function=0, enable_syslog=0): """ Initialize debugging library for logging to 'logfile'. :param str logfile: name of the logfile, or 'stderr', or 'stdout'. :param bool force_flush: force flushing of messages (True). :param bool trace_function: enable (True) or disable (False) function tracing. :param bool enable_syslog: enable (True) or disable (False) logging to SysLog. :returns: output file or None. """ global _logfilename, _handler_console, _handler_file, _do_flush, _enable_function, _enable_syslog # _handler_syslog result = None _logfilename = logfile null_handler = logging.NullHandler() ud2_base_logger = logging.getLogger('ud2') ud2_base_logger.setLevel(_map_lvl_old2new[TRACE]) ud2_base_logger.handlers = [null_handler] ud2_base_logger.propagate = False if _use_structured: from univention.logging import StructuredFormatter formatter = StructuredFormatter(with_date_prefix=True) else: formatter = _Formatter(_outfmt, _datefmt) exit() if logfile in ('stderr', '/dev/stderr', 'stdout', '/dev/stdout'): # add stderr or stdout handler _handler_console = logging.StreamHandler(sys.stdout if logfile in ('stdout', '/dev/stdout') else sys.stderr) _handler_console.setLevel(_map_lvl_old2new[TRACE]) _handler_console.setFormatter(formatter) logging.getLogger('ud2').addHandler(_handler_console) logging.getLogger('ud2').removeHandler(null_handler) result = _handler_console.stream else: try: # add file handler _handler_file = logging.FileHandler(logfile, 'a+') _handler_file.setLevel(_map_lvl_old2new[TRACE]) _handler_file.setFormatter(formatter) logging.getLogger('ud2').addHandler(_handler_file) logging.getLogger('ud2').removeHandler(null_handler) result = _handler_file.stream except OSError as ex: print('opening %s failed: %s' % (logfile, ex)) # if enable_syslog: # try: # add syslog handler # _handler_syslog = logging.handlers.SysLogHandler( ('localhost', 514), logging.handlers.SysLogHandler.LOG_ERR ) # _handler_syslog.setLevel( _map_lvl_old2new[ERROR] ) # _handler_syslog.setFormatter(formatter) # logging.getLogger('').addHandler(_handler_syslog) # except: # raise # print('opening syslog failed') logging.addLevelName(25, 'PROCESS') logging.addLevelName(15, 'ALL') logging.addLevelName(5, 'TRACE') logging.addLevelName(100, '------') logging.getLogger('ud2').getChild('MAIN').log(100, 'DEBUG_INIT') _do_flush = force_flush _enable_function = enable_function _enable_syslog = enable_syslog return result
[docs] def exit(): """Close debug logfile.""" global _handler_console, _handler_file logging.getLogger('ud2').getChild('MAIN').log(100, 'DEBUG_EXIT') if _handler_console: logging.getLogger('').removeHandler(_handler_console) _handler_console = None if _handler_file: logging.getLogger('').removeHandler(_handler_file) _handler_file = None
[docs] def reopen(): """Close and re-open the debug logfile.""" logging.getLogger('ud2').getChild('MAIN').log(100, 'DEBUG_REINIT') init(_logfilename, _do_flush, _enable_function, _enable_syslog)
[docs] def set_level(category, level): """ Set minimum required severity 'level' for facility 'category'. :param int category: ID of the category, e.g. MAIN, LDAP, USERS, ... :param int level: Level of logging, e.g. ERROR, WARN, PROCESS, INFO, ALL, TRACE """ new_id = _map_id_old2new.get(category, 'MAIN') if level > TRACE: # pragma: no cover level = TRACE elif level < ERROR: # pragma: no cover level = ERROR _logger_level[new_id] = level
[docs] def get_level(category): """ Get minimum required severity for facility 'category'. :param int category: ID of the category, e.g. MAIN, LDAP, USERS, ... :return: Return debug level of category. :rtype: int """ new_id = _map_id_old2new.get(category, 'MAIN') return _logger_level[new_id]
[docs] def set_function(activate): """ Enable or disable the logging of function begins and ends. :param bool activate: enable (True) or disable (False) function tracing. .. deprecated:: 4.4 Use function decorator :py:func:`trace` instead. """ global _enable_function _enable_function = activate
[docs] def debug(category, level, message, utf8=True): """ Log message 'message' of severity 'level' to facility 'category'. :param int category: ID of the category, e.g. MAIN, LDAP, USERS, ... :param int level: Level of logging, e.g. ERROR, WARN, PROCESS, INFO, ALL, TRACE :param str message: The message to log. :param bool utf8: Assume the message is UTF-8 encoded. """ new_id = _map_id_old2new.get(category, 'MAIN') if level <= _logger_level[new_id]: new_level = _map_lvl_old2new[level] logging.getLogger('ud2').getChild(new_id).log(new_level, message) _flush()
[docs] class function: """ Log function call begin and end. :param str fname: name of the function starting. :param bool utf8: Assume the message is UTF-8 encoded. .. deprecated:: 4.4 Use function decorator :py:func:`trace` instead. >>> def my_func(agr1, agr2=None): ... _d = function('my_func(...)') # noqa: F841 ... return 'yes' >>> my_func(42) 'yes' """ def __init__(self, fname, utf8=True): warn('univention.debug2.function is deprecated and will be removed with UCS-5', PendingDeprecationWarning, stacklevel=2) self.fname = fname if _enable_function: logging.getLogger('ud2').getChild('MAIN').log(100, 'UNIVENTION_DEBUG_BEGIN : ' + self.fname) _flush() def __del__(self): """Log the end of function.""" if _enable_function: logging.getLogger('ud2').getChild('MAIN').log(100, 'UNIVENTION_DEBUG_END : ' + self.fname) _flush()
[docs] def set_structured(use_structured=False): global _use_structured _use_structured = use_structured
[docs] def trace(with_args=True, with_return=False, repr=object.__repr__): """ Log function call, optional with arguments and result. :param bool with_args: Log function arguments. :param bool with_return: Log function result. :param repr: Function accepting a single object and returing a string representation for the given object. Defaults to :py:func:`object.__repr__`, alternative :py:func:`repr`. >>> @trace(with_args=True, with_return=True) ... def my_func(arg1, arg2=None): ... return 'yes' >>> my_func(42) 'yes' >>> class MyClass(object): ... @trace(with_args=True, with_return=True, repr=repr) ... def my_meth(self, arg1, arg2=None): ... return 'yes' >>> MyClass().my_meth(42) 'yes' >>> @trace() ... def my_bug(): ... 1 / 0 >>> my_bug() Traceback (most recent call last): ... ZeroDivisionError: integer division or modulo by zero """ def decorator(f): @wraps(f) def wrapper(*args, **kwargs): fname = '%s.%s' % (f.__module__, f.__name__) _args = ', '.join( chain( (repr(arg) for arg in args), ('%s=%s' % (k, repr(v)) for (k, v) in kwargs.items()), ), ) if with_args else '...' logger = logging.getLogger('ud2').getChild('MAIN') logger.log(100, 'UNIVENTION_DEBUG_BEGIN : %s(%s): ...', fname, _args) _flush() try: ret = f(*args, **kwargs) except BaseException: try: (exctype, value) = sys.exc_info()[:2] logger.log(100, 'UNIVENTION_DEBUG_END : %s(...): %s(%s)', fname, exctype, value) finally: exctype = value = None raise else: logger.log(100, 'UNIVENTION_DEBUG_END : %s(...): %s', fname, repr(ret) if with_return else '...') return ret return wrapper return decorator
def _flush(): """Flushing all messages.""" if _do_flush: for handler in [_handler_console, _handler_file, _handler_syslog]: if handler: handler.flush()