Source code for univention.testing.umc

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# UCS test connections to remote UMC Servers
#
# Copyright 2016-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

from __future__ import print_function

import pprint
import sys
from typing import Any, Iterable, Optional, Tuple  # noqa: F401

import requests
from six.moves.html_parser import HTMLParser

from univention.config_registry import ConfigRegistry
from univention.lib.umc import Client as _Client


[docs]class Client(_Client): print_response = True print_request_data = True
[docs] @classmethod def get_test_connection(cls, hostname=None, *args, **kwargs): # type: (Optional[str], *Any, **Any) -> Client ucr = ConfigRegistry() ucr.load() username = ucr.get('tests/domainadmin/account') username = username.split(',')[0][len('uid='):] password = ucr.get('tests/domainadmin/pwd') return cls(hostname, username, password, *args, **kwargs)
[docs] def umc_command(self, *args, **kwargs): # type: (*Any, **Any) -> Client self.print_request_data = kwargs.pop('print_request_data', True) self.print_response = kwargs.pop('print_response', True) try: return super(Client, self).umc_command(*args, **kwargs) finally: self.print_request_data = True self.print_response = True
[docs] def request(self, method, path, data=None, headers=None): # type: (str, str, Any, Any) -> Any print('') print('*** UMC request: "%s %s" %s' % (method, path, '(%s)' % (data.get('flavor'),) if isinstance(data, dict) else '')) if self.print_request_data: print('UMC request payload: \n%s' % (pprint.pformat(data), )) try: response = super(Client, self).request(method, path, data, headers) except Exception: print('UMC request failed: %s' % (sys.exc_info()[1],)) print('') raise if self.print_response: print('*** UMC response: \n%s\n***' % (pprint.pformat(response.data),)) else: print('*** UMC response received') print('') return response
[docs]class SamlLoginError(Exception): pass
[docs]class GetHtmlTagValue(HTMLParser, object): def __init__(self, tag, condition, value_name): # type: (str, Tuple[str, str], str) -> None self.tag = tag self.condition = condition self.value_name = value_name self.value = None # type: Optional[str] super(GetHtmlTagValue, self).__init__()
[docs] def handle_starttag(self, tag, attrs): # type: (str, Iterable[Tuple[str, Optional[str]]]) -> None if tag == self.tag and self.condition in attrs: for attr in attrs: if attr[0] == self.value_name: self.value = attr[1]
[docs]def get_html_tag_value(page, tag, condition, value_name): # type: (str, str, Tuple[str, str], str) -> str htmlParser = GetHtmlTagValue(tag, condition, value_name) htmlParser.feed(page) htmlParser.close() assert htmlParser.value is not None return htmlParser.value
[docs]class ClientSaml(Client):
[docs] def authenticate(self, *args): # type: (*Any) -> None self.authenticate_saml(*args)
[docs] def authenticate_saml(self, *args): # type: (*Any) -> None self.__samlSession = requests.Session() saml_login_url = "https://%s/univention/saml/" % self.hostname print('GET SAML login form at: %s' % saml_login_url) saml_login_page = self.__samlSession.get(saml_login_url) saml_login_page.raise_for_status() saml_idp_login_ans = self._login_at_idp_with_credentials(saml_login_page) print('SAML message received from %s' % saml_idp_login_ans.url) self._send_saml_response_to_sp(saml_idp_login_ans) self.cookies.update(self.__samlSession.cookies.items())
def _login_at_idp_with_credentials(self, saml_login_page): # type: (Any) -> Any """Send login form to IdP""" auth_state = get_html_tag_value(saml_login_page.text, 'input', ('name', 'AuthState', ), 'value') data = {'username': self.username, 'password': self.password, 'AuthState': auth_state} print('Post SAML login form to: %s' % saml_login_page.url) saml_idp_login_ans = self.__samlSession.post(saml_login_page.url, data=data) saml_idp_login_ans.raise_for_status() if 'umcLoginWarning' in saml_idp_login_ans.text: raise SamlLoginError('Login failed?:\n{}'.format(saml_idp_login_ans.text)) return saml_idp_login_ans def _send_saml_response_to_sp(self, saml_idp_login_ans): # type: (Any) -> None sp_login_url = get_html_tag_value(saml_idp_login_ans.text, 'form', ('method', 'post', ), 'action') saml_msg = get_html_tag_value(saml_idp_login_ans.text, 'input', ('name', 'SAMLResponse', ), 'value') relay_state = get_html_tag_value(saml_idp_login_ans.text, 'input', ('name', 'RelayState', ), 'value') print('Post SAML msg to: %s' % sp_login_url) self.__samlSession.post(sp_login_url, data={'SAMLResponse': saml_msg, 'RelayState': relay_state}).raise_for_status()