# -*- coding: utf-8 -*-
# Univention UCS@school
# Copyright 2016-2021 Univention GmbH
# All rights reserved.
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <>.

Loader for Python based hooks.

import imp
import importlib
import inspect
import logging
import os.path
from collections import defaultdict
from os import listdir
from typing import IO, TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar, Union

from six import iteritems, string_types

    import logging.Logger

    from .pyhooks import PyHook

    PyHookTV = TypeVar("PyHookTV", bound=PyHook)

[docs]class PyHooksLoader(object): """ Loader for PyHooks. Use get_hook_objects() to get initialized and sorted objects. Use get_hook_classes() if you want to initialize them yourself. """ _hook_classes = {} # type: Dict[str, List[Type[PyHookTV]]] def __init__(self, base_dir, base_class, logger=None, filter_func=None): # type: (str, Union[str, Type[PyHookTV]], Optional[logging.Logger], Optional[Callable[[Type[PyHookTV]], bool]]) -> None # noqa: E501 """ Hint: if you wish to pass a logging instance to a hook, add it to the arguments list of :py:meth:`get_hook_objects()` and receive it in the hooks :py:meth:`__init__()` method. If `filter_func` is a callable, it will be passed each class that is considered for loading and it can decide if it should be loaded or not. Thus its signature is `(type) -> bool`. :param str base_dir: path to a directory containing Python files :param base_class: only subclasses of this class will be imported. This can be either a class object or the fully dotted Python path to a class (the latter helps to prevent import loops). :type base_class: str or type :param logging.Logger logger: Python logging instance to use for loader logging (deprecated, ignored) :param Callable filter_func: function that takes a class and returns a bool """ self.base_dir = base_dir self.base_class = self.hook_cls2importpyhook(base_class, "base_class") self.base_class_name = self.base_class.__name__ self.logger = logging.getLogger(__name__) # type: logging.Logger if filter_func and not callable(filter_func): raise TypeError("Argument 'filter_func' must be a callable, got {!r}.".format(filter_func)) self._filter_func = filter_func self._pyhook_obj_cache = None # type: Union[None, Dict[str, List[Callable[..., Any]]]]
[docs] def drop_cache(self): # type: () -> None """ Drop the cache of loaded hook classes and force a rerun of the filesystem search, next time get_hook_classes() or get_pyhook_objects() is called. :return: None """ self._pyhook_obj_cache = None if self.base_class_name in self._hook_classes: del self._hook_classes[self.base_class_name]
[docs] def get_hook_classes(self): # type: () -> List[Type[PyHookTV]] """ Search hook files in filesystem and load classes. No objects are initialized, no sorting is done. :return: list of PyHook subclasses :rtype: list[type] """ if self._hook_classes.get(self.base_class_name) is None: "Searching for hooks of type %r in: %s...", self.base_class_name, self.base_dir ) self._hook_classes[self.base_class_name] = [] if self._filter_func: filter_func = self._filter_func else: filter_func = lambda x: True # noqa: E731 for filename in listdir(self.base_dir): if filename.endswith(".py") and os.path.isfile(os.path.join(self.base_dir, filename)): info = None try: info = imp.find_module(filename[:-3], [self.base_dir]) a_class = self._load_hook_class(filename[:-3], info, self.base_class) finally: if info and isinstance(info, tuple) and info[0] is not None: info[0].close() if a_class: if filter_func(a_class): self._hook_classes[self.base_class_name].append(a_class) else: "Hook class %r filtered out by %s().", a_class.__name__, filter_func.__name__, ) "Found hook classes: %s", ", ".join(c.__name__ for c in self._hook_classes[self.base_class_name]), ) return self._hook_classes[self.base_class_name]
[docs] def get_hook_objects(self, *args, **kwargs): # type: (*Any, **Any) -> Dict[str, List[Callable[..., Any]]] """ Get initialized hook objects, sorted by method and priority. :param tuple args: arguments to pass to __init__ of hooks :param dict kwargs: arguments to pass to __init__ of hooks :return: mapping from method names to list of methods of initialized hook objects, sorted by method priority :rtype: Dict[str, List[Callable]] """ if self._pyhook_obj_cache is None: pyhook_objs = [] for pyhook_cls in self.get_hook_classes(): try: pyhook_objs.append(pyhook_cls(*args, **kwargs)) except Exception as exc: self.logger.exception( "Initializing hook class %r with args=%r and kwargs=%r: %s", pyhook_cls, args, kwargs, exc, ) raise # fill cache: find all enabled hook methods methods = defaultdict(list) # type: Dict[str, List[Tuple[Callable[..., Any], int]]] for pyhook_obj in pyhook_objs: if not hasattr(pyhook_obj, "priority") or not isinstance(pyhook_obj.priority, dict): self.logger.warning( 'Ignoring hook %r without/invalid "priority" attribute.', pyhook_obj ) continue for meth_name, prio in iteritems(pyhook_obj.priority): if hasattr(pyhook_obj, meth_name) and isinstance( pyhook_obj.priority.get(meth_name), int ): methods[meth_name].append( (getattr(pyhook_obj, meth_name), pyhook_obj.priority[meth_name]) ) elif hasattr(pyhook_obj, meth_name) and pyhook_obj.priority.get(meth_name) is None: pass else: self.logger.warning("Ignoring invalid priority item (%r : %r).", meth_name, prio) # sort by priority self._pyhook_obj_cache = {} for meth_name, meth_list in iteritems(methods): self._pyhook_obj_cache[meth_name] = [ x[0] for x in sorted(meth_list, key=lambda x: x[1], reverse=True) ] "Loaded hooks: %r.", dict( [ ( meth_name, [ "{}.{}".format(m.__self__.__class__.__name__, m.__func__.__name__) for m in meths ], ) for meth_name, meths in iteritems(self._pyhook_obj_cache) ] ), ) return self._pyhook_obj_cache
@staticmethod def _load_hook_class(module_name, info, super_class): # type: (str, Tuple[IO, str, Tuple[str, str, int]], Type[PyHookTV]) -> Optional[Type[PyHookTV]] try: res = imp.load_module(module_name, *info) except (ImportError, NameError) as exc: logging.getLogger(__name__).exception("Loading modul %r (%r): %s", module_name, info[1], exc) return None for thing in dir(res): candidate = getattr(res, thing) if ( inspect.isclass(candidate) and issubclass(candidate, super_class) and candidate is not super_class ): return candidate return None
[docs] @staticmethod def hook_cls2importpyhook(hook_cls_arg, arg_name): # type: (Union[Type[PyHookTV], str], str) -> Type[PyHookTV] error_msg = ( "Argument {!r} must be a class object or the fully dotted Python path to a class.".format( arg_name ) ) if isinstance(hook_cls_arg, string_types): try: _module_name, _class_name = hook_cls_arg.rsplit(".", 1) _module = importlib.import_module(_module_name) base_class = getattr(_module, _class_name) # type: Type[PyHookTV] except (AttributeError, ImportError, ValueError) as exc: raise TypeError("{} : {}".format(error_msg, exc)) if not inspect.isclass(base_class): raise ValueError( "Loaded module {!r} and its attribute {!r}, but it is not a class.".format( _module, base_class ) ) elif inspect.isclass(hook_cls_arg): base_class = hook_cls_arg # type: Type[PyHookTV] else: raise TypeError(error_msg) return base_class