#!/usr/bin/python3
# SPDX-FileCopyrightText: 2014-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""Python library for AD Member Mode."""
from __future__ import annotations
import ipaddress
import locale
import os
import socket
import subprocess
import sys
import tempfile
import time
from collections import namedtuple
from datetime import datetime, timedelta
from shlex import quote
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
import dns.resolver
import ldap
import ldap.sasl
from ldap.filter import filter_format
from samba.dcerpc import nbt, security
from samba.dcerpc.security import DOMAIN_RID_ADMINISTRATOR, DOMAIN_RID_ADMINS
from samba.ndr import ndr_unpack
from samba.net import Net
from samba.param import LoadParm
import univention.debug as ud
import univention.lib.package_manager
import univention.uldap
from univention.config_registry import ConfigRegistry
from univention.config_registry.interfaces import Interfaces
from univention.lib.misc import custom_groupname
if TYPE_CHECKING:
from collections.abc import Callable, Iterable
# Ensure univention debug is initialized
[docs]
def initialize_debug() -> None:
# Use a little hack to determine if univention.debug has been initialized
# get_level(..) returns always ud.ERROR if univention.debug is not initialized
oldLevel = ud.get_level(ud.MODULE)
if oldLevel == ud.PROCESS:
ud.set_level(ud.MODULE, ud.DEBUG)
is_ready = (ud.get_level(ud.MODULE) == ud.DEBUG)
else:
ud.set_level(ud.MODULE, ud.PROCESS)
is_ready = (ud.get_level(ud.MODULE) == ud.PROCESS)
if not is_ready:
ud.init('/var/log/univention/join.log', ud.FLUSH, ud.FUNCTION)
ud.set_level(ud.MODULE, ud.PROCESS)
else:
ud.set_level(ud.MODULE, oldLevel)
[docs]
class failedToSetService(Exception):
"""ucs_addServiceToLocalhost failed"""
[docs]
class invalidUCSServerRole(Exception):
"""Invalid UCS Server Role"""
[docs]
class failedADConnect(Exception):
"""Connection to AD Server failed"""
[docs]
class failedToSetAdministratorPassword(Exception):
"""Failed to set the password of the UCS Administrator to the AD password"""
[docs]
class failedToCreateAdministratorAccount(Exception):
"""Failed to create the administrator account in UCS"""
[docs]
class sambaSidNotSetForAdministratorAccount(Exception):
"""sambaSID is not set for Administrator account in UCS"""
[docs]
class failedToSearchForWellKnownSid(Exception):
"""failed to search for well known SID"""
[docs]
class failedToAddAdministratorAccountToDomainAdmins(Exception):
"""failed to add Administrator account to Domain Admins"""
[docs]
class domainnameMismatch(Exception):
"""Domain Names don't match"""
[docs]
class connectionFailed(Exception):
"""Connection to AD failed"""
[docs]
class notDomainAdminInAD(Exception):
"""User is not member of Domain Admins group in AD"""
[docs]
class univentionSambaWrongVersion(Exception):
"""univention-samba candidate has wrong version"""
[docs]
class timeSyncronizationFailed(Exception):
"""Time synchronization failed."""
[docs]
class manualTimeSyncronizationRequired(timeSyncronizationFailed):
"""Time difference critical for Kerberos but synchronization aborted."""
[docs]
class sambaJoinScriptFailed(Exception):
"""26univention-samba.inst failed"""
[docs]
class failedToAddServiceRecordToAD(Exception):
"""failed to add SRV record in AD"""
[docs]
class failedToGetUcrVariable(Exception):
"""failed to get ucr variable"""
[docs]
def is_localhost_in_admember_mode(ucr: ConfigRegistry | None = None) -> bool:
if not ucr:
ucr = ConfigRegistry()
ucr.load()
return ucr.is_true('ad/member', False)
[docs]
def is_localhost_in_adconnector_mode(ucr: ConfigRegistry | None = None) -> bool:
if not ucr:
ucr = ConfigRegistry()
ucr.load()
return bool(ucr.is_false('ad/member', True) and ucr.get('connector/ad/ldap/host'))
[docs]
def is_domain_in_admember_mode(ucr: ConfigRegistry | None = None) -> bool:
if not ucr:
ucr = ConfigRegistry()
ucr.load()
lo = univention.uldap.getMachineConnection()
res = lo.search(base=ucr.get('ldap/base'), filter='(&(univentionServerRole=master)(univentionService=AD Member))')
return bool(res)
def _get_kerberos_ticket(principal: str, password: str, ucr: ConfigRegistry | None = None) -> None:
ud.debug(ud.MODULE, ud.INFO, "running _get_kerberos_ticket")
if not ucr:
ucr = ConfigRegistry()
ucr.load()
# We need to remove the target credential cache first,
# otherwise kinit may use an old ticket and run into "krb5_get_init_creds: Clock skew too great".
cmd1 = ("/usr/bin/kdestroy",)
p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
stdout, _ = p1.communicate()
if p1.returncode:
ud.debug(ud.MODULE, ud.ERROR, "kdestroy failed:\n%s" % stdout.decode('UTF-8', 'replace'))
with tempfile.NamedTemporaryFile('w+') as f:
os.fchmod(f.fileno(), 0o600)
f.write(password)
f.flush()
cmd2 = ("/usr/bin/kinit", "--no-addresses", "--password-file=%s" % (f.name,), principal)
p1 = subprocess.Popen(cmd2, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
stdout, _ = p1.communicate()
if p1.returncode:
msg = "kinit failed:\n%s" % (stdout.decode('UTF-8', 'replace'),)
ud.debug(ud.MODULE, ud.ERROR, msg)
raise connectionFailed(msg)
if stdout:
ud.debug(ud.MODULE, ud.WARN, "kinit output:\n%s" % stdout.decode('UTF-8', 'replace'))
[docs]
def check_connection(ad_domain_info, username, password):
ud.debug(ud.MODULE, ud.INFO, "running check_connection")
test_share = '//%s/sysvol' % ad_domain_info["DC IP"]
cmd = ('smbclient', '-U', '%s%%%s' % (username, password), '-c', 'quit', test_share)
p1 = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
stdout, _ = p1.communicate()
if p1.returncode:
raise connectionFailed(stdout.decode('UTF-8', 'replace'))
[docs]
def flush_nscd_hosts_cache() -> None:
if os.path.exists("/usr/sbin/nscd"):
cmd = ("/usr/sbin/nscd", "--invalidate=hosts")
subprocess.call(cmd)
[docs]
def decode_sid(value):
return ndr_unpack(security.dom_sid, value)
[docs]
def check_ad_account(ad_domain_info: dict[str, str], username: str, password: str, ucr: ConfigRegistry | None = None) -> bool:
"""
returns True if account is Administrator in AD
returns False if account is just a member of Domain Admins
raises exception notDomainAdminInAD if neither criterion is met.
"""
ud.debug(ud.MODULE, ud.INFO, "running check_account")
ad_server_ip = ad_domain_info["DC IP"]
ad_server_name = ad_domain_info["DC DNS Name"]
ad_ldap_base = ad_domain_info["LDAP Base"]
ad_domain = ad_domain_info["Domain"]
ad_realm = ad_domain.upper()
if not ucr:
ucr = ConfigRegistry()
ucr.load()
try:
time_sync(ad_server_ip)
except timeSyncronizationFailed as ex:
ud.debug(ud.MODULE, ud.WARN, "Time sync failed, trying to authenticate anyway. Original exception: %s" % (ex,))
(previous_dns_ucr_set, previous_dns_ucr_unset) = set_nameserver([ad_server_ip], ucr)
(previous_krb_ucr_set, previous_krb_ucr_unset) = prepare_kerberos_ucr_settings(realm=ad_realm, ucr=ucr)
(previous_host_static_ucr_set, previous_host_static_ucr_unset) = prepare_dns_reverse_settings(ad_domain_info, ucr=ucr)
try:
principal = "%s@%s" % (username, ad_realm)
_get_kerberos_ticket(principal, password, ucr)
auth = ldap.sasl.gssapi("")
except Exception:
set_ucr(previous_dns_ucr_set, previous_dns_ucr_unset)
set_ucr(previous_krb_ucr_set, previous_krb_ucr_unset)
set_ucr(previous_host_static_ucr_set, previous_host_static_ucr_unset)
flush_nscd_hosts_cache()
raise
# Ok, ready and set for kerberized LDAP lookup
try:
subprocess.call(['systemctl', 'stop', 'nscd'])
lo_ad = univention.uldap.access(host=ad_server_name, port=389, base=ad_ldap_base, binddn=None, bindpw=None, start_tls=0, use_ldaps=False)
lo_ad.lo.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3)
lo_ad.lo.set_option(ldap.OPT_REFERRALS, 0)
lo_ad.lo.sasl_interactive_bind_s("", auth)
except (ldap.INVALID_CREDENTIALS, ldap.UNWILLING_TO_PERFORM) as exc:
ud.debug(ud.MODULE, ud.ERROR, str(exc))
raise connectionFailed(exc)
finally:
subprocess.call(['systemctl', 'start', 'nscd'])
set_ucr(previous_dns_ucr_set, previous_dns_ucr_unset)
set_ucr(previous_krb_ucr_set, previous_krb_ucr_unset)
set_ucr(previous_host_static_ucr_set, previous_host_static_ucr_unset)
flush_nscd_hosts_cache()
try:
res = lo_ad.search(scope="base", attr=["objectSid"])
except ldap.OPERATIONS_ERROR:
# Try again
try:
subprocess.call(['systemctl', 'stop', 'nscd'])
lo_ad = univention.uldap.access(host=ad_server_name, port=389, base=ad_ldap_base, binddn=None, bindpw=None, start_tls=0, use_ldaps=False)
lo_ad.lo.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3)
lo_ad.lo.set_option(ldap.OPT_REFERRALS, 0)
lo_ad.lo.sasl_interactive_bind_s("", auth)
except (ldap.INVALID_CREDENTIALS, ldap.UNWILLING_TO_PERFORM) as exc:
msg = "second attempt: " + str(exc)
ud.debug(ud.MODULE, ud.ERROR, msg)
raise connectionFailed(exc)
finally:
subprocess.call(['systemctl', 'start', 'nscd'])
set_ucr(previous_dns_ucr_set, previous_dns_ucr_unset)
set_ucr(previous_krb_ucr_set, previous_krb_ucr_unset)
set_ucr(previous_host_static_ucr_set, previous_host_static_ucr_unset)
flush_nscd_hosts_cache()
res = lo_ad.search(scope="base", attr=["objectSid"])
if not res or "objectSid" not in res[0][1]:
msg = "Determination of AD domain SID failed"
ud.debug(ud.MODULE, ud.ERROR, msg)
raise connectionFailed(msg)
domain_sid = decode_sid(res[0][1]["objectSid"][0])
res = lo_ad.search(filter=filter_format("(sAMAccountName=%s)", [username]), attr=["objectSid", "primaryGroupID"])
if not res or "objectSid" not in res[0][1]:
msg = "Determination user SID failed"
ud.debug(ud.MODULE, ud.ERROR, msg)
raise connectionFailed(msg)
user_sid = decode_sid(res[0][1]["objectSid"][0])
admin_sid = "%s-%d" % (domain_sid, DOMAIN_RID_ADMINISTRATOR)
admins_sid = "%s-%d" % (domain_sid, DOMAIN_RID_ADMINS)
admin_sid = security.dom_sid(admin_sid)
admins_sid = security.dom_sid(admins_sid)
if user_sid == admin_sid:
ud.debug(ud.MODULE, ud.PROCESS, "User is default AD Administrator")
return True
if int(res[0][1]["primaryGroupID"][0]) == DOMAIN_RID_ADMINS:
ud.debug(ud.MODULE, ud.PROCESS, "User is primary member of Domain Admins")
return False
user_dn = res[0][0]
res = lo_ad.search(filter=filter_format("(sAMAccountName=%s)", [username]), base=user_dn, scope="base", attr=["tokenGroups"])
if not res or "tokenGroups" not in res[0][1]:
msg = "Lookup of AD group memberships for user failed"
ud.debug(ud.MODULE, ud.ERROR, msg)
raise connectionFailed(msg)
if "tokenGroups" not in res[0][1]:
raise notDomainAdminInAD()
for group_sid_ndr in res[0][1]["tokenGroups"]:
group_sid = decode_sid(group_sid_ndr)
if group_sid == admins_sid:
return False
ud.debug(ud.MODULE, ud.ERROR, "User is not member of Domain Admins")
raise notDomainAdminInAD()
def _sid_of_ucs_sambadomain(lo: univention.uldap.access | None = None, ucr: ConfigRegistry | None = None) -> str:
if not lo:
lo = univention.uldap.getMachineConnection()
if not ucr:
ucr = ConfigRegistry()
ucr.load()
res = lo.search(filter=filter_format("(&(objectclass=sambadomain)(sambaDomainName=%s))", [ucr.get("windows/domain")]), attr=["sambaSID"], unique=True)
if not res:
ud.debug(ud.MODULE, ud.ERROR, "No UCS LDAP search result for sambaDomainName=%s" % ucr.get("windows/domain"))
raise ldap.NO_SUCH_OBJECT({'desc': 'no object'})
ucs_domain_sid = res[0][1].get("sambaSID", [None])[0]
if not ucs_domain_sid:
ud.debug(ud.MODULE, ud.ERROR, "No sambaSID found for sambaDomainName=%s" % ucr.get("windows/domain"))
raise ldap.NO_SUCH_OBJECT({'desc': 'no object'})
return ucs_domain_sid.decode('ASCII')
def _dn_of_udm_domain_admins(lo: univention.uldap.access | None = None, ucr: ConfigRegistry | None = None) -> str:
if not lo:
lo = univention.uldap.getMachineConnection()
if not ucr:
ucr = ConfigRegistry()
ucr.load()
ucs_domain_sid = _sid_of_ucs_sambadomain(lo, ucr)
domain_admins_sid = "%s-%d" % (ucs_domain_sid, DOMAIN_RID_ADMINS)
res = lo.searchDn(filter=filter_format("(sambaSID=%s)", [domain_admins_sid]), unique=True)
if not res:
ud.debug(ud.MODULE, ud.ERROR, "Failed to determine DN of UCS Domain Admins group")
raise ldap.NO_SUCH_OBJECT({'desc': 'no object'})
return res[0]
def _create_domain_admin_account_in_udm(username: str, password: str, lo: univention.uldap.access | None = None, ucr: ConfigRegistry | None = None) -> bool:
if not lo:
lo = univention.uldap.getMachineConnection()
ud.debug(ud.MODULE, ud.INFO, "running _create_domain_admin_account_in_udm")
if not ucr:
ucr = ConfigRegistry()
ucr.load()
domain_admins_dn = _dn_of_udm_domain_admins(lo, ucr)
cmd = ("univention-directory-manager", "users/user", "create", "--position", "cn=users,%s" % ucr.get("ldap/base"), "--set", "username=%s" % username, "--set", "lastname=tmp", "--set", "password=%s" % password, "--set", "primaryGroup=%s" % domain_admins_dn)
p1 = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
stdout, _ = p1.communicate()
if p1.returncode:
ud.debug(ud.MODULE, ud.ERROR, "Account creation for %s failed" % username)
if stdout:
ud.debug(ud.MODULE, ud.ERROR, "udm users/user create output:\n%s" % stdout.decode('UTF-8', 'replace'))
return False
return True
def _ucs_sid_is_well_known_administrator(user_sid: str, lo: univention.uldap.access | None = None, ucr: ConfigRegistry | None = None) -> bool:
if not lo:
lo = univention.uldap.getMachineConnection()
if not ucr:
ucr = ConfigRegistry()
ucr.load()
ucs_domain_sid = _sid_of_ucs_sambadomain(lo, ucr)
administrator_sid = "%s-%d" % (ucs_domain_sid, DOMAIN_RID_ADMINISTRATOR)
return user_sid == administrator_sid
def _add_udm_account_to_domain_admins(user_dn: str, lo: univention.uldap.access | None = None, ucr: ConfigRegistry | None = None) -> bool:
if not lo:
lo = univention.uldap.getMachineConnection()
if not ucr:
ucr = ConfigRegistry()
ucr.load()
domain_admins_dn = _dn_of_udm_domain_admins(lo, ucr)
cmd = ("univention-directory-manager", "users/user", "modify", "--dn", user_dn, "--append", "groups=%s" % domain_admins_dn)
p1 = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
stdout, _ = p1.communicate()
if p1.returncode:
ud.debug(ud.MODULE, ud.ERROR, "Adding %s to Domain Admins failed" % user_dn)
if stdout:
ud.debug(ud.MODULE, ud.ERROR, "udm users/user modify groups output:\n%s" % stdout.decode('UTF-8', 'replace'))
return False
return True
def _set_udm_account_password(user_dn: str, password: str) -> bool:
cmd = ('univention-directory-manager', 'users/user', 'modify', '--dn', user_dn, '--set', 'password=%s' % password, '--set', 'overridePWHistory=1', '--set', 'overridePWLength=1')
p1 = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
stdout, _ = p1.communicate()
if p1.returncode:
ud.debug(ud.MODULE, ud.ERROR, "Failed to set AD password in UDM for %s" % user_dn)
if stdout:
ud.debug(ud.MODULE, ud.ERROR, "udm users/user modify password output:\n%s" % stdout.decode('UTF-8', 'replace'))
return False
return True
[docs]
def prepare_administrator(username: str, password: str, ucr: ConfigRegistry | None = None) -> None:
ud.debug(ud.MODULE, ud.PROCESS, "Prepare administrator account")
if not ucr:
ucr = ConfigRegistry()
ucr.load()
# First check if account exists in LDAP, otherwise create it:
lo = univention.uldap.getMachineConnection()
res = lo.search(filter=filter_format("(&(uid=%s)(objectClass=shadowAccount))", (username,)), attr=["userPassword", "sambaSID"])
if not res:
ud.debug(ud.MODULE, ud.INFO, "No UCS LDAP search result for uid=%s" % username)
try:
success = _create_domain_admin_account_in_udm(username, password, lo, ucr)
except ldap.NO_SUCH_OBJECT:
success = False
if not success:
raise failedToCreateAdministratorAccount()
return
# Second, if the account existed already, check if it has the well known Administrator SID
user_dn = res[0][0]
user_sid = res[0][1].get("sambaSID", [None])[0]
old_hash = res[0][1].get("userPassword", [None])[0]
if not user_sid:
ud.debug(ud.MODULE, ud.ERROR, "UCS LDAP search for sambaSID of uid=%s failed" % username)
raise sambaSidNotSetForAdministratorAccount()
is_well_known_admin = False
try:
is_well_known_admin = _ucs_sid_is_well_known_administrator(user_sid.decode('ASCII'), lo, ucr)
except ldap.NO_SUCH_OBJECT:
raise failedToSearchForWellKnownSid()
# Third, if the account doesn't have the well known Administrator SID, add it to Domain Admins
if not is_well_known_admin:
try:
success = _add_udm_account_to_domain_admins(user_dn, lo, ucr)
except ldap.NO_SUCH_OBJECT:
success = False
if not success:
raise failedToAddAdministratorAccountToDomainAdmins()
return
# Finally, if the account does have the Administrator SID, set it's UDM password to the AD one.
if old_hash == b'{KINIT}':
return
success = _set_udm_account_password(user_dn, password)
if not success:
raise failedToSetAdministratorPassword()
def _mapped_ad_dn(ad_dn: str, ad_ldap_base: str, ucr: ConfigRegistry | None = None) -> str | None:
"""
>>> _mapped_ad_dn('uid=Administrator + CN=admin,OU=users,CN=univention,Foo=univention,bar=base', 'foo=univention,bar = base', {'ldap/base': 'dc=base'})
'uid=Administrator+cn=admin,ou=users,cn=univention,dc=base'
"""
parent = ad_dn
while parent:
if univention.uldap.access.compare_dn(parent, ad_ldap_base):
break
parent = univention.uldap.parentDn(parent)
else:
ud.debug(ud.MODULE, ud.ERROR, "Mapping of AD DN %r failed, base is not %r" % (ad_dn, ad_ldap_base))
return None
if not ucr:
ucr = ConfigRegistry()
ucr.load()
base = ldap.dn.str2dn(ad_ldap_base)
dn = [[(attr[0].lower() if attr[0] in ('CN', 'OU') else attr[0], attr[1], attr[2]) for attr in x] for x in ldap.dn.str2dn(ad_dn)[:-len(base)]]
return ldap.dn.dn2str(dn + ldap.dn.str2dn(ucr.get("ldap/base")))
[docs]
def synchronize_account_position(ad_domain_info: dict[str, str], username: str, password: str, ucr: ConfigRegistry | None = None) -> bool:
ud.debug(ud.MODULE, ud.PROCESS, "running synchronize_account_position")
if not ucr:
ucr = ConfigRegistry()
ucr.load()
# First determine target position from AD:
ad_server_ip = ad_domain_info["DC IP"]
ad_server_name = ad_domain_info["DC DNS Name"]
ad_ldap_base = ad_domain_info["LDAP Base"]
ad_domain = ad_domain_info["Domain"]
ad_realm = ad_domain.upper()
try:
time_sync(ad_server_ip)
except timeSyncronizationFailed as ex:
ud.debug(ud.MODULE, ud.WARN, "Time sync failed, trying to authenticate anyway. Original exception: %s" % (ex,))
principal = "%s@%s" % (username, ad_realm)
_get_kerberos_ticket(principal, password, ucr)
try:
lo_ad = univention.uldap.access(host=ad_server_name, port=389, base=ad_ldap_base, binddn=None, bindpw=None, start_tls=0, use_ldaps=False)
lo_ad.lo.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3)
lo_ad.lo.set_option(ldap.OPT_REFERRALS, 0)
auth = ldap.sasl.gssapi("")
lo_ad.lo.sasl_interactive_bind_s("", auth)
except (ldap.INVALID_CREDENTIALS, ldap.UNWILLING_TO_PERFORM):
return False # Massive failure, but no issue to be raised here.
res = lo_ad.searchDn(filter=filter_format("(sAMAccountName=%s)", [username]))
if not res:
ud.debug(ud.MODULE, ud.ERROR, "Lookup of AD DN for user %s failed" % username)
return False # Massive failure, but no issue to be raised here.
ad_user_dn = res[0]
# Second determine position in UCS LDAP:
lo = univention.uldap.getMachineConnection()
res = lo.searchDn(filter=filter_format("(&(uid=%s)(objectClass=shadowAccount))", (username,)), unique=True)
if not res:
ud.debug(ud.MODULE, ud.ERROR, "No UCS LDAP search result for uid=%s" % username)
return False # Massive failure, but no issue to be raised here.
ucs_user_dn = res[0]
if ucs_user_dn.lower() == ad_user_dn.lower():
return True
mapped_ad_user_dn = _mapped_ad_dn(ad_user_dn, ad_ldap_base, ucr)
target_position = lo.parentDn(mapped_ad_user_dn)
cmd = ("univention-directory-manager", "users/user", "move", "--dn", ucs_user_dn, "--position", target_position)
p1 = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
stdout, _ = p1.communicate()
if p1.returncode:
ud.debug(ud.MODULE, ud.ERROR, "Moving UDM object %s to %s failed" % (ucs_user_dn, target_position))
if stdout:
ud.debug(ud.MODULE, ud.ERROR, "udm users/user modify groups output:\n%s" % stdout.decode('UTF-8', 'replace'))
return False
return True
def _server_supports_ssl(server: str) -> bool:
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
ldapuri = "ldap://%s:389" % (server)
lo = ldap.initialize(ldapuri)
try:
lo.start_tls_s()
except ldap.UNAVAILABLE:
return False
except ldap.SERVER_DOWN:
return False
return True
[docs]
def server_supports_ssl(server: str) -> bool:
ud.debug(ud.MODULE, ud.PROCESS, "Check if server supports SSL")
# we have to create a new process because there is only one sec context allowed in python-ldap
p1 = subprocess.Popen([sys.executable, "-c", 'import univention.lib.admember; print(univention.lib.admember._server_supports_ssl(%r))' % (server,)], close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, _ = p1.communicate()
if p1.returncode == 0 and stdout.strip() == b'True':
ud.debug(ud.MODULE, ud.PROCESS, "SSL True")
return True
else:
ud.debug(ud.MODULE, ud.PROCESS, "SSL False")
return False
[docs]
def enable_ssl() -> None:
ud.debug(ud.MODULE, ud.PROCESS, "Enable connector SSL")
univention.config_registry.handler_set([
'connector/ad/ldap/ssl=yes',
'ldap/sasl/secprops/maxssf=128',
])
[docs]
def disable_ssl() -> None:
ud.debug(ud.MODULE, ud.PROCESS, "Disable connector SSL")
univention.config_registry.handler_set(['connector/ad/ldap/ssl=no'])
univention.config_registry.handler_unset(['ldap/sasl/secprops/maxssf'])
def _add_service_to_localhost(service: str) -> None:
ud.debug(ud.MODULE, ud.PROCESS, "Adding service %s to localhost" % service)
res = subprocess.call('. /usr/share/univention-lib/ldap.sh; ucs_addServiceToLocalhost %s' % (quote(service),), shell=True)
if res:
raise failedToSetService()
def _remove_service_from_localhost(service: str) -> None:
ud.debug(ud.MODULE, ud.PROCESS, "Remove service %s from localhost" % service)
res = subprocess.call('. /usr/share/univention-lib/ldap.sh; ucs_removeServiceFromLocalhost %s' % (quote(service),), shell=True)
if res:
raise failedToSetService()
[docs]
def add_admember_service_to_localhost() -> None:
_add_service_to_localhost('AD Member')
[docs]
def add_adconnector_service_to_localhost() -> None:
_add_service_to_localhost('AD Connector')
[docs]
def remove_admember_service_from_localhost() -> None:
_remove_service_from_localhost('AD Member')
[docs]
def info_handler(msg: Any) -> None:
ud.debug(ud.MODULE, ud.PROCESS, msg)
[docs]
def error_handler(msg: Any) -> None:
ud.debug(ud.MODULE, ud.ERROR, msg)
[docs]
def remove_install_univention_samba(info_handler: Callable[..., None] = info_handler, step_handler: Callable[..., None] | None = None, error_handler: Callable[..., None] = error_handler, install: bool = True, uninstall: bool = True) -> bool: # TODO: replace with univention-remove?
pm = univention.lib.package_manager.PackageManager(
info_handler=info_handler,
step_handler=step_handler,
error_handler=error_handler,
always_noninteractive=True,
)
if not pm.update():
return False
pm.noninteractive()
# uninstall first to get rid of the configured samba/* ucr vars
if uninstall and pm.is_installed('univention-samba'):
ud.debug(ud.MODULE, ud.PROCESS, "Uninstall univention-samba")
if not pm.uninstall('univention-samba'):
return False
# install
if install:
ud.debug(ud.MODULE, ud.PROCESS, "Install univention-samba")
if not pm.install('univention-samba'):
ud.debug(ud.MODULE, ud.PROCESS, "Installation of univention-samba failed. Try to re-create sources.list and try again.")
univention.config_registry.handler_commit(['/etc/apt/sources.list.d/15_ucs-online-version.list', '/etc/apt/sources.list.d/20_ucs-online-component.list'])
if not pm.update():
return False
if not pm.install('univention-samba'):
ud.debug(ud.MODULE, ud.ERROR, "Installation of univention-samba failed. Abort.")
return False
return True
SAMBA_TOOL_FIELDNAMES_TO_CLDAP_RES = {
'Forest': 'forest',
'Domain': 'dns_domain',
'Netbios domain': 'domain_name',
'DC name': 'pdc_dns_name',
'DC netbios name': 'pdc_name',
'Server site': 'server_site',
'Client site': 'client_site',
}
CLDAP_RES = namedtuple('CLDAP_RES', 'forest dns_domain domain_name pdc_dns_name pdc_name server_site client_site')
[docs]
def cldap_finddc(ip: str) -> CLDAP_RES:
lp = LoadParm()
lp.load('/dev/null')
net = Net(creds=None, lp=lp)
cldap_res = net.finddc(address=ip, flags=nbt.NBT_SERVER_LDAP | nbt.NBT_SERVER_DS | nbt.NBT_SERVER_WRITABLE)
return cldap_res
[docs]
def get_defaultNamingContext(ad_server_ip: str) -> str:
lo = ldap.initialize(f'ldap://{ad_server_ip}')
res = lo.search_s('', ldap.SCOPE_BASE, None, ['defaultNamingContext'])
default_naming_context = res[0][1]['defaultNamingContext'][0].decode('UTF-8')
return default_naming_context
[docs]
def lookup_adds_dc(ad_server: str = "", ucr: ConfigRegistry | None = None, check_dns: bool = True) -> dict[str, str]:
"""CLDAP lookup"""
ud.debug(ud.MODULE, ud.PROCESS, "Lookup ADDS DC")
ad_domain_info = {}
ips = []
if not ucr:
ucr = ConfigRegistry()
ucr.load()
if not ad_server:
ad_server = ucr['domainname']
# get ip addresses
try:
ipaddress.ip_address('%s' % (ad_server,))
ips.append(ad_server)
except ValueError:
dig_sources_ucr = []
for source in ['dns/forwarder1', 'dns/forwarder2', 'dns/forwarder3', 'nameserver1', 'nameserver2', 'nameserver3']:
ip = ucr.get(source)
if not ip:
continue
dig_sources_ucr.append(source)
try:
cmd = ['dig', '@' + ip, ad_server, '+short', '+nocookie']
ud.debug(ud.MODULE, ud.PROCESS, "running %s" % cmd)
p1 = subprocess.Popen(cmd, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p1.communicate()
stdout, stderr = out.decode('UTF-8', 'replace'), err.decode('UTF-8', 'replace')
ud.debug(ud.MODULE, ud.PROCESS, "stdout: %s" % stdout)
ud.debug(ud.MODULE, ud.PROCESS, "stderr: %s" % stderr)
if p1.returncode == 0:
for i in stdout.split('\n'):
if i:
ips.append(i)
if ips:
break
except OSError as ex:
ud.debug(ud.MODULE, ud.ERROR, "%s failed: %s" % (cmd, ex.args[1]))
# no ip addresses
if not ips:
raise failedADConnect(["DNS lookup of AD Server %s failed. Sources: %s" % (ad_server, ", ".join(dig_sources_ucr))])
ad_server_ip = None
check_results = []
for ip in ips:
if not ip:
continue
try: # check cldap
cldap_res = cldap_finddc(ip)
except RuntimeError as ex:
ud.debug(ud.MODULE, ud.ERROR, "Connection to AD Server %s failed: %s" % (ip, ex.args[0]))
check_results.append("CLDAP: %s" % ex.args[0])
else:
if not check_dns:
ad_server_ip = ip
break
try: # check dns
cmd = ['dig', '@%s' % ip, '+nocookie']
ud.debug(ud.MODULE, ud.PROCESS, "running %s" % cmd)
p1 = subprocess.Popen(cmd, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p1.communicate()
ud.debug(ud.MODULE, ud.PROCESS, "stdout: %s" % out.decode('UTF-8', 'replace'))
ud.debug(ud.MODULE, ud.PROCESS, "stderr: %s" % err.decode('UTF-8', 'replace'))
if p1.returncode == 0: # yes, this is also a DNS server, we are good
ad_server_ip = ip
break
except OSError as ex:
ud.debug(ud.MODULE, ud.ERROR, "%s failed: %s" % (cmd, ex.args[1]))
check_results.append("DNS: %s" % ex.args[1])
if ad_server_ip is None:
raise failedADConnect(["Connection to AD Server %s failed (%s)" % (ad_server, ",".join(check_results))])
ad_ldap_base = None
try:
ad_ldap_base = get_defaultNamingContext(ad_server_ip)
except RuntimeError as ex:
raise failedADConnect(["Could not detect LDAP base on %s: %s" % (ad_server_ip, ex.args[1])])
ad_domain_info = {
"Forest": cldap_res.forest,
"Domain": cldap_res.dns_domain,
"Netbios Domain": cldap_res.domain_name,
"DC DNS Name": cldap_res.pdc_dns_name,
"DC Netbios Name": cldap_res.pdc_name,
"Server Site": cldap_res.server_site,
"Client Site": cldap_res.client_site,
"LDAP Base": ad_ldap_base,
"DC IP": ad_server_ip,
}
ud.debug(ud.MODULE, ud.PROCESS, "AD Info: %s" % ad_domain_info)
return ad_domain_info
[docs]
def set_timeserver(timeserver: str, ucr: ConfigRegistry | None = None) -> None:
ud.debug(ud.MODULE, ud.PROCESS, "Setting timeserver to %s" % timeserver)
univention.config_registry.handler_set(['timeserver=%s' % (timeserver,)])
restart_service("ntpsec")
[docs]
def stop_service(service: str) -> None:
return invoke_service(service, "stop")
[docs]
def start_service(service: str) -> None:
return invoke_service(service, "start")
[docs]
def restart_service(service: str) -> None:
return invoke_service(service, "restart")
[docs]
def invoke_service(service: str, cmd: str) -> None:
init_script = '/etc/init.d/%s' % service # FIXME: SysV-init → systemd.service
if not os.path.exists(init_script):
return
try:
p1 = subprocess.Popen([init_script, cmd], close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, _ = p1.communicate()
except OSError as ex:
ud.debug(ud.MODULE, ud.ERROR, "%s %s failed: %s" % (init_script, cmd, ex.args[1]))
return
if p1.returncode:
ud.debug(ud.MODULE, ud.ERROR, "%s %s failed (%d)" % (init_script, cmd, p1.returncode))
return
ud.debug(ud.MODULE, ud.PROCESS, "%s %s: %s" % (init_script, cmd, stdout.decode('UTF-8', 'replace')))
[docs]
def do_time_sync(ad_ip: str) -> bool:
ud.debug(ud.MODULE, ud.PROCESS, "Synchronizing time to %s" % ad_ip)
p1 = subprocess.Popen(["rdate", "-s", "-n", ad_ip], close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p1.communicate()
if p1.returncode:
ud.debug(ud.MODULE, ud.ERROR, "rdate -s -p failed (%d)" % (p1.returncode,))
return False
return True
[docs]
def time_sync(ad_ip: str, tolerance: int = 180, critical_difference: int = 360) -> bool:
"""Try to sync the local time with an AD server"""
stdout = b""
env = os.environ.copy()
env["LC_ALL"] = "C"
try:
p1 = subprocess.Popen(["rdate", "-p", "-n", ad_ip], close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
stdout, _ = p1.communicate()
except OSError as ex:
ud.debug(ud.MODULE, ud.ERROR, "rdate -p -n %s: %s" % (ad_ip, ex.args[1]))
return False
if p1.returncode:
ud.debug(ud.MODULE, ud.ERROR, "rdate failed (%d)" % (p1.returncode,))
return False
TIME_FORMAT = "%a %b %d %H:%M:%S %Z %Y"
time_string = stdout.strip().decode('ASCII')
old_locale = locale.getlocale(locale.LC_TIME)
try:
locale.setlocale(locale.LC_TIME, (None, None)) # 'C' as env['LC_ALL'] some lines earlier
remote_datetime = datetime.strptime(time_string, TIME_FORMAT)
except ValueError:
raise timeSyncronizationFailed("AD Server did not return proper time string: %s" % time_string)
finally:
locale.setlocale(locale.LC_TIME, old_locale)
local_datetime = datetime.today()
delta_t = local_datetime - remote_datetime
if abs(delta_t) < timedelta(0, tolerance):
ud.debug(ud.MODULE, ud.PROCESS, "Time difference is less than %d seconds, skipping reset of local time" % (tolerance,))
elif local_datetime > remote_datetime:
if abs(delta_t) >= timedelta(0, critical_difference):
raise manualTimeSyncronizationRequired("Remote clock is behind local clock by more than %s seconds, refusing to turn back time." % critical_difference)
else:
ud.debug(ud.MODULE, ud.WARN, "Remote clock is behind local clock by more than %s seconds, refusing to turn back time, should be accurate enough." % (tolerance,))
return False
else:
if not do_time_sync(ad_ip):
raise timeSyncronizationFailed("Time synchronization failed")
return True
[docs]
def check_server_role(ucr: ConfigRegistry | None = None) -> None:
if not ucr:
ucr = ConfigRegistry()
ucr.load()
if ucr.get("server/role") != "domaincontroller_master":
raise invalidUCSServerRole("The function become_ad_member can only be run on an UCS Primary Directory Node")
[docs]
def check_domain(ad_domain_info: dict[str, str], ucr: ConfigRegistry | None = None) -> None:
if not ucr:
ucr = ConfigRegistry()
ucr.load()
if ad_domain_info["Domain"].lower() != ucr["domainname"].lower():
raise domainnameMismatch("The domain of the AD Server does not match the local domain: %s" % (ad_domain_info["Domain"],))
[docs]
def set_nameserver(server_ips: Iterable[str], ucr: ConfigRegistry | None = None) -> tuple[list[str], list[str]]:
previous_ucr_set = []
previous_ucr_unset = []
if not ucr:
ucr = ConfigRegistry()
ucr.load()
count = 1
for server_ip in server_ips:
var = 'nameserver%d' % count
val = ucr.get(var)
if val is not None:
previous_ucr_set.append('%s=%s' % (var, val))
else:
previous_ucr_unset.append('%s' % (var,))
univention.config_registry.handler_set(['%s=%s' % (var, server_ip)])
count += 1
for i in range(count, 4):
var = 'nameserver%s' % i
val = ucr.get(var)
if val is not None:
previous_ucr_set.append('%s=%s' % (var, val))
univention.config_registry.handler_unset([var])
return (previous_ucr_set, previous_ucr_unset)
[docs]
def rename_well_known_sid_objects(username: str, password: str, ucr: ConfigRegistry | None = None) -> None:
if not ucr:
ucr = ConfigRegistry()
ucr.load()
ud.debug(ud.MODULE, ud.PROCESS, "Matching well known object names")
# First determine current name Domain Admins (trivial)
lo = univention.uldap.getMachineConnection()
ucs_domain_sid = _sid_of_ucs_sambadomain(lo, ucr)
domain_admins_sid = "%s-%d" % (ucs_domain_sid, DOMAIN_RID_ADMINS)
res = lo.search(filter=filter_format("(&(sambaSID=%s)(objectClass=sambaGroupMapping))", [domain_admins_sid]), attr=["cn"], unique=True)
if not res or "cn" not in res[0][1]:
ud.debug(ud.MODULE, ud.ERROR, "Lookup of group name for Domain Admins sid failed")
domain_admins_name = "Domain Admins" # sensible guess
else:
domain_admins_name = res[0][1]["cn"][0].decode('UTF-8')
# Next run the renaming script
binddn = '%s@%s' % (username, ucr.get('kerberos/realm'))
with tempfile.NamedTemporaryFile('w+') as fd:
fd.write(password)
fd.flush()
p1 = subprocess.Popen(
['/usr/share/univention-ad-connector/scripts/well-known-sid-object-rename', '--binddn', binddn, '--bindpwdfile', fd.name],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
stdout, stderr = p1.communicate()
ud.debug(ud.MODULE, ud.PROCESS, "%s" % stdout.decode('UTF-8', 'replace'))
if p1.returncode:
msg = "well-known-sid-object-rename failed with %d (%s)" % (p1.returncode, stderr.decode('UTF-8', 'replace'))
ud.debug(ud.MODULE, ud.ERROR, msg)
raise connectionFailed(msg)
# Finally wait for replication and slapd restart to ensure that new LDAP ACLs are active:
res = lo.search(filter=filter_format("(&(sambaSID=%s)(objectClass=sambaGroupMapping))", [domain_admins_sid]), attr=["cn"], unique=True)
if not res or "cn" not in res[0][1]:
ud.debug(ud.MODULE, ud.ERROR, "Lookup of new group name for Domain Admins sid failed")
new_domain_admins_name = "Domain Admins"
else:
new_domain_admins_name = res[0][1]["cn"][0].decode('UTF-8')
wait_for_postrun = False
if new_domain_admins_name != domain_admins_name:
t0 = time.time()
ud.debug(ud.MODULE, ud.INFO, "Waiting for well-known-sid-name-mapping listener to map Domain Admins")
while custom_groupname(domain_admins_name) != new_domain_admins_name:
if (time.time() - t0) > 15:
break
time.sleep(1)
else:
wait_for_postrun = True
if wait_for_postrun:
ud.debug(ud.MODULE, ud.ERROR, "Waiting for postrun of well-known-sid-name-mapping")
time.sleep(15)
[docs]
def make_deleted_objects_readable_for_this_machine(username: str, password: str, ucr: ConfigRegistry | None = None) -> None:
if not ucr:
ucr = ConfigRegistry()
ucr.load()
ud.debug(ud.MODULE, ud.PROCESS, "Make Deleted Objects readable for this machine")
with tempfile.NamedTemporaryFile('w+') as fd:
fd.write(password)
fd.flush()
binddn = '%s@%s' % (username, ucr.get('kerberos/realm'))
p1 = subprocess.Popen(
['/usr/share/univention-ad-connector/scripts/make-deleted-objects-readable-for-this-machine', '--binddn', binddn, '--bindpwdfile', fd.name],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
close_fds=True)
stdout, stderr = p1.communicate()
ud.debug(ud.MODULE, ud.PROCESS, "%s" % stdout.decode('UTF-8', 'replace'))
if p1.returncode:
msg = "make-deleted-objects-readable-for-this-machine failed with %d (%s)" % (p1.returncode, stderr.decode('UTF-8', 'replace'))
ud.debug(ud.MODULE, ud.ERROR, msg)
raise connectionFailed(msg)
[docs]
def prepare_dns_reverse_settings(ad_domain_info: dict[str, str], ucr: ConfigRegistry | None = None) -> tuple[list[str], list[str]]:
# For python-ldap / GSSAPI / AD we need working reverse DNS lookups
# Otherwise one ends up with:
#
# SASL(-1): generic failure: GSSAPI Error: Miscellaneous failure (see text)
# (Matching credential (ldap/10.20.30.123@10.20.30.123) not found)
#
# Or even worse, in case there had been a (nscd cached?) PTR record
# in the ucs.domain:
#
# SASL(-1): generic failure: GSSAPI Error: Miscellaneous failure (see text)
# (Matching credential (ldap/adhost.ucs.domain@UCS.DOMAIN) not found)
#
if not ucr:
ucr = ConfigRegistry()
ucr.load()
# Flush the cache, just in case
flush_nscd_hosts_cache()
# Test DNS resolution (just for fun)
try:
hostname, _aliaslist, _ipaddrlist = socket.gethostbyaddr(ad_domain_info['DC IP'])
ud.debug(ud.MODULE, ud.INFO, "%s resolves to %s" % (ad_domain_info['DC IP'], hostname))
except (socket.herror, socket.gaierror) as exc:
ud.debug(ud.MODULE, ud.INFO, "Resolving %s failed: %s" % (ad_domain_info['DC IP'], exc.args[1]))
# Set a hosts/static anyway, to be safe from DNS issues (Bug #38285)
previous_ucr_set = []
previous_ucr_unset = []
ad_server_name = ad_domain_info['DC DNS Name']
ip = socket.gethostbyname(ad_server_name)
ucr_key = 'hosts/static/%s' % (ip,)
ucr_set = ['%s=%s' % (ucr_key, ad_server_name)]
for setting in ucr_set:
var = setting.split("=", 1)[0]
old_val = ucr.get(var)
if old_val is not None:
previous_ucr_set.append('%s=%s' % (var, old_val))
else:
previous_ucr_unset.append('%s' % (var,))
ud.debug(ud.MODULE, ud.PROCESS, "Setting UCR variables: %s" % ucr_set)
univention.config_registry.handler_set(ucr_set)
return (previous_ucr_set, previous_ucr_unset)
[docs]
def prepare_kerberos_ucr_settings(realm: str | None = None, ucr: ConfigRegistry | None = None) -> tuple[list[str], list[str]]:
ud.debug(ud.MODULE, ud.PROCESS, "Prepare Kerberos UCR settings")
if not ucr:
ucr = ConfigRegistry()
ucr.load()
previous_ucr_set = []
previous_ucr_unset = []
ucr_set = [
'kerberos/defaults/dns_lookup_kdc=true',
]
if realm and realm != ucr.get('kerberos/realm'):
ucr_set.append('kerberos/realm=%s' % realm)
for setting in ucr_set:
var = setting.split("=", 1)[0]
old_val = ucr.get(var)
if old_val is not None:
previous_ucr_set.append('%s=%s' % (var, old_val))
else:
previous_ucr_unset.append('%s' % (var,))
ud.debug(ud.MODULE, ud.PROCESS, "Setting UCR variables: %s" % ucr_set)
univention.config_registry.handler_set(ucr_set)
ucr_unset = [
'kerberos/kdc',
'kerberos/kpasswdserver',
'kerberos/adminserver',
]
for var in ucr_unset:
val = ucr.get(var)
if val is not None:
previous_ucr_set.append('%s=%s' % (var, val))
ud.debug(ud.MODULE, ud.PROCESS, "Unsetting UCR variables: %s" % ucr_unset)
univention.config_registry.handler_unset(ucr_unset)
return (previous_ucr_set, previous_ucr_unset)
[docs]
def set_ucr(ucr_set: list[str], ucr_unset: list[str]) -> None:
univention.config_registry.handler_set(ucr_set)
univention.config_registry.handler_unset(ucr_unset)
[docs]
def prepare_ucr_settings() -> None:
ud.debug(ud.MODULE, ud.PROCESS, "Prepare UCR settings")
# Show warnings in UMC
# Change displayed name of users from "username" to "displayName" (as in AD)
ucr_set = [
'ad/member=true',
'connector/ad/mapping/user/password/kinit=true',
'directory/manager/web/modules/users/user/display=displayName',
'nameserver/external=true',
'connector/ad/mapping/group/primarymail=true',
'connector/ad/mapping/user/primarymail=true',
]
modules = ('computers/computer', 'groups/group', 'users/user', 'dns/dns')
ucr_set += ['directory/manager/web/modules/%s/show/adnotification=true' % (module,) for module in modules]
ud.debug(ud.MODULE, ud.PROCESS, "Setting UCR variables: %s" % ucr_set)
univention.config_registry.handler_set(ucr_set)
prepare_kerberos_ucr_settings()
[docs]
def revert_ucr_settings() -> None:
ud.debug(ud.MODULE, ud.PROCESS, "Revert UCR settings")
# TODO: something else?
ucr_unset = [
'ad/member',
'directory/manager/web/modules/users/user/display',
'kerberos/defaults/dns_lookup_kdc',
]
modules = ('computers/computer', 'groups/group', 'users/user', 'dns/dns')
ucr_unset += ['directory/manager/web/modules/%s/show/adnotification' % (module,) for module in modules]
ud.debug(ud.MODULE, ud.PROCESS, "Unsetting UCR variables: %s" % ucr_unset)
univention.config_registry.handler_unset(ucr_unset)
ucr_set = [
'nameserver/external=false',
]
ud.debug(ud.MODULE, ud.PROCESS, "Setting UCR variables: %s" % ucr_set)
univention.config_registry.handler_set(ucr_set)
[docs]
def prepare_connector_settings(username: str, password: str, ad_domain_info: dict[str, str], ucr: ConfigRegistry | None = None) -> None:
if not ucr:
ucr = ConfigRegistry()
ucr.load()
ud.debug(ud.MODULE, ud.PROCESS, "Prepare connector settings")
binddn = '%(hostname)s$' % ucr
ucr_set = [
'connector/ad/ldap/host=%s' % ad_domain_info["DC DNS Name"],
'connector/ad/ldap/base=%s' % ad_domain_info["LDAP Base"],
'connector/ad/ldap/binddn=%s' % binddn,
'connector/ad/ldap/bindpw=/etc/machine.secret',
'connector/ad/ldap/kerberos=true',
'connector/ad/mapping/syncmode=read',
'connector/ad/mapping/user/ignorelist=krbtgt,root,pcpatch',
]
ud.debug(ud.MODULE, ud.PROCESS, "Setting UCR variables: %s" % ucr_set)
univention.config_registry.handler_set(ucr_set)
[docs]
def revert_connector_settings(ucr: ConfigRegistry | None = None) -> None:
ud.debug(ud.MODULE, ud.PROCESS, "Revert connector settings")
# TODO: something else?
ucr_unset = [
'connector/ad/ldap/host',
'connector/ad/ldap/base',
'connector/ad/ldap/binddn',
'connector/ad/ldap/bindpw',
'connector/ad/ldap/kerberos',
'connector/ad/mapping/syncmode',
'connector/ad/mapping/user/ignorelist',
]
ud.debug(ud.MODULE, ud.PROCESS, "Unsetting UCR variables: %s" % ucr_unset)
univention.config_registry.handler_unset(ucr_unset)
[docs]
def disable_local_samba4() -> None:
ud.debug(ud.MODULE, ud.PROCESS, "Disable local samba4")
stop_service("samba")
univention.config_registry.handler_set(['samba4/autostart=false'])
[docs]
def disable_local_heimdal() -> None:
ud.debug(ud.MODULE, ud.PROCESS, "Disable local heimdal")
stop_service("heimdal-kdc")
univention.config_registry.handler_set(['kerberos/autostart=false'])
[docs]
def run_samba_join_script(username: str, password: str, ucr: ConfigRegistry | None = None) -> None:
if not ucr:
ucr = ConfigRegistry()
ucr.load()
ud.debug(ud.MODULE, ud.PROCESS, "Running samba join script")
lo = univention.uldap.getMachineConnection()
res = lo.searchDn(filter=filter_format("(&(uid=%s)(objectClass=shadowAccount))", (username,)), unique=True)
if not res:
ud.debug(ud.MODULE, ud.ERROR, "No UCS LDAP search result for uid=%s" % username)
raise sambaJoinScriptFailed()
binddn = res[0]
with tempfile.NamedTemporaryFile('w+') as fd:
fd.write(password)
fd.flush()
my_env = os.environ
my_env['SMB_CONF_PATH'] = '/etc/samba/smb.conf'
cmd = ('/usr/lib/univention-install/26univention-samba.inst', '--binddn', binddn, '--bindpwdfile', fd.name)
p1 = subprocess.Popen(cmd, close_fds=True, env=my_env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p1.communicate()
ud.debug(ud.MODULE, ud.PROCESS, "%s" % stdout.decode('UTF-8', 'replace'))
if p1.returncode:
if stderr:
ud.debug(ud.MODULE, ud.ERROR, "stderr:\n%s" % (stderr.decode('UTF-8', 'replace'),))
ud.debug(ud.MODULE, ud.ERROR, "26univention-samba.inst failed with %d" % (p1.returncode,))
raise sambaJoinScriptFailed()
[docs]
def add_host_record_in_ad(uid: str | None = None, binddn: str | None = None, bindpw: str | None = None, bindpwdfile: str | None = None, fqdn: str | None = None, ip: str | None = None, sso: bool = False) -> bool:
pwdfile = None
create_pwdfile = False
ucr = ConfigRegistry()
ucr.load()
domainname = ucr.get('domainname')
if binddn:
uids = [y[1] for x in ldap.dn.str2dn(binddn) for y in x if ('uid' in y)]
if uids:
uid = uids[0]
if bindpwdfile:
create_pwdfile = False
pwdfile = bindpwdfile
elif bindpw:
create_pwdfile = True
pwdfile = bindpw
# take myself as default
if not ip:
ip = Interfaces().get_default_ip_address().ip
if sso and not fqdn:
uri = ucr.get('ucs/server/sso/uri', 'https://ucs-sso-ng.' + domainname)
fqdn = urlparse(uri).netloc
if not uid or not pwdfile or not fqdn or not ip:
print('Missing binddn/bindpw/bindpwdfile/fqdn or ip, do nothing!')
return False
ad_domain_info = lookup_adds_dc()
ad_ip = ad_domain_info['DC IP']
found = False
print("Create %s (%s) A record on %s" % (fqdn, ip, ad_ip))
# check if we are already defined as host record
try:
resolver = dns.resolver.Resolver()
resolver.lifetime = 10
resolver.nameservers = [ad_ip]
response = resolver.query(fqdn, 'A')
for data in response:
if str(data) == str(ip):
found = True
except dns.resolver.NXDOMAIN:
found = False
except Exception as err:
print('failed to query for A record (%s, %s)' % (err.__class__.__name__, err))
found = False
if found:
print('%s A record for %s found' % (fqdn, ip))
return True
# create host record # FIXME: missing quoting
fd = tempfile.NamedTemporaryFile('w+', delete=False)
fd.write('server %s\n' % ad_ip)
fd.write('update add %s 86400 A %s\n' % (fqdn, ip))
fd.write('send\n')
fd.write('quit\n')
fd.close()
# create pwd file
if create_pwdfile:
tmp = tempfile.NamedTemporaryFile('w+', delete=False)
tmp.write('%s' % pwdfile)
tmp.close()
pwdfile = tmp.name
cmd = ['kinit', '--password-file=%s' % pwdfile, uid]
cmd += ['nsupdate', '-v', '-g', fd.name]
try:
p1 = subprocess.Popen(cmd, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p1.communicate()
ud.debug(ud.MODULE, ud.PROCESS, '%s' % stdout.decode('UTF-8', 'replace'))
if p1.returncode:
print('%s failed with %d (%s)' % (cmd, p1.returncode, stderr.decode('UTF-8', 'replace')))
print('failed to add A record for ucs-sso to %s' % ad_ip)
return False
finally:
os.unlink(fd.name)
if create_pwdfile:
os.unlink(pwdfile)
return True
[docs]
def get_domaincontroller_srv_record(domain: str, nameserver: str | None = None) -> bool | str | None:
if not domain:
return False
resolver = dns.resolver.Resolver()
resolver.lifetime = 10 # make sure that we get an early timeout
if nameserver:
resolver.nameservers = [nameserver]
# perform a SRV lookup
try:
response = resolver.query('_domaincontroller_master._tcp.%s.' % domain, 'SRV')
if len(response) != 1:
ud.debug(ud.MODULE, ud.ERROR, 'Non-unique SRV record: %s!' % (response.rrset,))
return None
return str(response[0].target)
except dns.resolver.NoAnswer:
ud.debug(ud.MODULE, ud.WARN, 'Received no answer to query for _domaincontroller_master._tcp.%s. SRV record.' % (domain,))
except dns.resolver.NXDOMAIN:
ud.debug(ud.MODULE, ud.WARN, 'Domain (%s) not resolvable!' % (domain,))
except dns.resolver.NoNameservers:
ud.debug(ud.MODULE, ud.WARN, 'No name servers in domain (%s) available to answer the query.' % (domain,))
except dns.exception.Timeout as exc:
ud.debug(ud.MODULE, ud.WARN, 'Lookup for Primary Directory Node record timed out: %s' % (exc,))
return None
[docs]
def add_domaincontroller_srv_record_in_ad(ad_ip: str, username: str, password: str, ucr: ConfigRegistry | None = None) -> bool:
if not ucr:
ucr = ConfigRegistry()
ucr.load()
ud.debug(ud.MODULE, ud.PROCESS, "Create _domaincontroller_master SRV record on %s" % ad_ip)
hostname = ucr.get('hostname')
domainname = ucr.get('domainname')
fqdn_with_trailing_dot = "%s.%s." % (hostname, domainname)
srv_record = "_domaincontroller_master._tcp.%s" % (domainname,)
current_record = get_domaincontroller_srv_record(domainname)
if current_record == fqdn_with_trailing_dot:
ud.debug(ud.MODULE, ud.PROCESS, "Ok, SRV record %s already points to this server" % (srv_record,))
return True
if current_record:
# remove the existing SRV record. Important when replacing an existing Primary Directory Node system!
# we need Administrator permissions to do this.
ud.debug(ud.MODULE, ud.PROCESS, "Removing previous SRV record %s" % (current_record,))
with tempfile.NamedTemporaryFile('w+') as fd, tempfile.NamedTemporaryFile('w+') as fd2:
fd2.write(password)
fd2.flush()
# FIXME: missing quoting
fd.write('server %s\n' % ad_ip)
fd.write('update delete %s. SRV\n' % (srv_record,))
fd.write('send\n')
fd.write('quit\n')
fd.flush()
cmd = ['kinit', '--password-file=%s' % (fd2.name,), username, 'nsupdate', '-v', '-g', fd.name]
p1 = subprocess.Popen(cmd, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p1.communicate()
ud.debug(ud.MODULE, ud.PROCESS, "%s" % stdout.decode('UTF-8', 'replace'))
if p1.returncode:
ud.debug(ud.MODULE, ud.ERROR, "%s failed with %d (%s)" % (cmd, p1.returncode, stderr.decode('UTF-8', 'replace')))
ud.debug(ud.MODULE, ud.ERROR, "failed to remove SRV record. Ignoring error.")
subprocess.call(['kdestroy'])
# FIXME: missing quoting
fd = tempfile.NamedTemporaryFile('w+', delete=False)
fd.write('server %s\n' % ad_ip)
fd.write('update add %s. 10800 SRV 0 0 0 %s\n' % (srv_record, fqdn_with_trailing_dot))
fd.write('send\n')
fd.write('quit\n')
fd.close()
cmd = ['kinit', '--password-file=/etc/machine.secret']
# use the machine account so that the server has permissions to modify this record
cmd += [r'%s\$' % hostname]
cmd += ['nsupdate', '-v', '-g', fd.name]
try:
p1 = subprocess.Popen(cmd, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p1.communicate()
ud.debug(ud.MODULE, ud.PROCESS, "%s" % stdout.decode('UTF-8', 'replace'))
if p1.returncode:
ud.debug(ud.MODULE, ud.ERROR, "%s failed with %d (%s)" % (cmd, p1.returncode, stderr.decode('UTF-8', 'replace')))
ud.debug(ud.MODULE, ud.ERROR, "failed to add SRV record to %s" % ad_ip)
# raise failedToAddServiceRecordToAD("failed to add SRV record to %s" % ad_ip)
return False
finally:
os.unlink(fd.name)
return True
[docs]
def get_ucr_variable_from_ucs(host: str, server: str, var: str) -> str:
cmd = ['univention-ssh', '/etc/machine.secret']
cmd += [r'%s\$@%s' % (host, server)]
cmd += ['/usr/sbin/ucr get %s' % (quote(var),)]
p1 = subprocess.Popen(cmd, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p1.communicate()
if p1.returncode:
ud.debug(ud.MODULE, ud.ERROR, "%s failed with %d (%s)" % (cmd, p1.returncode, stderr.decode('UTF-8', 'replace')))
raise failedToGetUcrVariable("failed to get UCR variable %s from %s" % (var, server))
return stdout.decode('UTF-8', 'replace').strip()
[docs]
def set_nameserver_from_ucs_master(ucr: ConfigRegistry | None = None) -> None:
if not ucr:
ucr = ConfigRegistry()
ucr.load()
ud.debug(ud.MODULE, ud.PROCESS, "Set nameservers")
for var in ['nameserver1', 'nameserver2', 'nameserver3']:
value = get_ucr_variable_from_ucs(ucr.get('hostname'), ucr.get('ldap/master'), var)
if value:
ud.debug(ud.MODULE, ud.PROCESS, "Setting %s=%s" % (var, value))
univention.config_registry.handler_set(['%s=%s' % (var, value)])
[docs]
def revert_backup_ad_member() -> None:
# TODO: something else?
remove_install_univention_samba(install=False)
revert_ucr_settings()
[docs]
def revert_slave_ad_member() -> None:
# TODO: something else?
remove_install_univention_samba(install=False)
revert_ucr_settings()
[docs]
def revert_member_ad_member() -> None:
# TODO: something else?
remove_install_univention_samba(install=False)
revert_ucr_settings()
[docs]
def revert_container_ad_member() -> None:
revert_ucr_settings()