# SPDX-FileCopyrightText: 2024-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
import contextlib
import re
import socket
import sqlite3
import subprocess
import time
from collections.abc import Iterator
from typing import Any
import ldap
import ldb
from samba.auth import system_session
from samba.param import LoadParm
from samba.samdb import SamDB
from univention import config_registry
from univention.testing.utils import package_installed
[docs]
class DRSReplicationFailed(Exception):
pass
[docs]
class WaitForS4ConnectorTimeout(Exception):
pass
[docs]
@contextlib.contextmanager
def password_policy(complexity: bool = False, minimum_password_age: int = 0, maximum_password_age: int = 3) -> Iterator[None]:
if not package_installed('univention-samba4'):
print('skipping samba password policy adjustment')
yield
return
min_pwd_age = subprocess.check_output('samba-tool domain passwordsettings show | grep "Minimum password age" | sed s/[^0-9]*/""/', shell=True).strip()
max_pwd_age = subprocess.check_output('samba-tool domain passwordsettings show | grep "Maximum password age" | sed s/[^0-9]*/""/', shell=True).strip()
pwd_complexity = subprocess.check_output('samba-tool domain passwordsettings show | grep complexity | sed "s/Password complexity: //"', shell=True).strip()
if complexity != pwd_complexity or str(minimum_password_age) != min_pwd_age or str(maximum_password_age) != max_pwd_age:
subprocess.call(['samba-tool', 'domain', 'passwordsettings', 'set', '--min-pwd-age', str(minimum_password_age), '--max-pwd-age', str(maximum_password_age), '--complexity', 'on' if complexity else 'off'])
yield
if complexity != pwd_complexity or str(minimum_password_age) != min_pwd_age:
subprocess.call(['samba-tool', 'domain', 'passwordsettings', 'set', '--min-pwd-age', min_pwd_age, '--max-pwd-age', max_pwd_age, '--complexity', pwd_complexity])
[docs]
def wait_for_drs_replication(ldap_filter: str, attrs: list[str] | str | None = None, base: str | None = None, scope: int = ldb.SCOPE_SUBTREE, lp: LoadParm | None = None, timeout: int = 360, delta_t: int = 1, verbose: bool = True, should_exist: bool = True, controls: list[str] | None = None) -> None:
if not package_installed('univention-samba4'):
if package_installed('univention-samba'):
time.sleep(15)
print('Sleeping 15 seconds as a workaround for http://forge.univention.org/bugzilla/show_bug.cgi?id=52145')
elif verbose:
print('wait_for_drs_replication(): skip, univention-samba4 not installed.')
return
if not attrs:
attrs = ['dn']
elif not isinstance(attrs, list):
attrs = [attrs]
if not lp:
lp = LoadParm()
lp.load('/etc/samba/smb.conf')
samdb = SamDB("tdb://%s" % lp.private_path("sam.ldb"), session_info=system_session(lp), lp=lp)
if not controls:
controls = ["domain_scope:0"]
if base is None:
ucr = config_registry.ConfigRegistry()
ucr.load()
base = ucr['samba4/ldap/base']
else:
if len(ldap.dn.str2dn(base)[0]) > 1:
if verbose:
print('wait_for_drs_replication(): skip, multiple RDNs are not supported')
return
if not base:
if verbose:
print('wait_for_drs_replication(): skip, no samba domain found')
return
if verbose:
print(f"Waiting for DRS replication, filter: {ldap_filter!r}, base: {base!r}, scope: {scope!r}, should_exist: {should_exist!r}", end=' ')
t = t0 = time.monotonic()
while t < t0 + timeout:
try:
res = samdb.search(base=base, scope=scope, expression=ldap_filter, attrs=attrs, controls=controls)
if bool(res) is bool(should_exist):
if verbose:
print("\nDRS replication took %d seconds" % (t - t0, ))
return # res
except ldb.LdbError as exc:
(_num, msg) = exc.args
if _num == ldb.ERR_INVALID_DN_SYNTAX:
raise
if _num == ldb.ERR_NO_SUCH_OBJECT and not should_exist:
if verbose:
print("\nDRS replication took %d seconds" % (t - t0, ))
return
print(f"Error during samdb.search: {msg}")
print('.', end=' ')
time.sleep(delta_t)
t = time.monotonic()
raise DRSReplicationFailed("DRS replication for filter: %r failed due to timeout after %d sec." % (ldap_filter, t - t0))
[docs]
def get_available_s4connector_dc() -> str:
cmd = ("/usr/bin/univention-ldapsearch", "-LLL", "(univentionService=S4 Connector)", "uid")
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
stdout, _stderr = p.communicate()
if not stdout:
print("WARNING: Automatic S4 Connector host detection failed")
return ""
matches = re.compile(r'^uid: (.*)\$$', re.M).findall(stdout.decode('utf-8', 'replace'))
if len(matches) == 1:
return matches[0]
elif len(matches) == 0:
print("WARNING: Automatic S4 Connector host detection failed")
return ""
# check if this is UCS@school
cmd = ("/usr/bin/univention-ldapsearch", "-LLL", "(univentionService=UCS@school)", "dn")
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
stdout, _stderr = p.communicate()
if not stdout:
print("ERROR: Automatic S4 Connector host detection failed: Found %s S4 Connector services" % len(matches))
return ""
# Look for replicating DCs
dcs_replicating_with_this_one = []
for s4c in matches:
cmd = ("/usr/bin/samba-tool", "drs", "showrepl", s4c)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, _stderr = p.communicate()
if p.returncode != 0:
continue
dcs_replicating_with_this_one.append(s4c)
if len(dcs_replicating_with_this_one) == 1:
return dcs_replicating_with_this_one[0]
else:
print("ERROR: Automatic S4 Connector host detection failed: Replicating with %s S4 Connector services" % len(dcs_replicating_with_this_one))
return ""
[docs]
def force_drs_replication(source_dc: str | None = None, destination_dc: str | None = None, partition_dn: str | None = None, direction: str = "in") -> int:
if not package_installed('univention-samba4'):
print('force_drs_replication(): skip, univention-samba4 not installed.')
return 0
src = source_dc or get_available_s4connector_dc()
if not src:
return 1
dst = destination_dc or socket.gethostname()
if src == dst:
return 0
if not partition_dn:
ucr = config_registry.ConfigRegistry()
ucr.load()
partition_dn = str(ucr.get('samba4/ldap/base'))
print("USING partition_dn:", partition_dn)
cmd = ("/usr/bin/samba-tool", "drs", "replicate", dst, src, partition_dn)
return subprocess.call(cmd)
def _ldap_replication_complete(verbose: bool = True) -> bool:
kwargs: dict[str, Any] = {}
if not verbose:
kwargs = {'stdout': open('/dev/null', 'w'), 'stderr': subprocess.STDOUT}
return subprocess.call('/usr/lib/nagios/plugins/check_univention_replication', **kwargs) == 0
[docs]
def wait_for_s4connector(timeout: int = 360, delta_t: int = 1, s4cooldown_t: int = 5) -> int:
ucr = config_registry.ConfigRegistry()
ucr.load()
if not package_installed('univention-s4-connector'):
print('wait_for_s4connector(): skip, univention-s4-connector not installed.')
return 0
if ucr.is_false('connector/s4/autostart'):
print('wait_for_s4connector(): skip, connector/s4/autostart is set to false.')
return 0
conn = sqlite3.connect('/etc/univention/connector/s4internal.sqlite')
c = conn.cursor()
static_count = 0
replication_complete = False
highestCommittedUSN = -1
lastUSN = -1
t = t0 = time.monotonic()
while t < t0 + timeout:
time.sleep(delta_t)
if not _ldap_replication_complete(verbose=False):
continue
else:
if not replication_complete:
print('Start waiting for S4-Connector replication')
replication_complete = True
previous_highestCommittedUSN = highestCommittedUSN
ldbresult = subprocess.Popen([
'ldbsearch',
'--url', '/var/lib/samba/private/sam.ldb',
'--scope', 'base',
'--basedn', '',
'highestCommittedUSN',
], stdout=subprocess.PIPE)
assert ldbresult.stdout
for chunk in ldbresult.stdout:
line = chunk.decode('utf-8').strip()
if line.startswith('highestCommittedUSN: '):
highestCommittedUSN = int(line[len('highestCommittedUSN: '):])
break
else:
raise KeyError('No highestCommittedUSN in ldbsearch')
previous_lastUSN = lastUSN
c.execute('select value from S4 where key=="lastUSN"')
lastUSN = int(c.fetchone()[0])
if not (lastUSN == highestCommittedUSN and lastUSN == previous_lastUSN and highestCommittedUSN == previous_highestCommittedUSN):
static_count = 0
print('Reset counter')
else:
static_count += 1
print(f'Counter: {static_count}; highestCommittedUSN: {highestCommittedUSN!r}; lastUSN: {lastUSN!r}')
if static_count * delta_t >= s4cooldown_t:
return 0
t = time.monotonic()
conn.close()
raise WaitForS4ConnectorTimeout()
[docs]
def append_dot(verify_list: list[str]) -> list[str]:
"""The S4-Connector appends dots to various dns records. Helper function to adjust a list."""
if not package_installed('univention-s4-connector'):
return verify_list
return [f'{x}.' for x in verify_list]