Source code for univention.debug2

# -*- coding: utf-8 -*-
"""
Python native Univention debugging library.

See :py:mod:`univention.debug` for an alternative being a wrapper for the C
implementation.
"""
# Copyright 2008-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

from __future__ import print_function
from __future__ import absolute_import
import sys
from functools import wraps
from itertools import chain
from warnings import warn
import logging
# import logging.handlers

ERROR = 0
WARN = 1
PROCESS = 2
INFO = 3
ALL = 4
DEFAULT = WARN

# The default levels provided are DEBUG(10), INFO(20), WARNING(30), ERROR(40) and CRITICAL(50).
# Mapping old levels to new ones
_map_lvl_old2new = {
	0: logging.ERROR,    # 40
	1: logging.WARNING,  # 30
	2: 25,               # 25
	3: logging.INFO,     # 20
	4: 15,               # 15
}

MAIN = 0x00
LDAP = 0x01
USERS = 0x02
NETWORK = 0x03
SSL = 0x04
SLAPD = 0x05
SEARCH = 0x06
TRANSFILE = 0x07
LISTENER = 0x08
POLICY = 0x09
ADMIN = 0x0A
CONFIG = 0x0B
LICENSE = 0x0C
KERBEROS = 0x0D
DHCP = 0x0E
PROTOCOL = 0x0F
MODULE = 0x10
ACL = 0x11
RESOURCES = 0x12
PARSER = 0x13
LOCALE = 0x14
AUTH = 0x15

NO_FLUSH = 0x00
FLUSH = 0x01

NO_FUNCTION = 0x00
FUNCTION = 0x01

_map_id_old2new = {
	MAIN: "MAIN",
	LDAP: "LDAP",
	USERS: "USERS",
	NETWORK: "NETWORK",
	SSL: "SSL",
	SLAPD: "SLAPD",
	SEARCH: "SEARCH",
	TRANSFILE: "TRANSFILE",
	LISTENER: "LISTENER",
	POLICY: "POLICY",
	ADMIN: "ADMIN",
	CONFIG: "CONFIG",
	LICENSE: "LICENSE",
	KERBEROS: "KERBEROS",
	DHCP: "DHCP",
	PROTOCOL: "PROTOCOL",
	MODULE: "MODULE",
	ACL: "ACL",
	RESOURCES: "RESOURCES",
	PARSER: "PARSER",
	LOCALE: "LOCALE",
	AUTH: "AUTH",
}


# 13.08.08 13:13:57.123  LISTENER    ( ERROR   ) : listener: 1
# 13.08.08 13:13:57.123  LISTENER    ( WARN    ) : received signal 2
# 13.08.08 13:14:02.123  DEBUG_INIT
_outfmt = '%(asctime)s.%(msecs)03d %(name)-11s (%(levelname)-7s): %(message)s'
_outfmt_syslog = '%(name)-11s (%(levelname)-7s): %(message)s'
_datefmt = '%d.%m.%Y %H:%M:%S'

_logfilename = None
_handler_console = None
_handler_file = None
_handler_syslog = None
_do_flush = False
_enable_function = False
_enable_syslog = False
_logger_level = dict((key, DEFAULT) for key in _map_id_old2new.values())


