Source code for univention.management.console.modules.setup.checks.univention_join

# SPDX-FileCopyrightText: 2024-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

import shlex
import subprocess
from typing import Any

from packaging.version import Version

import univention.config_registry
from univention.lib.i18n import Translation
from univention.management.console.log import MODULE
from univention.management.console.modules import UMC_Error
from univention.management.console.modules.setup.util import _temporary_password_file, get_ucs_domain


UCR = univention.config_registry.ConfigRegistry()
UCR.load()
_ = Translation('univention-management-console-module-setup').translate


[docs] def set_role_and_check_if_join_will_work(role: str, master_fqdn: str, admin_username: str, admin_password: str) -> None: orig_role = UCR.get('server/role') try: univention.config_registry.handler_set(['server/role=%s' % (role,)]) with _temporary_password_file(admin_password) as password_file: p1 = subprocess.Popen([ 'univention-join', '-dcname', master_fqdn, '-dcaccount', admin_username, '-dcpwd', password_file, '-checkPrerequisites', ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) stdout, _stderr = p1.communicate() if p1.returncode != 0: messages = [line[11:] for line in stdout.decode("UTF-8", "replace").split('\n') if line.startswith("* Message: ")] raise UMC_Error(_( "univention-join -checkPrerequisites reported a problem. " "Output of check:\n\n", ) + "\n".join(messages)) finally: if orig_role: univention.config_registry.handler_set(['server/role=%s' % (orig_role,)]) else: univention.config_registry.handler_unset(['server/role'])
[docs] def receive_domaincontroller_master_information(dns: str, nameserver: str, address: str, username: str, password: str) -> dict[str, Any]: result: dict[str, Any] = {} result['domain'] = check_credentials_nonmaster(dns, nameserver, address, username, password) check_domain_has_activated_license(address, username, password) check_domain_is_higher_or_equal_version(address, username, password) return result
[docs] def check_credentials_nonmaster(dns: str, nameserver: str, address: str, username: str, password: str) -> str: domain = get_ucs_domain(nameserver) if dns else '.'.join(address.split('.')[1:]) if not domain: # Not checked... no UCS domain! raise UMC_Error(_('No UCS Primary Directory Node could be found at the address.')) with _temporary_password_file(password) as password_file: if subprocess.call(['univention-ssh', password_file, '%s@%s' % (username, address), '/bin/true']): raise UMC_Error(_('The connection to the UCS Primary Directory Node was refused. Please recheck the password.')) return domain
[docs] def check_domain_has_activated_license(address: str, username: str, password: str) -> None: appliance_name = UCR.get("umc/web/appliance/name") if not appliance_name: return # the license must only be checked in an appliance scenario valid_license = True error = None with _temporary_password_file(password) as password_file: try: license_uuid = subprocess.check_output([ 'univention-ssh', password_file, '%s@%s' % (username, address), '/usr/sbin/ucr', 'get', 'uuid/license', ], stderr=subprocess.STDOUT).decode('UTF-8').rstrip() except subprocess.CalledProcessError as exc: valid_license = False error = exc.output.decode("UTF-8") else: valid_license = len(license_uuid) == 36 error = _('The license %s is not valid.') % (license_uuid,) if not valid_license: raise UMC_Error(' '.join(( _('To install the {appliance_name} appliance it is necessary to have an activated UCS license on the Primary Directory Node.').format(appliance_name=appliance_name), _('During the check of the license status the following error occurred:\n{error}').format(error=error), )))
[docs] def check_domain_is_higher_or_equal_version(address: str, username: str, password: str) -> None: with _temporary_password_file(password) as password_file: try: master_ucs_version = subprocess.check_output(['univention-ssh', password_file, '%s@%s' % (username, address), 'echo $(/usr/sbin/ucr get version/version)-$(/usr/sbin/ucr get version/patchlevel)'], stderr=subprocess.STDOUT).rstrip().decode('UTF-8', 'replace') except subprocess.CalledProcessError: MODULE.exception('Failed to retrieve UCS version') return nonmaster_ucs_version = '{}-{}'.format(UCR.get('version/version'), UCR.get('version/patchlevel')) if Version(nonmaster_ucs_version) > Version(master_ucs_version): raise UMC_Error(_('The UCS version of the domain you are trying to join ({}) is lower than the local one ({}). This constellation is not supported.').format(master_ucs_version, nonmaster_ucs_version))
[docs] def check_for_school_domain(hostname: str, address: str, username: str, password: str) -> dict[str, Any]: MODULE.process('univention-join:school: check_for_school_domain(%r, %r, %r, %r)', hostname, address, username, '$PASSWORD') is_school_multiserver_domain = check_is_school_multiserver_domain(address, username, password) if is_school_multiserver_domain: server_school_roles = get_server_school_roles(hostname, address, username, password) else: server_school_roles = [] MODULE.process('univention-join:school: check_for_school_domain = %r', {'server_school_roles': server_school_roles, 'is_school_multiserver_domain': is_school_multiserver_domain}) return {'server_school_roles': server_school_roles, 'is_school_multiserver_domain': is_school_multiserver_domain}
[docs] def check_is_school_multiserver_domain(address: str, username: str, password: str) -> bool: MODULE.process('univention-join:school: check_is_school_multiserver_domain(%r, %r, %r)', address, username, '$PASSWORD') is_school_multiserver_domain = False with _temporary_password_file(password) as password_file: try: master_hostdn = subprocess.check_output([ 'univention-ssh', password_file, '%s@%s' % (username, address), '/usr/sbin/univention-config-registry', 'get', 'ldap/hostdn', ]).strip().decode('UTF-8') ldap_base = subprocess.check_output([ 'univention-ssh', password_file, '%s@%s' % (username, address), '/usr/sbin/univention-config-registry', 'get', 'ldap/base', ]).strip().decode('UTF-8') remote_cmd = ' '.join(shlex.quote(x) for x in [ 'univention-ldapsearch', '-D', f'cn=admin,{ldap_base}', '-y', '/etc/ldap.secret', '-b', f'{master_hostdn}', '(&(ucsschoolRole=dc_master:school:-)(!(ucsschoolRole=single_master:school:-))(univentionService=UCS@school))', 'dn', ]) is_school_multiserver_domain = f'dn: {master_hostdn}' in subprocess.check_output([ 'univention-ssh', '--no-split', password_file, '%s@%s' % (username, address), remote_cmd, ]).strip().decode('UTF-8').splitlines() except subprocess.CalledProcessError as exc: MODULE.error('univention-join:school: Could not query Primary Directory Node if the domain is a multiserver school domain: %s', exc) MODULE.process('univention-join:school: check_is_school_multiserver_domain = %r', is_school_multiserver_domain) return is_school_multiserver_domain
[docs] def get_server_school_roles(hostname: str, address: str, username: str, password: str) -> list[str]: MODULE.process('univention-join:school: get_server_school_roles(%r, %r, %r, %r)', hostname, address, username, '$PASSWORD') school_roles = [] with _temporary_password_file(password) as password_file: try: ldap_base = subprocess.check_output([ 'univention-ssh', password_file, '%s@%s' % (username, address), '/usr/sbin/univention-config-registry', 'get', 'ldap/base', ]).strip().decode('UTF-8') remote_cmd = ' '.join(shlex.quote(x) for x in [ 'univention-ldapsearch', '-D', f'cn=admin,{ldap_base}', '-y', '/etc/ldap.secret', '-LLL', f'(uid={hostname}$)', 'ucsschoolRole', ]) school_roles = subprocess.check_output([ 'univention-ssh', '--no-split', password_file, '%s@%s' % (username, address), remote_cmd, ]).strip().decode('UTF-8').splitlines()[1:] school_roles = [role.split()[-1] for role in school_roles] except (subprocess.CalledProcessError, IndexError) as exc: MODULE.error('univention-join:school: Could not query Primary Directory Node for ucsschoolRole: %s', exc) MODULE.process('univention-join:school: get_server_school_roles = %r', school_roles) return school_roles