#!/usr/bin/python3
# SPDX-FileCopyrightText: 2022-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
import base64
import json
import os
import sys
import time
import zlib
from urllib.parse import urlparse, urlunsplit
from saml2 import BINDING_HTTP_ARTIFACT, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT
from saml2.client import Saml2Client
from saml2.ident import code as encode_name_id, decode as decode_name_id
from saml2.metadata import create_metadata_string
from saml2.response import StatusError, UnsolicitedResponse, VerificationError
from saml2.s_utils import UnknownPrincipal, UnsupportedBinding
from saml2.sigver import MissingKey, SignatureError
from tornado.web import HTTPError
from univention.lib.i18n import NullTranslation
from univention.management.console.error import UMC_Error
from univention.management.console.log import CORE
from univention.management.console.resource import Resource
from univention.management.console.shared_memory import shared_memory
PORT = None
try:
from time import monotonic
except ImportError:
from monotonic import monotonic
_ = NullTranslation('univention-management-console-frontend').translate
SERVICE_UNAVAILABLE = 503
[docs]
class SAMLUser:
"""SAML specific user information"""
__slots__ = ('message', 'name_id', 'session_end_time', 'username')
def __init__(self, response, message):
self.name_id = encode_name_id(response.name_id)
self.message = message
self.username = ''.join(response.ava['uid'])
self.session_end_time = 0
if response.not_on_or_after:
self.session_end_time = int(monotonic() + (response.not_on_or_after - time.time()))
[docs]
def on_logout(self):
SAMLResource.on_logout(self.name_id)
[docs]
class SamlError(HTTPError):
"""Errors caused during SAML authentication"""
def __init__(self, _=_):
self._ = _
[docs]
def error(func=None, status=400):
def _decorator(func):
def _decorated(self, *args, **kwargs):
message = func(self, *args, **kwargs) or ()
super().__init__(status, message)
if "Passive authentication not supported." not in message:
# "Passive authentication not supported." just means an active login is required. That is expected and needs no logging. It still needs to be raised though.
CORE.warning('SamlError: %s %s', status, message)
return self
return _decorated
if func is None:
return _decorator
return _decorator(func)
[docs]
def from_exception(self, etype, exc, etraceback):
if isinstance(exc, UnknownPrincipal):
return self.unknown_principal(exc)
if isinstance(exc, UnsupportedBinding):
return self.unsupported_binding(exc)
if isinstance(exc, VerificationError):
return self.verification_error(exc)
if isinstance(exc, UnsolicitedResponse):
return self.unsolicited_response(exc)
if isinstance(exc, StatusError):
return self.status_error(exc)
if isinstance(exc, MissingKey):
return self.missing_key(exc)
if isinstance(exc, SignatureError):
return self.signature_error(exc)
raise exc.with_traceback(etraceback)
[docs]
@error
def unknown_principal(self, exc):
return self._('The principal is unknown: %s') % (exc,)
[docs]
@error
def unsupported_binding(self, exc):
return self._('The requested SAML binding is not known: %s') % (exc,)
[docs]
@error
def unknown_logout_binding(self, binding):
return self._('The logout binding is not known.')
[docs]
@error
def verification_error(self, exc):
return self._('The SAML response could not be verified: %s') % (exc,)
[docs]
@error
def unsolicited_response(self, exc):
return self._('Received an unsolicited SAML response. Please try to single sign on again by accessing /univention/saml/. Error message: %s') % (exc,)
[docs]
@error
def status_error(self, exc):
return self._('The identity provider reported a status error: %s') % (exc,)
[docs]
@error(status=500)
def missing_key(self, exc):
return self._('The issuer %r is not known to the SAML service provider. This is probably a misconfiguration and might be resolved by restarting the univention-management-console-server.') % (str(exc),)
[docs]
@error
def signature_error(self, exc):
return self._('The SAML response contained a invalid signature: %s') % (exc,)
[docs]
@error
def unparsed_saml_response(self):
return self._("The SAML message is invalid for this service provider.")
[docs]
@error(status=500)
def no_identity_provider(self):
return self._('There is a configuration error in the service provider: No identity provider are set up for use.')
[docs]
@error # TODO: multiple choices redirection status
def multiple_identity_provider(self, idps, idp_query_param):
return self._('Could not pick an identity provider. You can specify one via the query string parameter %(param)r from %(idps)r') % {'param': idp_query_param, 'idps': idps}
[docs]
class SAMLResource(Resource):
"""Base class for all SAML resources"""
requires_authentication = False
SP = None
configfile = '/usr/share/univention-management-console/saml/sp.py'
idp_query_param = "IdpQuery"
bindings = [BINDING_HTTP_REDIRECT, BINDING_HTTP_POST, BINDING_HTTP_ARTIFACT]
outstanding_queries = {}
[docs]
@classmethod
def on_logout(cls, name_id):
if cls.SP:
try:
cls.SP.local_logout(decode_name_id(name_id))
except Exception as exc: # e.g. bsddb.DBNotFoundError
CORE.warning('Could not remove SAML session: %s', exc)
[docs]
class SamlACS(SAMLResource):
"""SAML attribute consuming service (or Single Sign On redirection)"""
@property
def sp(self):
if not self.SP and not self.reload():
raise HTTPError(SERVICE_UNAVAILABLE, 'Single sign on is not available due to misconfiguration. See logfiles.')
return self.SP
[docs]
@classmethod
def reload(cls):
CORE.info('Reloading SAML service provider configuration')
sys.modules.pop(os.path.splitext(os.path.basename(cls.configfile))[0], None)
try:
cls.SP = Saml2Client(config_file=cls.configfile, identity_cache=None, state_cache=shared_memory.saml_state_cache)
return True
except Exception:
CORE.warning('Startup of SAML2.0 service provider failed', exc_info=True)
return False
[docs]
async def get(self):
binding, message, relay_state = self._get_saml_message()
if message is None:
self.do_single_sign_on(relay_state=self.get_query_argument('location', '/univention/management/'))
return
acs = self.attribute_consuming_service
if relay_state == 'iframe-passive':
acs = self.attribute_consuming_service_iframe
await acs(binding, message, relay_state)
post = get
[docs]
async def attribute_consuming_service(self, binding, message, relay_state):
response = self.parse_authn_response(message, binding)
saml = SAMLUser(response, message)
await self.pam_saml_authentication(saml)
# protect against javascript:alert('XSS'), mailto:foo and other non relative links!
location = urlparse(relay_state)
if location.path.startswith('//'):
location = urlparse('')
location = urlunsplit(('', '', location.path, location.query, location.fragment))
self.redirect(location, status=303)
[docs]
async def attribute_consuming_service_iframe(self, binding, message, relay_state):
self.request.headers['Accept'] = 'application/json' # enforce JSON response in case of errors
self.request.headers['X-Iframe-Response'] = 'true' # enforce textarea wrapping
response = self.parse_authn_response(message, binding)
saml = SAMLUser(response, message)
await self.pam_saml_authentication(saml)
self.set_header('Content-Type', 'text/html')
data = {"status": 200, "result": {"username": saml.username}}
self.finish(b'<html><body><textarea>%s</textarea></body></html>' % (json.dumps(data).encode('ASCII'),))
[docs]
async def pam_saml_authentication(self, saml):
# important: must be called before the auth, to preserve session id in case of re-auth and that a user cannot choose his own session ID by providing a cookie
sessionid = self.create_sessionid()
# TODO: drop in the future to gain performance
result = await self.current_user.authenticate({
'locale': self.locale.code,
'username': saml.username,
'password': saml.message,
'auth_type': 'SAML',
})
if not self.current_user.user.authenticated:
CORE.error('SECURITY WARNING: PAM SAML Authentication failed while pysaml2 succeeded!')
raise UMC_Error(result.message, result.status, result.result)
# as an alternative to PAM we could just set the user as authenticated because pysaml2 already ensured this.
# but we keep the behavior for now because this is what happened prior to the UMC-Web-Server and UMC-Sever unification
# PAM also makes acct_mgmt. This is of course also done by the IDP but nevertheless we don't know if this is still required.
# self.current_user.set_credentials(saml.username, saml.message, 'SAML')
self.current_user.saml = saml
self.set_session(sessionid)
def _logout_success(self):
user = self.current_user
if user:
user.saml = None
self.redirect('/univention/logout', status=303)
def _get_saml_message(self):
"""Get the SAML message and corresponding binding from the HTTP request"""
if self.request.method not in ('GET', 'POST'):
self.set_header('Allow', 'GET, HEAD, POST')
raise HTTPError(405)
if self.request.method == 'GET':
binding = BINDING_HTTP_REDIRECT
args = self.request.query_arguments
elif self.request.method == "POST":
binding = BINDING_HTTP_POST
args = self.request.body_arguments
relay_state = args.get('RelayState', [b''])[0].decode('UTF-8')
try:
message = args['SAMLResponse'][0].decode('UTF-8')
except KeyError:
try:
message = args['SAMLRequest'][0].decode('UTF-8')
except KeyError:
try:
message = args['SAMLart'][0].decode('UTF-8')
except KeyError:
return None, None, None
message = self.sp.artifact2message(message, 'spsso')
binding = BINDING_HTTP_ARTIFACT
if isinstance(message, list):
message = message[0]
return binding, message, relay_state
[docs]
def parse_authn_response(self, message, binding):
try:
response = self.sp.parse_authn_request_response(message, binding, self.outstanding_queries)
except (UnknownPrincipal, UnsupportedBinding, VerificationError, UnsolicitedResponse, StatusError, MissingKey, SignatureError):
raise SamlError(self._).from_exception(*sys.exc_info())
if response is None:
CORE.warning('The SAML message could not be parsed with binding %r: %r', binding, message)
raise SamlError(self._).unparsed_saml_response()
self.outstanding_queries.pop(response.in_response_to, None)
return response
[docs]
def do_single_sign_on(self, **kwargs):
binding, http_args = self.create_authn_request(**kwargs)
self.http_response(binding, http_args)
[docs]
def create_authn_request(self, **kwargs):
"""
Creates the SAML <AuthnRequest> request and returns the SAML binding and HTTP response.
Returns (binding, http-arguments)
"""
identity_provider_entity_id = self.select_identity_provider()
binding, destination = self.get_identity_provider_destination(identity_provider_entity_id)
relay_state = kwargs.pop('relay_state', None)
reply_binding, service_provider_url = self.select_service_provider()
sid, message = self.sp.create_authn_request(destination, binding=reply_binding, assertion_consumer_service_urls=(service_provider_url,), **kwargs)
http_args = self.sp.apply_binding(binding, message, destination, relay_state=relay_state)
self.outstanding_queries[sid] = service_provider_url # self.request.full_url() # TODO: shouldn't this contain service_provider_url?
return binding, http_args
[docs]
def select_identity_provider(self):
"""
Select an identity provider based on the available identity providers.
If multiple IDP's are set up the client might have specified one in the query string.
Otherwise an error is raised where the user can choose one.
Returns the EntityID of the IDP.
"""
idps = self.sp.metadata.with_descriptor("idpsso")
if not idps and self.reload():
idps = self.sp.metadata.with_descriptor("idpsso")
if self.get_query_argument(self.idp_query_param, None) in idps:
return self.get_query_argument(self.idp_query_param)
if len(idps) == 1:
return list(idps.keys())[0] # noqa: RUF015
if not idps:
raise SamlError(self._).no_identity_provider()
raise SamlError(self._).multiple_identity_provider(list(idps.keys()), self.idp_query_param)
[docs]
def get_identity_provider_destination(self, entity_id):
"""
Get the destination (with SAML binding) of the specified entity_id.
Returns (binding, destination-URI)
"""
return self.sp.pick_binding("single_sign_on_service", self.bindings, "idpsso", entity_id=entity_id)
[docs]
def select_service_provider(self):
"""
Select the ACS-URI and binding of this service provider based on the request uri.
Tries to preserve the current scheme (HTTP/HTTPS) and netloc (host/IP) but falls back to FQDN if it is not set up.
Returns (binding, service-provider-URI)
"""
acs = self.sp.config.getattr("endpoints", "sp")["assertion_consumer_service"]
service_url, reply_binding = acs[0]
netloc = False
p2 = urlparse(self.request.full_url())
for _url, _binding in acs:
p1 = urlparse(_url)
if p1.scheme == p2.scheme and p1.netloc == p2.netloc:
netloc = True
service_url, reply_binding = _url, _binding
if p1.path == p2.path:
break
elif not netloc and p1.netloc == p2.netloc:
service_url, reply_binding = _url, _binding
CORE.info('SAML: picked %r for %r with binding %r', service_url, self.request.full_url(), reply_binding)
return reply_binding, service_url
[docs]
def http_response(self, binding, http_args):
"""Converts the HTTP arguments from pysaml2 into the tornado response."""
body = ''.join(http_args["data"])
for key, value in http_args["headers"]:
self.set_header(key, value)
if binding in (BINDING_HTTP_ARTIFACT, BINDING_HTTP_REDIRECT):
self.set_status(303 if self.request.version != "HTTP/1.0" and self.request.method == 'POST' else 302)
if not body:
self.redirect(self._headers['Location'], status=self.get_status())
return
self.finish(body.encode('UTF-8'))
[docs]
class SamlIframeACS(SamlACS):
"""Passive SAML authentication via hidden iframe"""
[docs]
def get(self):
self.do_single_sign_on(is_passive='true', relay_state='iframe-passive')
post = get
[docs]
class SamlSingleLogout(SamlACS):
"""SAML Single Logout by IDP"""
[docs]
def get(self, *args, **kwargs): # single logout service
binding, message, relay_state = self._get_saml_message()
if message is None:
raise HTTPError(400, 'The HTTP request is missing required SAML parameter.')
try:
data = base64.b64decode(message.encode('UTF-8'))
try:
data = zlib.decompress(data, -15).split(b'>', 1)[0]
except zlib.error:
pass
is_logout_request = b'LogoutRequest' in data
except Exception:
CORE.exception("Could not parse SAML logout request/response.")
is_logout_request = False
if is_logout_request:
user = self.current_user
if not user or user.saml is None:
# The user is either already logged out or has no cookie because he signed in via IP and gets redirected to the FQDN
name_id = None
else:
name_id = user.saml.name_id
user.saml = None
http_args = self.sp.handle_logout_request(message, name_id, binding, relay_state=relay_state)
self.expire_session()
self.http_response(binding, http_args)
return
else:
response = self.sp.parse_logout_request_response(message, binding)
self.sp.handle_logout_response(response)
self._logout_success()
post = get
[docs]
class SamlLogout(SamlACS):
"""Initiate SAML Logout at the IDP"""
[docs]
def get(self):
user = self.current_user
if user is None or user.saml is None:
return self._logout_success()
# What if more than one
try:
data = self.sp.global_logout(user.saml.name_id)
except KeyError:
try:
tb = sys.exc_info()[2]
while tb.tb_next:
tb = tb.tb_next
if tb.tb_frame.f_code.co_name != 'entities':
raise
finally:
tb = None
# already logged out or UMC-Webserver restart
user.saml = None
data = {}
for logout_info in data.values():
if not isinstance(logout_info, tuple):
continue # result from logout, should be OK
binding, http_args = logout_info
if binding not in (BINDING_HTTP_POST, BINDING_HTTP_REDIRECT):
raise SamlError(self._).unknown_logout_binding(binding)
self.http_response(binding, http_args)
return
self._logout_success()
post = get