Source code for univention.portal.extensions.authenticator

#!/usr/bin/python3
#
# Univention Portal
#
# SPDX-FileCopyrightText: 2019-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

import base64
import binascii
import hashlib
import json
from urllib.parse import urljoin

from tornado.httpclient import AsyncHTTPClient, HTTPError, HTTPRequest

from univention.portal import Plugin, config
from univention.portal.log import get_logger
from univention.portal.user import User


[docs] class Session: def __init__(self, nonce): self.nonce = nonce self.user = None
[docs] def is_valid(self): return True
[docs] class Authenticator(metaclass=Plugin): """ Our base class for authentication May hold all the sessions, set cookies, etc. The idea is that this class handles the following methods from the Portal: `login_request`: A user GETs to the login action `login_user`: Credentials are POSTed to this action `get_user`: While gathering the portal data, the caller wants This base class does nothing... """
[docs] def get_auth_mode(self, request): # pragma: no cover return "ucs"
[docs] async def login_request(self, request): # pragma: no cover pass
[docs] async def login_user(self, request): # pragma: no cover pass
[docs] async def logout_user(self, request): # pragma: no cover pass
[docs] async def get_user(self, request): # pragma: no cover return User(username=None, display_name=None, groups=[], headers={})
[docs] def refresh(self, reason=None): # pragma: no cover pass
[docs] class UMCAuthenticator(Authenticator): """ Specialized Authenticator that relies on a UMC that actually holds any session. Asks UMC for every request if this session is known. auth_mode: The preferred mode for auth. The portal hands it over to the frontend. umc_session_url: The URL where to go to with the cookie. Expects a json answer with the username. group_cache: As UMC does not return groups, we need a cache object that gets us the groups for the username. """ def __init__(self, auth_mode, umc_session_url, group_cache): self.auth_mode = auth_mode umc_base_url = config.fetch("umc_base_url") self.umc_session_url = urljoin(umc_base_url, 'get/session-info') self.group_cache = group_cache
[docs] def get_auth_mode(self, request): return self.auth_mode
[docs] def refresh(self, reason=None): return self.group_cache.refresh(reason=reason)
[docs] async def get_user(self, request): cookies = {key: morsel.value for key, morsel in request.cookies.items()} username, display_name = await self._get_username(cookies) groups = self.group_cache.get().get(username, []) return User(username, display_name=display_name, groups=groups, headers=dict(request.request.headers))
async def _get_username(self, cookies): headers = {} for cookie in cookies: if cookie.startswith("UMCSessionId"): # UMCSessionId-1234 -> Host: localhost:1234 host_port = cookie[13:] if host_port: headers = {"Host": f"localhost:{host_port}"} break else: get_logger("user").debug("no user given") return None, None get_logger("user").debug("searching user for cookies=%r" % cookies) username = await self._ask_umc(cookies, headers) if username is None: get_logger("user").debug("no user found") return None, None else: get_logger("user").debug("found %s" % (username,)) return username.lower(), username async def _ask_umc(self, cookies, headers): try: headers['Cookie'] = '; '.join('='.join(c) for c in cookies.items()) req = HTTPRequest(self.umc_session_url, method="GET", headers=headers) http_client = AsyncHTTPClient() response = await http_client.fetch(req) data = json.loads(response.body.decode('UTF-8')) username = data["result"]["username"] except HTTPError as exc: get_logger("user").error("request failed: %s" % exc) except OSError as exc: get_logger("user").error("connection failed: %s" % exc) except ValueError: get_logger("user").error("malformed answer!") except KeyError: get_logger("user").warning("session unknown!") else: return username
[docs] class UMCAndSecretAuthenticator(UMCAuthenticator): """Authenticate with a private secret and become any user (god mode)"""
[docs] async def get_user(self, request): user = await super().get_user(request) if user and user.username: return user authorization = request.request.headers.get('Authorization') if not authorization: return user try: if not authorization.lower().startswith('basic '): raise ValueError() display_name, password = base64.b64decode(authorization.split(' ', 1)[1].encode('ISO8859-1')).decode('UTF-8').split(':', 1) except (ValueError, IndexError, binascii.Error): raise HTTPError(400) username = display_name.lower() get_logger("user").debug("received basic auth request with username=%r", username) try: with open(config.fetch("portal-secret-file")) as fd: # noqa: ASYNC221, ASYNC230 config_secret = fd.read().strip() except (KeyError, AttributeError): return user # compare hashed password to prevent time based side channel attack if hashlib.sha512(password.encode('utf-8')).hexdigest() != hashlib.sha512(config_secret.encode('utf-8')).hexdigest(): get_logger("user").warning("password mismatch: %s != %s", config_secret, password) return user groups = self.group_cache.get().get(username, []) return User(username, display_name, groups, headers=dict(request.request.headers))