#!/usr/bin/python3
#
# Univention Portal
#
# SPDX-FileCopyrightText: 2020-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
#
import os.path
import time
from urllib.parse import urljoin
import requests
import requests.exceptions
from univention.portal import Plugin, config
from univention.portal.log import get_logger
from univention.portal.util import is_current_time_between as is_announcement_visible_now
[docs]
class Portal(metaclass=Plugin):
"""
Base (and maybe only) class for a Portal.
It is the only interface exposed to the portal tools, so you could
replace it entirely. But these methods need to be implemented:
:param get_user: Get the user for the current request
:param login_user: New login for a user
:param login_request: An anonymous user wants to login
:param get_visible_content:
The content that the frontend shall present.
Should be filtered by the "user". Also gets "admin_mode", a
boolean indicating whether the user requested all the content
(and is authorized to do so)
:param get_user_links:
Get the user links in the portal, filtered by "user"
and "admin_mode"
:param get_menu_links:
Get the menu links in the portal, filtered by "user"
and "admin_mode"
:param get_entries:
Get all entries of "content", which in turn was the
return value of ``get_visible_content``
:param get_folders:
Get all folders of "content", which in turn was the
return value of ``get_visible_content``
:param get_categories:
Get all categories of "content", which in turn was the
return value of ``get_visible_content``
:param auth_mode: Mode for auth based on given "request"
:param may_be_edited: Whether a "user" may edit this portal
:param get_meta:
Get some information about the portal itself, given
"content" and "categories". Those were return values of
``get_visible_content`` and ``get_categories``.
:param refresh:
Refresh the portal data if needed ("reason" acts as a hint).
Thereby allows the object to cache its content.
:param score:
If multiple portals are configured, use the one with the
highest score for a given "request".
:ivar scorer: Object that does the actual scoring. Meant to get a ``Scorer`` object
:ivar portal_cache: Object that holds the cache. Meant to get a ``Cache`` object
:ivar authenticator: Object that does the whole auth thing. Meant to the a ``Authenticator`` object
"""
def __init__(self, scorer, portal_cache, authenticator):
self.scorer = scorer
self.portal_cache = portal_cache
self.authenticator = authenticator
[docs]
def get_cache_id(self):
return self.portal_cache.get_id()
[docs]
async def get_user(self, request):
return await self.authenticator.get_user(request)
[docs]
async def login_user(self, request):
return await self.authenticator.login_user(request)
[docs]
async def login_request(self, request):
return await self.authenticator.login_request(request)
[docs]
async def logout_user(self, request):
return await self.authenticator.logout_user(request)
[docs]
def get_visible_content(self, user, admin_mode):
entries = self.portal_cache.get_entries()
folders = self.portal_cache.get_folders()
categories = self.portal_cache.get_categories()
announcements = self.portal_cache.get_announcements()
visible_entry_dns = self._filter_entry_dns(entries.keys(), entries, user, admin_mode)
visible_folder_dns = [
folder_dn
for folder_dn in folders.keys()
if admin_mode or len(
[
entry_dn
for entry_dn in self._get_all_entries_of_folder(folder_dn, folders, entries)
if entry_dn in visible_entry_dns
],
) > 0
]
visible_category_dns = [
category_dn
for category_dn in categories.keys()
if admin_mode or len(
[
entry_dn
for entry_dn in categories[category_dn]["entries"]
if entry_dn in visible_entry_dns or entry_dn in visible_folder_dns
],
) > 0
]
visible_announcement_dns = [
announcement_dn
for announcement_dn, announcement in announcements.items()
if self._announcement_visible(user, announcement)
]
return {
"entry_dns": visible_entry_dns,
"folder_dns": visible_folder_dns,
"category_dns": visible_category_dns,
"announcement_dns": visible_announcement_dns,
}
[docs]
def get_user_links(self, content):
links = self.portal_cache.get_user_links()
return [
dn for dn in links if dn in content["entry_dns"] or dn in content["folder_dns"]
]
[docs]
def get_entries(self, content):
entries = self.portal_cache.get_entries()
return [entries[entry_dn] for entry_dn in content["entry_dns"]]
[docs]
def get_folders(self, content):
folders = self.portal_cache.get_folders()
folders = [folders[folder_dn] for folder_dn in content["folder_dns"]]
for folder in folders:
folder["entries"] = [
entry_dn
for entry_dn in folder["entries"]
if entry_dn in content["entry_dns"] or entry_dn in content["folder_dns"]
]
return folders
[docs]
def get_categories(self, content):
categories = self.portal_cache.get_categories()
categories = [categories[category_dn] for category_dn in content["category_dns"]]
for category in categories:
category["entries"] = [
entry_dn
for entry_dn in category["entries"]
if entry_dn in content["entry_dns"] or entry_dn in content["folder_dns"]
]
return categories
[docs]
def auth_mode(self, request):
return self.authenticator.get_auth_mode(request)
[docs]
def may_be_edited(self, user):
return config.fetch('editable') and user.is_admin()
def _announcement_visible(self, user, announcement: dict) -> bool:
return self._announcement_matches_time(announcement) and self._announcement_matches_group(user, announcement)
def _announcement_matches_time(self, announcement: dict):
return is_announcement_visible_now(
announcement.get('visibleFrom'),
announcement.get('visibleUntil'),
)
def _announcement_matches_group(self, user, announcement: dict):
visible = False
allowed_groups = announcement.get("allowedGroups")
if not allowed_groups or allowed_groups == []:
visible = True
else:
for group in allowed_groups:
if user.is_member_of(group):
visible = True
return visible
[docs]
def get_announcements(self, content):
announcements = self.portal_cache.get_announcements()
return [announcements[announcement_dn] for announcement_dn in content["announcement_dns"]]
def _filter_entry_dns(self, entry_dns, entries, user, admin_mode):
filtered_dns = []
for entry_dn in entry_dns:
entry = entries.get(entry_dn)
if entry is None:
continue
if not admin_mode:
if not entry["in_portal"]:
continue
if not entry["activated"]:
continue
if entry["anonymous"] and not user.is_anonymous():
continue
if entry["allowedGroups"]:
for group in entry["allowedGroups"]:
if user.is_member_of(group):
break
else:
continue
filtered_dns.append(entry_dn)
return filtered_dns
def _get_all_entries_of_folder(self, folder_dn, folders, entries):
def _flatten(folder_dn, folders, entries, ret, already_unpacked_folder_dns):
for entry_dn in folders[folder_dn]["entries"]:
if entry_dn in entries:
if entry_dn not in ret:
ret.append(entry_dn)
elif entry_dn in folders and entry_dn not in already_unpacked_folder_dns:
already_unpacked_folder_dns.append(entry_dn)
_flatten(entry_dn, folders, entries, ret, already_unpacked_folder_dns)
ret = []
_flatten(folder_dn, folders, entries, ret, [])
return ret
[docs]
def refresh(self, reason=None):
touched = self.portal_cache.refresh(reason=reason)
touched = self.authenticator.refresh(reason=reason) or touched
return touched
def _get_umc_portal(self):
return UMCPortal(self.scorer, self.authenticator)
[docs]
def score(self, request):
return self.scorer.score(request)
[docs]
class UMCPortal(Portal):
def __init__(self, scorer, authenticator):
self.scorer = scorer
self.authenticator = authenticator
[docs]
def auth_mode(self, request):
return "ucs"
[docs]
def may_be_edited(self, user):
return False
def _request_umc_get(self, get_path, headers):
umc_base_url = config.fetch("umc_base_url")
uri = urljoin(urljoin(umc_base_url, 'get/'), get_path)
body = {"options": {}}
try:
response = requests.post(uri, json=body, headers=headers)
except requests.exceptions.RequestException as exc:
get_logger("umc").warning("Exception while getting %s: %s", get_path, exc)
return []
else:
if response.status_code != 200:
get_logger("umc").debug("Status %r while getting %s", response.status_code, get_path)
return []
return response.json()[get_path]
[docs]
def get_visible_content(self, user, admin_mode):
headers = user.headers
categories = self._request_umc_get("categories", headers)
modules = self._request_umc_get("modules", headers)
return {
"umc_categories": categories,
"umc_modules": modules,
}
[docs]
def get_user_links(self, content):
return []
[docs]
def get_entries(self, content):
entries = []
colors = {cat["id"]: cat["color"] for cat in content["umc_categories"] if cat["id"] != "_favorites_"}
for module in content["umc_modules"]:
if "apps" in module["categories"]:
continue
entries.append(self._module_to_entry(module, colors))
return entries
def _module_to_entry(self, module, colors, locale='en_US'):
icon_url = self._module_icon_url(module)
color = self._module_background_color(module, colors)
entry = {
"dn": self._entry_id(module),
"name": {
locale: module["name"],
},
"description": {
locale: module["description"],
},
"keywords": {
locale: ' '.join(module["keywords"]),
},
"linkTarget": "embedded",
"target": None,
"icon_url": icon_url,
"backgroundColor": color,
"links": [{
"locale": locale,
"value": "/univention/management/?header=try-hide&overview=false&menu=false#module={}:{}".format(module["id"], module.get("flavor", "")),
}],
# TODO: missing: in_portal, anonymous, activated, allowedGroups
}
return entry
def _module_icon_url(self, module):
sub_path = "js/dijit/themes/umc/icons/scalable/{}.svg".format(module["icon"])
filename = os.path.join('/usr/share/univention-management-console-frontend/', sub_path)
if os.path.exists(filename):
return urljoin("/univention/management/", sub_path)
def _module_background_color(self, module, colors):
color = None
for cat in module["categories"]:
if cat in colors:
color = colors[cat]
break
return color
def _entry_id(self, module):
return "umc:module:{}:{}".format(module["id"], module.get("flavor", ""))
[docs]
def get_folders(self, content):
folders = []
for category in content["umc_categories"]:
if category["id"] == "apps":
continue
if category["id"] == "_favorites_":
continue
entries = [[-module["priority"], module["name"], self._entry_id(module)] for module in content["umc_modules"] if category["id"] in module["categories"]]
entries = sorted(entries)
folders.append({
"name": {
"en_US": category["name"],
"de_DE": category["name"],
},
"dn": category["id"],
"entries": [entry[2] for entry in entries],
})
return folders
[docs]
def get_categories(self, content):
ret = []
categories = content["umc_categories"]
categories = sorted(categories, key=lambda entry: entry["priority"], reverse=True)
modules = content["umc_modules"]
modules = sorted(modules, key=lambda entry: entry["priority"], reverse=True)
fav_cat = [cat for cat in categories if cat["id"] == "_favorites_"]
if fav_cat:
fav_cat = fav_cat[0]
ret.append({
"display_name": {
"en_US": fav_cat["name"],
},
"dn": "umc:category:favorites",
"entries": [self._entry_id(mod) for mod in modules if "_favorites_" in mod.get("categories", [])],
})
else:
ret.append({
"display_name": {
"en_US": "Favorites",
},
"dn": "umc:category:favorites",
"entries": [],
})
ret.append({
"display_name": {
"en_US": "Univention Management Console",
},
"dn": "umc:category:umc",
"entries": [cat["id"] for cat in categories if cat["id"] not in ["_favorites_", "apps"]],
})
return ret
[docs]
def get_announcements(self, content):
return []
[docs]
def refresh(self, reason=None):
pass
[docs]
def get_cache_id(self):
return str(time.time())