#!/usr/bin/python3
#
# Univention Management Console
# module: system setup
#
# SPDX-FileCopyrightText: 2011-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
from __future__ import annotations
import copy
import json
import locale as _locale
import os
import re
import stringprep
import subprocess
import threading
import time
import traceback
from typing import Any
import lxml.etree
import psutil
import univention.config_registry
from univention.lib.admember import connectionFailed, failedADConnect, lookup_adds_dc
from univention.lib.i18n import Locale, Translation
from univention.management.console.log import MODULE
from univention.management.console.modules import Base, UMC_Error
from univention.management.console.modules.decorators import SimpleThread, sanitize, simple_response, threaded
from univention.management.console.modules.mixins import ProgressMixin
from univention.management.console.modules.sanitizers import IntegerSanitizer, PatternSanitizer, StringSanitizer
# FIXME: this triggers imports from univention-lib during build time test execution.
# This in effect imports univention-ldap which is not an explicit dependency for
# univention-lib as of writing.
# The try except can be removed as soon as the dependency is added in the
# univention-lib package.
try:
from univention.appcenter.app_cache import AppCache
except ImportError as exc:
MODULE.warning('Ignoring import error: %s', exc)
from univention.management.console.modules.setup import network, util
from univention.management.console.modules.setup.checks.ldap import check_if_uid_is_available
from univention.management.console.modules.setup.checks.repositories import get_unreachable_repository_servers
from univention.management.console.modules.setup.checks.univention_join import (
check_for_school_domain, receive_domaincontroller_master_information, set_role_and_check_if_join_will_work,
)
ucr = univention.config_registry.ConfigRegistry()
ucr.load()
_translation = Translation('univention-management-console-module-setup')
_ = _translation.translate
i18nXKeyboard = Translation('xkeyboard-config')
RE_IPV4 = re.compile(r'^interfaces/(([^/]+?)(_[0-9])?)/(address|netmask)$')
RE_IPV6_DEFAULT = re.compile(r'^interfaces/([^/]+)/ipv6/default/(prefix|address)$')
RE_SPACE = re.compile(r'\s+')
RE_SSL = re.compile(r'^ssl/.*')
[docs]
class RequestTimeout(UMC_Error):
msg = _('Request Timeout')
status = 408
[docs]
class Instance(Base, ProgressMixin):
def __init__(self, *args, **kwargs):
Base.__init__(self, *args, **kwargs)
ProgressMixin.__init__(self)
self._finishedLock = threading.Lock()
self._finishedResult = True
self._progressParser = util.ProgressParser()
self.__keep_alive_request = None
self._net_apply_running = 0
# reset umask to default
os.umask(0o022)
[docs]
def init(self) -> None:
os.putenv('LANG', str(self.locale))
_locale.setlocale(_locale.LC_ALL, str(self.locale))
if not util.is_system_joined():
self._preload_city_data()
def _preload_city_data(self) -> None:
util.get_city_data()
util.get_country_data()
def _get_localized_label(self, label_dict: dict[str, str]) -> str:
return label_dict.get(self.locale.language, '') or label_dict.get('en', '') or label_dict.get('', '')
[docs]
def ping(self, request):
if request.options.get('keep_alive'):
self.__keep_alive_request = request
return
self.finished(request.id, None)
[docs]
@simple_response
def close_browser(self) -> bool:
try:
with open('/var/cache/univention-system-setup/browser.pid', 'rb') as fd:
pid = int(fd.readline().strip())
process = psutil.Process(pid)
process.kill()
return True
except OSError as exc:
MODULE.warning('cannot open browser PID file: %s', exc)
except ValueError as exc:
MODULE.error('browser PID is not a number: %s', exc)
except psutil.NoSuchProcess as exc:
MODULE.error('cannot kill process with PID: %s', exc)
return False
[docs]
@simple_response
def load(self):
"""
Return a dict with all necessary values for system-setup read from the current
status of the system.
"""
return util.load_values(self.locale.language)
[docs]
@simple_response
def save_keymap(self, layout=None) -> bool:
"""
Set the systems x-keymap according to
request.options[keymap]
"""
# Don't set in debian installer mode
if ucr.is_true('system/setup/boot/installer'):
return True
if layout:
subprocess.call(['/usr/bin/setxkbmap', '-display', ':0', '-layout', layout])
return True
[docs]
def save(self, request):
'''
Reconfigures the system according to the values specified in the dict given as
option named "values".
'''
# get new values
values = request.options.get('values', {})
run_hooks = request.options.get('run_hooks', False)
script_args = []
if run_hooks:
# create a status file that indicates that save has been triggered
util.create_status_file()
# enforce particular arguments for setup scripts
script_args = ['--appliance-mode', '--force-recreate', '--demo-mode']
def _thread(request, obj):
# acquire the lock until the scripts have been executed
self._finishedResult = False
obj._finishedLock.acquire()
try:
subfolders = {
'network': ['30_net'],
'certificate': ['40_ssl'],
'languages': ['15_keyboard', '20_language', '35_timezone'],
}.get(request.flavor)
self._progressParser.reset(subfolders)
if request.flavor == 'setup':
# adjust progress fractions for setup wizard with pre-configurred settings
fractions = self._progressParser.fractions
fractions['05_role/10role'] = 0
fractions['10_basis/12domainname'] = 0
fractions['10_basis/14ldap_basis'] = 0
fractions['90_postjoin/10admember'] = 0
self._progressParser.calculateFractions()
MODULE.info('saving profile values')
util.write_profile(values)
if not values:
MODULE.error('No property "values" given for save().')
return False
# in case of changes of the IP address, restart UMC server and web server
# for this we ignore changes of virtual or non-default devices
# ... no need to restart the UMC server if cleanup scripts are run anyway
restart = False
if not run_hooks:
MODULE.info('Check whether ip addresses have been changed')
for ikey in values.keys():
if RE_IPV4.match(ikey) or RE_IPV6_DEFAULT.match(ikey) or RE_SSL.match(ikey):
restart = True
break
MODULE.info('Restart servers: %s', restart)
# on a joined system we can run the setup scripts
MODULE.info('runnning system setup scripts (flavor %r)', request.flavor)
util.run_scripts(self._progressParser, restart, subfolders, lang=str(self.locale), args=script_args)
# run cleanup scripts and appliance hooks if needed
if run_hooks:
util.cleanup(with_appliance_hooks=True)
# done :)
self._finishedResult = True
return True
finally:
obj._finishedLock.release()
def _finished(thread, result) -> None:
if self.__keep_alive_request:
self.finished(self.__keep_alive_request.id, None)
self.__keep_alive_request = None
if isinstance(result, BaseException):
msg = ''.join(thread.trace + traceback.format_exception_only(*thread.exc_info[:2]))
MODULE.warning('Exception during saving the settings', traceback=msg)
self._progressParser.current.errors.append(_('Encountered unexpected error during setup process: %s') % result)
self._progressParser.current.critical = True
self._finishedResult = True
thread = SimpleThread('save', _thread, _finished)
thread.run(request, self)
self.finished(request.id, None)
[docs]
@simple_response
def join(self, values=None, dcname: str | None = None, username: str | None = None, password: str | None = None) -> None:
'''
Join and reconfigure the system according to the values specified in the dict given as
option named "values".
'''
# get old and new values
orgValues = util.load_values()
values = values or {}
# determine new system role
oldrole = orgValues.get('server/role', '')
newrole = values.get('server/role', oldrole)
# create a status file that indicates that save has been triggered
util.create_status_file()
def _thread(obj, username: str, password: str) -> bool:
# acquire the lock until the scripts have been executed
self._finishedResult = False
obj._finishedLock.acquire()
try:
self._progressParser.reset()
# write the profile file and run setup scripts
util.auto_complete_values_for_join(values)
# on unjoined Primary Directory Node the nameserver must be set to the external nameserver
if newrole == 'domaincontroller_master' and not orgValues.get('joined'):
for i in range(1, 4):
# overwrite these values only if they are set, because the UMC module
# will save only changed values
if values.get('dns/forwarder%d' % i):
values['nameserver%d' % i] = values.get('dns/forwarder%d' % i)
MODULE.info('saving profile values')
util.write_profile(values)
# unjoined Primary Directory Node -> run the join script
MODULE.info('runnning system setup join script')
util.run_joinscript(self._progressParser, values, username, password, dcname, lang=str(self.locale))
# done :)
self._finishedResult = True
return True
finally:
obj._finishedLock.release()
def _finished(thread, result) -> None:
if self.__keep_alive_request:
self.finished(self.__keep_alive_request.id, None)
self.__keep_alive_request = None
if isinstance(result, BaseException):
msg = ''.join(thread.trace + traceback.format_exception_only(*thread.exc_info[:2]))
MODULE.warning('Exception during saving the settings', traceback=msg)
self._progressParser.current.errors.append(_('Encountered unexpected error during setup process: %s') % (result,))
self._progressParser.current.critical = True
self._finishedResult = True
thread = SimpleThread('join', _thread, _finished)
thread.run(self, username, password)
[docs]
@threaded
def check_finished(self, request) -> None:
"""
Check whether the join/setup scripts are finished. This method implements a long
polling request, i.e., the request is only finished at the moment when all scripts
have been executed or due to a timeout. If it returns because of the timeout, a new
try can be started.
"""
def progress_info(state, **kwargs):
info = {
'component': state.fractionName,
'info': state.message,
'errors': state.errors,
'critical': state.critical,
'steps': state.percentage,
}
info.update(kwargs)
MODULE.info('Progress state: %(steps).1f%% - %(component)s - %(info)s', info)
return info
# acquire the lock in order to wait for the join/setup scripts to finish
# do this for 30 sec and then return anyway
SLEEP_TIME = 0.200
WAIT_TIME = 30
ntries = WAIT_TIME / SLEEP_TIME
while not self._finishedLock.acquire(False):
if ntries <= 0 or self._progressParser.changed and self._progressParser.current:
state = self._progressParser.current
return progress_info(state, finished=False)
time.sleep(SLEEP_TIME)
ntries -= 1
self._finishedLock.release()
# scripts are done, return final result
# return all errors that we gathered throughout the setup
state = self._progressParser.current
return progress_info(state, finished=self._finishedResult)
[docs]
@simple_response(with_flavor=True)
def validate(self, values: dict | None = None, flavor: str | None = None):
'''
Validate the specified values given in the dict as option named "values".
Return a dict (with variable names as key) of dicts with the structure:
{ "valid": True/False, "message": "..." }
'''
# init variables
messages = []
values = values or {}
orgValues = util.load_values()
is_wizard_mode = flavor == 'wizard'
# determine new system role
newrole = values.get('server/role', orgValues.get('server/role', ''))
ad_member = values.get('ad/member', orgValues.get('ad/member', ''))
# mix original and new values
allValues = copy.copy(values)
for ikey, ival in orgValues.items():
if ikey not in allValues:
allValues[ikey] = ival
# helper functions
# TODO: 'valid' is not correctly evaluated in frontend
# i.e. if valid you may continue without getting message
def _check(key: str, check, message: str, critical: bool = True) -> None:
assert values is not None
if key not in values:
return
if not check(values[key]):
messages.append({
'message': message,
'valid': not critical,
'key': key,
})
def _append(key: str, message: str) -> None:
MODULE.warning('Validation failed for key %s: %s', key, message)
messages.append({
'key': key,
'valid': False,
'message': message,
})
# host and domain name
packages = set(values.get('components', []))
_check('hostname', util.is_hostname, _('The hostname or the hostname part of the fully qualified domain name is invalid. Please go back to the host setting and make sure, that the hostname only contains letter (a-zA-Z) and digits (0-9).'))
hostname_length_critical = ad_member or 'univention-samba' in packages or 'univention-samba4' in packages
appliance_str = _('the UCS system')
if ucr['umc/web/appliance/name']:
appliance_str = _('the %s appliance') % (ucr['umc/web/appliance/name'],)
hostname_length_message = _('A valid NetBIOS name can not be longer than 15 characters. If Samba is installed, the hostname should be shortened.') if hostname_length_critical else _('The hostname %s is longer than 15 characters. It will not be possible to install an Active Directory compatible Domaincontroller (Samba 4) or UCS@school. The hostname cannot be changed after the installation of %s. It is recommended to shorten the hostname to maximal 15 characters.') % (values.get('hostname', ''), appliance_str)
_check('hostname', lambda x: len(x) <= 15, hostname_length_message, critical=hostname_length_critical)
_check('domainname', util.is_domainname, _("Please enter a valid fully qualified domain name (e.g. host.example.com)."))
hostname = allValues.get('hostname', '')
domainname = allValues.get('domainname', '')
if hostname or domainname:
if len('%s%s' % (hostname, domainname)) >= 63:
_append('domainname', _('The length of fully qualified domain name is greater than 63 characters.'))
if hostname == domainname.split('.')[0]:
_append('domainname', _("Hostname is equal to domain name."))
if is_wizard_mode and not util.is_system_joined():
if newrole == 'domaincontroller_master' and not values.get('domainname'):
_append('domainname', _("No fully qualified domain name has been specified for the system."))
elif not values.get('hostname'):
_append('hostname', _("No hostname has been specified for the system."))
# windows domain
_check('windows/domain', lambda x: x == x.upper(), _("The windows domain name can only consist of upper case characters."))
_check('windows/domain', lambda x: len(x) <= 15, _("The windows domain name cannot be longer than 15 characters."))
_check('windows/domain', util.is_windowsdomainname, _("The windows domain name is not valid."))
# LDAP base
_check('ldap/base', util.is_ldap_base, _("The LDAP base may neither contain blanks nor any special characters. Its structure needs to consist of at least two relative distinguished names (RDN) which may only use the attribute tags 'dc', 'cn', 'c', 'o', or 'l' (e.g., dc=test,dc=net)."))
# root password
_check('root_password', lambda x: len(x) >= 8, _("The root password is too short. For security reasons, your password must contain at least 8 characters."))
_check('root_password', util.is_ascii, _("The root password may only contain ascii characters."))
# ssl + email
labels = {
'ssl/country': _('Country'),
'ssl/state': _('State'),
'ssl/locality': _('Location'),
'ssl/organization': _('Organization'),
'organization': _('Organization'),
'ssl/organizationalunit': _('Business unit'),
'ssl/email': _('Email address'),
'email_address': _('Email address'),
'ssl/common': _('Common name for the root SSL certificate'),
}
for maxlenth, keys in [(2, ('ssl/country',)), (128, ('ssl/state', 'ssl/locality')), (64, ('organization', 'ssl/organization', 'ssl/organizationalunit', 'ssl/email', 'email_address', 'ssl/common'))]:
for ikey in keys:
_check(ikey, lambda x, maxlenth=maxlenth: len(x) <= maxlenth, _('The following value is too long, only %(max)s characters allowed: %(name)s') % {'max': maxlenth, 'name': labels[ikey]})
for ikey in ('ssl/country', 'ssl/state', 'ssl/locality', 'ssl/organization', 'ssl/organizationalunit', 'ssl/email', 'ssl/common'):
for table in (stringprep.in_table_c21_c22, stringprep.in_table_a1, stringprep.in_table_c8, stringprep.in_table_c3, stringprep.in_table_c4, stringprep.in_table_c5, lambda c: c == '\ufffd'):
_check(ikey, lambda x, table=table: not any(map(table, x)), _('The value for %s contains invalid characters.') % (labels[ikey],))
_check('ssl/country', lambda x: len(x) == 2, _('Country must be a country code consisting of 2 characters.'))
for ikey in ['ssl/email', 'email_address']:
_check(ikey, lambda x: x.find('@') > 0, _("Please enter a valid email address"))
# net
try:
interfaces = network.Interfaces()
interfaces.from_dict(allValues.get('interfaces', {}))
interfaces.check_consistency()
except network.DeviceError as exc:
_append('interfaces', str(exc))
# validate the primary network interface
_check('interfaces/primary', lambda x: not x or x in interfaces, _('The primary network device must exist.'))
# check nameservers
for ikey, iname in [('nameserver[1-3]', _('Domain name server')), ('dns/forwarder[1-3]', _('External name server'))]:
reg = re.compile('^(%s)$' % ikey)
for jkey, jval in values.items():
if reg.match(jkey):
if not values.get(jkey):
# allow empty value
continue
_check(jkey, util.is_ipaddr, _('The specified IP address (%(name)s) is not valid: %(value)s') % {'name': iname, 'value': jval})
if is_wizard_mode and not util.is_system_joined() and (newrole not in ['domaincontroller_master'] or ad_member):
if all(nameserver in values and not values[nameserver] for nameserver in ('nameserver1', 'nameserver2', 'nameserver3')):
# 'nameserver1'-key exists → widget is displayed → = not in UCS/debian installer mode
if not any(interface.ip4dynamic or interface.ip6dynamic for interface in interfaces.values()):
_append('nameserver1', _('A domain name server needs to be specified.'))
# _append('nameserver1', _('At least one domain name server needs to be given if DHCP or SLAAC is not specified.'))
# see whether the domain can be determined automatically
ucr.load()
guessed_domain = None
for obj in [values, ucr]:
for nameserver in ('nameserver1', 'nameserver2', 'nameserver3'):
nameserver = obj.get(nameserver)
if nameserver:
guessed_domain = None
if obj.get('ad/member') and obj.get('ad/address'):
try:
ad_domain_info = lookup_adds_dc(obj.get('ad/address'), ucr={'nameserver1': nameserver})
except failedADConnect:
pass
else:
guessed_domain = ad_domain_info['Domain']
else:
guessed_domain = util.get_ucs_domain(nameserver)
if guessed_domain:
differing_domain_name = values.get('domainname') and values['domainname'].lower() != guessed_domain.lower()
if differing_domain_name:
_append('domainname', _('The specified domain name is different to the %s domain name found via the configured DNS server: %s') % (_('Active Directory') if ad_member else _('UCS'), guessed_domain))
else:
# communicate guessed domainname to frontend
messages.append({
'valid': True,
'key': 'domainname',
'value': guessed_domain,
})
break
if guessed_domain:
break
if not guessed_domain:
if not values.get('domainname'):
_append('domainname', _("Cannot automatically determine the domain. Please specify the server's fully qualified domain name."))
if values.get('nameserver1') and values.get('start/join'):
_append('nameserver1', _('The specified nameserver %s is not part of a valid UCS domain.') % (values['nameserver1'],))
# check gateways
if values.get('gateway'): # allow empty value
_check('gateway', util.is_ipv4addr, _('The specified gateway IPv4 address is not valid: %s') % values.get('gateway'))
if values.get('ipv6/gateway'): # allow empty value
_check('ipv6/gateway', util.is_ipv6addr, _('The specified gateway IPv6 address is not valid: %s') % values.get('ipv6/gateway'))
# proxy
_check('proxy/http', util.is_proxy, _('The specified proxy address is not valid (e.g., http://10.201.1.1:8080): %s') % allValues.get('proxy/http', ''))
# software checks
if 'univention-samba' in packages and 'univention-samba4' in packages:
_append('components', _('It is not possible to install Samba 3 and Samba 4 on one system. Please select only one of these components.'))
return messages
[docs]
@sanitize(pattern=PatternSanitizer(default='.*', required=True, add_asterisks=True))
@simple_response
def lang_locales(self, pattern, category='language_en'):
"""Return a list of all available locales."""
return util.get_available_locales(pattern, category)
[docs]
def lang_timezones(self, request):
"""Return a list of all available time zones."""
try:
fd = open('/usr/share/univention-system-setup/locale/timezone')
except OSError:
MODULE.error('Cannot find locale data for timezones in /usr/share/univention-system-setup/locale')
self.finished(request.id, None)
return
timezones = [i.strip('\n') for i in fd if not i.startswith('#')]
self.finished(request.id, timezones)
[docs]
@simple_response
def lang_keyboard_model(self):
"""Return a list of all available keyboard models."""
tree = lxml.etree.parse(open('/usr/share/X11/xkb/rules/base.xml')) # noqa: S320
models = tree.xpath("//model")
model_result = [{
'label': i18nXKeyboard.translate(model.xpath('./configItem/description')[0].text),
'id': model.xpath('./configItem/name')[0].text,
} for model in models]
return model_result
[docs]
@simple_response
def lang_keyboard_layout(self):
"""Return a list of all available keyboard layouts."""
tree = lxml.etree.parse(open('/usr/share/X11/xkb/rules/base.xml')) # noqa: S320
layouts = tree.xpath("//layout")
layout_result = [{
'label': i18nXKeyboard.translate(layout.xpath('./configItem/description')[0].text),
'id': layout.xpath('./configItem/name')[0].text,
'language': layout.xpath('./configItem/shortDescription')[0].text,
'countries': ':'.join([icountry.text for icountry in layout.xpath('./configItem/countryList/*')]),
} for layout in layouts]
return layout_result
[docs]
@sanitize(keyboardlayout=StringSanitizer(default='us'))
@simple_response
def lang_keyboard_variante(self, keyboardlayout):
"""Return a list of all available keyboard variantes."""
variante_result = []
tree = lxml.etree.parse(open('/usr/share/X11/xkb/rules/base.xml')) # noqa: S320
layouts = tree.xpath("//layout")
for layout in layouts:
layoutID = layout.xpath("./configItem/name")[0].text
if layoutID != keyboardlayout:
continue
variants = layout.xpath("./variantList/variant")
variante_result += [{
'label': i18nXKeyboard.translate(variant.xpath('./configItem/description')[0].text),
'id': variant.xpath('./configItem/name')[0].text,
} for variant in variants]
variante_result.insert(0, {'label': '', 'id': ''})
return variante_result
[docs]
def lang_countrycodes(self, request):
"""Return a list of all countries with their two letter chcountry codes."""
country_data = util.get_country_data()
countries = [{
'id': icountry,
'label': self._get_localized_label(idata.get('label', {})),
}
for icountry, idata in country_data.items()
if idata.get('label')]
# add the value from ucr value to the list
# this is required because invalid values will be unset in frontend
# Bug #26409
tmpUCR = univention.config_registry.ConfigRegistry()
tmpUCR.load()
ssl_country = tmpUCR.get('ssl/country')
if ssl_country not in [i['id'] for i in countries]:
countries.append({'label': ssl_country, 'id': ssl_country})
self.finished(request.id, countries)
[docs]
def net_apply(self, request):
if self._net_apply_running > 0:
# do not start another process applying the network settings
return False
values = request.options.get('values', {})
demo_mode = request.options.get('demo_mode', False)
def _thread(obj):
obj._net_apply_running += 1
MODULE.process('Applying network settings')
with util.written_profile(values):
util.run_networkscrips(demo_mode)
def _finished(thread, result):
self._net_apply_running -= 1
self.finished(request.id, True)
thread = SimpleThread('net_apply', _thread, _finished)
thread.run(self)
[docs]
@simple_response
def net_apply_check_finished(self):
if self._net_apply_running > 0:
# raise an error if net_apply command is still running...
# this allows long polling on the client side (poll until successful request)
raise RequestTimeout()
return self._net_apply_running == 0
[docs]
@simple_response
def net_interfaces(self):
"""Return a list of all available network interfaces."""
return [idev['name'] for idev in util.detect_interfaces()]
# workaround: use with_progress to make the method threaded
[docs]
@simple_response(with_progress=True)
def net_dhclient(self, interface, timeout=10):
'''
Request a DHCP address. Expects as options a dict containing the key
"interface" and optionally the key "timeout" (in seconds).
'''
return util.dhclient(interface, timeout)
[docs]
@sanitize(locale=StringSanitizer(default='en_US'))
@simple_response
def reset_locale(self, locale):
locale = Locale(locale)
locale.codeset = self.locale.codeset
MODULE.info('Switching language to: %s', locale)
os.putenv('LANG', str(self.locale))
try:
_locale.setlocale(_locale.LC_ALL, str(locale))
except _locale.Error:
MODULE.warning('Locale %s is not supported, using fallback locale "C" instead.', locale)
_locale.setlocale(_locale.LC_ALL, 'C')
self.locale = locale
# dynamically change the translation methods
_translation.set_language(str(self.locale))
i18nXKeyboard.set_language(str(self.locale))
network._translation.set_language(str(self.locale))
AppCache().clear_cache()
[docs]
@sanitize(pattern=StringSanitizer(), max_results=IntegerSanitizer(minimum=1, default=5))
@simple_response
def find_city(self, pattern: str, max_results: int) -> list | None:
pattern = pattern.lower()
MODULE.info('pattern: %s', pattern)
if not pattern:
return []
# for the given pattern, find matching cities
city_data = util.get_city_data()
matches = []
for icity in city_data:
match = None
for jlabel in icity.get('label', {}).values():
label = jlabel.lower()
if pattern in label:
# matching score is the overlap if the search pattern and the matched text
# (as fraction between 0 and 1)
match_score = len(pattern) / float(len(label))
if match and match_score < match['match_score']:
# just keep the best match of a city
continue
if match_score > 0.1:
# found a match with more than 10% overlap :)
match = icity.copy()
match['match'] = jlabel
match['match_score'] = match_score
if match:
matches.append(match)
MODULE.info('Search for pattern "%s" with %s matches', pattern, len(matches))
if not matches:
return None
# add additional score w.r.t. the population size of the city
# such that the largest city gains additional 0.4 on top
max_population = max(imatch['population'] for imatch in matches)
weighted_inv_max_population = 0.6 / float(max_population)
for imatch in matches:
imatch['final_score'] = imatch['match_score'] + weighted_inv_max_population * imatch['population']
# sort matches...
matches.sort(key=lambda x: x['final_score'], reverse=True)
MODULE.info('Top 5 matches: %s', json.dumps(matches[:5], indent=2))
matches = matches[:max_results]
# add additional information about keyboard layout, time zone etc. and
# get the correct localized labels
country_data = util.get_country_data()
for imatch in matches:
match_country = country_data.get(imatch.get('country'))
if match_country:
imatch.update(util.get_random_nameserver(match_country))
imatch.update({
"default_lang": match_country.get('default_lang'),
"country_label": self._get_localized_label(match_country.get('label', {})),
"label": self._get_localized_label(imatch.get('label')) or imatch.get('match'),
})
return matches
[docs]
@simple_response
def apps_query(self) -> list[dict[str, Any]]:
return util.get_apps(True)
[docs]
@simple_response
def check_domain(self, role: str, nameserver: str) -> dict[str, Any]:
result = {}
if role == 'ad':
try:
ad_domain_info = lookup_adds_dc(nameserver)
dc = ad_domain_info['DC DNS Name']
if dc:
result['dc_name'] = dc
domain = ad_domain_info['Domain']
result['domain'] = domain
result['ucs_master'] = util.is_ucs_domain(nameserver, domain)
ucs_master_fqdn = util.resolve_domaincontroller_master_srv_record(nameserver, domain)
result['ucs_master_fqdn'] = ucs_master_fqdn
result['ucs_master_reachable'] = util.is_ssh_reachable(ucs_master_fqdn)
except (failedADConnect, connectionFailed) as exc:
MODULE.warning('ADDS DC lookup failed: %s', exc)
elif role == 'nonmaster':
domain = util.get_ucs_domain(nameserver)
if domain:
fqdn = util.resolve_domaincontroller_master_srv_record(nameserver, domain)
else:
fqdn = util.get_fqdn(nameserver)
if fqdn:
result['dc_name'] = fqdn
domain = '.'.join(fqdn.split('.')[1:])
result['ucs_master'] = util.is_ucs_domain(nameserver, domain)
return result
[docs]
@simple_response
def check_domain_join_information(self, domain_check_role: str, role: str, dns: str, nameserver: str, address: str, username: str, password: str) -> dict[str, str]:
result = {}
if domain_check_role == 'ad':
domain = util.check_credentials_ad(nameserver, address, username, password)
result['domain'] = domain
if dns: # "dns" means we don't want to replace the existing Primary Directory Node
ucs_master_fqdn = util.resolve_domaincontroller_master_srv_record(nameserver, domain)
if ucs_master_fqdn:
# if we found a _domaincontroller_master._tcp SRV record the system will be a Backup/Replica Directory Node or a Managed Node.
# We need to check the credentials of this system, too, so we ensure that the System is reachable via SSH.
# Otherwise the join will fail with strange error like "ping to ..." failed.
result.update(receive_domaincontroller_master_information(False, nameserver, ucs_master_fqdn, username, password))
set_role_and_check_if_join_will_work(role, ucs_master_fqdn, username, password)
elif domain_check_role == 'nonmaster':
result.update(receive_domaincontroller_master_information(dns, nameserver, address, username, password))
set_role_and_check_if_join_will_work(role, address, username, password)
# Primary Directory Node? no domain check necessary
return result
[docs]
@simple_response
def check_repository_accessibility(self) -> list[str]:
return get_unreachable_repository_servers()
[docs]
@simple_response
def check_uid(self, uid: str, role: str, address: str, username: str, password: str) -> bool:
return check_if_uid_is_available(uid, role, address, username, password)