Source code for univention.testing.license_client

#!/usr/bin/python3
# SPDX-FileCopyrightText: 2014-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""A tool to obtain licenses for the UCS test environments."""


import cgi
import logging
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from collections.abc import Iterable
from datetime import datetime
from html.parser import HTMLParser
from http.client import HTTPException, HTTPResponse, HTTPSConnection
from os import path
from sys import exit
from typing import Any
from urllib.parse import urlencode


[docs] class CredentialsMissing(Exception): """A custom exception to be raised when a 'license.secret' file is not found"""
[docs] class ShopParser(HTMLParser): def __init__(self, log: logging.Logger) -> None: HTMLParser.__init__(self) # old style class self.log = log self.link_to_license: str | None = None
[docs] def error(self, message): self.log.error("Failed parsing HTML: %s", message)
[docs] def handle_starttag(self, tag: str, attrs: Iterable[tuple[str, str | None]]) -> None: """ Method is called every time a new start tag is found in the html feed. When the link tag with 'orders/' attribute is found, the attribute is saved to 'self.link_to_license' """ self.log.debug("In 'handle_starttag' method: tag='%s', attrs='%s'", tag, attrs) if tag != 'a': return for (name, value) in attrs: if name != 'href': continue if not value: continue if not value.startswith('orders/'): continue self.link_to_license = value break
[docs] class TestLicenseClient: def __init__(self, parser: ArgumentParser | None = None) -> None: """Class constructor for the test license client and HTMLParser""" self.log = logging.getLogger("License_Client") self.setup_logging() self.parser = parser self.license_server_url = 'license.univention.de' self.license_filename = 'ValidTest.license' self.connection: HTTPSConnection | None = None self.server_username = 'ucs-test' self.server_password = '' self.secret_file = '/etc/license.secret' self.cookie = '' self.license_shop = 'testing' self.license_params: dict[str, Any] = { "kundeUnternehmen": "Univention", "kundeEmail": "umc-test@univention.de", "BaseDN": "", "EndDate": "", "Servers": 50, "Support": 0, "PremiumSupport": 0, "Users": 50, "ManagedClients": 50, "CorporateClients": 50, "VirtualDesktopUsers": 0, "VirtualDesktopClients": 0, "Type": "UCS", }
[docs] def setup_logging(self) -> None: """Creates and configures the logger with an INFO level""" self.log.setLevel(logging.INFO) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) ch.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")) self.log.addHandler(ch)
[docs] def create_connection(self) -> None: """ Creates a HTTPS connection instance on a default port (443) to the 'self.license_server_url' """ self.log.debug("In 'create_connection'") self.connection = HTTPSConnection(self.license_server_url, timeout=60)
[docs] def close_connection(self) -> None: """ Closes the license server connection if the connection instance was created """ self.log.debug("In 'close_connection'") if self.connection: try: self.connection.close() except HTTPException as exc: self.log.exception("An HTTP Exception occurred while closing the connection: '%s'", exc) finally: self.connection = None
[docs] def get_server_password(self, secret_file: str = '/etc/license.secret') -> None: """ Opens and reads the 'secret_file'. Saves the result to a 'self.server_password' """ self.log.debug("In 'get_server_password': secret_file='%s'", secret_file) if not path.exists(secret_file): self.log.critical("The '%s' secret file does not exist, cannot proceed without password", secret_file) raise CredentialsMissing("The '%s' secret file does not exist" % secret_file) try: with open(secret_file) as password: self.server_password = password.read() except (OSError, ValueError) as exc: self.log.exception("Failed to get the password from the '%s', an error occurred: %r", secret_file, exc) exit(1) if not self.server_password: self.log.critical("The password to access the license service cannot be empty") exit(1)
[docs] def make_post_request(self, url: str, body: str, headers: dict[str, str]) -> HTTPResponse: """ Makes a POST request with the given 'url', 'body', 'headers' and returns the response """ self.log.debug("In 'make_post_request' method: url='%s', body='%s', headers='%s'", url, body, headers) assert self.connection try: self.connection.request("POST", url, body, headers) return self.connection.getresponse() except HTTPException as exc: self.log.exception("An HTTP Exception occurred while making '%s' POST request: '%s'", url, exc) exit(1)
[docs] def make_get_request(self, url: str, headers: dict[str, str]) -> HTTPResponse: """ Makes a GET request with the given 'url', 'headers' and returns the response """ self.log.debug("In 'make_get_request' method: url='%s', headers='%s'", url, headers) assert self.connection try: self.connection.request("GET", url, headers=headers) return self.connection.getresponse() except HTTPException as exc: self.log.exception("An HTTP Exception occurred while making '%s' GET request: '%s'", url, exc) exit(1)
[docs] def get_the_license(self, body: str) -> None: """ Processes the given 'body' with HTMLParser to find the link to a created license file and downloads the license after. """ self.log.debug("In 'get_the_license' method: body='%s'", body) parser = ShopParser(self.log) parser.feed(body) # process the response 'body' for license link if not parser.link_to_license: self.log.critical("The link to the license file was not found in the body from server: '%s'", body) exit(1) self.download_license_file(parser.link_to_license)
[docs] def order_a_license(self) -> str: """ Makes a POST request with encoded 'self.license_params' as a body to order a new license. Returns the response body. """ self.log.debug("In 'order_a_license' method") body = self.license_params headers = { "Cookie": self.cookie, "Content-type": "application/x-www-form-urlencoded", } response = self.make_post_request(f'/shop/{self.license_shop}/order', urlencode(body), headers) assert response.status == 202 return self.get_body(response)
[docs] def get_body(self, response: HTTPResponse) -> str: self.log.debug("The response status is '%s', reason is '%s', headers are '%s'", response.status, response.reason, response.getheaders()) content_type = response.getheader('Content-Type') _mimetype, options = cgi.parse_header(content_type) encoding = options.get("charset", "ascii") return response.read().decode(encoding, "replace")
[docs] def download_license_file(self, link_to_license: str) -> None: """ Downloads the license located at `filename` and saves it to the file 'self.license_filename' """ self.log.debug("In 'download_license_file' method") headers = { "Cookie": self.cookie, "Accept": "text/plain", } response = self.make_get_request(f'/shop/{self.license_shop}/{link_to_license}', headers) try: body = self.get_body(response) with open(self.license_filename, 'w') as license_file: license_file.write(body) self.log.info("The license was written to file '%s'", self.license_filename) except (OSError, ValueError) as exc: self.log.exception("An error happened while writing the downloaded license to a file '%s': '%s'", self.license_filename, exc) exit(1)
[docs] def check_date_format(self) -> None: """Checks if the 'EndDate' format is correct.""" try: if self.license_params['EndDate'] != 'unlimited': datetime.strptime(self.license_params['EndDate'], '%d.%m.%Y') except ValueError as exc: self.log.exception("The 'EndDate' for the license has a wrong format, supported format is 'dd.mm.yyyy': %r", exc) exit(1)
[docs] def update_with_parsed_args(self, args: dict[str, Any]) -> None: """ Updates the loglevel and license filename settings if given among the parsed arguments. Merges parsed data with default license parameters. """ log_level = args.pop('LogLevel') if log_level: numeric_level = getattr(logging, log_level.upper(), None) if isinstance(numeric_level, int): self.log.setLevel(numeric_level) else: self.log.info("The LogLevel was not changed, unknown '%s' log level given", log_level) self.license_shop = args.pop('shop') self.server_username = args.pop('username') self.secret_file = args.pop('secret_file') license_file = args.pop('FileName') if license_file: self.license_filename = license_file self.log.debug("The filename for the license will be '%s'", self.license_filename) # merging parsed args with the default values: self.license_params.update((key, val) for key, val in args.items() if val is not None) self.log.info("Requested license parameters are: '%s'", self.license_params)
[docs] def process_cmd_arguments(self) -> None: """ Populates self.parser class with positional and optional arguments and processes the user input, checks the date format and than merges it with the default values in the 'self.license_params' dictionary """ self.log.debug("In 'process_cmd_arguments' method") assert self.parser self.parser.add_argument("BaseDN", help="A base DN for the license") self.parser.add_argument("EndDate", help="The date till which the license will be valid (max 1 year from now)") self.parser.add_argument("-f", "--FileName", help="The filename to be used for the issued license", default="ValidTest.license") self.parser.add_argument("-s", "--Servers", type=int, help="Max amount of servers allowed with the license", default=50) self.parser.add_argument("-u", "--Users", type=int, help="Max amount of users allowed with the license", default=50) self.parser.add_argument("-mc", "--ManagedClients", type=int, help="Max amount of managed clients allowed with the license", default=50) self.parser.add_argument("-cc", "--CorporateClients", type=int, help="Max amount of corporate clients allowed with the license", default=50) self.parser.add_argument("-ll", "--LogLevel", help="Logging level", choices=("INFO", "DEBUG", "ERROR", "CRITICAL"), default="INFO") self.parser.add_argument("--shop", help="The shop", default=self.license_shop) self.parser.add_argument("--username", help="username", default=self.server_username) self.parser.add_argument("--secret-file", help="password file", default=self.secret_file) opts = self.parser.parse_args() args = vars(opts) # converting Namespace to a dictionary self.log.debug("Parsed arguments are: '%s'", args) self.update_with_parsed_args(args)
[docs] def main(self, base_dn: str = "", end_date: str = "", server_url: str = "", license_file: str = "") -> None: """ A method to order and download a test license from the license server. 'base_dn' and 'end_date' should be provided if argument parser is not used. 'server_url' is an optional argument for the license shop server. 'license_file' is an optional argument for the license filename. """ self.log.debug("In 'main' method: server_url='%s', license_file='%s', base_dn='%s', end_date='%s'", server_url, license_file, base_dn, end_date) if self.parser: self.process_cmd_arguments() elif base_dn and end_date: self.license_params['BaseDN'] = base_dn self.license_params['EndDate'] = end_date else: self.log.error("The 'BaseDN' or/and 'EndDate' were not provided for the license to create") exit(1) self.check_date_format() if server_url: self.license_server_url = server_url if license_file: self.license_filename = license_file self.get_server_password(self.secret_file) try: self.create_connection() self.get_cookie() self.get_the_license(self.order_a_license()) finally: self.close_connection()
if __name__ == '__main__': Client = TestLicenseClient(ArgumentParser(description=__doc__, formatter_class=ArgumentDefaultsHelpFormatter)) Client.main()