[docs]def init(logfile, force_flush=0, enable_function=0, enable_syslog=0): """ Initialize debugging library for logging to 'logfile'. :param str logfile: name of the logfile, or 'stderr', or 'stdout'. :param bool force_flush: force flushing of messages (True). :param bool trace_function: enable (True) or disable (False) function tracing. :param bool enable_syslog: enable (True) or disable (False) logging to SysLog. :returns: output file or None. """ global _logfilename, _handler_console, _handler_file, _handler_syslog, _do_flush, _enable_function, _enable_syslog result = None _logfilename = logfile # create root logger logging.basicConfig( level=logging.DEBUG, filename='/dev/null', # disabled format=_outfmt, datefmt=_datefmt ) formatter = logging.Formatter(_outfmt, _datefmt) exit() if logfile in ('stderr', '/dev/stderr', 'stdout', '/dev/stdout'): # add stderr or stdout handler _handler_console = logging.StreamHandler(sys.stdout if logfile in ('stdout', '/dev/stdout') else sys.stderr) _handler_console.setLevel(logging.DEBUG) _handler_console.setFormatter(formatter) logging.getLogger('').addHandler(_handler_console) result = _handler_console.stream else: try: # add file handler _handler_file = logging.FileHandler(logfile, 'a+') _handler_file.setLevel(logging.DEBUG) _handler_file.setFormatter(formatter) logging.getLogger('').addHandler(_handler_file) result = _handler_file.stream except EnvironmentError as ex: print('opening %s failed: %s' % (logfile, ex)) # if enable_syslog: # try: # add syslog handler # _handler_syslog = logging.handlers.SysLogHandler( ('localhost', 514), logging.handlers.SysLogHandler.LOG_ERR ) # _handler_syslog.setLevel( _map_lvl_old2new[ERROR] ) # _handler_syslog.setFormatter(formatter) # logging.getLogger('').addHandler(_handler_syslog) # except: # raise # print('opening syslog failed') logging.addLevelName(25, 'PROCESS') logging.addLevelName(15, 'ALL') logging.addLevelName(100, '------') logging.getLogger('MAIN').log(100, 'DEBUG_INIT') _do_flush = force_flush _enable_function = enable_function _enable_syslog = enable_syslog return result
[docs]def exit(): """ Close debug logfile. """ global _handler_console, _handler_file, _handler_syslog logging.getLogger('MAIN').log(100, 'DEBUG_EXIT') if _handler_console: logging.getLogger('').removeHandler(_handler_console) _handler_console = None if _handler_file: logging.getLogger('').removeHandler(_handler_file) _handler_file = None
[docs]def reopen(): """ Close and re-open the debug logfile. """ logging.getLogger('MAIN').log(100, 'DEBUG_REINIT') init(_logfilename, _do_flush, _enable_function, _enable_syslog)
[docs]def set_level(category, level): """ Set minimum required severity 'level' for facility 'category'. :param int category: ID of the category, e.g. MAIN, LDAP, USERS, ... :param int level: Level of logging, e.g. ERROR, WARN, PROCESS, INFO, ALL """ new_id = _map_id_old2new.get(category, 'MAIN') if level > ALL: level = ALL elif level < ERROR: level = ERROR _logger_level[new_id] = level
[docs]def get_level(category): """ Get minimum required severity for facility 'category'. :param int category: ID of the category, e.g. MAIN, LDAP, USERS, ... :return: Return debug level of category. :rtype: int """ new_id = _map_id_old2new.get(category, 'MAIN') return _logger_level[new_id]
[docs]def set_function(activate): """ Enable or disable the logging of function begins and ends. :param bool activate: enable (True) or disable (False) function tracing. .. deprecated:: 4.4 Use function decorator :py:func:`trace` instead. """ global _enable_function _enable_function = activate
[docs]def debug(category, level, message, utf8=True): """ Log message 'message' of severity 'level' to facility 'category'. :param int category: ID of the category, e.g. MAIN, LDAP, USERS, ... :param int level: Level of logging, e.g. ERROR, WARN, PROCESS, INFO, ALL :param str message: The message to log. :param bool utf8: Assume the message is UTF-8 encoded. """ new_id = _map_id_old2new.get(category, 'MAIN') if level <= _logger_level[new_id]: new_level = _map_lvl_old2new[level] logging.getLogger(new_id).log(new_level, message) _flush()
[docs]class function(object): """ Log function call begin and end. :param str fname: name of the function starting. :param bool utf8: Assume the message is UTF-8 encoded. .. deprecated:: 4.4 Use function decorator :py:func:`trace` instead. >>> def my_func(agr1, agr2=None): ... _d = function('my_func(...)') # noqa: F841 ... return 'yes' >>> my_func(42) 'yes' """ def __init__(self, fname, utf8=True): warn('univention.debug2.function is deprecated and will be removed with UCS-5', PendingDeprecationWarning) self.fname = fname if _enable_function: logging.getLogger('MAIN').log(100, 'UNIVENTION_DEBUG_BEGIN : ' + self.fname) _flush() def __del__(self): """ Log the end of function. """ if _enable_function: logging.getLogger('MAIN').log(100, 'UNIVENTION_DEBUG_END : ' + self.fname) _flush()
[docs]def trace(with_args=True, with_return=False, repr=object.__repr__): """ Log function call, optional with arguments and result. :param bool with_args: Log function arguments. :param bool with_return: Log function result. :param repr: Function accepting a single object and returing a string representation for the given object. Defaults to :py:func:`object.__repr__`, alternative :py:func:`repr`. >>> @trace(with_args=True, with_return=True) ... def my_func(arg1, arg2=None): ... return 'yes' >>> my_func(42) 'yes' >>> class MyClass(object): ... @trace(with_args=True, with_return=True, repr=repr) ... def my_meth(self, arg1, arg2=None): ... return 'yes' >>> MyClass().my_meth(42) 'yes' >>> @trace() ... def my_bug(): ... 1 / 0 >>> my_bug() Traceback (most recent call last): ... ZeroDivisionError: integer division or modulo by zero """ def decorator(f): @wraps(f) def wrapper(*args, **kwargs): fname = '%s.%s' % (f.__module__, f.__name__) _args = ', '.join( chain( (repr(arg) for arg in args), ('%s=%s' % (k, repr(v)) for (k, v) in kwargs.items()), ) ) if with_args else '...' logger = logging.getLogger('MAIN') logger.log(100, 'UNIVENTION_DEBUG_BEGIN : %s(%s): ...', fname, _args) _flush() try: ret = f(*args, **kwargs) except BaseException: try: (exctype, value) = sys.exc_info()[:2] logger.log(100, 'UNIVENTION_DEBUG_END : %s(...): %s(%s)', fname, exctype, value) finally: exctype = value = None raise else: logger.log(100, 'UNIVENTION_DEBUG_END : %s(...): %s', fname, repr(ret) if with_return else '...') return ret return wrapper return decorator
def _flush(): """ Flushing all messages. """ if _do_flush: for handler in [_handler_console, _handler_file, _handler_syslog]: if handler: handler.flush()