# SPDX-FileCopyrightText: 2017-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""
Get a Python logging object below a listener module root logger.
The new logging object can log to a stream or a file.
The listener module root logger will log messages of all of its children
additionally to the common `listener.log`.
"""
#
# Code mostly copied and adapted from
# ucs-school-4.2/ucs-school-lib/python/models/utils.py
#
import grp
import logging
import os
import pwd
import stat
import syslog
from collections.abc import Mapping
from logging.handlers import WatchedFileHandler
from typing import IO, Any
import univention.debug as ud
from univention.config_registry import ConfigRegistry
import listener
__syslog_opened = False
[docs]
class UniFileHandler(WatchedFileHandler):
"""
Used by listener modules using the :py:mod:`univention.listener` API to
write log files below :file:`/var/log/univention/listener_log/`.
Configuration can be done through the `handler_kwargs` argument of
:py:func:`get_listener_logger`.
"""
def _open(self) -> IO[str]:
newly_created = not os.path.exists(self.baseFilename)
try:
stream = WatchedFileHandler._open(self)
except PermissionError:
self._set_listener_module_log_file_permissions()
stream = WatchedFileHandler._open(self)
if newly_created:
# Bug #55610: When Python creates a new file every user is able to read it
self._set_listener_module_log_file_permissions()
return stream
def _set_listener_module_log_file_permissions(self):
listener_uid = pwd.getpwnam('listener').pw_uid
adm_gid = grp.getgrnam('adm').gr_gid
old_uid = os.geteuid()
try:
if old_uid != 0:
listener.setuid(0)
os.chown(self.baseFilename, listener_uid, adm_gid)
os.chmod(self.baseFilename, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
finally:
if old_uid != 0:
listener.unsetuid()
[docs]
class ModuleHandler(logging.Handler):
"""
Used by listener modules using the :py:mod:`univention.listener` API to
write log messages through :py:mod:`univention.debug` to
:file:`/var/log/univention/listener.log`
"""
LOGGING_TO_UDEBUG = {
"CRITICAL": ud.ERROR,
"ERROR": ud.ERROR,
"WARN": ud.WARN,
"WARNING": ud.WARN,
"INFO": ud.PROCESS,
"DEBUG": ud.INFO,
"NOTSET": ud.INFO,
}
def __init__(self, level: int = logging.NOTSET, udebug_facility: int = ud.LISTENER) -> None:
self._udebug_facility = udebug_facility
super().__init__(level)
[docs]
def emit(self, record: logging.LogRecord) -> None:
msg = self.format(record)
msg = '{}: {}'.format(record.name.rsplit('.')[-1], msg)
udebug_level = self.LOGGING_TO_UDEBUG[record.levelname]
ud.debug(self._udebug_facility, udebug_level, msg)
__LF_D = '%(asctime)s %(levelname)-7s %(module)s.%(funcName)s:%(lineno)d %(message)s'
__LF_I = '%(asctime)s %(levelname)-7s %(message)s'
FILE_LOG_FORMATS = {
"NOTSET": __LF_D,
"DEBUG": __LF_D,
"INFO": __LF_I,
"WARNING": __LF_I,
"WARN": __LF_I,
"ERROR": __LF_I,
"CRITICAL": __LF_I,
}
__LC_D = '%(asctime)s %(levelname)-7s %(module)s.%(funcName)s:%(lineno)d %(message)s'
__LC_I = '%(message)s'
__LC_W = '%(levelname)-7s %(message)s'
CMDLINE_LOG_FORMATS = {
"NOTSET": __LC_D,
"DEBUG": __LC_D,
"INFO": __LC_I,
"WARNING": __LC_W,
"WARN": __LC_W,
"ERROR": __LC_W,
"CRITICAL": __LC_W,
}
UCR_DEBUG_LEVEL_TO_LOGGING_LEVEL = {
0: 'ERROR',
1: 'WARN',
2: 'INFO',
3: 'DEBUG',
4: 'DEBUG',
}
LOG_DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
_logger_cache: dict[str, logging.Logger] = {}
_handler_cache: dict[str, UniFileHandler] = {}
_ucr = ConfigRegistry()
_ucr.load()
def _get_ucr_int(ucr_key: str, default: int) -> int:
try:
return int(_ucr.get(ucr_key, default))
except ValueError:
return default
_listener_debug_level = _get_ucr_int('listener/debug/level', 2)
_listener_debug_level_str = UCR_DEBUG_LEVEL_TO_LOGGING_LEVEL[max(0, min(4, _listener_debug_level))]
_listener_module_handler = ModuleHandler(level=getattr(logging, _listener_debug_level_str))
listener_module_root_logger = logging.getLogger('listener module')
listener_module_root_logger.setLevel(getattr(logging, _listener_debug_level_str))
[docs]
def get_logger(name: str, path: str | None = None) -> logging.Logger:
"""
Get a logging instance. Caching wrapper for
:py:func:`get_listener_logger()`.
:param str name: name of the logger instance will be `<root loggers name>.name`
:param str path: path to log file to create. If unset will be
`/var/log/univention/listener_modules/<name>.log`.
:return: a Python logging object
:rtype: logging.Logger
"""
if name not in _logger_cache:
file_name = name.replace('/', '_')
logger_name = name.replace('.', '_')
log_dir = '/var/log/univention/listener_modules'
file_path = path or os.path.join(log_dir, f'{file_name}.log')
listener_uid = pwd.getpwnam('listener').pw_uid
adm_grp = grp.getgrnam('adm').gr_gid
if not os.path.isdir(log_dir):
old_uid = os.geteuid()
try:
if old_uid != 0:
listener.setuid(0)
os.mkdir(log_dir)
os.chown(log_dir, listener_uid, adm_grp)
os.chmod(
log_dir,
stat.S_ISGID | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP,
)
finally:
if old_uid != 0:
listener.unsetuid()
_logger_cache[name] = get_listener_logger(logger_name, file_path)
return _logger_cache[name]
[docs]
def calculate_loglevel(name: str) -> str:
"""
Returns the higher of `listener/debug/level` and `listener/module/<name>/debug/level`
which is the lower log level.
:param str name: name of logger instance
:return: log level
:rtype: int
"""
listener_module_debug_level = _get_ucr_int(f'listener/module/{name}/debug/level', 2)
# 0 <= ucr level <= 4
return UCR_DEBUG_LEVEL_TO_LOGGING_LEVEL[min(4, max(0, _listener_debug_level, listener_module_debug_level))]
[docs]
def get_listener_logger(name: str, filename: str, level: str | None = None, handler_kwargs: dict[str, Any] | None = None, formatter_kwargs: dict[str, Any] | None = None) -> logging.Logger:
"""
Get a logger object below the listener module root logger. The logger
will additionally log to the common `listener.log`.
* The logger will use UniFileHandler(TimedRotatingFileHandler) for files if
not configured differently through `handler_kwargs[cls]`.
* A call with the same name will return the same logging object.
* There is only one handler per name-target combination.
* If name and target are the same, and only the log level changes, it will
return the logging object with the same handlers and change both the log
level of the respective handler and of the logger object to be the lowest
of the previous and the new level.
* The loglevel will be the lowest one of `INFO` and the UCRVs
`listener/debug/level` and `listener/module/<name>/debug/level`.
* Complete output customization is possible, setting kwargs for the
constructors of the handler and formatter.
* Using custom handler and formatter classes is possible by configuring
the `cls` key of `handler_kwargs` and `formatter_kwargs`.
:param str name: name of the logger instance will be `<root loggers name>.name`
:param str level: loglevel (`DEBUG`, `INFO` etc) or if not set it will be chosen
automatically (see above)
:param str target: (file path)
:param dict handler_kwargs: will be passed to the handlers constructor.
It cannot be used to modify a handler, as it is only used at creation time.
If it has a key `cls` it will be used as handler instead of :py:class:`UniFileHandler`
or :py:class:`UniStreamHandler`. It should be a subclass of one of those!
:param dict formatter_kwargs: will be passed to the formatters constructor,
if it has a key `cls` it will be used to create a formatter instead of
:py:class`logging.Formatter`.
:return: a Python logging object
:rtype: logging.Logger
"""
assert isinstance(filename, str)
assert isinstance(name, str)
if not name:
name = 'noname'
name = name.replace('.', '_') # don't mess up logger hierarchy
cache_key = f'{name}-{filename}'
logger_name = f'{listener_module_root_logger.name}.{name}'
_logger = logging.getLogger(logger_name)
if not level:
level = calculate_loglevel(name)
if cache_key in _handler_cache and getattr(logging, level) >= _handler_cache[cache_key].level:
return _logger
if not isinstance(handler_kwargs, Mapping):
handler_kwargs = {}
if not isinstance(formatter_kwargs, Mapping):
formatter_kwargs = {}
# The logger objects level must be the lowest of all handlers, or handlers
# with a higher level will not be able to log anything.
if getattr(logging, level) < _logger.level:
_logger.setLevel(level)
if _logger.level == logging.NOTSET:
# fresh logger
_logger.setLevel(level)
fmt = FILE_LOG_FORMATS[level]
fmt_kwargs: dict[str, Any] = {"cls": logging.Formatter, "fmt": fmt, "datefmt": LOG_DATETIME_FORMAT}
fmt_kwargs.update(formatter_kwargs)
if cache_key in _handler_cache:
# Check if loglevel from this request is lower than the one used in
# the cached loggers handler. We do only lower level, never raise it.
if getattr(logging, level) < _handler_cache[cache_key].level:
handler = _handler_cache[cache_key]
handler.setLevel(level)
formatter_cls: type[logging.Formatter] = fmt_kwargs.pop('cls')
formatter = formatter_cls(**fmt_kwargs)
handler.setFormatter(formatter)
else:
# Create handler and formatter from scratch.
handler = UniFileHandler(filename=filename, **handler_kwargs)
handler.set_name(logger_name)
handler.setLevel(level)
formatter_cls = fmt_kwargs.pop('cls')
formatter = formatter_cls(**fmt_kwargs)
handler.setFormatter(formatter)
_logger.addHandler(handler)
_handler_cache[cache_key] = handler
if _listener_module_handler not in listener_module_root_logger.handlers:
listener_module_root_logger.addHandler(_listener_module_handler)
return _logger
def _log_to_syslog(level: int, msg: str) -> None:
if not __syslog_opened:
syslog.openlog('Listener', 0, syslog.LOG_SYSLOG)
syslog.syslog(level, msg)
[docs]
def info_to_syslog(msg: str) -> None:
_log_to_syslog(syslog.LOG_INFO, msg)