# SPDX-FileCopyrightText: 2024-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
import os
import subprocess
import sys
import ldap
import ldap.dn
from ldap import modlist
from ldap.controls import LDAPControl
import univention.testing.connector_common as tcommon
from univention.config_registry import ConfigRegistry
ucr = ConfigRegistry()
ucr.load()
[docs]
def get_rdn(dn):
r"""
>>> get_rdn(r'a=b\,c+d=e,f=g+h=i\,j')
'a=b\\,c+d=e'
>>> get_rdn(r'a=b')
'a=b'
"""
rdn = ldap.dn.str2dn(dn)[0]
return ldap.dn.dn2str([rdn])
[docs]
def get_parent_dn(dn):
r"""
>>> get_parent_dn(r'a=b\,c+d=e,f=g+h=i\,j')
'f=g+h=i\\,j'
>>> get_parent_dn(r'a=b')
"""
parent = ldap.dn.str2dn(dn)[1:]
return ldap.dn.dn2str(parent) if parent else None
[docs]
def to_bytes(value):
if isinstance(value, list):
return [to_bytes(item) for item in value]
if not isinstance(value, bytes):
return value.encode('utf-8')
return value
[docs]
def get_first(value):
if isinstance(value, list | tuple):
return value[0]
return value
[docs]
class LDAPConnection:
"""helper functions to modify LDAP-objects intended as glue for shell-scripts"""
def __init__(self, no_starttls=False):
self.ldapbase = ucr['ldap/base']
self.login_dn = 'cn=admin,%s' % self.ldapbase
self.pw_file = '/etc/ldap.secret'
self.host = 'localhost'
self.port = ucr.get('ldap/server/port', 389)
self.ca_file = None
self.protocol = 'ldap'
self.kerberos = False
self.serverctrls_for_add_and_modify = []
self.connect(no_starttls)
[docs]
def connect(self, no_starttls=False):
self.timeout = 5
tls_mode = 0 if no_starttls else 2
login_pw = ""
if self.pw_file:
with open(self.pw_file) as fp:
login_pw = fp.readline().rstrip('\n')
try:
if self.protocol == 'ldapi':
import urllib.parse
socket = urllib.parse.quote(self.socket, '')
ldapuri = f"{self.protocol}://{socket}"
else:
ldapuri = "%s://%s:%d" % (self.protocol, self.host, int(self.port))
# lo = univention.uldap.access(host=self.host, port=int(self.port), base=self.adldapbase, binddn=self.login_dn , bindpw=self.pw_file, start_tls=tls_mode, ca_certfile=self.ca_file, uri=ldapuri)
self.lo = ldap.initialize(ldapuri)
if self.ca_file:
self.lo.set_option(ldap.OPT_X_TLS_CACERTFILE, self.ca_file)
self.lo.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
if tls_mode > 0:
self.lo.start_tls_s()
except Exception:
ex = f'LDAP Connection to "{self.host}:{self.port}" failed (TLS: {not no_starttls}, Certificate: {self.ca_file})\n'
import traceback
raise Exception(ex + traceback.format_exc())
self.lo.set_option(ldap.OPT_REFERRALS, 0)
try:
if self.kerberos:
os.environ['KRB5CCNAME'] = '/tmp/ucs-test-ldap-glue.cc'
self.get_kerberos_ticket()
auth = ldap.sasl.gssapi("")
self.lo.sasl_interactive_bind_s("", auth)
elif login_pw:
self.lo.simple_bind_s(self.login_dn, login_pw)
except Exception:
if self.kerberos:
cred_msg = f'{self.principal!r} with Kerberos password {login_pw!r}'
else:
cred_msg = f'{self.login_dn!r} with simplebind password {login_pw!r}'
ex = f'LDAP Bind as {cred_msg} failed over connection to "{self.host}:{self.port}" (TLS: {not no_starttls}, Certificate: {self.ca_file})\n'
import traceback
raise Exception(ex + traceback.format_exc())
[docs]
def get_kerberos_ticket(self):
p1 = subprocess.Popen(['kdestroy'], close_fds=True)
p1.wait()
cmd_block = ['kinit', '--no-addresses', '--password-file=%s' % self.pw_file, self.principal]
p1 = subprocess.Popen(cmd_block, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
stdout, _stderr = p1.communicate()
if p1.returncode != 0:
raise Exception('The following command failed: "%s" (%s): %s' % (''.join(cmd_block), p1.returncode, stdout.decode('UTF-8')))
[docs]
def exists(self, dn):
try:
self.lo.search_ext_s(dn, ldap.SCOPE_BASE, timeout=10)
return True
except ldap.NO_SUCH_OBJECT:
return False
[docs]
def get_attribute(self, dn, attribute):
"""Get attributes 'key' of LDAP object at 'dn'."""
res = self.lo.search_ext_s(dn, ldap.SCOPE_BASE, timeout=10)
try:
return res[0][1][attribute]
except LookupError:
return []
[docs]
def get(self, dn, attr=[], required=False):
"""returns ldap object"""
if dn:
try:
result = self.lo.search_ext_s(dn, ldap.SCOPE_BASE, '(objectClass=*)', attr, timeout=10)
except ldap.NO_SUCH_OBJECT:
result = []
if result:
return result[0][1]
if required:
raise ldap.NO_SUCH_OBJECT({'desc': 'no object'})
return {}
[docs]
def create(self, dn, attrs):
"""Create LDAP object at 'dn' with attributes 'attrs'."""
# attrs = {key,:[value] if isinstance(value, (str, bytes)) else value for key, value in attrs.items()}
ldif = modlist.addModlist(attrs)
print(f'Creating {dn!r} with {ldif!r}', file=sys.stderr)
self.lo.add_ext_s(dn, ldif, serverctrls=self.serverctrls_for_add_and_modify)
[docs]
def delete(self, dn):
"""Delete LDAP object at 'dn'."""
print(f'Deleting {dn!r}', file=sys.stderr)
self.lo.delete_s(dn)
[docs]
def move(self, dn, newdn):
"""Move LDAP object from 'dn' to 'newdn'."""
newrdn = get_rdn(newdn)
parent1 = get_parent_dn(dn)
parent2 = get_parent_dn(newdn)
if parent1 != parent2:
print(f'Moving {dn!r} as {newdn!r} into {parent2!r}', file=sys.stderr)
self.lo.rename_s(dn, newrdn, parent2)
else:
print(f'Renaming {dn!r} to {newrdn!r}', file=sys.stderr)
self.lo.modrdn_s(dn, newrdn)
[docs]
def set_attribute(self, dn, key, value):
"""Set attribute 'key' of LDAP object at 'dn' to 'value'."""
print(f'Replace {key!r}={value!r} at {dn!r}', file=sys.stderr)
self.lo.modify_ext_s(dn, [(ldap.MOD_REPLACE, key, value)], serverctrls=self.serverctrls_for_add_and_modify)
[docs]
def set_attributes(self, dn, **attributes):
old_attributes = self.get(dn, attr=attributes.keys())
attributes = {name: [attr] if not isinstance(attr, list | tuple) else attr for name, attr in attributes.items()}
ldif = modlist.modifyModlist(old_attributes, attributes)
comp_dn = dn
if ldif:
print(f'Modifying {comp_dn!r}: {ldif!r}', file=sys.stderr)
self.lo.modify_ext_s(comp_dn, ldif, serverctrls=self.serverctrls_for_add_and_modify)
[docs]
def set_attribute_with_provision_ctrl(self, dn, key, value):
LDB_CONTROL_PROVISION_OID = '1.3.6.1.4.1.7165.4.3.16'
DSDB_CONTROL_REPLICATED_UPDATE_OID = '1.3.6.1.4.1.7165.4.3.3'
ctrls = [LDAPControl(LDB_CONTROL_PROVISION_OID, criticality=0), LDAPControl(DSDB_CONTROL_REPLICATED_UPDATE_OID, criticality=0), *self.serverctrls_for_add_and_modify]
print(f'Replace {key!r}={value!r} at {dn!r} (with provision control)', file=sys.stderr)
self.lo.modify_ext_s(dn, [(ldap.MOD_REPLACE, key, value)], serverctrls=ctrls)
[docs]
def delete_attribute(self, dn, key):
"""Delete attribute 'key' of LDAP object at 'dn'."""
print(f'Removing {key!r} from {dn!r}', file=sys.stderr)
self.lo.modify_ext_s(dn, [(ldap.MOD_DELETE, key, None)], serverctrls=self.serverctrls_for_add_and_modify)
[docs]
def append_to_attribute(self, dn, key, value):
"""Add 'value' to attribute 'key' of LDAP object at 'dn'."""
print(f'Appending {key!r}={value!r} to {dn!r}', file=sys.stderr)
self.lo.modify_ext_s(dn, [(ldap.MOD_ADD, key, value)], serverctrls=self.serverctrls_for_add_and_modify)
[docs]
def remove_from_attribute(self, dn, key, value):
"""Remove 'value' from attribute 'key' of LDAP object at 'dn'."""
print(f'Removing {key!r}={value!r} from {dn!r}', file=sys.stderr)
self.lo.modify_ext_s(dn, [(ldap.MOD_DELETE, key, value)], serverctrls=self.serverctrls_for_add_and_modify)
[docs]
class ADConnection(LDAPConnection):
"""helper functions to modify AD-objects"""
def __init__(self, configbase='connector'):
self.configbase = configbase
self.adldapbase = ucr['%s/ad/ldap/base' % configbase]
self.addomain = self.adldapbase.replace(',DC=', '.').replace('DC=', '')
self.kerberos = ucr.is_true('%s/ad/ldap/kerberos' % configbase)
if self.kerberos: # i.e. if UCR ad/member=true
# Note: tests/domainadmin/account is an OpenLDAP DN but
# we only extract the username from it in ldap_glue
self.login_dn = ucr['tests/domainadmin/account']
self.principal = ldap.dn.str2dn(self.login_dn)[0][0][1]
self.pw_file = ucr['tests/domainadmin/pwdfile']
else:
self.login_dn = ucr['%s/ad/ldap/binddn' % configbase]
self.pw_file = ucr['%s/ad/ldap/bindpw' % configbase]
self.host = ucr['%s/ad/ldap/host' % configbase]
self.port = ucr['%s/ad/ldap/port' % configbase]
self.ca_file = ucr['%s/ad/ldap/certificate' % configbase]
self.protocol = 'ldaps' if ucr.is_true('%s/ad/ldap/ldaps' % configbase) else 'ldap'
self.serverctrls_for_add_and_modify = []
no_starttls = ucr.is_false('%s/ad/ldap/ssl' % configbase)
self.connect(no_starttls)
[docs]
def delete(self, dn):
"""Delete LDAP object at 'dn'."""
SUBTREE_DELETE_CONTROL = '1.2.840.113556.1.4.805'
ctrls = [LDAPControl(SUBTREE_DELETE_CONTROL, criticality=True)]
print(f'Deleting {dn!r} with SUBTREE_DELETE_CONTROL ({SUBTREE_DELETE_CONTROL}) control', file=sys.stderr)
self.lo.delete_ext_s(dn, serverctrls=ctrls)
[docs]
def search(self, ldap_filter, attr=[], required=False):
res = self.lo.search_ext_s(self.adldapbase, ldap.SCOPE_SUBTREE, ldap_filter, attr, timeout=10)
result = []
for dn, attr in res:
if dn:
result.append((dn, attr))
if not result and required:
raise ldap.NO_SUCH_OBJECT({'desc': 'no object'})
return result
[docs]
def get(self, dn, attr=[], required=False):
"""returns ldap object"""
if dn:
try:
result = self.lo.search_ext_s(dn, ldap.SCOPE_BASE, '(objectClass=*)', attr, timeout=10)
except ldap.NO_SUCH_OBJECT:
result = []
if result:
return result[0][1]
if required:
raise ldap.NO_SUCH_OBJECT({'desc': 'no object'})
return {}
[docs]
def set_attributes(self, dn, **attributes):
old_attributes = self.get(dn, attr=attributes.keys())
ldif = modlist.modifyModlist(old_attributes, attributes)
if ldif:
self.lo.modify_ext_s(dn, ldif)
[docs]
def add_to_group(self, group_dn, member_dn):
self.append_to_attribute(group_dn, 'member', member_dn)
[docs]
def remove_from_group(self, group_dn, member_dn):
self.remove_from_attribute(group_dn, 'member', member_dn)
[docs]
def getdn(self, filter):
for dn, _attr in self.lo.search_ext_s(self.adldapbase, ldap.SCOPE_SUBTREE, filter, timeout=10):
if dn:
print(dn)
[docs]
def createuser(self, username, position=None, **attributes):
"""
Create a AD user with attributes as given by the keyword-args
`attributes`. The created user will be populated with some defaults if
not otherwise set.
Returns the dn of the created user.
"""
cn = to_bytes(attributes.get('cn', username))
sn = to_bytes(attributes.get('sn', b'SomeSurName'))
new_position = position or 'cn=users,%s' % self.adldapbase
new_dn = 'cn=%s,%s' % (ldap.dn.escape_dn_chars(get_first(cn).decode("UTF-8")), new_position)
defaults = (
('objectclass', [b'top', b'user', b'person', b'organizationalPerson']),
('cn', cn),
('sn', sn),
('sAMAccountName', to_bytes(username)),
('userPrincipalName', b'%s@%s' % (to_bytes(username), to_bytes(self.addomain))),
('displayName', b'%s %s' % (to_bytes(username), get_first(sn))))
new_attributes = dict(defaults)
new_attributes.update(attributes)
self.create(new_dn, new_attributes)
return new_dn
[docs]
def rename_or_move_user_or_group(self, dn, name=None, position=None):
exploded = ldap.dn.str2dn(dn)
new_rdn = [("cn", name, ldap.AVA_STRING)] if name else exploded[0]
new_position = ldap.dn.str2dn(position) if position else exploded[1:]
new_dn = ldap.dn.dn2str([new_rdn, *new_position])
self.move(dn, new_dn)
return new_dn
[docs]
def group_create(self, groupname, position=None, **attributes):
"""
Create a AD group with attributes as given by the keyword-args
`attributes`. The created group will be populated with some defaults if
not otherwise set.
Returns the dn of the created group.
"""
new_position = position or 'cn=groups,%s' % self.adldapbase
new_dn = f'cn={ldap.dn.escape_dn_chars(groupname)},{new_position}'
defaults = (('objectclass', [b'top', b'group']), ('sAMAccountName', to_bytes(groupname)))
new_attributes = dict(defaults)
new_attributes.update(attributes)
self.create(new_dn, new_attributes)
return new_dn
[docs]
def windows_create(self, name, position=None, **attributes):
"""
Create a AD windows with attributes as given by the keyword-args
`attributes`. The created windows will be populated with some defaults if
not otherwise set.
Returns the dn of the created windows.
"""
new_position = position or 'cn=Computers,%s' % self.adldapbase
new_dn = f'cn={ldap.dn.escape_dn_chars(name)},{new_position}'
defaults = (('userAccountControl', [b'4098']), ('objectclass', [b'top', b'person', b'organizationalPerson', b'user', b'computer']), ('cn', to_bytes(name)))
new_attributes = dict(defaults)
new_attributes.update(attributes)
self.create(new_dn, new_attributes)
return new_dn
[docs]
def getprimarygroup(self, user_dn):
try:
res = self.lo.search_ext_s(user_dn, ldap.SCOPE_BASE, timeout=10)
except Exception:
return None
primaryGroupID = res[0][1]['primaryGroupID'][0].decode('UTF-8')
res = self.lo.search_ext_s(
self.adldapbase,
ldap.SCOPE_SUBTREE,
'objectClass=group',
timeout=10,
)
import re
regex = '^(.*?)-%s$' % primaryGroupID
for r in res:
if r[0] is None or r[0] == 'None':
continue # Referral
if re.search(regex, self.decode_sid(r[1]['objectSid'][0])):
return r[0]
[docs]
def setprimarygroup(self, user_dn, group_dn):
res = self.lo.search_ext_s(group_dn, ldap.SCOPE_BASE, timeout=10)
import re
groupid = (re.search('^(.*)-(.*?)$', self.decode_sid(res[0][1]['objectSid'][0]))).group(2)
self.set_attribute(user_dn, 'primaryGroupID', groupid.encode('UTF-8'))
[docs]
def container_create(self, name, position=None, description=None):
if not position:
position = self.adldapbase
attrs = {}
attrs['objectClass'] = [b'top', b'container']
attrs['cn'] = to_bytes(name)
if description:
attrs['description'] = to_bytes(description)
container_dn = f'cn={ldap.dn.escape_dn_chars(name)},{position}'
self.create(container_dn, attrs)
return container_dn
[docs]
def createou(self, name, position=None, description=None):
if not position:
position = self.adldapbase
attrs = {}
attrs['objectClass'] = [b'top', b'organizationalUnit']
attrs['ou'] = to_bytes(name)
if description:
attrs['description'] = to_bytes(description)
dn = f'ou={ldap.dn.escape_dn_chars(name)},{position}'
self.create(dn, attrs)
return dn
[docs]
def verify_object(self, dn, expected_attributes):
"""
Verify an object exists with the given `dn` and attributes in the
AD-LDAP. Setting `expected_attributes` to `None` requires the object to
not exist. `expected_attributes` is a dictionary of
`attribute`:`list-of-values`.
This will throw an `AssertionError` in case of a mismatch.
"""
if expected_attributes is None:
assert not self.exists(dn), f"AD object {dn} should not exist"
else:
ad_object = self.get(dn)
for (key, value) in expected_attributes.items():
ad_value = {tcommon.to_unicode(x).lower() for x in ad_object.get(key, [])}
expected = set((tcommon.to_unicode(v).lower() for v in value) if isinstance(value, list | tuple) else (tcommon.to_unicode(value).lower(),))
if not expected.issubset(ad_value):
try:
ad_value = {tcommon.normalize_dn(dn) for dn in ad_value}
expected = {tcommon.normalize_dn(dn) for dn in expected}
except ldap.DECODING_ERROR:
pass
error_msg = f'{key}: {expected} not in {ad_value}, object {ad_object}'
assert expected.issubset(ad_value), error_msg
if __name__ == '__main__':
import doctest
doctest.testmod()