Source code for univention.management.console.modules.adconnector

#!/usr/bin/python3
#
# Univention Management Console
#  module: AD connector
#
# SPDX-FileCopyrightText: 2011-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

import functools
import os.path
import re
import subprocess
import time
from contextlib import contextmanager

import ldap.dn
import ldap.filter
import ldb
import psutil
import tornado.process
from ldap import explode_rdn
from samba.auth import system_session
from samba.credentials import Credentials
from samba.param import LoadParm
from samba.samdb import SamDB

import univention.config_registry
from univention.lib import Translation, admember
from univention.management.console.base import Base, UMC_Error
from univention.management.console.config import ucr
from univention.management.console.log import MODULE
from univention.management.console.modules.decorators import file_upload, sanitize, simple_response
from univention.management.console.modules.mixins import ProgressMixin
from univention.management.console.modules.sanitizers import ChoicesSanitizer, StringSanitizer


_ = Translation('univention-management-console-module-adconnector').translate

FN_BINDPW = '/etc/univention/connector/ad/bindpw'
DO_NOT_CHANGE_PWD = '********************'


[docs] class ADNotAvailable(Exception): pass
[docs] @contextmanager def ucr_rollback(ucr, variables): ucr.load() old = {} for variable in variables: old[variable] = ucr.get(variable) try: yield except BaseException: univention.config_registry.frontend.ucr_update(ucr, old) raise
[docs] def test_connection(): """Search a query that should never fail: RDN of connector/ad/ldap/base""" base = ucr.get('connector/ad/ldap/base') rdn = explode_rdn(base)[0] p1, _stdout, stderr = adsearch(rdn) if stderr: MODULE.warning(stderr) if p1.returncode != 0: raise ADNotAvailable() return True
[docs] def adsearch(query): cmd = ['/usr/sbin/univention-adsearch', query] p1 = subprocess.Popen(cmd, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p1.communicate() stdout, stderr = stdout.decode('UTF-8', 'replace'), stderr.decode('UTF-8', 'replace') return p1, stdout, stderr
[docs] def guess_ad_domain_language(): ''' AD Connector supports "en" and "de", this check detects a German AD Domain and returns "en" as fallback. ''' _p1, stdout, stderr = adsearch('sAMAccountName=Domänen-Admins') if stderr: MODULE.warning('adsearch "sAMAccountName=Domänen-Admins" stderr: %s', stderr) for line in stdout.split('\n'): line = line.lower().strip() if line == 'samaccountname: domänen-admins': return 'de' return 'en'
[docs] def get_ad_binddn_from_name(base, server, username, password): lp = LoadParm() creds = Credentials() creds.guess(lp) creds.set_username(username) creds.set_password(password) binddn = f'cn={ldap.dn.escape_dn_chars(username)},cn=users,{base}' try: samdb = SamDB(url=f'ldap://{server}', session_info=system_session(), credentials=creds, lp=lp) res = samdb.search( base, scope=ldb.SCOPE_SUBTREE, expression=ldap.filter.filter_format('(samAccountName=%s)', [username]), attrs=['samaccountname']) if res.count == 1: binddn = res.msgs[0].get('dn', idx=0).extended_str() except ldb.LdbError as ex: MODULE.warning('get_dn_from_name() could not get binddn for user %s: %s', username, ex) return binddn
[docs] class Instance(Base, ProgressMixin): OPTION_MAPPING = ( ('LDAP_Host', 'connector/ad/ldap/host', ''), ('LDAP_Base', 'connector/ad/ldap/base', ''), ('LDAP_BindDN', 'connector/ad/ldap/binddn', ''), ('KerberosDomain', 'connector/ad/mapping/kerberosdomain', ''), ('PollSleep', 'connector/ad/poll/sleep', 5), ('RetryRejected', 'connector/ad/retryrejected', 10), ('DebugLevel', 'connector/debug/level', 2), ('DebugFunction', 'connector/debug/function', False), ('MappingSyncMode', 'connector/ad/mapping/syncmode', 'sync'), ('MappingGroupLanguage', 'connector/ad/mapping/group/language', 'de'), )
[docs] def init(self): self.__update_status()
[docs] def state(self, request): """ Retrieve current status of the Active Directory connection configuration and the service options: {} return: { 'configured' : (True|False), 'certificate' : (True|False), 'running' : (True|False) } """ self.__update_status() self.finished(request.id, { 'ssl_enabled': self.status_ssl, 'password_sync_enabled': self.status_password_sync, 'running': self.status_running, 'certificate': self.status_certificate, 'mode_admember': self.status_mode_admember, 'mode_adconnector': self.status_mode_adconnector, 'configured': self.status_mode_adconnector or self.status_mode_admember, 'server_role': ucr.get('server/role'), })
[docs] def load(self, request): """ Retrieve current status of the Active Directory connection configuration and the service options: {} return: { <all AD connector UCR variables> } """ result = {} for option, var, default in Instance.OPTION_MAPPING: result[option] = ucr.get(var, default) pwd_file = ucr.get('connector/ad/ldap/bindpw') result['passwordExists'] = bool(pwd_file and os.path.exists(pwd_file)) self.finished(request.id, result)
[docs] @sanitize( LDAP_Host=StringSanitizer(required=True), Host_IP=StringSanitizer(required=True), LDAP_Base=StringSanitizer(required=True), LDAP_BindDN=StringSanitizer(required=True), KerberosDomain=StringSanitizer(required=True), ) def adconnector_save(self, request): """ Saves the Active Directory connection configuration options: Host_IP: IP address of the AD server LDAP_Host: hostname of the AD server LDAP_Base: LDAP base of the AD server LDAP_BindDN: LDAP DN to use for authentication KerberosDomain: kerberos domain PollSleep: time in seconds between polls RetryRejected: how many time to retry a synchronisation MappingSyncMode: synchronisation mode MappingGroupLanguage: language of the AD server return: { 'success' : (True|False), 'message' : <details> } """ for umckey, ucrkey, default in Instance.OPTION_MAPPING: val = request.options.get(umckey, default) if val: if isinstance(val, bool): val = 'yes' if val else 'no' MODULE.info('Setting %s=%s', ucrkey, val) univention.config_registry.handler_set([f'{ucrkey}={val}']) ucr.load() if ucr.get('connector/ad/ldap/ldaps'): MODULE.info('Unsetting connector/ad/ldap/ldaps') univention.config_registry.handler_unset(['connector/ad/ldap/ldaps']) if ucr.get('connector/ad/ldap/port') == '636': MODULE.info('Setting ldap port to 389') univention.config_registry.handler_set(['connector/ad/ldap/port=389']) if request.options.get('LDAP_Password') not in (None, '', DO_NOT_CHANGE_PWD): fn = ucr.get('connector/ad/ldap/bindpw', FN_BINDPW) try: with open(fn, 'w') as fd: fd.write(request.options.get('LDAP_Password')) os.chmod(fn, 0o600) os.chown(fn, 0, 0) univention.config_registry.handler_set([f'connector/ad/ldap/bindpw={fn}']) except Exception as e: MODULE.info('Saving bind password failed (filename=%s ; exception=%s)', fn, e.__class__) self.finished(request.id, {'success': False, 'message': _('Saving bind password failed (filename=%(fn)s ; exception=%(exception)s)') % {'fn': fn, 'exception': str(e.__class__)}}) return ssldir = '/etc/univention/ssl/{}'.format(request.options.get('LDAP_Host')) if not os.path.exists(ssldir): self._create_certificate(request) return # enter a static host entry such that the AD server's FQDN can be resolved univention.config_registry.handler_set(['hosts/static/{Host_IP}={LDAP_Host}'.format(**request.options)]) # check for SSL support on AD side if admember.server_supports_ssl(server=request.options.get('LDAP_Host')): MODULE.process('Enabling SSL...') admember.enable_ssl() else: MODULE.warning('SSL is not supported') admember.disable_ssl() # UCR variables are set, and now we can try to guess the language of # the AD domain ad_lang = guess_ad_domain_language() univention.config_registry.handler_set([f'connector/ad/mapping/group/language={ad_lang}']) self.finished(request.id, {'success': True, 'message': _('Active Directory connection settings have been saved.')})
def _create_certificate(self, request): ssldir = '/etc/univention/ssl/{}'.format(request.options.get('LDAP_Host')) def _return(request, status): if not os.path.exists(ssldir): MODULE.error('Creation of certificate failed (%s)', ssldir) self.finished(request.id, {'success': False, 'message': _('Creation of certificate failed (%s)') % ssldir}) self.finished(request.id, {'success': True, 'message': _('Active Directory connection settings have been saved and a new certificate for the Active Directory server has been created.')}) cmd = ['/usr/sbin/univention-certificate', 'new', '-name', request.options['LDAP_Host']] MODULE.info('Creating new SSL certificate: %s', cmd) proc = tornado.process.Subprocess(cmd, stdout=subprocess.PIPE) proc.set_exit_callback(functools.partial(_return, request))
[docs] @file_upload def upload_certificate(self, request): def _return(stdout, request, fn, status): bufstdout = stdout.read().decode('UTF-8', 'replace') success = True if status == 0: message = _('Certificate has been uploaded successfully.') MODULE.info('Certificate has been uploaded successfully. status=%s\nSTDOUT:\n%s', status, bufstdout) try: self._enable_ssl_and_test_connection(fn) except UMC_Error: message = _('Could not establish connection. Either the certificate is wrong, the Active Directory server is unreachable or it does not support SSL.') success = False else: success = False message = _('Certificate upload or conversion failed.') MODULE.process('Certificate upload or conversion failed. status=%s\nSTDOUT:\n%s', status, bufstdout) self.finished(request.id, [{'success': success, 'message': message}]) upload = request.options[0]['tmpfile'] now = time.strftime('%Y%m%d_%H%M%S', time.localtime()) fn = f'/etc/univention/connector/ad/ad_cert_{now}.pem' cmd = ['/usr/bin/openssl', 'x509', '-inform', 'der', '-outform', 'pem', '-in', upload, '-out', fn] MODULE.info('Converting certificate into correct format: %s', cmd) proc = tornado.process.Subprocess(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) proc.set_exit_callback(functools.partial(_return, proc.stdout, request, fn))
[docs] @sanitize( action=ChoicesSanitizer(['start', 'stop'], required=True), ) @simple_response def service(self, action): self.__update_status() MODULE.info('State: action=%s status_running=%s', action, self.status_running) message = None if self.status_running and action == 'start': message = _('Active Directory Connector is already running. Nothing to do.') elif not self.status_running and action == 'stop': message = _('Active Directory Connector is already stopped. Nothing to do.') if message is not None: return {'success': True, 'message': message} def _run_it(self, request): result = subprocess.call(('service', 'univention-ad-connector', action)) success = not result if result: message = _('Switching running state of Active Directory Connector failed.') MODULE.info('Switching running state of Active Directory Connector failed. exitcode=%s', result) else: if action == 'start': message = _('Active Directory connection service has been started.') else: message = _('Active Directory connection service has been stopped.') return {'success': success, 'message': message} return _run_it
def __update_status(self): ucr.load() fn = ucr.get('connector/ad/ldap/certificate') self.status_ssl = ucr.is_true('connector/ad/ldap/ssl') self.status_password_sync = ucr.is_true('connector/ad/mapping/user/password/kinit') self.status_certificate = bool(fn and os.path.exists(fn)) self.status_running = self.__is_process_running('^([^ ]+)?python3.*univention.connector.ad.main$') self.status_mode_admember = admember.is_localhost_in_admember_mode(ucr) self.status_mode_adconnector = admember.is_localhost_in_adconnector_mode(ucr) def __is_process_running(self, command_pattern): pattern = re.compile(command_pattern) for proc in psutil.process_iter(): try: cmdline = proc.cmdline() if callable(proc.cmdline) else proc.cmdline except psutil.NoSuchProcess: continue if cmdline and pattern.match(' '.join(cmdline)): return True return False
[docs] @sanitize( username=StringSanitizer(required=True), password=StringSanitizer(required=True), ad_server_address=StringSanitizer(required=True), mode=StringSanitizer(default='admember'), ) @simple_response def check_domain(self, username, password, ad_server_address, mode): ad_domain_info = {} try: if mode == 'admember': admember.check_server_role() ad_domain_info = admember.lookup_adds_dc(ad_server_address) ad_server_ip = ad_domain_info['DC IP'] if mode == 'admember': admember.check_domain(ad_domain_info) admember.check_connection(ad_domain_info, username, password) admember.check_ad_account(ad_domain_info, username, password) except admember.invalidUCSServerRole as exc: # check_server_role() MODULE.warning('Failure: %s', exc) raise UMC_Error(_('The AD member mode can only be configured on a Primary Directory Node.')) except admember.failedADConnect as exc: # lookup_adds_dc() MODULE.warning('Failure: %s', exc) raise UMC_Error(_('Could not connect to AD Server %s. Please verify that the specified address is correct. (%s)') % (ad_server_address, f'check_domain: {exc}')) except admember.domainnameMismatch as exc: # check_domain() MODULE.warning('Failure: %s', exc) raise UMC_Error(_('The domain name of the AD Server (%(ad_domain)s) does not match the local UCS domain name (%(ucs_domain)s). For the AD member mode, it is necessary to setup a UCS system with the same domain name as the AD Server.') % {'ad_domain': ad_domain_info.get("Domain"), 'ucs_domain': ucr['domainname']}) except admember.connectionFailed as exc: # check_connection() MODULE.warning('Failure: %s', exc) raise UMC_Error(_('Could not connect to AD Server %s. Please verify that username and password are correct. (Details:\n%s)') % (ad_domain_info.get('DC DNS Name'), exc)) except admember.notDomainAdminInAD as exc: # check_ad_account() MODULE.warning('Failure: %s', exc) raise UMC_Error(_('The given user is not member of the Domain Admins group in Active Directory. This is a requirement for the Active Directory domain join.')) # final info dict that is returned... replace spaces in the keys with '_' MODULE.info('Preparing info dict...') info = {key.replace(' ', '_'): value for key, value in ad_domain_info.items()} info['ssl_supported'] = admember.server_supports_ssl(ad_server_ip) # try to get binddn info['LDAP_BindDN'] = get_ad_binddn_from_name(info['LDAP_Base'], ad_server_ip, username, password) MODULE.info(str(info)) return info
[docs] @sanitize( username=StringSanitizer(required=True), password=StringSanitizer(required=True), ad_server_address=StringSanitizer(required=True), ) @simple_response(with_progress=True) def admember_join(self, username, password, ad_server_address, progress): progress.title = _('Joining UCS into Active Directory domain') progress.total = 100.0 progress.warnings = [] overall_success = False MODULE.process(progress.title) def _progress(steps, msg): progress.current = float(steps) progress.message = msg MODULE.process(msg) time.sleep(0.2) def _err(exc=None, msg=None): exc_str = '' if exc is not None: exc_str = str(exc) or exc.__doc__ # if no message, take the doc string exc_class_name = exc.__class__.__name__ MODULE.error('Join process failed [%s]: %s', exc_class_name, exc_str) if msg: MODULE.error(msg) else: msg = _('An unexpected error occurred: %s') % exc_str progress.finish_with_result({ 'success': False, 'error': msg, 'warnings': progress.warnings, }) ad_domain_info = {} try: admember.check_server_role() ad_domain_info = admember.lookup_adds_dc(ad_server_address) ad_server_ip = ad_domain_info['DC IP'] _progress(5, _('Configuring time synchronization...')) admember.time_sync(ad_server_ip) admember.set_timeserver(ad_server_ip) _progress(10, _('Configuring DNS server...')) admember.set_nameserver([ad_server_ip]) admember.prepare_ucr_settings() _progress(15, _('Configuring Kerberos settings...')) admember.disable_local_heimdal() admember.disable_local_samba4() _progress(20, _('Configuring reverse DNS settings...')) admember.prepare_dns_reverse_settings(ad_domain_info) _progress(25, _('Configuring software components...')) _step_offset = 30.0 _nsteps = 35.0 def _step_handler(step): MODULE.process('Package manager progress: %.1f', step) progress.current = (step / 100.0) * _nsteps + _step_offset def _err_handler(err): MODULE.warning(err) progress.warnings.append(err) success = admember.remove_install_univention_samba(info_handler=MODULE.process, error_handler=_err_handler, step_handler=_step_handler) if not success: raise RuntimeError(_('An error occurred while installing necessary software components.')) _progress(65, _('Configuring synchronization from AD...')) admember.prepare_connector_settings(username, password, ad_domain_info) admember.disable_ssl() _progress(70, _('Renaming well known SID objects...')) admember.rename_well_known_sid_objects(username, password) _progress(75, _('Configuring Administrator account...')) admember.prepare_administrator(username, password) _progress(80, _('Running Samba join script...')) admember.run_samba_join_script(username, password) _progress(85, _('Configuring DNS entries...')) admember.add_domaincontroller_srv_record_in_ad(ad_server_ip, username, password) admember.add_host_record_in_ad(uid=username, bindpw=password, sso=True) admember.make_deleted_objects_readable_for_this_machine(username, password) admember.synchronize_account_position(ad_domain_info, username, password) _progress(90, _('Starting Active Directory connection service...')) admember.start_service('univention-ad-connector') _progress(95, _('Registering LDAP service entry...')) admember.add_admember_service_to_localhost() overall_success = True _progress(100, _('Join has been finished successfully.')) # error handling... except admember.invalidUCSServerRole as exc: _err(exc, _('The AD member mode can only be configured on a Primary Directory Node.')) except admember.failedADConnect as exc: _err(exc, _('Could not connect to AD Server %s. Please verify that the specified address is correct. (%s)') % (ad_domain_info.get('DC DNS Name'), f'admember_join: {exc}')) except admember.domainnameMismatch as exc: _err(exc, _('The domain name of the AD Server (%(ad_domain)s) does not match the local UCS domain name (%(ucs_domain)s). For the AD member mode, it is necessary to setup a UCS system with the same domain name as the AD Server.') % {'ad_domain': ad_domain_info["Domain"], 'ucs_domain': ucr['domainname']}) except admember.connectionFailed as exc: _err(exc, _('Could not connect to AD Server %s. Please verify that username and password are correct. (Details:\n%s)') % (ad_domain_info.get('DC DNS Name'), exc)) except admember.failedToSetAdministratorPassword as exc: _err(exc, _('Failed to set the password of the UCS Administrator to the Active Directory Administrator password.')) except admember.failedToCreateAdministratorAccount as exc: _err(exc, _('Failed to create the Administrator account in UCS.')) except admember.sambaSidNotSetForAdministratorAccount as exc: _err(exc, _('The sambaSID could not set for the Administrator account in UCS.')) except admember.failedToSearchForWellKnownSid as exc: _err(exc, _('Failed to search for the well known SID.')) except admember.failedToAddAdministratorAccountToDomainAdmins as exc: _err(exc, _('Failed to add the Administrator account to the Domain Admins group.')) except admember.timeSyncronizationFailed as exc: _err(exc, _('Could not synchronize the time between the UCS system and the Active Directory domain controller: %s') % exc) except RuntimeError as exc: _err(exc) except Exception as exc: # catch all other errors that are unlikely to occur _err(exc) MODULE.exception('Traceback') if not overall_success: _progress(100, _('Join has been finished with errors.')) admember.revert_ucr_settings() admember.revert_connector_settings() if hasattr(progress, 'result'): # some error probably occurred -> return the result in the progress return progress.result return {'success': success}
def _enable_ssl_and_test_connection(self, certificate_fname=None): with ucr_rollback(ucr, ['connector/ad/ldap/ssl', 'connector/ad/ldap/certificate']): if certificate_fname: univention.config_registry.handler_set([f'connector/ad/ldap/certificate={certificate_fname}']) server = ucr.get('connector/ad/ldap/host') if server: success = False if admember.server_supports_ssl(server): admember.enable_ssl() try: success = test_connection() except ADNotAvailable: success = False if not success: raise UMC_Error(_('Could not establish an encrypted connection. Either "%r" is not reachable or does not support encryption.') % server) else: MODULE.warning('connector is not configured yet, cannot test connection')
[docs] @simple_response def enable_ssl(self): self._enable_ssl_and_test_connection() return subprocess.call(['service', 'univention-ad-connector', 'restart'])
[docs] @simple_response def password_sync_service(self, enable=True): # kinit=true -> do not sync passwords, but use Kerberos authentication # kinit=false -> sync passwords value = str(not enable).lower() univention.config_registry.handler_set([f'connector/ad/mapping/user/password/kinit={value}']) return subprocess.call(['service', 'univention-ad-connector', 'restart'])
[docs] @simple_response def check_dcmaster_srv_rec(self): result = bool(admember.get_domaincontroller_srv_record(ucr.get('domainname'))) return {'success': result}