Source code for univention.connector.ad.mapping

#!/usr/bin/python3
#
# Univention AD Connector
#  this file defines the mapping between AD and UCS
#
# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only


import importlib.util

import univention.connector.ad
import univention.connector.ad.password
from univention.config_registry import ConfigRegistry
from univention.connector.ad import proxyAddresses


configRegistry = ConfigRegistry()
configRegistry.load()


[docs] def ignore_filter_from_tmpl(template, ucr_key, default=''): """ Construct an `ignore_filter` from a `ucr_key` (`connector/ad/mapping/*/ignorelist`, a comma delimited list of values), as specified by `template` while correctly escaping the filter-expression. `template` must be formatted as required by `format_escaped`. >>> ignore_filter_from_tmpl('(cn={0!e})', ... 'connector/ad/mapping/nonexistend/ignorelist', ... 'one,two,three') '(|(cn=one)(cn=two)(cn=three))' """ from univention.connector.ad import format_escaped variables = [v for v in configRegistry.get(ucr_key, default).split(',') if v] filter_parts = [format_escaped(template, v) for v in variables] if filter_parts: return '(|{})'.format(''.join(filter_parts)) return ''
[docs] def ignore_filter_from_attr(attribute, ucr_key, default=''): """ Convenience-wrapper around `ignore_filter_from_tmpl()`. This expects a single `attribute` instead of a `template` argument. >>> ignore_filter_from_attr('cn', ... 'connector/ad/mapping/nonexistend/ignorelist', ... 'one,two,three') '(|(cn=one)(cn=two)(cn=three))' """ template = f'({attribute}={{0!e}})' return ignore_filter_from_tmpl(template, ucr_key, default)
[docs] def get_mapping(configbasename='connector'): ad_mapping = create_mapping(configbasename) return load_localmapping(ad_mapping, f'/etc/univention/{configbasename}/ad/localmapping.py')
[docs] def create_mapping(configbasename='connector'): def connector(string): return string % (configbasename,) global_allow_subtree_ad = [] global_allow_subtree_ucs = [] for key in configRegistry: if key.startswith(connector('%s/ad/mapping/allowsubtree')): if key.endswith('/ad'): global_allow_subtree_ad.append(configRegistry[key]) elif key.endswith('/ucs'): global_allow_subtree_ucs.append(configRegistry[key]) global_ignore_subtree = [ 'cn=univention,{ldap/base}'.format(**configRegistry), 'cn=policies,{ldap/base}'.format(**configRegistry), 'cn=shares,{ldap/base}'.format(**configRegistry), 'cn=printers,{ldap/base}'.format(**configRegistry), 'cn=networks,{ldap/base}'.format(**configRegistry), 'cn=kerberos,{ldap/base}'.format(**configRegistry), 'cn=dhcp,{ldap/base}'.format(**configRegistry), 'cn=dns,{ldap/base}'.format(**configRegistry), 'cn=mail,{ldap/base}'.format(**configRegistry), 'cn=samba,{ldap/base}'.format(**configRegistry), 'cn=nagios,{ldap/base}'.format(**configRegistry), connector('cn=System,%%(%s/ad/ldap/base)s') % configRegistry, connector('ou=Grp Policy Users,%%(%s/ad/ldap/base)s') % configRegistry, connector('cn=Builtin,%%(%s/ad/ldap/base)s') % configRegistry, connector('cn=ForeignSecurityPrincipals,%%(%s/ad/ldap/base)s') % configRegistry, connector('ou=Domain Controllers,%%(%s/ad/ldap/base)s') % configRegistry, connector('cn=Program Data,%%(%s/ad/ldap/base)s') % configRegistry, connector('cn=Configuration,%%(%s/ad/ldap/base)s') % configRegistry, 'cn=opsi,{ldap/base}'.format(**configRegistry), connector('cn=Microsoft Exchange System Objects,%%(%s/ad/ldap/base)s') % configRegistry, ] for key in configRegistry: if key.startswith(connector('%s/ad/mapping/ignoresubtree/')): global_ignore_subtree.append(configRegistry[key]) user_ignore_list = ignore_filter_from_tmpl('(uid={0!e})(CN={0!e})', connector('%s/ad/mapping/user/ignorelist')) user_ignore_filter = configRegistry.get(connector('%s/ad/mapping/user/ignorefilter'), '') if user_ignore_filter and not user_ignore_filter.startswith('('): user_ignore_filter = f'({user_ignore_filter})' user_ignore_filter = '(|{}{}{})'.format('(userAccountControl=2080)', user_ignore_filter, user_ignore_list) ignore_filter_parts = '(groupType=-2147483643)(groupType=4)(univentionGroupType=-2147483643)(univentionGroupType=4)' group_ignore_filter = configRegistry.get(connector('%s/ad/mapping/group/ignorefilter'), '') if group_ignore_filter and not group_ignore_filter.startswith('('): group_ignore_filter = f'({group_ignore_filter})' if configRegistry.is_false(connector('%s/ad/mapping/group/grouptype'), False): ignore_filter_parts += '(sambaGroupType=5)(groupType=5)' ignore_filter_parts += ignore_filter_from_attr('cn', connector('%s/ad/mapping/group/ignorelist')) group_ignore_filter = f'(|{group_ignore_filter}{ignore_filter_parts})' computer_ignore_filter = configRegistry.get(connector('%s/ad/mapping/windowscomputer/ignorefilter'), '') if computer_ignore_filter and not computer_ignore_filter.startswith('('): computer_ignore_filter = f'({computer_ignore_filter})' computer_ignore_parts = ignore_filter_from_attr('cn', connector('%s/ad/mapping/windowscomputer/ignorelist')) computer_ignore_filter = f'(|{computer_ignore_filter}{computer_ignore_parts})' if computer_ignore_filter or computer_ignore_parts else '' container_ignore_attrs = ignore_filter_from_attr('cn', connector('%s/ad/mapping/container/ignorelist'), 'mail,kerberos') container_ignore_filter = configRegistry.get(connector('%s/ad/mapping/container/ignorefilter'), '') if container_ignore_filter and not container_ignore_filter.startswith('('): container_ignore_filter = f'({container_ignore_filter})' container_ignore_filter = f'(|{container_ignore_filter}{container_ignore_attrs})' if container_ignore_filter or container_ignore_attrs else '' ou_ignore_attrs = ignore_filter_from_attr('ou', connector('%s/ad/mapping/ou/ignorelist')) ou_ignore_filter = configRegistry.get(connector('%s/ad/mapping/ou/ignorefilter'), '') if ou_ignore_filter and not ou_ignore_filter.startswith('('): ou_ignore_filter = f'({ou_ignore_filter})' ou_ignore_filter = f'(|{ou_ignore_filter}{ou_ignore_attrs})' if ou_ignore_filter or ou_ignore_attrs else '' ad_mapping = { 'user': univention.connector.property( ucs_default_dn='cn=users,{ldap/base}'.format(**configRegistry), con_default_dn=connector('cn=users,%%(%s/ad/ldap/base)s') % configRegistry, ucs_module='users/user', # read, write, sync, none sync_mode=configRegistry.get(connector('%s/ad/mapping/user/syncmode'), configRegistry.get(connector('%s/ad/mapping/syncmode'))), scope='sub', con_search_filter='(&(objectClass=user)(!objectClass=computer))', match_filter='(|(&(objectClass=posixAccount)(objectClass=sambaSamAccount))(objectClass=user))', allow_subtree=global_allow_subtree_ucs + global_allow_subtree_ad, ignore_filter=user_ignore_filter or None, ignore_subtree=global_ignore_subtree, con_create_objectclass=['top', 'user', 'person', 'organizationalPerson'], dn_mapping_function=[univention.connector.ad.user_dn_mapping], attributes={ # from UCS Module 'samAccountName': univention.connector.attribute( ucs_attribute='username', ldap_attribute='uid', con_attribute='sAMAccountName', required=1, compare_function=univention.connector.compare_lowercase, ), 'givenName': univention.connector.attribute( ucs_attribute='firstname', ldap_attribute='givenName', con_attribute='givenName', ), 'sn': univention.connector.attribute( ucs_attribute='lastname', ldap_attribute='sn', con_attribute='sn', ), }, ucs_create_functions=[ univention.connector.set_ucs_passwd_user, univention.connector.check_ucs_lastname_user, univention.connector.set_primary_group_user, ], post_con_modify_functions=list(filter(None, [ univention.connector.ad.set_userPrincipalName_from_ucr, univention.connector.ad.password.password_sync_ucs if configRegistry.is_false(connector('%s/ad/mapping/user/password/disabled'), True) else None, univention.connector.ad.password.lockout_sync_from_ucs, univention.connector.ad.primary_group_sync_from_ucs, univention.connector.ad.object_memberships_sync_from_ucs, univention.connector.ad.disable_user_from_ucs, ])), post_ucs_modify_functions=list(filter(None, [ univention.connector.ad.password.password_sync_kinit if configRegistry.is_false(connector('%s/ad/mapping/user/password/disabled'), True) and configRegistry.is_true(connector('%s/ad/mapping/user/password/kinit'), False) else None, univention.connector.ad.password.password_sync if configRegistry.is_false(connector('%s/ad/mapping/user/password/disabled'), True) and not configRegistry.is_true(connector('%s/ad/mapping/user/password/kinit'), False) else None, univention.connector.ad.password.lockout_sync_to_ucs, univention.connector.ad.set_univentionObjectFlag_to_synced, univention.connector.ad.primary_group_sync_to_ucs, univention.connector.ad.object_memberships_sync_to_ucs, univention.connector.ad.disable_user_to_ucs, ])), post_attributes={ 'organisation': univention.connector.attribute( ucs_attribute='organisation', ldap_attribute='o', con_attribute=configRegistry.get(connector('%s/ad/mapping/organisation'), 'company'), ), 'Exchange-Homeserver': univention.connector.attribute( ucs_attribute='Exchange-Homeserver', ldap_attribute='univentionADmsExchHomeServerName', con_attribute='msExchHomeServerName', ), 'Exchange-homeMDB': univention.connector.attribute( ucs_attribute='Exchange-homeMDB', ldap_attribute='univentionADhomeMDB', con_attribute='homeMDB', ), 'Exchange-Nickname': univention.connector.attribute( ucs_attribute='Exchange-Nickname', ldap_attribute='univentionADmailNickname', con_attribute='mailNickname', ), 'mailPrimaryAddress': univention.connector.attribute( ucs_attribute='mailPrimaryAddress', ldap_attribute='mailPrimaryAddress', con_attribute='proxyAddresses', mapping=( proxyAddresses.to_proxyAddresses, proxyAddresses.to_mailPrimaryAddress, ), compare_function=proxyAddresses.equal, ), 'mailPrimaryAddress_to_mail': univention.connector.attribute( sync_mode='write', ucs_attribute='mailPrimaryAddress', ldap_attribute='mailPrimaryAddress', con_attribute='mail', ), 'mailAlternativeAddress': univention.connector.attribute( sync_mode='read' if configRegistry.is_true(connector('%s/ad/mapping/user/primarymail')) else 'sync', # proxyAddresses.to_mailPrimaryAddress does the write ucs_attribute='mailAlternativeAddress', ldap_attribute='mailAlternativeAddress', con_attribute='proxyAddresses', mapping=( None, proxyAddresses.to_mailAlternativeAddress, ), compare_function=proxyAddresses.equal, ), 'description': univention.connector.attribute( ucs_attribute='description', ldap_attribute='description', con_attribute='description', ), 'street': univention.connector.attribute( ucs_attribute='street', ldap_attribute='street', con_attribute='streetAddress', ), 'city': univention.connector.attribute( ucs_attribute='city', ldap_attribute='l', con_attribute='l', ), 'postcode': univention.connector.attribute( ucs_attribute='postcode', ldap_attribute='postalCode', con_attribute='postalCode', ), 'sambaWorkstations': univention.connector.attribute( ucs_attribute='sambaUserWorkstations', ldap_attribute='sambaUserWorkstations', con_attribute='userWorkstations', ), # 'sambaLogonHours': univention.connector.attribute( # ucs_attribute='sambaLogonHours', # ldap_attribute='sambaLogonHours', # con_attribute='logonHours', # ), 'profilepath': univention.connector.attribute( ucs_attribute='profilepath', ldap_attribute='sambaProfilePath', con_attribute='profilePath', ), 'scriptpath': univention.connector.attribute( ucs_attribute='scriptpath', ldap_attribute='sambaLogonScript', con_attribute='scriptPath', ), 'telephoneNumber': univention.connector.attribute( ucs_attribute='phone', ldap_attribute='telephoneNumber', con_attribute='telephoneNumber', con_other_attribute='otherTelephone', ), 'homePhone': univention.connector.attribute( ucs_attribute='homeTelephoneNumber', ldap_attribute='homePhone', con_attribute='homePhone', con_other_attribute='otherHomePhone', ), 'mobilePhone': univention.connector.attribute( ucs_attribute='mobileTelephoneNumber', ldap_attribute='mobile', con_attribute='mobile', con_other_attribute='otherMobile', ), 'pager': univention.connector.attribute( ucs_attribute='pagerTelephoneNumber', ldap_attribute='pager', con_attribute='pager', con_other_attribute='otherPager', ), 'displayName': univention.connector.attribute( ucs_attribute='displayName', ldap_attribute='displayName', con_attribute='displayName', ), }, ), 'group': univention.connector.property( ucs_default_dn='cn=groups,{ldap/base}'.format(**configRegistry), con_default_dn=connector('cn=Users,%%(%s/ad/ldap/base)s') % configRegistry, ucs_module='groups/group', sync_mode=configRegistry.get(connector('%s/ad/mapping/group/syncmode'), configRegistry.get(connector('%s/ad/mapping/syncmode'))), scope='sub', allow_subtree=global_allow_subtree_ucs + global_allow_subtree_ad, ignore_filter=group_ignore_filter or None, ignore_subtree=global_ignore_subtree, con_search_filter='objectClass=group', con_create_objectclass=['top', 'group'], post_con_modify_functions=[ univention.connector.ad.group_members_sync_from_ucs, univention.connector.ad.object_memberships_sync_from_ucs, ], post_ucs_modify_functions=[ univention.connector.ad.set_univentionObjectFlag_to_synced, univention.connector.ad.group_members_sync_to_ucs, univention.connector.ad.object_memberships_sync_to_ucs, ], dn_mapping_function=[univention.connector.ad.group_dn_mapping], attributes={ 'cn': univention.connector.attribute( ucs_attribute='name', ldap_attribute='cn', con_attribute='sAMAccountName', required=True, compare_function=univention.connector.compare_lowercase, ), 'groupType': univention.connector.attribute( ucs_attribute='adGroupType', ldap_attribute='univentionGroupType', con_attribute='groupType', ), 'description': univention.connector.attribute( ucs_attribute='description', ldap_attribute='description', con_attribute='description', ), 'mailAddress': univention.connector.attribute( sync_mode='read', ucs_attribute='mailAddress', ldap_attribute='mailPrimaryAddress', con_attribute='proxyAddresses', mapping=( proxyAddresses.to_proxyAddresses, proxyAddresses.to_mailPrimaryAddress, ), compare_function=proxyAddresses.equal, ), 'mailPrimaryAddress_to_mail': univention.connector.attribute( sync_mode='write', ucs_attribute='mailAddress', ldap_attribute='mailPrimaryAddress', con_attribute='mail', ), 'mailAlternativeAddress': univention.connector.attribute( sync_mode='read' if configRegistry.is_true(connector('%s/ad/mapping/group/primarymail')) else 'sync', # proxyAddresses.to_mailPrimaryAddress does the write ucs_attribute='mailAlternativeAddress', ldap_attribute='mailAlternativeAddress', con_attribute='proxyAddresses', mapping=( None, proxyAddresses.to_mailAlternativeAddress, ), compare_function=proxyAddresses.equal, ), 'Exchange-Nickname': univention.connector.attribute( ucs_attribute='Exchange-Nickname', ldap_attribute='univentionADmailNickname', con_attribute='mailNickname', ), }, mapping_table={ 'cn': [ ('Domain Users', 'Domänen-Benutzer'), ('Domain Admins', 'Domänen-Admins'), ('Windows Hosts', 'Domänencomputer'), ('Domain Guests', 'Domänen-Gäste'), ], }, ), 'windowscomputer': univention.connector.property( ucs_default_dn='cn=computers,{ldap/base}'.format(**configRegistry), con_default_dn=connector('cn=computers,%%(%s/ad/ldap/base)s') % configRegistry, ucs_module='computers/windows', ucs_module_others=['computers/memberserver', 'computers/linux', 'computers/ubuntu', 'computers/macos'], sync_mode=configRegistry.get(connector('%s/ad/mapping/computer/syncmode'), configRegistry.get(connector('%s/ad/mapping/syncmode'))), post_ucs_modify_functions=[ univention.connector.ad.set_univentionObjectFlag_to_synced, ], scope='sub', dn_mapping_function=[univention.connector.ad.windowscomputer_dn_mapping], con_search_filter='(&(objectClass=computer)(userAccountControl:1.2.840.113556.1.4.803:=4096))', # ignore_filter='userAccountControl=4096', match_filter='(|(&(objectClass=univentionWindows)(!(univentionServerRole=windows_domaincontroller)))(objectClass=computer)(objectClass=univentionMemberServer)(objectClass=univentionUbuntuClient)(objectClass=univentionLinuxClient)(objectClass=univentionMacOSClient))', allow_subtree=global_allow_subtree_ucs + global_allow_subtree_ad, ignore_subtree=global_ignore_subtree, ignore_filter=computer_ignore_filter or None, con_create_objectclass=['top', 'computer'], con_create_attributes=[('userAccountControl', [b'4096'])], attributes={ 'cn': univention.connector.attribute( ucs_attribute='name', ldap_attribute='cn', con_attribute='cn', required=True, compare_function=univention.connector.compare_lowercase, ), 'samAccountName': univention.connector.attribute( ldap_attribute='uid', con_attribute='sAMAccountName', compare_function=univention.connector.compare_lowercase, sync_mode='write', ), 'description': univention.connector.attribute( ucs_attribute='description', ldap_attribute='description', con_attribute='description', ), 'operatingSystem': univention.connector.attribute( ucs_attribute='operatingSystem', ldap_attribute='univentionOperatingSystem', con_attribute='operatingSystem', ), 'operatingSystemVersion': univention.connector.attribute( ucs_attribute='operatingSystemVersion', ldap_attribute='univentionOperatingSystemVersion', con_attribute='operatingSystemVersion', ), }, ), 'container': univention.connector.property( ucs_module='container/cn', sync_mode=configRegistry.get(connector('%s/ad/mapping/container/syncmode'), configRegistry.get(connector('%s/ad/mapping/syncmode'))), scope='sub', con_search_filter='(|(objectClass=container)(objectClass=builtinDomain))', # builtinDomain is cn=builtin (with group cn=Administrators) allow_subtree=global_allow_subtree_ucs + global_allow_subtree_ad, ignore_filter=container_ignore_filter or None, ignore_subtree=global_ignore_subtree, post_ucs_modify_functions=[ univention.connector.ad.set_univentionObjectFlag_to_synced, ], con_create_objectclass=['top', 'container'], attributes={ 'cn': univention.connector.attribute( ucs_attribute='name', ldap_attribute='cn', con_attribute='cn', required=1, compare_function=univention.connector.compare_lowercase, ), 'description': univention.connector.attribute( ucs_attribute='description', ldap_attribute='description', con_attribute='description', ), }, ), 'ou': univention.connector.property( ucs_module='container/ou', sync_mode=configRegistry.get(connector('%s/ad/mapping/ou/syncmode'), configRegistry.get(connector('%s/ad/mapping/syncmode'))), scope='sub', con_search_filter='objectClass=organizationalUnit', allow_subtree=global_allow_subtree_ucs + global_allow_subtree_ad, ignore_filter=ou_ignore_filter or None, ignore_subtree=global_ignore_subtree, post_ucs_modify_functions=[ univention.connector.ad.set_univentionObjectFlag_to_synced, ], con_create_objectclass=['top', 'organizationalUnit'], attributes={ 'ou': univention.connector.attribute( ucs_attribute='name', ldap_attribute='ou', con_attribute='ou', required=True, compare_function=univention.connector.compare_lowercase, ), 'description': univention.connector.attribute( ucs_attribute='description', ldap_attribute='description', con_attribute='description', ), }, ), } # fmt: skip # allow filter for obj_type in ad_mapping.keys(): # noqa: PLC0206 allow_filter = configRegistry.get(connector(f'%s/ad/mapping/{obj_type}/allowfilter'), '') if allow_filter: if not allow_filter.startswith('('): allow_filter = f'({allow_filter})' ad_mapping[obj_type].allow_filter = allow_filter # users if configRegistry.is_false(connector('%s/ad/mapping/user/exchange'), True): ad_mapping['user'].post_attributes.pop('Exchange-Homeserver') ad_mapping['user'].post_attributes.pop('Exchange-homeMDB') ad_mapping['user'].post_attributes.pop('Exchange-Nickname') if not configRegistry.is_true(connector('%s/ad/mapping/user/primarymail')): ad_mapping['user'].post_attributes.pop('mailPrimaryAddress') ad_mapping['user'].post_attributes.pop('mailPrimaryAddress_to_mail') if not configRegistry.is_true(connector('%s/ad/mapping/user/alternativemail')): ad_mapping['user'].post_attributes.pop('mailAlternativeAddress') # groups if not configRegistry.is_true(connector('%s/ad/mapping/group/grouptype'), True): ad_mapping['group'].attributes.pop('groupType') if not configRegistry.is_true(connector('%s/ad/mapping/group/primarymail')): ad_mapping['group'].attributes.pop('mailAddress') ad_mapping['group'].attributes.pop('mailPrimaryAddress_to_mail') if not configRegistry.is_true(connector('%s/ad/mapping/group/alternativemail')): ad_mapping['group'].attributes.pop('mailAlternativeAddress') if configRegistry.is_false(connector('%s/ad/mapping/group/exchange'), True): ad_mapping['group'].attributes.pop('Exchange-Nickname') if configRegistry.get(connector('%s/ad/mapping/group/language')) not in ['de', 'DE']: ad_mapping['group'].mapping_table.pop('cn') return ad_mapping
[docs] def load_localmapping(ad_mapping, filename='/etc/univention/connector/ad/localmapping.py'): try: spec = importlib.util.spec_from_file_location('localmapping', filename) mapping = importlib.util.module_from_spec(spec) spec.loader.exec_module(mapping) mapping_hook = mapping.mapping_hook except (OSError, AttributeError): return ad_mapping else: return mapping_hook(ad_mapping)