#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention AD Connector
# this file defines the mapping between AD and UCS
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
from __future__ import print_function
import six
import univention.connector.ad
import univention.connector.ad.password
import univention.connector.ad.proxyAddresses as proxyAddresses
from univention.connector.ad import format_escaped
from univention.config_registry import ConfigRegistry
configRegistry = ConfigRegistry()
configRegistry.load()
[docs]def ignore_filter_from_tmpl(template, ucr_key, default=''):
"""
Construct an `ignore_filter` from a `ucr_key`
(`connector/ad/mapping/*/ignorelist`, a comma delimited list of values), as
specified by `template` while correctly escaping the filter-expression.
`template` must be formatted as required by `format_escaped`.
>>> ignore_filter_from_tmpl('(cn={0!e})',
... 'connector/ad/mapping/nonexistend/ignorelist',
... 'one,two,three')
'(|(cn=one)(cn=two)(cn=three))'
"""
variables = [v for v in configRegistry.get(ucr_key, default).split(',') if v]
filter_parts = [format_escaped(template, v) for v in variables]
if filter_parts:
return '(|{})'.format(''.join(filter_parts))
return ''
[docs]def ignore_filter_from_attr(attribute, ucr_key, default=''):
"""
Convenience-wrapper around `ignore_filter_from_tmpl()`.
This expects a single `attribute` instead of a `template` argument.
>>> ignore_filter_from_attr('cn',
... 'connector/ad/mapping/nonexistend/ignorelist',
... 'one,two,three')
'(|(cn=one)(cn=two)(cn=three))'
"""
template = '({}={{0!e}})'.format(attribute)
return ignore_filter_from_tmpl(template, ucr_key, default)
global_ignore_subtree = [
'cn=univention,%(ldap/base)s' % configRegistry,
'cn=policies,%(ldap/base)s' % configRegistry,
'cn=shares,%(ldap/base)s' % configRegistry,
'cn=printers,%(ldap/base)s' % configRegistry,
'cn=networks,%(ldap/base)s' % configRegistry,
'cn=kerberos,%(ldap/base)s' % configRegistry,
'cn=dhcp,%(ldap/base)s' % configRegistry,
'cn=dns,%(ldap/base)s' % configRegistry,
'cn=mail,%(ldap/base)s' % configRegistry,
'cn=samba,%(ldap/base)s' % configRegistry,
'cn=nagios,%(ldap/base)s' % configRegistry,
'cn=System,%(connector/ad/ldap/base)s' % configRegistry,
'ou=Grp Policy Users,%(connector/ad/ldap/base)s' % configRegistry,
'cn=Builtin,%(connector/ad/ldap/base)s' % configRegistry,
'cn=ForeignSecurityPrincipals,%(connector/ad/ldap/base)s' % configRegistry,
'ou=Domain Controllers,%(connector/ad/ldap/base)s' % configRegistry,
'cn=Program Data,%(connector/ad/ldap/base)s' % configRegistry,
'cn=Configuration,%(connector/ad/ldap/base)s' % configRegistry,
'cn=opsi,%(ldap/base)s' % configRegistry,
'cn=Microsoft Exchange System Objects,%(connector/ad/ldap/base)s' % configRegistry
]
for k in configRegistry.keys():
if k.startswith('connector/ad/mapping/ignoresubtree/'):
global_ignore_subtree.append(configRegistry.get(k))
user_ignore_list = ignore_filter_from_tmpl('(uid={0!e})(CN={0!e})', 'connector/ad/mapping/user/ignorelist')
user_ignore_filter = configRegistry.get('connector/ad/mapping/user/ignorefilter', '')
if user_ignore_filter and not user_ignore_filter.startswith('('):
user_ignore_filter = '({})'.format(user_ignore_filter)
user_ignore_filter = '(|{}{}{})'.format('(userAccountControl=2080)', user_ignore_filter, user_ignore_list)
ignore_filter_parts = '(groupType=-2147483643)(groupType=4)(univentionGroupType=-2147483643)(univentionGroupType=4)'
if configRegistry.is_false('connector/ad/mapping/group/grouptype', False):
ignore_filter_parts += '(sambaGroupType=5)(groupType=5)'
ignore_filter_parts += ignore_filter_from_attr('cn', 'connector/ad/mapping/group/ignorelist')
group_ignore_filter = '(|{})'.format(ignore_filter_parts)
ad_mapping = {
'user': univention.connector.property(
ucs_default_dn='cn=users,%(ldap/base)s' % configRegistry,
con_default_dn='cn=users,%(connector/ad/ldap/base)s' % configRegistry,
ucs_module='users/user',
# read, write, sync, none
sync_mode=configRegistry.get('connector/ad/mapping/user/syncmode', configRegistry.get('connector/ad/mapping/syncmode')),
scope='sub',
con_search_filter='(&(objectClass=user)(!objectClass=computer))',
match_filter='(|(&(objectClass=posixAccount)(objectClass=sambaSamAccount))(objectClass=user))',
ignore_filter=user_ignore_filter or None,
ignore_subtree=global_ignore_subtree,
con_create_objectclass=['top', 'user', 'person', 'organizationalPerson'],
dn_mapping_function=[univention.connector.ad.user_dn_mapping],
attributes={ # from UCS Module
'samAccountName': univention.connector.attribute(
ucs_attribute='username',
ldap_attribute='uid',
con_attribute='sAMAccountName',
required=1,
compare_function=univention.connector.compare_lowercase,
),
'givenName': univention.connector.attribute(
ucs_attribute='firstname',
ldap_attribute='givenName',
con_attribute='givenName',
),
'sn': univention.connector.attribute(
ucs_attribute='lastname',
ldap_attribute='sn',
con_attribute='sn',
),
},
ucs_create_functions=[
univention.connector.set_ucs_passwd_user,
univention.connector.check_ucs_lastname_user,
univention.connector.set_primary_group_user
],
post_con_modify_functions=list(filter(None, [
univention.connector.ad.set_userPrincipalName_from_ucr,
univention.connector.ad.password.password_sync_ucs if configRegistry.is_false('connector/ad/mapping/user/password/disabled', True) else None,
univention.connector.ad.primary_group_sync_from_ucs,
univention.connector.ad.object_memberships_sync_from_ucs,
univention.connector.ad.disable_user_from_ucs,
])),
post_ucs_modify_functions=list(filter(None, [
univention.connector.ad.password.password_sync_kinit if configRegistry.is_false('connector/ad/mapping/user/password/disabled', True) and configRegistry.is_true('connector/ad/mapping/user/password/kinit', False) else None,
univention.connector.ad.password.password_sync if configRegistry.is_false('connector/ad/mapping/user/password/disabled', True) and not configRegistry.is_true('connector/ad/mapping/user/password/kinit', False) else None,
univention.connector.ad.set_univentionObjectFlag_to_synced,
univention.connector.ad.primary_group_sync_to_ucs,
univention.connector.ad.object_memberships_sync_to_ucs,
univention.connector.ad.disable_user_to_ucs,
])),
post_attributes={
'organisation': univention.connector.attribute(
ucs_attribute='organisation',
ldap_attribute='o',
con_attribute=configRegistry.get('connector/ad/mapping/organisation', 'company'),
),
'Exchange-Homeserver': univention.connector.attribute(
ucs_attribute='Exchange-Homeserver',
ldap_attribute='univentionADmsExchHomeServerName',
con_attribute='msExchHomeServerName',
),
'Exchange-homeMDB': univention.connector.attribute(
ucs_attribute='Exchange-homeMDB',
ldap_attribute='univentionADhomeMDB',
con_attribute='homeMDB',
),
'Exchange-Nickname': univention.connector.attribute(
ucs_attribute='Exchange-Nickname',
ldap_attribute='univentionADmailNickname',
con_attribute='mailNickname',
),
'mailPrimaryAddress': univention.connector.attribute(
ucs_attribute='mailPrimaryAddress',
ldap_attribute='mailPrimaryAddress',
con_attribute='proxyAddresses',
mapping=(
proxyAddresses.to_proxyAddresses,
proxyAddresses.to_mailPrimaryAddress
),
compare_function=proxyAddresses.equal,
),
'mailPrimaryAddress_to_mail': univention.connector.attribute(
sync_mode='write',
ucs_attribute='mailPrimaryAddress',
ldap_attribute='mailPrimaryAddress',
con_attribute='mail',
),
'mailAlternativeAddress': univention.connector.attribute(
sync_mode='read' if configRegistry.is_true('connector/ad/mapping/user/primarymail') else 'sync', # proxyAddresses.to_mailPrimaryAddress does the write
ucs_attribute='mailAlternativeAddress',
ldap_attribute='mailAlternativeAddress',
con_attribute='proxyAddresses',
mapping=(
None,
proxyAddresses.to_mailAlternativeAddress
),
compare_function=proxyAddresses.equal,
),
'description': univention.connector.attribute(
ucs_attribute='description',
ldap_attribute='description',
con_attribute='description',
),
'street': univention.connector.attribute(
ucs_attribute='street',
ldap_attribute='street',
con_attribute='streetAddress',
),
'city': univention.connector.attribute(
ucs_attribute='city',
ldap_attribute='l',
con_attribute='l',
),
'postcode': univention.connector.attribute(
ucs_attribute='postcode',
ldap_attribute='postalCode',
con_attribute='postalCode',
),
'sambaWorkstations': univention.connector.attribute(
ucs_attribute='sambaUserWorkstations',
ldap_attribute='sambaUserWorkstations',
con_attribute='userWorkstations',
),
#'sambaLogonHours': univention.connector.attribute(
# ucs_attribute='sambaLogonHours',
# ldap_attribute='sambaLogonHours',
# con_attribute='logonHours',
#),
'profilepath': univention.connector.attribute(
ucs_attribute='profilepath',
ldap_attribute='sambaProfilePath',
con_attribute='profilePath',
),
'scriptpath': univention.connector.attribute(
ucs_attribute='scriptpath',
ldap_attribute='sambaLogonScript',
con_attribute='scriptPath',
),
'telephoneNumber': univention.connector.attribute(
ucs_attribute='phone',
ldap_attribute='telephoneNumber',
con_attribute='telephoneNumber',
con_other_attribute='otherTelephone',
),
'homePhone': univention.connector.attribute(
ucs_attribute='homeTelephoneNumber',
ldap_attribute='homePhone',
con_attribute='homePhone',
con_other_attribute='otherHomePhone',
),
'mobilePhone': univention.connector.attribute(
ucs_attribute='mobileTelephoneNumber',
ldap_attribute='mobile',
con_attribute='mobile',
con_other_attribute='otherMobile',
),
'pager': univention.connector.attribute(
ucs_attribute='pagerTelephoneNumber',
ldap_attribute='pager',
con_attribute='pager',
con_other_attribute='otherPager',
),
'displayName': univention.connector.attribute(
ucs_attribute='displayName',
ldap_attribute='displayName',
con_attribute='displayName',
),
},
),
'group': univention.connector.property(
ucs_default_dn='cn=groups,%(ldap/base)s' % configRegistry,
con_default_dn='cn=Users,%(connector/ad/ldap/base)s' % configRegistry,
ucs_module='groups/group',
sync_mode=configRegistry.get('connector/ad/mapping/group/syncmode', configRegistry.get('connector/ad/mapping/syncmode')),
scope='sub',
ignore_filter=group_ignore_filter or None,
ignore_subtree=global_ignore_subtree,
con_search_filter='objectClass=group',
con_create_objectclass=['top', 'group'],
post_con_modify_functions=[
univention.connector.ad.group_members_sync_from_ucs,
univention.connector.ad.object_memberships_sync_from_ucs
],
post_ucs_modify_functions=[
univention.connector.ad.set_univentionObjectFlag_to_synced,
univention.connector.ad.group_members_sync_to_ucs,
univention.connector.ad.object_memberships_sync_to_ucs
],
dn_mapping_function=[univention.connector.ad.group_dn_mapping],
attributes={
'cn': univention.connector.attribute(
ucs_attribute='name',
ldap_attribute='cn',
con_attribute='sAMAccountName',
required=True,
compare_function=univention.connector.compare_lowercase,
),
'groupType': univention.connector.attribute(
ucs_attribute='adGroupType',
ldap_attribute='univentionGroupType',
con_attribute='groupType',
),
'description': univention.connector.attribute(
ucs_attribute='description',
ldap_attribute='description',
con_attribute='description',
),
'mailAddress': univention.connector.attribute(
sync_mode='read',
ucs_attribute='mailAddress',
ldap_attribute='mailPrimaryAddress',
con_attribute='proxyAddresses',
mapping=(
proxyAddresses.to_proxyAddresses,
proxyAddresses.to_mailPrimaryAddress
),
compare_function=proxyAddresses.equal,
),
'mailPrimaryAddress_to_mail': univention.connector.attribute(
sync_mode='write',
ucs_attribute='mailAddress',
ldap_attribute='mailPrimaryAddress',
con_attribute='mail',
),
'mailAlternativeAddress': univention.connector.attribute(
sync_mode='read' if configRegistry.is_true('connector/ad/mapping/group/primarymail') else 'sync', # proxyAddresses.to_mailPrimaryAddress does the write
ucs_attribute='mailAlternativeAddress',
ldap_attribute='mailAlternativeAddress',
con_attribute='proxyAddresses',
mapping=(
None,
proxyAddresses.to_mailAlternativeAddress
),
compare_function=proxyAddresses.equal,
),
'Exchange-Nickname': univention.connector.attribute(
ucs_attribute='Exchange-Nickname',
ldap_attribute='univentionADmailNickname',
con_attribute='mailNickname',
),
},
mapping_table={
'cn': [
(u'Domain Users', u'Domänen-Benutzer'),
(u'Domain Admins', u'Domänen-Admins'),
(u'Windows Hosts', u'Domänencomputer'),
(u'Domain Guests', u'Domänen-Gäste'),
]
},
),
'windowscomputer': univention.connector.property(
ucs_default_dn='cn=computers,%(ldap/base)s' % configRegistry,
con_default_dn='cn=computers,%(connector/ad/ldap/base)s' % configRegistry,
ucs_module='computers/windows',
ucs_module_others=['computers/memberserver', 'computers/linux', 'computers/ubuntu', 'computers/macos'],
sync_mode=configRegistry.get('connector/ad/mapping/computer/syncmode', configRegistry.get('connector/ad/mapping/syncmode')),
post_ucs_modify_functions=[
univention.connector.ad.set_univentionObjectFlag_to_synced,
],
scope='sub',
dn_mapping_function=[univention.connector.ad.windowscomputer_dn_mapping],
con_search_filter='(&(objectClass=computer)(userAccountControl:1.2.840.113556.1.4.803:=4096))',
# ignore_filter='userAccountControl=4096',
match_filter='(|(&(objectClass=univentionWindows)(!(univentionServerRole=windows_domaincontroller)))(objectClass=computer)(objectClass=univentionMemberServer)(objectClass=univentionUbuntuClient)(objectClass=univentionLinuxClient)(objectClass=univentionMacOSClient))',
ignore_subtree=global_ignore_subtree,
ignore_filter=ignore_filter_from_attr('cn', 'connector/ad/mapping/windowscomputer/ignorelist') or None,
con_create_objectclass=['top', 'computer'],
con_create_attributes=[('userAccountControl', [b'4096'])],
attributes={
'cn': univention.connector.attribute(
ucs_attribute='name',
ldap_attribute='cn',
con_attribute='cn',
required=True,
compare_function=univention.connector.compare_lowercase,
),
'samAccountName': univention.connector.attribute(
ldap_attribute='uid',
con_attribute='sAMAccountName',
compare_function=univention.connector.compare_lowercase,
sync_mode='write',
),
'description': univention.connector.attribute(
ucs_attribute='description',
ldap_attribute='description',
con_attribute='description',
),
'operatingSystem': univention.connector.attribute(
ucs_attribute='operatingSystem',
ldap_attribute='univentionOperatingSystem',
con_attribute='operatingSystem',
),
'operatingSystemVersion': univention.connector.attribute(
ucs_attribute='operatingSystemVersion',
ldap_attribute='univentionOperatingSystemVersion',
con_attribute='operatingSystemVersion',
),
},
),
'container': univention.connector.property(
ucs_module='container/cn',
sync_mode=configRegistry.get('connector/ad/mapping/container/syncmode', configRegistry.get('connector/ad/mapping/syncmode')),
scope='sub',
con_search_filter='(|(objectClass=container)(objectClass=builtinDomain))', # builtinDomain is cn=builtin (with group cn=Administrators)
ignore_filter=ignore_filter_from_attr('cn', 'connector/ad/mapping/container/ignorelist', 'mail,kerberos') or None,
ignore_subtree=global_ignore_subtree,
post_ucs_modify_functions=[
univention.connector.ad.set_univentionObjectFlag_to_synced,
],
con_create_objectclass=['top', 'container'],
attributes={
'cn': univention.connector.attribute(
ucs_attribute='name',
ldap_attribute='cn',
con_attribute='cn',
required=1,
compare_function=univention.connector.compare_lowercase,
),
'description': univention.connector.attribute(
ucs_attribute='description',
ldap_attribute='description',
con_attribute='description',
),
},
),
'ou': univention.connector.property(
ucs_module='container/ou',
sync_mode=configRegistry.get('connector/ad/mapping/ou/syncmode', configRegistry.get('connector/ad/mapping/syncmode')),
scope='sub',
con_search_filter='objectClass=organizationalUnit',
ignore_filter=ignore_filter_from_attr('ou', 'connector/ad/mapping/ou/ignorelist') or None,
ignore_subtree=global_ignore_subtree,
post_ucs_modify_functions=[
univention.connector.ad.set_univentionObjectFlag_to_synced,
],
con_create_objectclass=['top', 'organizationalUnit'],
attributes={
'ou': univention.connector.attribute(
ucs_attribute='name',
ldap_attribute='ou',
con_attribute='ou',
required=True,
compare_function=univention.connector.compare_lowercase,
),
'description': univention.connector.attribute(
ucs_attribute='description',
ldap_attribute='description',
con_attribute='description',
),
},
),
}
# users
if configRegistry.is_false('connector/ad/mapping/user/exchange', True):
ad_mapping['user'].post_attributes.pop('Exchange-Homeserver')
ad_mapping['user'].post_attributes.pop('Exchange-homeMDB')
ad_mapping['user'].post_attributes.pop('Exchange-Nickname')
if not configRegistry.is_true('connector/ad/mapping/user/primarymail'):
ad_mapping['user'].post_attributes.pop('mailPrimaryAddress')
ad_mapping['user'].post_attributes.pop('mailPrimaryAddress_to_mail')
if not configRegistry.is_true('connector/ad/mapping/user/alternativemail'):
ad_mapping['user'].post_attributes.pop('mailAlternativeAddress')
# groups
if not configRegistry.is_true('connector/ad/mapping/group/grouptype', True):
ad_mapping['group'].attributes.pop('groupType')
if not configRegistry.is_true('connector/ad/mapping/group/primarymail'):
ad_mapping['group'].attributes.pop('mailAddress')
ad_mapping['group'].attributes.pop('mailPrimaryAddress_to_mail')
if not configRegistry.is_true('connector/ad/mapping/group/alternativemail'):
ad_mapping['group'].attributes.pop('mailAlternativeAddress')
if configRegistry.is_false('connector/ad/mapping/group/exchange', True):
ad_mapping['group'].attributes.pop('Exchange-Nickname')
if configRegistry.get('connector/ad/mapping/group/language') not in ['de', 'DE']:
ad_mapping['group'].mapping_table.pop('cn')
[docs]def load_localmapping(filename='/etc/univention/connector/ad/localmapping.py'):
try:
if six.PY2:
import imp
mapping_hook = imp.load_source('localmapping', filename).mapping_hook
else:
import importlib.util
spec = importlib.util.spec_from_file_location('localmapping', filename)
mapping = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mapping)
mapping_hook = mapping.mapping_hook
except (IOError, AttributeError):
return ad_mapping
else:
return mapping_hook(ad_mapping)