Source code for univention.management.console.modules.join

#!/usr/bin/python3
#
# Univention Management Console
#  module: system usage statistics
#
# SPDX-FileCopyrightText: 2011-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

import errno
import fcntl
import glob
import os
import re
import select
import socket
import subprocess
import tempfile
import traceback
from collections.abc import Callable

import apt_pkg
import dns.exception
import dns.resolver

import univention.management.console as umc
from univention.management.console.config import ucr
from univention.management.console.log import MODULE
from univention.management.console.modules import Base, UMC_Error
from univention.management.console.modules.decorators import SimpleThread, sanitize, simple_response
from univention.management.console.modules.sanitizers import BooleanSanitizer, ListSanitizer, StringSanitizer


_ = umc.Translation('univention-management-console-module-join').translate

CMD_ENABLE_EXEC = ['/usr/share/univention-updater/enable-apache2-umc', '--no-restart']
CMD_DISABLE_EXEC = ['/usr/share/univention-updater/disable-apache2-umc']
RE_HOSTNAME = re.compile(r'^[a-z]([a-z0-9-]*[a-z0-9])*(\.([a-z0-9]([a-z0-9-]*[a-z0-9])*[.])*[a-z0-9]([a-z0-9-]*[a-z0-9])*)?$')


[docs] def get_master_dns_lookup() -> dict: # DNS lookup for the Primary Directory Node entry msg = None fqdn = None try: domainname = ucr.get('domainname') query = f'_domaincontroller_master._tcp.{domainname}.' resolver = dns.resolver.Resolver() resolver.lifetime = 3.0 # max. 3 seconds result = resolver.query(query, 'SRV') if result: fqdn = result[0].target.canonicalize().split(1)[0].to_text() except dns.resolver.NXDOMAIN: MODULE.error("No record found for %s.", query) msg = _('No DNS record for the Primary Directory Node was found. This might be a problem with the configured DNS server. Please make sure the DNS settings are correct.') except dns.resolver.Timeout: MODULE.error("Timeout when looking up %s.", query) msg = _('The lookup of the Primary Directory Node record timed out. There might be a problem with the configured DNS server. Make sure the DNS server is up and running or check the DNS settings.') except dns.resolver.NoAnswer: MODULE.error("Non-Authoritative answer during lookup of %s.", query) except dns.exception.DNSException as exc: MODULE.exception("Error during Primary Directory Node lookup") msg = f'Error during Primary Directory Node lookup: {exc}.' return {'master': fqdn, 'error_message': msg}
[docs] class HostSanitizer(StringSanitizer): def _sanitize(self, value: str, name: str, further_args: list[str]) -> str: value = super()._sanitize(value, name, further_args) try: return socket.getfqdn(value) except socket.gaierror: # invalid FQDN self.raise_validation_error(_('The entered FQDN is not a valid value'))
[docs] class Progress: def __init__(self, max_steps=100): self.reset(max_steps)
[docs] def reset(self, max_steps: int = 100) -> None: self.max_steps = max_steps self.finished = False self.steps = 0 self.component = _('Initializing') self.info = '' self.errors = [] self.critical = False
[docs] def poll(self) -> dict: return { "finished": self.finished, "steps": 100 * float(self.steps) / self.max_steps, "component": self.component, "info": self.info, "errors": self.errors, "critical": self.critical, }
[docs] def finish(self) -> None: self.finished = True
[docs] def info_handler(self, info: str) -> None: MODULE.process(info) self.info = info
[docs] def error_handler(self, err: str) -> None: MODULE.warning(err) self.errors.append(err)
[docs] def component_handler(self, component: str) -> None: self.component = component
[docs] def critical_handler(self, critical: bool) -> None: self.critical = critical
[docs] def step_handler(self, steps: int) -> None: self.steps = steps
[docs] def add_steps(self, steps: int = 1) -> None: self.steps += steps
def _dummyFunc(*args: list) -> None: """dummy function that does nothing"""
[docs] def system_join( hostname: str, username: str, password: str, info_handler: Callable = _dummyFunc, error_handler: Callable = _dummyFunc, critical_handler: Callable = _dummyFunc, step_handler: Callable = _dummyFunc, component_handler: Callable = _dummyFunc) -> None: # get the number of join scripts n_joinscripts = len(glob.glob(f'{INSTDIR}/*.inst')) steps_per_script = 100.0 / (n_joinscripts + 1) with tempfile.NamedTemporaryFile() as passwordFile: passwordFile.write(password.encode('UTF-8')) passwordFile.flush() MODULE.process('Performing system join...') cmd = ['/usr/sbin/univention-join', '-dcname', hostname, '-dcaccount', username, '-dcpwd', passwordFile.name] return run(cmd, steps_per_script, info_handler, error_handler, critical_handler, step_handler, component_handler)
[docs] def run_join_scripts( scripts: list, force: bool, username: str, password: str, info_handler: Callable = _dummyFunc, error_handler: Callable = _dummyFunc, critical_handler: Callable = _dummyFunc, step_handler: Callable = _dummyFunc, component_handler: Callable = _dummyFunc) -> None: with tempfile.NamedTemporaryFile() as passwordFile: cmd = ['/usr/sbin/univention-run-join-scripts'] if username and password: # credentials are provided passwordFile.write(password.encode('UTF-8')) passwordFile.flush() cmd += ['-dcaccount', username, '-dcpwd', passwordFile.name] if force: cmd += ['--force'] if scripts: # if scripts are provided only execute them instead of running all join scripts cmd += ['--run-scripts', *scripts] else: # we need the number of join scripts for the progressbar scripts = os.listdir(INSTDIR) steps_per_script = 100.0 / (len(scripts) + 1) MODULE.process('Executing join scripts ...') return run(cmd, steps_per_script, info_handler, error_handler, critical_handler, step_handler, component_handler)
[docs] def run( cmd: list, steps_per_script: float, info_handler: Callable = _dummyFunc, error_handler: Callable = _dummyFunc, critical_handler: Callable = _dummyFunc, step_handler: Callable = _dummyFunc, component_handler: Callable = _dummyFunc) -> None: # disable restart of UMC server/web-server MODULE.info('disabling restart of UMC server/web-server') subprocess.call(CMD_DISABLE_EXEC) try: # regular expressions for output parsing error_pattern = re.compile(r'^\* Message:\s*(?P<message>.*)\s*$') joinscript_pattern = re.compile(r'(Configure|Running)\s+(?P<script>.*)\.inst.*$') info_pattern = re.compile(r'^(?P<message>.*?)\s*:?\s*\x1b.*$') # call to univention-join MODULE.info('calling "%s"', ' '.join(cmd)) process = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) failed_join_scripts = set() executed_join_scripts = set() def parse(line): if not line.strip(): return # parse output... first check for errors matches = error_pattern.match(line) if matches: message = matches.groupdict().get('message') error_handler(_("The system join process could not be completed:<br/><br/><i>%s</i><br/><br/> More details can be found in the log file <i>/var/log/univention/join.log</i>.<br/>Please retry after resolving any conflicting issues.") % message) if message.startswith(('ssh-login for', 'binddn for')): # invalid credentials or non existent user # do a critical error, the script will stop here critical_handler(True) return # check for currently called join script matches = joinscript_pattern.match(line) if matches: current_script = matches.groupdict().get("script") if current_script not in executed_join_scripts: executed_join_scripts.add(current_script) component_handler(_('Executing join scripts')) info_handler(_('Executing join script %s') % (current_script,)) step_handler(steps_per_script) if '\x1b[60Gfailed' in line: failed_join_scripts.add(current_script) return # check for other information matches = info_pattern.match(line) if matches: info_handler(matches.groupdict().get('message')) step_handler(steps_per_script / 10) return MODULE.process(repr(line.strip()).strip('"\'')) # make stdout file descriptor of the process non-blocking fd = process.stdout.fileno() fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) buf = '' while True: try: fd = select.select([process.stdout], [], [])[0][0] except IndexError: continue # not ready / no further data except OSError as exc: if exc.args[0] == errno.EINTR: continue raise # get the next lines line = fd.read().decode('utf-8', 'replace') eof = not line buf += line lines, _newline, buf = buf.rpartition('\n') # parse completed joinscript executions for line in lines.splitlines(): parse(line) # parse the currently executed joinscript if buf: parse(buf) if eof: break # no more text from stdout # get all remaining output stdout, stderr = process.communicate() stdout, stderr = stdout.decode('UTF-8', 'replace'), stderr.decode('UTF-8', 'replace') if stderr: MODULE.warning("stderr: %s", stderr) if process.returncode != 0: MODULE.warning("Could not perform system join: %s%s", stdout, stderr) error_handler(_('The join process could not be executed. More details can be found in the log file <i>/var/log/univention/join.log</i>.<br/>Please retry to join the system after resolving any conflicting issues.')) elif failed_join_scripts: MODULE.warning("The following join scripts could not be executed: %s", failed_join_scripts) error_handler(_('Some join scripts could not be executed. More details can be found in the log file <i>/var/log/univention/join.log</i>.<br/>Please retry to execute the join scripts after resolving any conflicting issues.')) finally: # make sure that UMC servers and apache can be restarted again MODULE.info('enabling UMC and apache server restart') subprocess.call(CMD_ENABLE_EXEC)
INSTDIR = '/usr/lib/univention-install' LOGFILE = '/var/log/univention/join.log' LOCKFILE = '/var/lock/univention_umc_join.lock' RE_JOINFILE = re.compile(r'^(?P<script>(?P<prio>\d\d)(?P<name>.+))\.(inst|uinst)$') RE_NOT_CONFIGURED = re.compile(r"^Warning: '([^']+)' is not configured.$") RE_ERROR = re.compile(r'^Error: (.*?)$')
[docs] class Instance(Base):
[docs] def init(self) -> None: os.umask(0o022) # umc umask is too restrictive for the join process and many listener modules self.progress_state = Progress()
[docs] @simple_response def dpkg_locked(self) -> bool: """ Do not execute join scripts when dpkg is running (e.g. via App Center) """ return self._dpkg_locked()
def _dpkg_locked(self) -> bool: fd = apt_pkg.get_lock('/var/lib/dpkg/lock') if fd == -1: return True else: os.close(fd) return False
[docs] @simple_response def query(self) -> list[str]: """collects status about join scripts""" # unjoined system? if not self._joined: return [] # List all join scripts files = {} for fname in os.listdir(INSTDIR): match = RE_JOINFILE.match(fname) if match: entry = match.groupdict() entry['configured'] = True entry['status'] = f'1:{entry["prio"]}' files[entry['name']] = entry # check for unconfigured scripts process = subprocess.Popen(['/usr/sbin/univention-check-join-status'], shell=False, stdout=subprocess.PIPE) stdout, _stderr = process.communicate() if process.returncode == 0: return list(files.values()) for line in stdout.decode('UTF-8', 'replace').splitlines(): # is there a general error? match = RE_ERROR.match(line) if match and not line.startswith('Error: Not all install files configured'): raise UMC_Error(_('Error: %s') % match.groups()[0]) # unconfigured script match = RE_NOT_CONFIGURED.match(line) if match: name = match.groups()[0] if name not in files: # The joinscripts does not exists in the filesystem or has a invalid name MODULE.error('not existing join script or join script with invalid name mentioned in status file: %r', name) continue files[name]['configured'] = False files[name]['status'] = f'0:{files[name]["prio"]}' return list(files.values())
[docs] @simple_response def joined(self) -> bool: return self._joined
[docs] @simple_response def progress(self) -> dict: return self.progress_state.poll()
[docs] @simple_response def running(self) -> bool: """returns true if a join script is running.""" return self._running
[docs] @simple_response def master(self) -> str: """returns the hostname of the Primary Directory Node as fqdn""" return get_master_dns_lookup()
@property def _joined(self) -> bool: return os.path.exists('/var/univention-join/joined') @property def _running(self) -> bool: return os.path.exists(LOCKFILE) def _lock(self) -> None: try: open(LOCKFILE, 'a').close() except OSError as ex: MODULE.warning("_lock: %s", ex) def _unlock(self) -> None: try: if self._running: os.unlink(LOCKFILE) except OSError as ex: MODULE.warning("_unlock: %s", ex) def __del__(self) -> None: self._unlock() # TODO: __finalize__?
[docs] @simple_response def logview(self) -> list[str]: """Returns the last 2MB of the join.log file""" with open(LOGFILE, 'rb') as fd: size = 2097152 try: fd.seek(max(os.stat(fd.name).st_size - size, 0)) except OSError: pass return fd.read(size).decode('utf-8', 'replace')
[docs] @sanitize( username=StringSanitizer(required=True, minimum=1), password=StringSanitizer(required=True, minimum=1), hostname=HostSanitizer(required=True, regex_pattern=RE_HOSTNAME), ) def join(self, request) -> bool: username, password, hostname = (request.options['username'], request.options['password'], request.options['hostname']) # Check if already a join process is running if self._running: raise UMC_Error(_('A join process is already running.')) # check for valid server role if ucr.get('server/role') == 'domaincontroller_master': raise UMC_Error(_('Invalid server role! A Primary Directory Node cannot be joined.')) # check for dpkg lock if self._dpkg_locked(): raise UMC_Error(_('Currently, software is being installed or uninstalled. Join scripts should not be run right now.')) def _thread(): self.progress_state.reset() self.progress_state.component = _('Domain join') self._lock() return system_join( hostname, username, password, info_handler=self.progress_state.info_handler, step_handler=self.progress_state.add_steps, error_handler=self.progress_state.error_handler, component_handler=self.progress_state.component_handler, critical_handler=self.progress_state.critical_handler, ) def _finished(thread, result: BaseException | None) -> None: MODULE.info('Finished joining') self._unlock() self.progress_state.info = _('finished...') self.progress_state.finish() if isinstance(result, BaseException): msg = ''.join(thread.trace + traceback.format_exception_only(*thread.exc_info[:2])) MODULE.warning("Exception during domain join", traceback=msg) self.progress_state.error_handler(_('An unexpected error occurred: %s') % result) # launch thread thread = SimpleThread('join', _thread, _finished) thread.run() self.finished(request.id, True, status=202)
[docs] @sanitize( username=StringSanitizer(required=False, minimum=1), password=StringSanitizer(required=False, minimum=1), scripts=ListSanitizer(required=True, min_elements=1), force=BooleanSanitizer(default=False), ) def run(self, request) -> None: """runs the given join scripts""" # Check if already a join process is running if self._running: raise UMC_Error(_('A join process is already running.')) # check for dpkg lock if self._dpkg_locked(): raise UMC_Error(_('Currently, software is being installed or uninstalled. Join scripts should not be run right now.')) scripts, username, password, force = (request.options['scripts'], request.options.get('username'), request.options.get('password'), request.options.get('force', False)) # sort scripts scripts.sort(key=lambda i: int(re.match(r'^(\d+)', i).group())) def _thread() -> bool: # reset progress state and lock against other join processes self.progress_state.reset() self.progress_state.component = _('Authentication') self._lock() return run_join_scripts( scripts, force, username, password, info_handler=self.progress_state.info_handler, step_handler=self.progress_state.add_steps, error_handler=self.progress_state.error_handler, component_handler=self.progress_state.component_handler, critical_handler=self.progress_state.critical_handler, ) def _finished(thread, result: BaseException | None) -> None: MODULE.info('Finished running join scripts') self._unlock() self.progress_state.info = _('finished...') self.progress_state.finish() if isinstance(result, BaseException): msg = ''.join(thread.trace + traceback.format_exception_only(*thread.exc_info[:2])) MODULE.warning("Exception during running join scripts", traceback=msg) self.progress_state.error_handler(_('An unexpected error occurred: %s') % result) # launch thread thread = SimpleThread('run-joinscripts', _thread, _finished) thread.run() self.finished(request.id, True, status=202)