Source code for univention.connector.ad

#!/usr/bin/python3
#
# Univention AD Connector
#  Basic class for the AD connector part
#
# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

import base64
import calendar
import copy
import os
import re
import string
import subprocess
import sys
import time
from logging import getLogger
from tempfile import NamedTemporaryFile

import ldap
import samba.dcerpc.samr
from ldap.controls import LDAPControl, SimplePagedResultsControl
from ldap.filter import escape_filter_chars
from samba import drs_utils
from samba.credentials import DONT_USE_KERBEROS, Credentials
from samba.dcerpc import drsuapi, lsa, nbt, security
from samba.ndr import ndr_unpack
from samba.net import Net
from samba.param import LoadParm

import univention.connector
import univention.connector.ad.mapping
import univention.uldap
from univention.config_registry import ConfigRegistry
from univention.logging import Structured


log = Structured(getLogger("LDAP").getChild(__name__))

LDAP_SERVER_SHOW_DELETED_OID = "1.2.840.113556.1.4.417"
LDB_CONTROL_DOMAIN_SCOPE_OID = "1.2.840.113556.1.4.1339"

# page results
PAGE_SIZE = 1000


[docs] class netbiosDomainnameNotFound(Exception): pass
[docs] class kerberosAuthenticationFailed(Exception): pass
[docs] def set_univentionObjectFlag_to_synced(connector, key, ucs_object): if connector.configRegistry.is_true('ad/member', False): connector._object_mapping(key, ucs_object, 'ucs') ucs_result = connector.lo.search(base=ucs_object['dn'], attr=['univentionObjectFlag']) flags = ucs_result[0][1].get('univentionObjectFlag', []) if b'synced' not in flags: connector.lo.lo.lo.modify_s(ucs_object['dn'], [(ldap.MOD_ADD, 'univentionObjectFlag', b'synced')])
[docs] def group_members_sync_from_ucs(connector, key, object): return connector.group_members_sync_from_ucs(key, object)
[docs] def object_memberships_sync_from_ucs(connector, key, object): return connector.object_memberships_sync_from_ucs(key, object)
[docs] def group_members_sync_to_ucs(connector, key, object): return connector.group_members_sync_to_ucs(key, object)
[docs] def object_memberships_sync_to_ucs(connector, key, object): return connector.object_memberships_sync_to_ucs(key, object)
[docs] def primary_group_sync_from_ucs(connector, key, object): return connector.primary_group_sync_from_ucs(key, object)
[docs] def primary_group_sync_to_ucs(connector, key, object): return connector.primary_group_sync_to_ucs(key, object)
[docs] def disable_user_from_ucs(connector, key, object): return connector.disable_user_from_ucs(key, object)
[docs] def set_userPrincipalName_from_ucr(connector, key, object): return connector.set_userPrincipalName_from_ucr(key, object)
[docs] def disable_user_to_ucs(connector, key, object): return connector.disable_user_to_ucs(key, object)
[docs] def fix_dn(dn): # Samba LDAP returns broken DN, which cannot be parsed: ldap.dn.str2dn('cn=foo\\?,dc=base') return dn.replace('\\?', '?') if dn is not None else dn
[docs] def str2dn(dn): try: return ldap.dn.str2dn(dn) except ldap.DECODING_ERROR: return ldap.dn.str2dn(fix_dn(dn))
[docs] def unix2ad_time(ltime): d = 116444736000000000 # difference between 1601 and 1970 return int(calendar.timegm(time.strptime(ltime, "%Y-%m-%d")) - 86400) * 10000000 + d # AD stores end of day in accountExpires
[docs] def ad2unix_time(ltime): d = 116444736000000000 # difference between 1601 and 1970 return time.strftime("%Y-%m-%d", time.gmtime((ltime - d) / 10000000 + 86400)) # shadowExpire treats day of expiry as exclusive
[docs] def samba2ad_time(ltime): d = 116444736000000000 # difference between 1601 and 1970 return int(time.mktime(time.localtime(ltime))) * 10000000 + d
[docs] def ad2samba_time(ltime): if ltime == 0: return ltime d = 116444736000000000 # difference between 1601 and 1970 return int((ltime - d) / 10000000)
[docs] def samaccountname_dn_mapping(connector, given_object, dn_mapping_stored, ucsobject, propertyname, propertyattrib, ocucs, ucsattrib, ocad, dn_attr=None): """ map dn of given object (which must have an samaccountname in AD) ocucs and ocad are objectclasses in UCS and AD """ object = copy.deepcopy(given_object) samaccountname = '' dn_attr_val = '' if object['dn'] is not None: if 'sAMAccountName' in object['attributes']: samaccountname = object['attributes']['sAMAccountName'][0].decode('UTF-8') if dn_attr: try: dn_attr_vals = [value for key, value in object['attributes'].items() if dn_attr.lower() == key.lower()][0] # noqa: RUF015 except IndexError: pass else: dn_attr_val = dn_attr_vals[0].decode('UTF-8') def dn_premapped(object, dn_key, dn_mapping_stored): if (dn_key not in dn_mapping_stored) or (not object[dn_key]): log.trace("samaccount_dn_mapping: not premapped (in first instance)") return False if ucsobject: if connector.get_object(object[dn_key]) is not None: log.trace("samaccount_dn_mapping: premapped AD object found") return True else: log.trace("samaccount_dn_mapping: premapped AD object not found") return False else: if connector.get_ucs_ldap_object(object[dn_key]) is not None: log.trace("samaccount_dn_mapping: premapped UCS object found") return True else: log.trace("samaccount_dn_mapping: premapped UCS object not found") return False for dn_key in ['dn', 'olddn']: log.trace("samaccount_dn_mapping: check newdn for key %s: %s", dn_key, object.get(dn_key)) if dn_key in object and not dn_premapped(object, dn_key, dn_mapping_stored): dn = object[dn_key] # Skip Configuration objects with empty DNs if dn is None: break exploded_dn = str2dn(dn) (_fst_rdn_attribute, fst_rdn_value, _flags) = exploded_dn[0][0] value = fst_rdn_value if ucsobject: # lookup the cn as sAMAccountName in AD to get corresponding DN, if not found create new log.trace("samaccount_dn_mapping: got an UCS-Object") for ucsval, conval in connector.property[propertyname].mapping_table.get(propertyattrib, []): if value.lower() == ucsval.lower(): value = conval log.trace("samaccount_dn_mapping: map %s according to mapping-table", propertyattrib) break else: if propertyattrib in connector.property[propertyname].mapping_table: log.trace("samaccount_dn_mapping: %s not in mapping-table", propertyattrib) log.debug("samaccount_dn_mapping: search in ad samaccountname=%s", value) search_filter = format_escaped('(&(objectclass={0!e})(samaccountname={1!e}))', ocad, value) result = connector.lo_ad.search(filter=search_filter) if result and len(result) > 0 and result[0] and len(result[0]) > 0 and result[0][0]: # no referral, so we've got a valid result if dn_key == 'olddn' or (dn_key == 'dn' and 'olddn' not in object): newdn = result[0][0] else: # move # return a kind of frankenstein DN here, sync_from_ucs replaces the UCS LDAP base # with the AD LDAP base at a later stage, see Bug #48440 newdn = ldap.dn.dn2str([str2dn(result[0][0])[0], *exploded_dn[1:]]) else: newdn = ldap.dn.dn2str([[('cn', fst_rdn_value, ldap.AVA_STRING)], *exploded_dn[1:]]) # new object, don't need to change log.trace("samaccount_dn_mapping: newdn: %s", newdn) else: # get the object to read the sAMAccountName in AD and use it as name # we have no fallback here, the given dn must be found in AD or we've got an error log.trace("samaccount_dn_mapping: got an AD-Object") i = 0 while not samaccountname: # in case of olddn this is already set i = i + 1 search_dn = dn if 'deleted_dn' in object: search_dn = object['deleted_dn'] try: samaccountname_filter = format_escaped('(objectClass={0!e})', ocad) samaccountname_search_result = connector.ad_search_ext_s(search_dn, ldap.SCOPE_BASE, samaccountname_filter, ['sAMAccountName']) samaccountname = samaccountname_search_result[0][1]['sAMAccountName'][0].decode('UTF-8') log.trace("samaccount_dn_mapping: got samaccountname from AD") except ldap.NO_SUCH_OBJECT: # AD may need time if i > 5: raise time.sleep(1) # AD may need some time... for ucsval, conval in connector.property[propertyname].mapping_table.get(propertyattrib, []): if samaccountname.lower() == conval.lower(): samaccountname = ucsval log.trace("samaccount_dn_mapping: map samaccountanme according to mapping-table") break else: if propertyattrib in connector.property[propertyname].mapping_table: log.trace("samaccount_dn_mapping: samaccountname not in mapping-table") # search for object with this dn in ucs, needed if it lies in a different container ucsdn = '' log.trace("samaccount_dn_mapping: samaccountname is: %r", samaccountname) ucsdn_filter = format_escaped('(&(objectclass={0!e})({1}={2!e}))', ocucs, ucsattrib, samaccountname) ucsdn_result = connector.search_ucs(filter=ucsdn_filter, base=connector.lo.base, scope='sub', attr=['objectClass']) if ucsdn_result and len(ucsdn_result) > 0 and ucsdn_result[0] and len(ucsdn_result[0]) > 0: ucsdn = ucsdn_result[0][0] if ucsdn and (dn_key == 'olddn' or (dn_key == 'dn' and 'olddn' not in object)): newdn = ucsdn log.trace("samaccount_dn_mapping: newdn is ucsdn") else: if dn_attr: newdn_rdn = [(dn_attr, dn_attr_val, ldap.AVA_STRING)] else: newdn_rdn = [(ucsattrib, samaccountname, ldap.AVA_STRING)] newdn = ldap.dn.dn2str([newdn_rdn, *exploded_dn[1:]]) # guess the old dn log.debug("samaccount_dn_mapping: newdn for key %r: olddn=%r newdn=%r", dn_key, dn, newdn) object[dn_key] = newdn return object
[docs] def user_dn_mapping(connector, given_object, dn_mapping_stored, isUCSobject): """ map dn of given user using the samaccountname/uid connector is an instance of univention.connector.ad, given_object an object-dict, dn_mapping_stored a list of dn-types which are already mapped because they were stored in the config-file """ return samaccountname_dn_mapping(connector, given_object, dn_mapping_stored, isUCSobject, 'user', 'samAccountName', 'posixAccount', 'uid', 'user')
[docs] def group_dn_mapping(connector, given_object, dn_mapping_stored, isUCSobject): """ map dn of given group using the samaccountname/cn connector is an instance of univention.connector.ad, given_object an object-dict, dn_mapping_stored a list of dn-types which are already mapped because they were stored in the config-file """ return samaccountname_dn_mapping(connector, given_object, dn_mapping_stored, isUCSobject, 'group', 'cn', 'posixGroup', 'cn', 'group')
[docs] def windowscomputer_dn_mapping(connector, given_object, dn_mapping_stored, isUCSobject): """ map dn of given windows computer using the samaccountname/uid connector is an instance of univention.connector.ad, given_object an object-dict, dn_mapping_stored a list of dn-types which are already mapped because they were stored in the config-file """ return samaccountname_dn_mapping(connector, given_object, dn_mapping_stored, isUCSobject, 'windowscomputer', 'samAccountName', 'posixAccount', 'uid', 'computer', 'cn')
[docs] def decode_sid(value): return str(ndr_unpack(security.dom_sid, value))
[docs] class LDAPEscapeFormatter(string.Formatter): """ A custom string formatter that supports a special `e` conversion, to employ the function `ldap.filter.escape_filter_chars()` on the given value. >>> LDAPEscapeFormatter().format("{0}", "*") '*' >>> LDAPEscapeFormatter().format("{0!e}", "*") '\\2a' Unfortunately this does not support the key/index-less variant (see http://bugs.python.org/issue13598). >>> LDAPEscapeFormatter().format("{!e}", "*") Traceback (most recent call last): KeyError: '' """
[docs] def convert_field(self, value, conversion): if conversion == 'e': if isinstance(value, str): return escape_filter_chars(value) if isinstance(value, bytes): raise TypeError(f'Filter must be string, not bytes: {value!r}') return escape_filter_chars(str(value)) return super().convert_field(value, conversion)
[docs] def format_escaped(format_string, *args, **kwargs): """ Convenience-wrapper around `LDAPEscapeFormatter`. Use `!e` do denote format-field that should be escaped using `ldap.filter.escape_filter_chars()`' >>> format_escaped("{0!e}", "*") '\\2a' """ return LDAPEscapeFormatter().format(format_string, *args, **kwargs)