Source code for univention.portal.extensions.reloader

#!/usr/bin/python3
#
# Univention Portal
#
# SPDX-FileCopyrightText: 2020-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
#

import importlib
import json
import os.path
import shutil
import tempfile
from pathlib import Path
from urllib.parse import quote

from filetype import guess

from univention.portal import Plugin, config
from univention.portal.log import get_logger


[docs] class Reloader(metaclass=Plugin): """ Our base class for reloading The idea is that this class handles the reloading for caches. `refresh`: In fact the only method. Gets a "reason" so that it can decide that a refresh is not necessary. If it was necessary, it should return True A reason "force" should be treated as very important. If the reloader refreshed the content, the overlying cache will reload itself. """
[docs] def refresh(self, reason=None): # pragma: no cover pass
[docs] class MtimeBasedLazyFileReloader(Reloader): """ Specialized class that reloads if a certain (cache) file was updated. So if a seconds process updated the file and this class is asked to reload, it just returns True. If the reason fits, it actually refreshes the content and writes it into the file. cache_file: Filename this object is responsible for """ def __init__(self, cache_file): self._cache_file = cache_file self._mtime = self._get_mtime() def _get_mtime(self): try: return os.stat(self._cache_file).st_mtime except (OSError, AttributeError) as exc: get_logger("cache").warning(f"Unable to get mtime for {exc}") return 0 def _file_was_updated(self): mtime = self._get_mtime() if mtime > self._mtime: self._mtime = mtime return True def _check_reason(self, reason, content=None): if reason is None: return False if reason == "force": return True
[docs] def refresh(self, reason=None, content=None): if self._check_reason(reason, content=content): get_logger("cache").info("refreshing cache") fd = None try: fd = self._refresh() except Exception: get_logger("cache").exception("Error during refresh") # hopefully, we can still work with an older cache? else: if fd: try: os.makedirs(os.path.dirname(self._cache_file)) except OSError: pass shutil.move(fd.name, self._cache_file) self._mtime = self._get_mtime() return True return self._file_was_updated()
def _refresh(self): # pragma: no cover pass
[docs] class PortalReloaderUDM(MtimeBasedLazyFileReloader): """ Specialized class that reloads a cache file with the content of a certain portal object using UDM. Reacts on reasons like "ldap:portal:<correct_dn>". portal_dn: DN of the portals/portal object cache_file: Filename this object is responsible for """ def __init__(self, portal_dn, cache_file): super().__init__(cache_file) self._portal_dn = portal_dn def _check_reason(self, reason, content=None): if super()._check_reason(reason, content): return True if reason is None: return False reason_args = reason.split(":", 2) if len(reason_args) < 2: return False if reason_args[0] != "ldap": return False return reason_args[1] in ["portal", "category", "entry", "folder", "announcement"] def _refresh(self): udm_lib = importlib.import_module("univention.udm") try: udm = udm_lib.UDM.machine(prefer_local_connection=True).version(2) portal_data = udm.get("portals/portal").get(self._portal_dn) except udm_lib.ConnectionError: get_logger("cache").warning("Could not establish UDM connection. Is the LDAP server accessible?") return None except udm_lib.UnknownModuleType: get_logger("cache").warning("UDM not up to date? Portal module not found.") return None except udm_lib.NoObject: get_logger("cache").warning("Portal %s not found", self._portal_dn) return None portal = self._extract_portal(portal_data) categories = self._extract_categories(udm, portal_data) user_links = portal_data.props.userLinks menu_links = portal_data.props.menuLinks folders = self._extract_folders(udm, portal_data, list(categories.values())) entries = self._extract_entries(udm, portal_data, list(categories.values()), list(folders.values())) announcements = self._extract_announcements(udm, portal_data) with tempfile.NamedTemporaryFile(mode="w", delete=False) as fd: json.dump( { "portal": portal, "categories": categories, "folders": folders, "entries": entries, "user_links": user_links, "menu_links": menu_links, "announcements": announcements, }, fd, sort_keys=True, indent=4, ) return fd @classmethod def _extract_portal(cls, portal_data): portal = { "dn": portal_data.dn, "showUmc": portal_data.props.showUmc, "logo": portal_data.props.logo, "background": portal_data.props.background, "name": portal_data.props.displayName, "defaultLinkTarget": portal_data.props.defaultLinkTarget, "ensureLogin": portal_data.props.ensureLogin, "categories": portal_data.props.categories, } portal_name = portal_data.props.name if portal["logo"]: portal["logo"] = cls._write_image(portal_data.props.logo.raw, portal_name, "logos") if portal["background"]: portal["background"] = cls._write_image(portal_data.props.background.raw, portal_name, "backgrounds") return portal @classmethod def _extract_categories(cls, udm, portal): categories = {} for category in udm.get("portals/category").search(): categories[category.dn] = { "dn": category.dn, "in_portal": category.dn in portal.props.categories, "display_name": category.props.displayName, "entries": category.props.entries, } return categories @classmethod def _extract_folders(cls, udm, portal, categories): folders = {} for folder in udm.get("portals/folder").search(): in_portal = ( folder.dn in portal.props.menuLinks or folder.dn in portal.props.userLinks or any(folder.dn in category["entries"] for category in categories if category["in_portal"]) ) folders[folder.dn] = { "dn": folder.dn, "in_portal": in_portal, "name": folder.props.displayName, "entries": folder.props.entries, } return folders @classmethod def _extract_entries(cls, udm, portal, categories, folders): entries = {} for entry in udm.get("portals/entry").search(): if entry.dn in entries: continue in_portal = ( entry.dn in portal.props.menuLinks or entry.dn in portal.props.userLinks or any(entry.dn in category["entries"] for category in categories if category["in_portal"]) or any(entry.dn in folder["entries"] for folder in folders if folder["in_portal"]) ) icon_url = None if entry.props.icon: icon_url = cls._write_image(entry.props.icon.raw, entry.props.name, "entries") entries[entry.dn] = { "dn": entry.dn, "in_portal": in_portal, "name": entry.props.displayName, "description": entry.props.description, 'keywords': entry.props.keywords, "icon_url": icon_url, "activated": entry.props.activated, "anonymous": entry.props.anonymous, "allowedGroups": entry.props.allowedGroups, "links": entry.props.link, "linkTarget": entry.props.linkTarget, "target": entry.props.target, "backgroundColor": entry.props.backgroundColor, } return entries @classmethod def _extract_announcements(cls, udm, portal): udm_lib = importlib.import_module("univention.udm") announcements = {} try: announcement_module = udm.get("portals/announcement") except udm_lib.UnknownModuleType: announcement_module = None if not announcement_module: get_logger("cache").warning("UDM not up to date? Announcement module not found.") return announcements for announcement in announcement_module.search(): announcements[announcement.dn] = { "dn": announcement.dn, "allowedGroups": announcement.props.allowedGroups, "name": announcement.props.name, "message": announcement.props.message, "title": announcement.props.title, "visibleFrom": str(announcement.props.visibleFrom), "visibleUntil": str(announcement.props.visibleUntil), "isSticky": announcement.props.isSticky, "needsConfirmation": announcement.props.needsConfirmation, "severity": announcement.props.severity, } return announcements @classmethod def _write_image(cls, image, name, dirname): assets_root = Path(config.fetch("assets_root")) try: name = name.replace( "/", "-", ) # name must not contain / and must be a path which can be accessed via the web! extension = getattr(guess(image), "extension", "svg") path = assets_root / "icons" / dirname / f"{name}.{extension}" path.write_bytes(image) except (OSError, TypeError): get_logger("img").exception("Error saving image for %s" % name) else: return f"./icons/{quote(dirname)}/{quote(name)}.{extension}"
[docs] class GroupsReloaderLDAP(MtimeBasedLazyFileReloader): """ Specialized class that reloads a cache file with the content of group object in LDAP. Reacts on the reason "ldap:group". .. warning:: As of 4.0.7-8 we use univention-group-membership-cache to obtain groups user belongs to; but we cannot change the constructor kwargs because customers may have added entries to /usr/share/univention-portal/portals.json that still uses them. ldap_uri: URI for the LDAP connection, e.g. "ldap://ucs:7369" binddn: The bind dn for the connection, e.g. "cn=ucs,cn=computers,..." password_file: Filename that holds the password for the binddn, e.g. "/etc/machine.secret" ldap_base: Base in which the groups are searched in. E.g., "dc=base,dc=com" or "cn=groups,ou=OU1,dc=base,dc=com" cache_file: Filename this object is responsible for """ def __init__(self, ldap_uri, binddn, password_file, ldap_base, cache_file): super().__init__(cache_file) def _check_reason(self, reason, content=None): if super()._check_reason(reason, content): return True if reason is None: return False if reason.startswith("ldap:group"): return True def _refresh(self): from univention.ldap_cache.frontend import users_groups users = users_groups() with tempfile.NamedTemporaryFile(mode="w", delete=False) as fd: json.dump(users, fd, sort_keys=True, indent=4) return fd