Source code for univention.management.console.oidc

#!/usr/bin/python3
#
# Univention Management Console
#  OpenID Connect implementation for the UMC
#
# SPDX-FileCopyrightText: 2022-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

import asyncio
import base64
import binascii
import hashlib
import json
import os
import time
import uuid
from time import monotonic
from urllib.parse import urlencode, urlparse, urlunsplit

import jwt
from jwt.algorithms import RSAAlgorithm
from sqlalchemy import exc
from tornado import escape
from tornado.auth import OAuth2Mixin
from tornado.httpclient import HTTPClientError, HTTPRequest

from univention.management.console.config import ucr
from univention.management.console.error import (
    BadRequest, Forbidden, NotFound, OpenIDProvideUnavailable, UMC_Error, Unauthorized,
)
from univention.management.console.ldap import get_machine_connection
from univention.management.console.log import CORE
from univention.management.console.resource import Resource
from univention.management.console.session import Session
from univention.management.console.session_db import DBDisabledException, get_session
from univention.management.console.shared_memory import shared_memory


[docs] def create_federated_account(uuid): import ldap from univention.admin import modules, objects from univention.admin.uexceptions import noObject lo, position = get_machine_connection(write=True) ldap_base = ucr['ldap/base'] federated_accounts = modules.get('users/federated_account') modules.init(lo, position, federated_accounts) position_string = f'cn=federated_accounts,cn=univention,{ldap_base}' try: dn = f'univentionObjectIdentifier={ldap.dn.escape_dn_chars(uuid)},{position_string}' account = objects.get(federated_accounts, None, lo, position, dn=dn, authz=False) except noObject: account = None if not account: position.setDn(position_string) account = federated_accounts.object(None, lo, position) account.open() account['univentionObjectIdentifier'] = uuid account.create() CORE.process('Created federated account', dn=account.dn)
[docs] class OIDCUser: """OIDC tokens of the authenticated user.""" __slots__ = ('access_token', 'claims', 'federated_account', 'id_token', 'refresh_token', 'roles', 'session_refresh_future', 'username', 'uuid') def __init__(self, id_token, access_token, refresh_token, claims): self.id_token = id_token self.access_token = access_token self.refresh_token = refresh_token self.claims = claims self.roles = None self.uuid = None self.federated_account = False self.username = claims['uid'] self.session_refresh_future = None if claims.get('nubus_federated_account', False): self.federated_account = True @property def session_end_time(self): # We can't verify the refresh token here, maybe we should get the session expiry from the token response instead exp = jwt.decode(self.refresh_token, options={"verify_signature": False})['exp'] if self.refresh_token else self.claims['exp'] return int(monotonic() + (exp - time.time())) @property def token_end_time(self): return int(monotonic() + (self.claims['exp'] - time.time()))
[docs] class OIDCResource(OAuth2Mixin, Resource): """Base class for all OIDC resources.""" requires_authentication = False
[docs] async def prepare(self): await super().prepare() state = shared_memory.pkce.get(self.get_query_argument('state', ''), {}) self.set_settings(state.get('iss', self.get_query_argument('iss', self.application.settings['default_authorization_server'])))
[docs] def get_openid_provider(self, issuer): # We may have multiple configs in oidc.json, use the one that refers to the value of umc/oidc/rp/server if self.application.settings['umc_oidc_rp_server']: if self.application.settings['umc_oidc_rp_server'] in self.application.settings['oidc']: if self.application.settings['oidc'][self.application.settings['umc_oidc_rp_server']]['issuer'] == issuer: return self.application.settings['oidc'][self.application.settings['umc_oidc_rp_server']] for openid_provider in self.application.settings['oidc'].values(): if openid_provider['issuer'] == issuer: return openid_provider raise KeyError(issuer)
[docs] def set_settings(self, issuer): try: settings = self.get_openid_provider(issuer) except KeyError: raise NotFound(self._('The OpenID Provider is not available. This might be a misconfiguration.')) self.client_id = settings['client_id'] self.issuer = settings['issuer'] self.JWKS = settings["jwks"] self.client_secret = settings['client_secret'] self._OAUTH_AUTHORIZE_URL = settings["op"]["authorization_endpoint"] self._OAUTH_ACCESS_TOKEN_URL = settings["op"]["token_endpoint"] self._OAUTH_END_SESSION_URL = settings["op"]["end_session_endpoint"] self._OAUTH_USERINFO_URL = settings["op"]["userinfo_endpoint"] self._OAUTH_CERT_URL = settings["op"]["jwks_uri"] self.id_token_signing_alg_values_supported = settings["op"]["id_token_signing_alg_values_supported"] self.extra_parameters = [x.strip() for x in settings.get('extra_parameters', '').split(',') if x.strip()]
[docs] async def bearer_authorization(self, bearer_token): if self.current_user and self.current_user.user.authenticated and self.current_user.oidc.access_token == bearer_token: return try: claims = self.verify_access_token(bearer_token) except Unauthorized as exc: self.add_header('WWW-Authenticate', 'Bearer realm="Univention Management Console" scope="openid" error="invalid_token" error_description="%s"' % (exc,)) raise # FIXME: Access Denied from Keycloak: https://github.com/keycloak/keycloak/issues/16844 because we don't have a "openid" scope # await self.get_user_information(bearer_token) oidc = OIDCUser(None, bearer_token, None, claims) await self.pam_oidc_authentication(oidc)
[docs] async def authenticate(self, code, code_verifier, nonce): CORE.debug('OIDC authenticate') try: response = await self.get_access_token( redirect_uri=self.reverse_abs_url('oidc-login'), code=code, code_verifier=code_verifier, ) except HTTPClientError as exc: CORE.error('Could not get access token: %s', exc.response and exc.response.body or exc) raise OpenIDProvideUnavailable(self._('Could not receive token from authorization server.')) try: id_token = response['id_token'] access_token = response['access_token'] refresh_token = response['refresh_token'] except KeyError: raise OpenIDProvideUnavailable(self._("Authorization server response did not contain token.")) CORE.log(1, 'Access token: %s', access_token) CORE.log(1, 'ID token: %s', id_token) CORE.log(1, 'Refresh token: %s', refresh_token) claims = self.verify_id_token(id_token, nonce) oidc = OIDCUser(id_token, access_token, refresh_token, claims) if oidc.federated_account: await self.handle_federated_account(oidc) await self.pam_oidc_authentication(oidc)
[docs] async def handle_federated_account(self, oidc): from .server import pool info = await self.get_user_information(oidc.access_token) oidc.uuid = info.get('nubus_id') if not oidc.uuid: CORE.error('TOKEN CLAIM ERROR: Claim nubus_id is missing for this account!') raise Forbidden('Login is not possible with this account! Please contact your adminitrator.') oidc.roles = info.get('nubus_roles', []) oidc.username = info.get('nubus_external_username') or oidc.uuid # TODO: Move to keycloak ad-hoc federation future = pool.submit(create_federated_account, oidc.uuid) await asyncio.wrap_future(future) CORE.process('OIDC login federated user %r with roles %r', oidc.uuid, oidc.roles)
[docs] async def pam_oidc_authentication(self, oidc): # important: must be called before the auth, to preserve session id in case of re-auth and that a user cannot choose his own session ID by providing a cookie sessionid = self.create_sessionid() if not oidc.federated_account: # TODO: drop in the future to gain performance result = await self.current_user.authenticate({ 'locale': self.locale.code, 'username': oidc.username, 'password': oidc.access_token, 'auth_type': 'OIDC', }) if not self.current_user.user.authenticated: CORE.error('SECURITY WARNING: PAM OIDC Authentication failed while JWT verification succeeded!') raise UMC_Error(result.message, result.status, result.result) else: self.current_user.set_credentials(oidc.username, oidc.access_token, 'OIDC', object_id=oidc.uuid, roles=oidc.roles, federated_account=oidc.federated_account) # as an alternative to PAM we could just set the user as authenticated because jwt.decode() already ensured this. # but we keep the behavior for now because this is what happened prior to the UMC-Web-Server and UMC-Sever unification # PAM also makes acct_mgmt. This is of course also done by the OP but nevertheless we don't know if this is still required. # self.current_user.set_credentials(oidc.username, oidc.access_token, 'OIDC') self.current_user.oidc = oidc self.set_session(sessionid)
[docs] def verify_id_token(self, token, nonce): claims = self._verify_jwt(id_token=token) if nonce and claims.get('nonce') != nonce: raise Unauthorized('The nonce is not matching.') return claims
[docs] def verify_access_token(self, token): return self._verify_jwt(access_token=token)
[docs] def verify_logout_token(self, token): claims = self._verify_jwt(logout_token=token) # TODO: verify contents of sub or/and sid, events if not (claims.get('sub') or claims['sid']): raise Unauthorized(self._('The logout token is missing a sub or sid claim')) if not claims.get('events'): raise Unauthorized(self._('The logout token is missing a events claim')) if claims.get('nounce'): raise Unauthorized(self._('The logout token must not have a nounce claim')) return claims
def _verify_jwt(self, id_token=None, access_token=None, logout_token=None): if len(list(filter(None, [id_token, access_token, logout_token]))) != 1: raise TypeError() if id_token: token = id_token audience = self.client_id options = { 'verify_signature': True, 'verify_exp': True, 'verify_nbf': True, 'verify_iat': True, 'verify_aud': True, 'verify_iss': True, } elif access_token: token = access_token audience = None options = { 'verify_signature': True, 'verify_exp': True, 'verify_nbf': True, 'verify_iat': True, 'verify_aud': False, 'verify_iss': True, } # TODO: verify azp elif logout_token: token = logout_token audience = self.client_id options = { 'verify_signature': True, 'verify_exp': True, 'verify_nbf': False, 'verify_iat': True, 'verify_aud': True, 'verify_iss': True, } try: claims = jwt.decode( token, self._get_public_key(token), algorithms=self.id_token_signing_alg_values_supported, options=options, issuer=self.issuer, audience=audience, leeway=ucr.get_int('umc/oidc/grace-time', 3), # seconds ) except jwt.ExpiredSignatureError: CORE.warning("Signature expired") raise Unauthorized(self._("The Token signature is expired.")) except jwt.InvalidSignatureError as exc: CORE.error("Invalid signature: %s", exc) raise Unauthorized(self._('The Token contains an invalid signature: %s') % (exc,)) except jwt.InvalidIssuerError as exc: CORE.warning("Invalid issuer: %s", exc) raise Unauthorized(self._('The Token contains an invalid issuer: %s') % (exc,)) except jwt.InvalidAudienceError as exc: CORE.warning("Invalid signature: %s", exc) raise Unauthorized(self._('The Token contains an invalid audience: %s') % (exc,)) except jwt.MissingRequiredClaimError as exc: CORE.warning("Missing claim: %s", exc) raise Unauthorized(self._('The Token is missing a required claim: %s') % (exc,)) except jwt.ImmatureSignatureError as exc: CORE.warning("Immature signature: %s", exc) raise Unauthorized(self._('The Token contains an immature signature: %s') % (exc,)) CORE.debug('OIDC JWK-Payload: %r', claims) return claims def _get_public_key(self, token): kid = jwt.get_unverified_header(token)['kid'] for key in self.JWKS['keys']: if key['kid'] == kid: return RSAAlgorithm.from_jwk(json.dumps(key))
[docs] async def get_user_information(self, bearer_token): user_info_req = HTTPRequest( self._OAUTH_USERINFO_URL, method="GET", headers={ "Accept": "application/json", "Authorization": "Bearer %s" % (bearer_token,), }, ) http_client = self.get_auth_http_client() try: user_info_res = await http_client.fetch(user_info_req) except HTTPClientError as exc: CORE.warning("Fetching user info failed: %s %s", user_info_req.url, exc) raise OpenIDProvideUnavailable(self._("Could not receive user information from OP.")) user_info = json.loads(user_info_res.body.decode('utf-8')) CORE.debug('OIDC User-Info: %r', user_info) return user_info
[docs] async def download_jwks(self): request = HTTPRequest(self._OAUTH_CERT_URL, method='GET') http_client = self.get_auth_http_client() try: response = await http_client.fetch(request, raise_error=False) except HTTPClientError as exc: CORE.warning("Fetching certificate failed: %s %s", request.url, exc) raise OpenIDProvideUnavailable(self._("Could not receive certificate from OP.")) if response.code != 200: CORE.warning("Fetching certificate failed") raise OpenIDProvideUnavailable(self._("Could not receive certificate from OP.")) return json.loads(response.body.decode('utf-8'))
[docs] async def get_access_token(self, redirect_uri, code, code_verifier): return await self._get_access_token(redirect_uri, {"code": code, "grant_type": "authorization_code", "code_verifier": code_verifier})
[docs] async def get_new_access_token(self, redirect_uri, refresh_token): return await self._get_access_token(redirect_uri, {"refresh_token": refresh_token, "grant_type": "refresh_token"})
async def _get_access_token(self, redirect_uri, data): http_client = self.get_auth_http_client() body = urlencode(dict( data, redirect_uri=redirect_uri, client_id=self.client_id, client_secret=self.client_secret, )) # TODO: request specific AUD for ldap server try: response = await http_client.fetch( self._OAUTH_ACCESS_TOKEN_URL, method="POST", headers={"Content-Type": "application/x-www-form-urlencoded"}, body=body, ) except HTTPClientError: raise # handled in get() return escape.json_decode(response.body)
[docs] async def refresh_session_tokens(self, user): """Refresh the tokens using the refresh token.""" CORE.debug('Refreshing OIDC session') try: response = await self.get_new_access_token( redirect_uri=self.reverse_abs_url('oidc-login', ()), refresh_token=user.oidc.refresh_token, ) except HTTPClientError as exc: if not exc.response or exc.response.body is None: CORE.error('OP response was empty or timed out. Could not get new access token: %s', exc) raise OpenIDProvideUnavailable(self._('Could not receive token from authorization server.')) json_response = escape.json_decode(exc.response.body) if json_response.get('error') == 'invalid_grant': if user.session_id in Session.sessions: Session.sessions[user.session_id].logout(reload=False) CORE.error('Could not get new access token: %s', json_response) raise OpenIDProvideUnavailable(self._('Could not receive token from authorization server.')) try: id_token = response['id_token'] access_token = response['access_token'] refresh_token = response['refresh_token'] except KeyError: raise OpenIDProvideUnavailable(self._("Authorization server response did not contain token.")) user.oidc.id_token = id_token user.oidc.access_token = access_token user.oidc.refresh_token = refresh_token # TODO: do we need to re-authenticate? claims = self.verify_id_token(id_token, None) oidc = OIDCUser(id_token, access_token, refresh_token, claims) if oidc.federated_account: await self.handle_federated_account(oidc) await self.pam_oidc_authentication(oidc)
def _logout_success(self): user = self.current_user if user: user.oidc = None self.redirect('/univention/logout', status=303)
[docs] class OIDCLogin(OIDCResource): """User initiated login at the OP using Authentication Code Flow."""
[docs] async def get(self): code = self.get_argument('code', False) if not code: await self.do_single_sign_on( self.get_query_argument('target_link_uri', self.get_query_argument('location', '/univention/management/')), self.get_query_argument('login_hint', None), ) return state = shared_memory.pkce.pop(self.get_query_argument('state', ''), {}) await self.authenticate(code, state.get('code_verifier'), state.get('nonce')) # protect against javascript:alert('XSS'), mailto:foo and other non relative links! location = urlparse(state.get('location', '')) if location.path.startswith('//'): location = urlparse('') location = urlunsplit(('', '', location.path, location.query, location.fragment)) self.redirect(location or self.reverse_abs_url('index'), status=303)
[docs] async def post(self): return await self.get()
[docs] async def do_single_sign_on(self, location, login_hint): CORE.debug('OIDC single sign on') # TODO: The Client MUST understand the login_hint and iss parameters and SHOULD support the target_link_uri parameter. extra_parameters = {'approval_prompt': 'auto'} for extra_parameter in self.extra_parameters: value = self.get_query_argument(extra_parameter, None) if value: extra_parameters[extra_parameter] = value state = str(uuid.uuid4()) nonce = binascii.b2a_hex(uuid.uuid4().bytes).decode('ASCII') extra_parameters['state'] = state extra_parameters['nonce'] = nonce extra_parameters['display'] = 'page' # extra_parameters['prompt'] = 'login' # 'content' # extra_parameters['max_age'] = '' extra_parameters['ui_locales'] = self.locale.code if login_hint: extra_parameters['login_hint'] = login_hint code_verifier = base64.urlsafe_b64encode(os.urandom(43)).decode('ASCII').rstrip('=') code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest()).decode('ASCII').rstrip('=') shared_memory.pkce[state] = {'iss': self.issuer, 'location': location, 'code_verifier': code_verifier, 'nonce': nonce} extra_parameters['code_challenge'] = code_challenge extra_parameters['code_challenge_method'] = 'S256' self.authorize_redirect( redirect_uri=self.reverse_abs_url('oidc-login'), client_id=self.client_id, scope=['openid'], response_type='code', extra_params=extra_parameters, )
class _OIDCLogoutBase(OIDCResource): def _logout_success(self): user = self.current_user if user: user.oidc = None self.redirect('/univention/logout', status=303)
[docs] class OIDCLogout(_OIDCLogoutBase): """User initiated logout at the OP"""
[docs] def get(self): """User initiated front channel logout at OP.""" CORE.debug('frontchannel logout') user = self.current_user if user is None or user.oidc is None: return self._logout_success() access_token = user.oidc.access_token if not access_token: raise BadRequest(self._("Not logged in")) logout_url = '%s?%s' % (self._OAUTH_END_SESSION_URL, urlencode({ 'post_logout_redirect_uri': self.reverse_abs_url('oidc-logout-done'), 'client_id': self.client_id, 'id_token_hint': user.oidc.id_token, # 'logout_hint': None, # 'ui_locales': None, })) self.redirect(logout_url)
[docs] async def post(self): """User initiated back channel logout at OP.""" CORE.debug('backchannel logout') user = self.current_user if user is None or user.oidc is None: return self._logout_success() id_token = user.oidc.id_token if not id_token: raise BadRequest(self._("Not logged in")) http_client = self.get_auth_http_client() try: await http_client.fetch( self._OAUTH_END_SESSION_URL, method="POST", headers={"Content-Type": "application/x-www-form-urlencoded"}, body=urlencode({ 'id_token_hint': id_token, 'client_id': self.client_id, # 'logout_hint': None, # 'ui_locales': None, }), ) # escape.json_decode(response.body) except HTTPClientError: raise # FIXME: return self._logout_success()
[docs] class OIDCLogoutFinished(_OIDCLogoutBase):
[docs] def get(self): self._logout_success()
[docs] class OIDCFrontchannelLogout(_OIDCLogoutBase): """OP initiated frontchannel logout at this RP."""
[docs] def get(self): CORE.debug('frontchannel OP logout') self.add_header('Cache-Control', 'no-store') # self.get_query_argument('iss') # sid = self.get_query_argument('sid') user = self.current_user if user: user.oidc = None self.expire_session()
[docs] class OIDCBackchannelLogout(OIDCResource): """OP initiated backchannel logout at this RP."""
[docs] def post(self): CORE.debug('backchannel OP logout') logout_token = self.get_argument('logout_token') self.add_header('Cache-Control', 'no-store') try: claims = self.verify_logout_token(logout_token) except Unauthorized as exception: self.add_header('Content-Type', 'application/json') self.set_status(400) self.finish({'error': 'invalid_request', 'error_description': str(exception)}) return for session in Session.sessions.get_oidc_sessions(claims): if session.session_id in Session.sessions.sessions: Session.sessions[session.session_id].logout() else: try: with get_session() as db_session: session.delete(db_session, session.session_id, True) except exc.DBAPIError as err: CORE.error('Deleting the session from the database during OIDC backchannel logout failed: %s', err) except exc.TimeoutError as err: CORE.error('Deleting the session from the database during OIDC backchannel logout timed out: %s', err) except DBDisabledException: pass self.finish()
[docs] class OIDCMetadata(OIDCResource): """A client metadata document suitable for dynamic client registration."""
[docs] def get(self): if ucr.get('umc/oidc/rp/server'): fqdn = ucr['umc/oidc/rp/server'] addresses = [fqdn] else: from univention.config_registry.interfaces import Interfaces i = Interfaces() fqdn = '%(hostname)s.%(domainname)s' % ucr addresses = [fqdn] addresses.extend([y['address'] for x, y in i.all_interfaces if y and y.get('address')]) bases = ['%s://%s/univention/oidc/' % (scheme, addr) for addr in addresses for scheme in ('https', 'http')] result = { 'redirect_uris': bases, 'response_types': ['code'], 'grant_types': ['authorization_code', 'refresh_token'], 'application_type': 'web', 'contacts': [ucr.get('umc/oidc/contact-person/mail', '')], 'client_name': 'Univention Management Console', 'logo_uri': f'https://{fqdn}/favicon.ico', 'client_uri': f'https://{fqdn}/univention/management/', 'policy_uri': f'https://{fqdn}/univention/impress.html', 'tos_uri': f'https://{fqdn}/univention/tos.html', # 'jwks_uri': f'https://{fqdn}/univention/oidc/jwks.json', # 'jwks': None, # 'sector_identifier_uri': None, 'subject_type': 'pairwise', # 'id_token_signed_response_alg' # 'id_token_encrypted_response_alg' # 'id_token_encrypted_response_enc' # 'userinfo_signed_response_alg' # 'userinfo_encrypted_response_alg' # 'userinfo_encrypted_response_enc' # 'request_object_signing_alg' # 'request_object_encryption_alg' # 'request_object_encryption_enc' 'token_endpoint_auth_method': 'client_secret_basic', # 'token_endpoint_auth_signing_alg': 'default_max_age': 1800, 'require_auth_time': False, # 'default_acr_values': None, 'initiate_login_uri': self.reverse_abs_url('oidc-login'), 'request_uris': [], 'post_logout_redirect_uris': [base + '*' for base in bases], 'backchannel_logout_session_required': True, 'backchannel_logout_uri': self.reverse_abs_url('backchannel-logout'), 'frontchannel_logout_session_required': True, 'frontchannel_logout_uri': self.reverse_abs_url('frontchannel-logout'), } self.content_negotiation(result, wrap=False)
if __name__ == '__main__': import sys with open('/usr/share/univention-management-console/oidc/oidc.json') as fd: oidc = next(iter(json.load(fd)['oidc'].values())) with open(oidc['openid_configuration']) as fd: op = json.load(fd) with open(oidc['openid_certs']) as fd: jwks = json.load(fd) public_key = RSAAlgorithm.from_jwk(json.dumps(jwks['keys'][0])) token = sys.stdin.read().strip() claims_decoded = None print(jwt.get_unverified_header(token), file=sys.stderr) for verify, leeway in ((False, 60 * 60 * 24), (True, 0)): try: claims = jwt.decode( token, public_key, algorithms=op['id_token_signing_alg_values_supported'], options={ 'verify_signature': False, 'verify_exp': False, 'verify_nbf': False, 'verify_iat': False, 'verify_aud': False, 'verify_iss': False, }, issuer=op['issuer'], audience=oidc['client_id'], leeway=60 * 60 * 24, ) claims_decoded = claims except jwt.exceptions.PyJWTError as exc: print(exc, file=sys.stderr) print(json.dumps(claims_decoded))