Source code for univention.portal.extensions.reloader

#!/usr/bin/python3
#
# Univention Portal
#
# Copyright 2020-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
#

import importlib
import json
import os.path
import shutil
import tempfile
from imghdr import what

import ldap
from ldap.dn import str2dn
from six import BytesIO, with_metaclass
from six.moves.urllib.parse import quote
from univention.portal import Plugin
from univention.portal.log import get_logger


[docs]class Reloader(with_metaclass(Plugin)): """ Our base class for reloading The idea is that this class handles the reloading for caches. `refresh`: In fact the only method. Gets a "reason" so that it can decide that a refresh is not necessary. If it was necessary, it should return True A reason "force" should be treated as very important. If the reloader refreshed the content, the overlying cache will reload itself. """
[docs] def refresh(self, reason=None): # pragma: no cover pass
[docs]class MtimeBasedLazyFileReloader(Reloader): """ Specialized class that reloads if a certain (cache) file was updated. So if a seconds process updated the file and this class is asked to reload, it just returns True. If the reason fits, it actually refreshes the content and writes it into the file. cache_file: Filename this object is responsible for """ def __init__(self, cache_file): self._cache_file = cache_file self._mtime = self._get_mtime() def _get_mtime(self): try: return os.stat(self._cache_file).st_mtime except (EnvironmentError, AttributeError) as exc: get_logger("cache").warning("Unable to get mtime for {}".format(exc)) return 0 def _file_was_updated(self): mtime = self._get_mtime() if mtime > self._mtime: self._mtime = mtime return True def _check_reason(self, reason, content=None): if reason is None: return False if reason == "force": return True
[docs] def refresh(self, reason=None, content=None): if self._check_reason(reason, content=content): get_logger("cache").info("refreshing cache") fd = None try: fd = self._refresh() except Exception: get_logger("cache").exception("Error during refresh") # hopefully, we can still work with an older cache? else: if fd: try: os.makedirs(os.path.dirname(self._cache_file)) except EnvironmentError: pass shutil.move(fd.name, self._cache_file) self._mtime = self._get_mtime() return True return self._file_was_updated()
def _refresh(self): # pragma: no cover pass
[docs]class PortalReloaderUDM(MtimeBasedLazyFileReloader): """ Specialized class that reloads a cache file with the content of a certain portal object using UDM. Reacts on reasons like "ldap:portal:<correct_dn>". portal_dn: DN of the portals/portal object cache_file: Filename this object is responsible for """ def __init__(self, portal_dn, cache_file): super(PortalReloaderUDM, self).__init__(cache_file) self._portal_dn = portal_dn def _check_reason(self, reason, content=None): if super(PortalReloaderUDM, self)._check_reason(reason, content): return True if reason is None: return False reason_args = reason.split(":", 2) if len(reason_args) < 2: return False if reason_args[0] != "ldap": return False return reason_args[1] in ["portal", "category", "entry", "folder"] def _refresh(self): udm_lib = importlib.import_module("univention.udm") try: udm = udm_lib.UDM.machine().version(2) portal = udm.get("portals/portal").get(self._portal_dn) except udm_lib.ConnectionError: get_logger("cache").warning("Could not establish UDM connection. Is the LDAP server accessible?") return None except udm_lib.UnknownModuleType: get_logger("cache").warning("UDM not up to date? Portal module not found.") return None except udm_lib.NoObject: get_logger("cache").warning("Portal %s not found", self._portal_dn) return None content = {} content["portal"] = self._extract_portal(portal) content["categories"] = categories = self._extract_categories(udm, portal) content["folders"] = folders = self._extract_folders(udm, portal, list(categories.values())) content["entries"] = self._extract_entries(udm, portal, list(categories.values()), list(folders.values())) content["user_links"] = self._extract_user_links(portal) content["menu_links"] = self._extract_menu_links(portal) with tempfile.NamedTemporaryFile(mode="w", delete=False) as fd: json.dump(content, fd, sort_keys=True, indent=4) return fd def _extract_portal(self, portal): ret = {} ret["dn"] = portal.dn ret["showUmc"] = portal.props.showUmc if portal.props.logo: ret["logo"] = self._write_image(portal.props.name, portal.props.logo.raw, "logos") else: ret["logo"] = None if portal.props.background: ret["background"] = self._write_image( portal.props.name, portal.props.background.raw, "backgrounds" ) else: ret["background"] = None ret["name"] = portal.props.displayName ret["defaultLinkTarget"] = portal.props.defaultLinkTarget ret["ensureLogin"] = portal.props.ensureLogin ret["categories"] = portal.props.categories return ret def _extract_user_links(self, portal): return portal.props.userLinks def _extract_menu_links(self, portal): return portal.props.menuLinks def _extract_categories(self, udm, portal): ret = {} for category in udm.get("portals/category").search(): in_portal = category.dn in portal.props.categories ret[category.dn] = { "dn": category.dn, "in_portal": in_portal, "display_name": category.props.displayName, "entries": category.props.entries, } return ret def _extract_entries(self, udm, portal, categories, folders): ret = {} def add(entry, ret, in_portal): if entry.dn not in ret: ret[entry.dn] = { "dn": entry.dn, "in_portal": in_portal, "name": entry.props.displayName, "description": entry.props.description, "logo_name": self._save_image(portal, entry), "activated": entry.props.activated, "anonymous": entry.props.anonymous, "allowedGroups": entry.props.allowedGroups, "links": entry.props.link, "linkTarget": entry.props.linkTarget, "backgroundColor": entry.props.backgroundColor, } for obj in udm.get("portals/entry").search(): if obj.dn in portal.props.menuLinks: add(obj, ret, True) continue if obj.dn in portal.props.userLinks: add(obj, ret, True) continue if any(obj.dn in category["entries"] for category in categories if category["in_portal"]): add(obj, ret, True) continue if any(obj.dn in folder["entries"] for folder in folders if folder["in_portal"]): add(obj, ret, True) continue add(obj, ret, False) return ret def _extract_folders(self, udm, portal, categories): ret = {} def add(folder, ret, in_portal): ret[folder.dn] = { "dn": folder.dn, "in_portal": in_portal, "name": folder.props.displayName, "entries": folder.props.entries, } for obj in udm.get("portals/folder").search(): if obj.dn in portal.props.menuLinks: add(obj, ret, True) continue if obj.dn in portal.props.userLinks: add(obj, ret, True) continue if any(obj.dn in category["entries"] for category in categories if category["in_portal"]): add(obj, ret, True) continue add(obj, ret, False) return ret def _write_image(self, name, img, dirname): try: name = name.replace( "/", "-" ) # name must not contain / and must be a path which can be accessed via the web! string_buffer = BytesIO(img) suffix = what(string_buffer) or "svg" fname = "/usr/share/univention-portal/icons/%s/%s.%s" % (dirname, name, suffix) with open(fname, "wb") as fd: fd.write(img) except (EnvironmentError, TypeError, IOError): get_logger("img").exception("Error saving image for %s" % name) else: return "./icons/%s/%s.%s" % ( quote(dirname), quote(name), quote(suffix), ) def _save_image(self, portal, entry): img = entry.props.icon if img: return self._write_image(entry.props.name, img.raw, "entries")
[docs]class GroupsReloaderLDAP(MtimeBasedLazyFileReloader): """ Specialized class that reloads a cache file with the content of group object in LDAP. Reacts on the reason "ldap:group" ldap_uri: URI for the LDAP connection, e.g. "ldap://ucs:7369" binddn: The bind dn for the connection, e.g. "cn=ucs,cn=computers,..." password_file: Filename that holds the password for the binddn, e.g. "/etc/machine.secret" ldap_base: Base in which the groups are searched in. E.g., "dc=base,dc=com" or "cn=groups,ou=OU1,dc=base,dc=com" cache_file: Filename this object is responsible for """ def __init__(self, ldap_uri, binddn, password_file, ldap_base, cache_file): super(GroupsReloaderLDAP, self).__init__(cache_file) self._ldap_uri = ldap_uri self._bind_dn = binddn self._password_file = password_file self._ldap_base = ldap_base def _check_reason(self, reason, content=None): if super(GroupsReloaderLDAP, self)._check_reason(reason, content): return True if reason is None: return False if reason.startswith("ldap:group"): return True def _refresh(self): try: with open(self._password_file) as fd: password = fd.read().rstrip("\n") except EnvironmentError: get_logger("cache").warning("Unable to read {}".format(self._password_file)) return None con = ldap.initialize(self._ldap_uri) con.simple_bind_s(self._bind_dn, password) ldap_content = {} users = {} groups = con.search_s(self._ldap_base, ldap.SCOPE_SUBTREE, u"(objectClass=posixGroup)") for dn, attrs in groups: usernames = [] groups = [] member_uids = [member.decode("utf-8").lower() for member in attrs.get("memberUid", [])] unique_members = [ member.decode("utf-8").lower() for member in attrs.get("uniqueMember", []) ] for member in member_uids: if not member.endswith("$"): usernames.append(member.lower()) for member in unique_members: if member.startswith("cn="): member_uid = str2dn(member)[0][0][1] if "{}$".format(member_uid) not in member_uids: groups.append(member) ldap_content[dn.lower()] = {"usernames": usernames, "groups": groups} groups_with_nested_groups = {} for group_dn in ldap_content: self._nested_groups(group_dn, ldap_content, groups_with_nested_groups) for group_dn, attrs in ldap_content.items(): for user in attrs["usernames"]: groups = users.setdefault(user, set()) groups.update(groups_with_nested_groups[group_dn]) users = dict((user, list(groups)) for user, groups in users.items()) with tempfile.NamedTemporaryFile(mode="w", delete=False) as fd: json.dump(users, fd, sort_keys=True, indent=4) return fd def _nested_groups(self, dn, ldap_content, nested_groups_cache): if dn in nested_groups_cache: return nested_groups_cache[dn] ret = set([dn]) for group_dn in ldap_content.get(dn, {}).get("groups", []): ret.update(self._nested_groups(group_dn, ldap_content, nested_groups_cache)) nested_groups_cache[dn] = ret return ret