#!/usr/bin/python3
#
# Univention Management Console
# OpenID Connect implementation for the UMC
#
# SPDX-FileCopyrightText: 2022-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
import asyncio
import base64
import binascii
import hashlib
import json
import os
import time
import uuid
from time import monotonic
from urllib.parse import urlencode, urlparse, urlunsplit
import jwt
from jwt.algorithms import RSAAlgorithm
from sqlalchemy import exc
from tornado import escape
from tornado.auth import OAuth2Mixin
from tornado.httpclient import HTTPClientError, HTTPRequest
from univention.management.console.config import ucr
from univention.management.console.error import (
BadRequest, Forbidden, NotFound, OpenIDProvideUnavailable, UMC_Error, Unauthorized,
)
from univention.management.console.ldap import get_machine_connection
from univention.management.console.log import CORE
from univention.management.console.resource import Resource
from univention.management.console.session import Session
from univention.management.console.session_db import DBDisabledException, get_session
from univention.management.console.shared_memory import shared_memory
[docs]
def create_federated_account(uuid):
import ldap
from univention.admin import modules, objects
from univention.admin.uexceptions import noObject
lo, position = get_machine_connection(write=True)
ldap_base = ucr['ldap/base']
federated_accounts = modules.get('users/federated_account')
modules.init(lo, position, federated_accounts)
position_string = f'cn=federated_accounts,cn=univention,{ldap_base}'
try:
dn = f'univentionObjectIdentifier={ldap.dn.escape_dn_chars(uuid)},{position_string}'
account = objects.get(federated_accounts, None, lo, position, dn=dn, authz=False)
except noObject:
account = None
if not account:
position.setDn(position_string)
account = federated_accounts.object(None, lo, position)
account.open()
account['univentionObjectIdentifier'] = uuid
account.create()
CORE.process('Created federated account', dn=account.dn)
[docs]
class OIDCUser:
"""OIDC tokens of the authenticated user."""
__slots__ = ('access_token', 'claims', 'federated_account', 'id_token', 'refresh_token', 'roles', 'session_refresh_future', 'username', 'uuid')
def __init__(self, id_token, access_token, refresh_token, claims):
self.id_token = id_token
self.access_token = access_token
self.refresh_token = refresh_token
self.claims = claims
self.roles = None
self.uuid = None
self.federated_account = False
self.username = claims['uid']
self.session_refresh_future = None
if claims.get('nubus_federated_account', False):
self.federated_account = True
@property
def session_end_time(self):
# We can't verify the refresh token here, maybe we should get the session expiry from the token response instead
exp = jwt.decode(self.refresh_token, options={"verify_signature": False})['exp'] if self.refresh_token else self.claims['exp']
return int(monotonic() + (exp - time.time()))
@property
def token_end_time(self):
return int(monotonic() + (self.claims['exp'] - time.time()))
[docs]
class OIDCResource(OAuth2Mixin, Resource):
"""Base class for all OIDC resources."""
requires_authentication = False
[docs]
async def prepare(self):
await super().prepare()
state = shared_memory.pkce.get(self.get_query_argument('state', ''), {})
self.set_settings(state.get('iss', self.get_query_argument('iss', self.application.settings['default_authorization_server'])))
[docs]
def get_openid_provider(self, issuer):
# We may have multiple configs in oidc.json, use the one that refers to the value of umc/oidc/rp/server
if self.application.settings['umc_oidc_rp_server']:
if self.application.settings['umc_oidc_rp_server'] in self.application.settings['oidc']:
if self.application.settings['oidc'][self.application.settings['umc_oidc_rp_server']]['issuer'] == issuer:
return self.application.settings['oidc'][self.application.settings['umc_oidc_rp_server']]
for openid_provider in self.application.settings['oidc'].values():
if openid_provider['issuer'] == issuer:
return openid_provider
raise KeyError(issuer)
[docs]
def set_settings(self, issuer):
try:
settings = self.get_openid_provider(issuer)
except KeyError:
raise NotFound(self._('The OpenID Provider is not available. This might be a misconfiguration.'))
self.client_id = settings['client_id']
self.issuer = settings['issuer']
self.JWKS = settings["jwks"]
self.client_secret = settings['client_secret']
self._OAUTH_AUTHORIZE_URL = settings["op"]["authorization_endpoint"]
self._OAUTH_ACCESS_TOKEN_URL = settings["op"]["token_endpoint"]
self._OAUTH_END_SESSION_URL = settings["op"]["end_session_endpoint"]
self._OAUTH_USERINFO_URL = settings["op"]["userinfo_endpoint"]
self._OAUTH_CERT_URL = settings["op"]["jwks_uri"]
self.id_token_signing_alg_values_supported = settings["op"]["id_token_signing_alg_values_supported"]
self.extra_parameters = [x.strip() for x in settings.get('extra_parameters', '').split(',') if x.strip()]
[docs]
async def bearer_authorization(self, bearer_token):
if self.current_user and self.current_user.user.authenticated and self.current_user.oidc.access_token == bearer_token:
return
try:
claims = self.verify_access_token(bearer_token)
except Unauthorized as exc:
self.add_header('WWW-Authenticate', 'Bearer realm="Univention Management Console" scope="openid" error="invalid_token" error_description="%s"' % (exc,))
raise
# FIXME: Access Denied from Keycloak: https://github.com/keycloak/keycloak/issues/16844 because we don't have a "openid" scope
# await self.get_user_information(bearer_token)
oidc = OIDCUser(None, bearer_token, None, claims)
await self.pam_oidc_authentication(oidc)
[docs]
async def authenticate(self, code, code_verifier, nonce):
CORE.debug('OIDC authenticate')
try:
response = await self.get_access_token(
redirect_uri=self.reverse_abs_url('oidc-login'),
code=code,
code_verifier=code_verifier,
)
except HTTPClientError as exc:
CORE.error('Could not get access token: %s', exc.response and exc.response.body or exc)
raise OpenIDProvideUnavailable(self._('Could not receive token from authorization server.'))
try:
id_token = response['id_token']
access_token = response['access_token']
refresh_token = response['refresh_token']
except KeyError:
raise OpenIDProvideUnavailable(self._("Authorization server response did not contain token."))
CORE.log(1, 'Access token: %s', access_token)
CORE.log(1, 'ID token: %s', id_token)
CORE.log(1, 'Refresh token: %s', refresh_token)
claims = self.verify_id_token(id_token, nonce)
oidc = OIDCUser(id_token, access_token, refresh_token, claims)
if oidc.federated_account:
await self.handle_federated_account(oidc)
await self.pam_oidc_authentication(oidc)
[docs]
async def handle_federated_account(self, oidc):
from .server import pool
info = await self.get_user_information(oidc.access_token)
oidc.uuid = info.get('nubus_id')
if not oidc.uuid:
CORE.error('TOKEN CLAIM ERROR: Claim nubus_id is missing for this account!')
raise Forbidden('Login is not possible with this account! Please contact your adminitrator.')
oidc.roles = info.get('nubus_roles', [])
oidc.username = info.get('nubus_external_username') or oidc.uuid
# TODO: Move to keycloak ad-hoc federation
future = pool.submit(create_federated_account, oidc.uuid)
await asyncio.wrap_future(future)
CORE.process('OIDC login federated user %r with roles %r', oidc.uuid, oidc.roles)
[docs]
async def pam_oidc_authentication(self, oidc):
# important: must be called before the auth, to preserve session id in case of re-auth and that a user cannot choose his own session ID by providing a cookie
sessionid = self.create_sessionid()
if not oidc.federated_account:
# TODO: drop in the future to gain performance
result = await self.current_user.authenticate({
'locale': self.locale.code,
'username': oidc.username,
'password': oidc.access_token,
'auth_type': 'OIDC',
})
if not self.current_user.user.authenticated:
CORE.error('SECURITY WARNING: PAM OIDC Authentication failed while JWT verification succeeded!')
raise UMC_Error(result.message, result.status, result.result)
else:
self.current_user.set_credentials(oidc.username, oidc.access_token, 'OIDC', object_id=oidc.uuid, roles=oidc.roles, federated_account=oidc.federated_account)
# as an alternative to PAM we could just set the user as authenticated because jwt.decode() already ensured this.
# but we keep the behavior for now because this is what happened prior to the UMC-Web-Server and UMC-Sever unification
# PAM also makes acct_mgmt. This is of course also done by the OP but nevertheless we don't know if this is still required.
# self.current_user.set_credentials(oidc.username, oidc.access_token, 'OIDC')
self.current_user.oidc = oidc
self.set_session(sessionid)
[docs]
def verify_id_token(self, token, nonce):
claims = self._verify_jwt(id_token=token)
if nonce and claims.get('nonce') != nonce:
raise Unauthorized('The nonce is not matching.')
return claims
[docs]
def verify_access_token(self, token):
return self._verify_jwt(access_token=token)
[docs]
def verify_logout_token(self, token):
claims = self._verify_jwt(logout_token=token)
# TODO: verify contents of sub or/and sid, events
if not (claims.get('sub') or claims['sid']):
raise Unauthorized(self._('The logout token is missing a sub or sid claim'))
if not claims.get('events'):
raise Unauthorized(self._('The logout token is missing a events claim'))
if claims.get('nounce'):
raise Unauthorized(self._('The logout token must not have a nounce claim'))
return claims
def _verify_jwt(self, id_token=None, access_token=None, logout_token=None):
if len(list(filter(None, [id_token, access_token, logout_token]))) != 1:
raise TypeError()
if id_token:
token = id_token
audience = self.client_id
options = {
'verify_signature': True,
'verify_exp': True,
'verify_nbf': True,
'verify_iat': True,
'verify_aud': True,
'verify_iss': True,
}
elif access_token:
token = access_token
audience = None
options = {
'verify_signature': True,
'verify_exp': True,
'verify_nbf': True,
'verify_iat': True,
'verify_aud': False,
'verify_iss': True,
}
# TODO: verify azp
elif logout_token:
token = logout_token
audience = self.client_id
options = {
'verify_signature': True,
'verify_exp': True,
'verify_nbf': False,
'verify_iat': True,
'verify_aud': True,
'verify_iss': True,
}
try:
claims = jwt.decode(
token, self._get_public_key(token),
algorithms=self.id_token_signing_alg_values_supported,
options=options,
issuer=self.issuer,
audience=audience,
leeway=ucr.get_int('umc/oidc/grace-time', 3), # seconds
)
except jwt.ExpiredSignatureError:
CORE.warning("Signature expired")
raise Unauthorized(self._("The Token signature is expired."))
except jwt.InvalidSignatureError as exc:
CORE.error("Invalid signature: %s", exc)
raise Unauthorized(self._('The Token contains an invalid signature: %s') % (exc,))
except jwt.InvalidIssuerError as exc:
CORE.warning("Invalid issuer: %s", exc)
raise Unauthorized(self._('The Token contains an invalid issuer: %s') % (exc,))
except jwt.InvalidAudienceError as exc:
CORE.warning("Invalid signature: %s", exc)
raise Unauthorized(self._('The Token contains an invalid audience: %s') % (exc,))
except jwt.MissingRequiredClaimError as exc:
CORE.warning("Missing claim: %s", exc)
raise Unauthorized(self._('The Token is missing a required claim: %s') % (exc,))
except jwt.ImmatureSignatureError as exc:
CORE.warning("Immature signature: %s", exc)
raise Unauthorized(self._('The Token contains an immature signature: %s') % (exc,))
CORE.debug('OIDC JWK-Payload: %r', claims)
return claims
def _get_public_key(self, token):
kid = jwt.get_unverified_header(token)['kid']
for key in self.JWKS['keys']:
if key['kid'] == kid:
return RSAAlgorithm.from_jwk(json.dumps(key))
[docs]
async def download_jwks(self):
request = HTTPRequest(self._OAUTH_CERT_URL, method='GET')
http_client = self.get_auth_http_client()
try:
response = await http_client.fetch(request, raise_error=False)
except HTTPClientError as exc:
CORE.warning("Fetching certificate failed: %s %s", request.url, exc)
raise OpenIDProvideUnavailable(self._("Could not receive certificate from OP."))
if response.code != 200:
CORE.warning("Fetching certificate failed")
raise OpenIDProvideUnavailable(self._("Could not receive certificate from OP."))
return json.loads(response.body.decode('utf-8'))
[docs]
async def get_access_token(self, redirect_uri, code, code_verifier):
return await self._get_access_token(redirect_uri, {"code": code, "grant_type": "authorization_code", "code_verifier": code_verifier})
[docs]
async def get_new_access_token(self, redirect_uri, refresh_token):
return await self._get_access_token(redirect_uri, {"refresh_token": refresh_token, "grant_type": "refresh_token"})
async def _get_access_token(self, redirect_uri, data):
http_client = self.get_auth_http_client()
body = urlencode(dict(
data,
redirect_uri=redirect_uri,
client_id=self.client_id,
client_secret=self.client_secret,
)) # TODO: request specific AUD for ldap server
try:
response = await http_client.fetch(
self._OAUTH_ACCESS_TOKEN_URL,
method="POST",
headers={"Content-Type": "application/x-www-form-urlencoded"},
body=body,
)
except HTTPClientError:
raise # handled in get()
return escape.json_decode(response.body)
[docs]
async def refresh_session_tokens(self, user):
"""Refresh the tokens using the refresh token."""
CORE.debug('Refreshing OIDC session')
try:
response = await self.get_new_access_token(
redirect_uri=self.reverse_abs_url('oidc-login', ()),
refresh_token=user.oidc.refresh_token,
)
except HTTPClientError as exc:
if not exc.response or exc.response.body is None:
CORE.error('OP response was empty or timed out. Could not get new access token: %s', exc)
raise OpenIDProvideUnavailable(self._('Could not receive token from authorization server.'))
json_response = escape.json_decode(exc.response.body)
if json_response.get('error') == 'invalid_grant':
if user.session_id in Session.sessions:
Session.sessions[user.session_id].logout(reload=False)
CORE.error('Could not get new access token: %s', json_response)
raise OpenIDProvideUnavailable(self._('Could not receive token from authorization server.'))
try:
id_token = response['id_token']
access_token = response['access_token']
refresh_token = response['refresh_token']
except KeyError:
raise OpenIDProvideUnavailable(self._("Authorization server response did not contain token."))
user.oidc.id_token = id_token
user.oidc.access_token = access_token
user.oidc.refresh_token = refresh_token
# TODO: do we need to re-authenticate?
claims = self.verify_id_token(id_token, None)
oidc = OIDCUser(id_token, access_token, refresh_token, claims)
if oidc.federated_account:
await self.handle_federated_account(oidc)
await self.pam_oidc_authentication(oidc)
def _logout_success(self):
user = self.current_user
if user:
user.oidc = None
self.redirect('/univention/logout', status=303)
[docs]
class OIDCLogin(OIDCResource):
"""User initiated login at the OP using Authentication Code Flow."""
[docs]
async def get(self):
code = self.get_argument('code', False)
if not code:
await self.do_single_sign_on(
self.get_query_argument('target_link_uri', self.get_query_argument('location', '/univention/management/')),
self.get_query_argument('login_hint', None),
)
return
state = shared_memory.pkce.pop(self.get_query_argument('state', ''), {})
await self.authenticate(code, state.get('code_verifier'), state.get('nonce'))
# protect against javascript:alert('XSS'), mailto:foo and other non relative links!
location = urlparse(state.get('location', ''))
if location.path.startswith('//'):
location = urlparse('')
location = urlunsplit(('', '', location.path, location.query, location.fragment))
self.redirect(location or self.reverse_abs_url('index'), status=303)
[docs]
async def post(self):
return await self.get()
[docs]
async def do_single_sign_on(self, location, login_hint):
CORE.debug('OIDC single sign on')
# TODO: The Client MUST understand the login_hint and iss parameters and SHOULD support the target_link_uri parameter.
extra_parameters = {'approval_prompt': 'auto'}
for extra_parameter in self.extra_parameters:
value = self.get_query_argument(extra_parameter, None)
if value:
extra_parameters[extra_parameter] = value
state = str(uuid.uuid4())
nonce = binascii.b2a_hex(uuid.uuid4().bytes).decode('ASCII')
extra_parameters['state'] = state
extra_parameters['nonce'] = nonce
extra_parameters['display'] = 'page'
# extra_parameters['prompt'] = 'login' # 'content'
# extra_parameters['max_age'] = ''
extra_parameters['ui_locales'] = self.locale.code
if login_hint:
extra_parameters['login_hint'] = login_hint
code_verifier = base64.urlsafe_b64encode(os.urandom(43)).decode('ASCII').rstrip('=')
code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest()).decode('ASCII').rstrip('=')
shared_memory.pkce[state] = {'iss': self.issuer, 'location': location, 'code_verifier': code_verifier, 'nonce': nonce}
extra_parameters['code_challenge'] = code_challenge
extra_parameters['code_challenge_method'] = 'S256'
self.authorize_redirect(
redirect_uri=self.reverse_abs_url('oidc-login'),
client_id=self.client_id,
scope=['openid'],
response_type='code',
extra_params=extra_parameters,
)
class _OIDCLogoutBase(OIDCResource):
def _logout_success(self):
user = self.current_user
if user:
user.oidc = None
self.redirect('/univention/logout', status=303)
[docs]
class OIDCLogout(_OIDCLogoutBase):
"""User initiated logout at the OP"""
[docs]
def get(self):
"""User initiated front channel logout at OP."""
CORE.debug('frontchannel logout')
user = self.current_user
if user is None or user.oidc is None:
return self._logout_success()
access_token = user.oidc.access_token
if not access_token:
raise BadRequest(self._("Not logged in"))
logout_url = '%s?%s' % (self._OAUTH_END_SESSION_URL, urlencode({
'post_logout_redirect_uri': self.reverse_abs_url('oidc-logout-done'),
'client_id': self.client_id,
'id_token_hint': user.oidc.id_token,
# 'logout_hint': None,
# 'ui_locales': None,
}))
self.redirect(logout_url)
[docs]
async def post(self):
"""User initiated back channel logout at OP."""
CORE.debug('backchannel logout')
user = self.current_user
if user is None or user.oidc is None:
return self._logout_success()
id_token = user.oidc.id_token
if not id_token:
raise BadRequest(self._("Not logged in"))
http_client = self.get_auth_http_client()
try:
await http_client.fetch(
self._OAUTH_END_SESSION_URL,
method="POST",
headers={"Content-Type": "application/x-www-form-urlencoded"},
body=urlencode({
'id_token_hint': id_token,
'client_id': self.client_id,
# 'logout_hint': None,
# 'ui_locales': None,
}),
)
# escape.json_decode(response.body)
except HTTPClientError:
raise # FIXME:
return self._logout_success()
[docs]
class OIDCLogoutFinished(_OIDCLogoutBase):
[docs]
def get(self):
self._logout_success()
[docs]
class OIDCFrontchannelLogout(_OIDCLogoutBase):
"""OP initiated frontchannel logout at this RP."""
[docs]
def get(self):
CORE.debug('frontchannel OP logout')
self.add_header('Cache-Control', 'no-store')
# self.get_query_argument('iss')
# sid = self.get_query_argument('sid')
user = self.current_user
if user:
user.oidc = None
self.expire_session()
[docs]
class OIDCBackchannelLogout(OIDCResource):
"""OP initiated backchannel logout at this RP."""
[docs]
def post(self):
CORE.debug('backchannel OP logout')
logout_token = self.get_argument('logout_token')
self.add_header('Cache-Control', 'no-store')
try:
claims = self.verify_logout_token(logout_token)
except Unauthorized as exception:
self.add_header('Content-Type', 'application/json')
self.set_status(400)
self.finish({'error': 'invalid_request', 'error_description': str(exception)})
return
for session in Session.sessions.get_oidc_sessions(claims):
if session.session_id in Session.sessions.sessions:
Session.sessions[session.session_id].logout()
else:
try:
with get_session() as db_session:
session.delete(db_session, session.session_id, True)
except exc.DBAPIError as err:
CORE.error('Deleting the session from the database during OIDC backchannel logout failed: %s', err)
except exc.TimeoutError as err:
CORE.error('Deleting the session from the database during OIDC backchannel logout timed out: %s', err)
except DBDisabledException:
pass
self.finish()
if __name__ == '__main__':
import sys
with open('/usr/share/univention-management-console/oidc/oidc.json') as fd:
oidc = next(iter(json.load(fd)['oidc'].values()))
with open(oidc['openid_configuration']) as fd:
op = json.load(fd)
with open(oidc['openid_certs']) as fd:
jwks = json.load(fd)
public_key = RSAAlgorithm.from_jwk(json.dumps(jwks['keys'][0]))
token = sys.stdin.read().strip()
claims_decoded = None
print(jwt.get_unverified_header(token), file=sys.stderr)
for verify, leeway in ((False, 60 * 60 * 24), (True, 0)):
try:
claims = jwt.decode(
token, public_key,
algorithms=op['id_token_signing_alg_values_supported'],
options={
'verify_signature': False,
'verify_exp': False,
'verify_nbf': False,
'verify_iat': False,
'verify_aud': False,
'verify_iss': False,
},
issuer=op['issuer'],
audience=oidc['client_id'],
leeway=60 * 60 * 24,
)
claims_decoded = claims
except jwt.exceptions.PyJWTError as exc:
print(exc, file=sys.stderr)
print(json.dumps(claims_decoded))