Source code for univention.uldap

# -*- coding: utf-8 -*-
#
# Univention Python
#  LDAP access
#
# Copyright 2002-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

import re
from functools import wraps
import random

import six
import ldap
import ldap.schema
import ldap.sasl
from ldapurl import LDAPUrl
from ldapurl import isLDAPUrl

import univention.debug
from univention.config_registry import ConfigRegistry
try:
	from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union  # noqa: F401
except ImportError:
	pass


[docs]def parentDn(dn, base=''): # type: (str, str) -> Optional[str] """ Return the parent container of a distinguished name. :param str dn: The distinguished name. :param str base: distinguished name where to stop. :return: The parent distinguished name or None. :rtype: str or None """ if dn.lower() == base.lower(): return None dn = ldap.dn.str2dn(dn) return ldap.dn.dn2str(dn[1:])
[docs]def explodeDn(dn, notypes=0): # type: (str, int) -> List[str] """ Break up a DN into its component parts. :param str dn: The distinguished name. :param int notypes: Return only the component's attribute values if True. Also the attribute types if False. :return: A list of relative distinguished names. :rtype: list[str] """ return ldap.dn.explode_dn(dn, notypes)
[docs]def getRootDnConnection(start_tls=2, decode_ignorelist=[], reconnect=True): # type: (int, List[str], bool) -> access """ Open a LDAP connection to the local LDAP server with the LDAP root account. :param int start_tls: Negotiate TLS with server. If `2` is given, the command will require the operation to be successful. :param decode_ignorelist: List of LDAP attribute names which shall be handled as binary attributes. :type decode_ignorelist: list[str] :param bool reconnect: Automatically reconect if the connection fails. :return: A LDAP access object. :rtype: univention.uldap.access """ ucr = ConfigRegistry() ucr.load() port = int(ucr.get('slapd/port', '7389').split(',')[0]) host = ucr['hostname'] + '.' + ucr['domainname'] if ucr.get('ldap/server/type', 'dummy') == 'master': bindpw = open('/etc/ldap.secret').read().rstrip('\n') binddn = 'cn=admin,{0}'.format(ucr['ldap/base']) else: bindpw = open('/etc/ldap/rootpw.conf').read().rstrip('\n').replace('rootpw "', '', 1)[:-1] binddn = 'cn=update,{0}'.format(ucr['ldap/base']) return access(host=host, port=port, base=ucr['ldap/base'], binddn=binddn, bindpw=bindpw, start_tls=start_tls, decode_ignorelist=decode_ignorelist, reconnect=reconnect)
[docs]def getAdminConnection(start_tls=2, decode_ignorelist=[], reconnect=True): # type: (int, List[str], bool) -> access """ Open a LDAP connection to the Primary Directory Node LDAP server using the admin credentials. :param int start_tls: Negotiate TLS with server. If `2` is given, the command will require the operation to be successful. :param decode_ignorelist: List of LDAP attribute names which shall be handled as binary attributes. :type decode_ignorelist: list[str] :param bool reconnect: Automatically reconect if the connection fails. :return: A LDAP access object. :rtype: univention.uldap.access """ ucr = ConfigRegistry() ucr.load() bindpw = open('/etc/ldap.secret').read().rstrip('\n') port = int(ucr.get('ldap/master/port', '7389')) return access(host=ucr['ldap/master'], port=port, base=ucr['ldap/base'], binddn='cn=admin,' + ucr['ldap/base'], bindpw=bindpw, start_tls=start_tls, decode_ignorelist=decode_ignorelist, reconnect=reconnect)
[docs]def getBackupConnection(start_tls=2, decode_ignorelist=[], reconnect=True): # type: (int, List[str], bool) -> access """ Open a LDAP connection to a Backup Directory Node LDAP server using the admin credentials. :param int start_tls: Negotiate TLS with server. If `2` is given, the command will require the operation to be successful. :param decode_ignorelist: List of LDAP attribute names which shall be handled as binary attributes. :type decode_ignorelist: list[str] :param bool reconnect: Automatically reconect if the connection fails. :return: A LDAP access object. :rtype: univention.uldap.access """ ucr = ConfigRegistry() ucr.load() bindpw = open('/etc/ldap-backup.secret').read().rstrip('\n') port = int(ucr.get('ldap/master/port', '7389')) try: return access(host=ucr['ldap/master'], port=port, base=ucr['ldap/base'], binddn='cn=backup,' + ucr['ldap/base'], bindpw=bindpw, start_tls=start_tls, decode_ignorelist=decode_ignorelist, reconnect=reconnect) except ldap.SERVER_DOWN: if not ucr['ldap/backup']: raise backup = ucr['ldap/backup'].split(' ')[0] return access(host=backup, port=port, base=ucr['ldap/base'], binddn='cn=backup,' + ucr['ldap/base'], bindpw=bindpw, start_tls=start_tls, decode_ignorelist=decode_ignorelist, reconnect=reconnect)
[docs]def getMachineConnection(start_tls=2, decode_ignorelist=[], ldap_master=True, secret_file="/etc/machine.secret", reconnect=True, random_server=False): # type: (int, List[str], bool, str, bool, bool) -> access """ Open a LDAP connection using the machine credentials. :param int start_tls: Negotiate TLS with server. If `2` is given, the command will require the operation to be successful. :param decode_ignorelist: List of LDAP attribute names which shall be handled as binary attributes. :type decode_ignorelist: list[str] :param bool ldap_master: Open a connection to the Master if True, to the preferred LDAP server otherwise. :param str secret_file: The name of a file containing the password credentials. :param bool reconnect: Automatically reconnect if the connection fails. :param bool random_server: Choose a random LDAP server from ldap/server/name and ldap/server/addition. :return: A LDAP access object. :rtype: univention.uldap.access """ ucr = ConfigRegistry() ucr.load() bindpw = open(secret_file).read().rstrip('\n') if ldap_master: # Connect to Primary Directory Node port = int(ucr.get('ldap/master/port', '7389')) return access(host=ucr['ldap/master'], port=port, base=ucr['ldap/base'], binddn=ucr['ldap/hostdn'], bindpw=bindpw, start_tls=start_tls, decode_ignorelist=decode_ignorelist, reconnect=reconnect) else: # Connect to ldap/server/name port = int(ucr.get('ldap/server/port', '7389')) servers = [ucr.get('ldap/server/name')] additional_servers = ucr.get('ldap/server/addition', '').split() if random_server: if ucr.get('server/role') in ('memberserver',) and random_server: # shuffle all servers on Managed Nodes if random_server==True servers += additional_servers random.shuffle(servers) else: # shuffle only additional server random.shuffle(additional_servers) servers += additional_servers else: servers += additional_servers for server in servers: try: return access(host=server, port=port, base=ucr['ldap/base'], binddn=ucr['ldap/hostdn'], bindpw=bindpw, start_tls=start_tls, decode_ignorelist=decode_ignorelist, reconnect=reconnect) # LDAP server down, try next server except ldap.SERVER_DOWN as oexc: exc = oexc raise exc
def _fix_reconnect_handling(func): # Bug #47926: python ldap does not reconnect on ldap.UNAVAILABLE # We need this until https://github.com/python-ldap/python-ldap/pull/267 is fixed @wraps(func) def _decorated(self, *args, **kwargs): if not self.reconnect: return func(self, *args, **kwargs) try: return func(self, *args, **kwargs) except ldap.INSUFFICIENT_ACCESS: if self.whoami(): # the connection is still bound and valid raise self._reconnect() return func(self, *args, **kwargs) except (ldap.UNAVAILABLE, ldap.CONNECT_ERROR, ldap.TIMEOUT): # ldap.TIMELIMIT_EXCEEDED ? self._reconnect() return func(self, *args, **kwargs) return _decorated
[docs]class access(object): """ The low-level class to access a LDAP server. :param str host: host name of the LDAP server. :param int port: TCP port of the LDAP server. Defaults to 7389 or 7636. :param str base: LDAP base distinguished name. :param str binddn: Distinguished name for simple authentication. :param str bindpw: Password for simple authentication. :param int start_tls: 0=no, 1=try StartTLS, 2=require StartTLS. :param str ca_certfile: File name to CA certificate. :param decode_ignorelist: List of LDAP attribute names which shall be handled as binary attributes. :param bool use_ldaps: Connect to SSL port. :param str uri: LDAP connection string. :param bool follow_referral: Follow referrals and return result from other servers instead of returning the referral itself. :param bool reconnect: Automatically re-establish connection to LDAP server if connection breaks. """ def __init__(self, host='localhost', port=None, base='', binddn='', bindpw='', start_tls=2, ca_certfile=None, decode_ignorelist=[], use_ldaps=False, uri=None, follow_referral=False, reconnect=True): # type: (str, int, str, Optional[str], str, int, str, List, bool, str, bool, bool) -> None self.host = host self.base = base self.binddn = binddn if six.PY2 and isinstance(self.binddn, bytes): self.binddn = self.binddn.decode('UTF-8') self.bindpw = bindpw if six.PY2 and isinstance(self.bindpw, bytes): self.bindpw = self.bindpw.decode('UTF-8') self.start_tls = start_tls self.ca_certfile = ca_certfile self.reconnect = reconnect self.port = int(port) if port else None ucr = ConfigRegistry() ucr.load() if not self.port: # if no explicit port is given self.port = int(ucr.get('ldap/server/port', 7389)) # take UCR value if use_ldaps and self.port == 7389: # adjust the standard port for ssl self.port = 7636 # http://www.openldap.org/faq/data/cache/605.html self.protocol = 'ldap' if use_ldaps: self.protocol = 'ldaps' self.uri = 'ldaps://%s:%d' % (self.host, self.port) elif uri: self.uri = uri else: self.uri = "ldap://%s:%d" % (self.host, self.port) self.decode_ignorelist = decode_ignorelist or ucr.get('ldap/binaryattributes', 'krb5Key,userCertificate;binary').split(',') # python-ldap does not cache the credentials, so we override the # referral handling if follow_referral is set to true # https://forge.univention.org/bugzilla/show_bug.cgi?id=9139 self.follow_referral = follow_referral try: client_retry_count = int(ucr.get('ldap/client/retry/count', 10)) except ValueError: univention.debug.debug(univention.debug.LDAP, univention.debug.ERROR, "Unable to read ldap/client/retry/count, please reset to an integer value") client_retry_count = 10 self.client_connection_attempt = client_retry_count + 1 self.__open(ca_certfile)
[docs] @_fix_reconnect_handling def bind(self, binddn, bindpw): # type: (str, str) -> None """ Do simple LDAP bind using DN and password. :param str binddn: The distinguished name of the account. :param str bindpw: The user password for simple authentication. """ self.binddn = binddn self.bindpw = bindpw univention.debug.debug(univention.debug.LDAP, univention.debug.INFO, 'bind binddn=%s' % self.binddn) self.lo.simple_bind_s(self.binddn, self.bindpw)
[docs] @_fix_reconnect_handling def bind_saml(self, bindpw): # type: (str) -> None """ Do LDAP bind using SAML message. :param str bindpw: The SAML authentication cookie. """ self.binddn = None self.bindpw = bindpw saml = ldap.sasl.sasl({ ldap.sasl.CB_AUTHNAME: None, ldap.sasl.CB_PASS: bindpw, }, 'SAML') self.lo.sasl_interactive_bind_s('', saml) self.binddn = self.whoami() univention.debug.debug(univention.debug.LDAP, univention.debug.INFO, 'SAML bind binddn=%s' % self.binddn)
[docs] def unbind(self): # type: () -> None """ Unauthenticate. """ self.lo.unbind_s()
[docs] def whoami(self): # type: () -> str """ Return the distinguished name of the authenticated user. :returns: The distinguished name. :rtype: str """ dn = self.lo.whoami_s() return re.sub(u'^dn:', u'', dn)
def _reconnect(self): # type: () -> None """Reconnect.""" self.lo.reconnect(self.lo._uri, retry_max=self.lo._retry_max, retry_delay=self.lo._retry_delay) def __open(self, ca_certfile): # type: (Optional[str]) -> None if self.reconnect: univention.debug.debug(univention.debug.LDAP, univention.debug.INFO, 'establishing new connection with retry_max=%d' % self. client_connection_attempt) self.lo = ldap.ldapobject.ReconnectLDAPObject(self.uri, trace_stack_limit=None, retry_max=self.client_connection_attempt, retry_delay=1) else: univention.debug.debug(univention.debug.LDAP, univention.debug.INFO, 'establishing new connection') self.lo = ldap.initialize(self.uri, trace_stack_limit=None) if ca_certfile: self.lo.set_option(ldap.OPT_X_TLS_CACERTFILE, ca_certfile) self.lo.set_option(ldap.OPT_X_TLS_NEWCTX, 0) if self.protocol.lower() != 'ldaps': if self.start_tls == 1: try: self.__starttls() except Exception: univention.debug.debug(univention.debug.LDAP, univention.debug.WARN, 'Could not start TLS') elif self.start_tls == 2: self.__starttls() if self.binddn and not self.uri.startswith('ldapi://'): self.bind(self.binddn, self.bindpw) # Override referral handling if self.follow_referral: self.lo.set_option(ldap.OPT_REFERRALS, 0) self.__schema = None self.__reconnects_done = 0 @_fix_reconnect_handling def __starttls(self): self.lo.start_tls_s() def __encode(self, value): if six.PY3: return value if value is None: return value elif six.PY2 and isinstance(value, unicode): # noqa: F821 return str(value) elif isinstance(value, (list, tuple)): return map(self.__encode, value) else: return value def __recode_attribute(self, attr, val): if six.PY3: return val if attr in self.decode_ignorelist: return val return self.__encode(val) def __recode_entry(self, entry): if six.PY3: return entry if isinstance(entry, tuple) and len(entry) == 3: return (entry[0], entry[1], self.__recode_attribute(entry[1], entry[2])) elif isinstance(entry, tuple) and len(entry) == 2: return (entry[0], self.__recode_attribute(entry[0], entry[1])) elif isinstance(entry, (list, tuple)): return map(self.__recode_entry, entry) elif isinstance(entry, dict): return dict(map(lambda k_v: (k_v[0], self.__recode_attribute(k_v[0], k_v[1])), entry.items())) else: return entry def __encode_entry(self, entry): return self.__recode_entry(entry) def __encode_attribute(self, attr, val): return self.__recode_attribute(attr, val) def __decode_entry(self, entry): return self.__recode_entry(entry) def __decode_attribute(self, attr, val): return self.__recode_attribute(attr, val)
[docs] @_fix_reconnect_handling def get(self, dn, attr=[], required=False): # type: (str, List[str], bool) -> Dict[str, List[bytes]] """ Return multiple attributes of a single LDAP object. :param str dn: The distinguished name of the object to lookup. :param attr: The list of attributes to fetch. :type attr: list[str] :param bool required: Raise an exception instead of returning an empty dictionary. :returns: A dictionary mapping the requested attributes to a list of their values. :rtype: dict[str, list[bytes]] :raises ldap.NO_SUCH_OBJECT: If the LDAP object is not accessible. """ if dn: try: result = self.lo.search_s(dn, ldap.SCOPE_BASE, '(objectClass=*)', attr) return self.__decode_entry(result[0][1]) except (ldap.NO_SUCH_OBJECT, LookupError): pass if required: raise ldap.NO_SUCH_OBJECT({'desc': 'no object'}) return {}
[docs] @_fix_reconnect_handling def getAttr(self, dn, attr, required=False): # type: (str, str, bool) -> List[bytes] """ Return a single attribute of a single LDAP object. :param str dn: The distinguished name of the object to lookup. :param str attr: The attribute to fetch. :param bool required: Raise an exception instead of returning an empty dictionary. :returns: A list of values. :rtype: list[bytes] :raises ldap.NO_SUCH_OBJECT: If the LDAP object is not accessible. .. warning:: the attribute name is currently case sensitive and must be given as in the LDAP schema .. warning:: when `required=True` it raises `ldap.NO_SUCH_OBJECT` even if the object exists but the attribute is not set """ if dn: try: result = self.lo.search_s(dn, ldap.SCOPE_BASE, '(objectClass=*)', [attr]) return result[0][1][attr] except (ldap.NO_SUCH_OBJECT, LookupError): pass if required: raise ldap.NO_SUCH_OBJECT({'desc': 'no object'}) return []
[docs] @_fix_reconnect_handling def search(self, filter='(objectClass=*)', base='', scope='sub', attr=[], unique=False, required=False, timeout=-1, sizelimit=0, serverctrls=None, response=None): # type: (str, str, str, List[str], bool, bool, int, int, Optional[List[ldap.controls.LDAPControl]], Optional[Dict[str, ldap.controls.LDAPControl]]) -> List[Tuple[str, Dict[str, List[bytes]]]] """ Perform LDAP search and return values. :param str filter: LDAP search filter. :param str base: the starting point for the search. :param str scope: Specify the scope of the search to be one of `base`, `base+one`, `one`, `sub`, or `domain` to specify a base object, base plus one-level, one-level, subtree, or children search. :param attr: The list of attributes to fetch. :type attr: list[str] :param bool unique: Raise an exception if more than one object matches. :param bool required: Raise an exception instead of returning an empty dictionary. :param int timeout: wait at most timeout seconds for a search to complete. `-1` for no limit. :param int sizelimit: retrieve at most sizelimit entries for a search. `0` for no limit. :param serverctrls: a list of :py:class:`ldap.controls.LDAPControl` instances sent to the server along with the LDAP request. :type serverctrls: list[ldap.controls.LDAPControl] :param dict response: An optional dictionary to receive the server controls of the result. :returns: A list of 2-tuples (dn, values) for each LDAP object, where values is a dictionary mapping attribute names to a list of values. :rtype: list[tuple[str, dict[str, list[bytes]]]] :raises ldap.NO_SUCH_OBJECT: Indicates the target object cannot be found. :raises ldap.INAPPROPRIATE_MATCHING: Indicates that the matching rule specified in the search filter does not match a rule defined for the attribute's syntax. """ univention.debug.debug(univention.debug.LDAP, univention.debug.INFO, 'uldap.search filter=%s base=%s scope=%s attr=%s unique=%d required=%d timeout=%d sizelimit=%d' % ( filter, base, scope, attr, unique, required, timeout, sizelimit)) if not base: base = self.base if scope == 'base+one': res = self.lo.search_ext_s(base, ldap.SCOPE_BASE, filter, attr, serverctrls=serverctrls, clientctrls=None, timeout=timeout, sizelimit=sizelimit) + \ self.lo.search_ext_s(base, ldap.SCOPE_ONELEVEL, filter, attr, serverctrls=serverctrls, clientctrls=None, timeout=timeout, sizelimit=sizelimit) else: if scope == 'sub' or scope == 'domain': ldap_scope = ldap.SCOPE_SUBTREE elif scope == 'one': ldap_scope = ldap.SCOPE_ONELEVEL else: ldap_scope = ldap.SCOPE_BASE res = self.lo.search_ext_s(base, ldap_scope, filter, attr, serverctrls=serverctrls, clientctrls=None, timeout=timeout, sizelimit=sizelimit) if unique and len(res) > 1: raise ldap.INAPPROPRIATE_MATCHING({'desc': 'more than one object'}) if required and len(res) < 1: raise ldap.NO_SUCH_OBJECT({'desc': 'no object'}) return res
[docs] def searchDn(self, filter='(objectClass=*)', base='', scope='sub', unique=False, required=False, timeout=-1, sizelimit=0, serverctrls=None, response=None): # type: (str, str, str, bool, bool, int, int, Optional[List[ldap.controls.LDAPControl]], Optional[Dict[str, ldap.controls.LDAPControl]]) -> List[str] """ Perform LDAP search and return distinguished names only. :param str filter: LDAP search filter. :param str base: the starting point for the search. :param str scope: Specify the scope of the search to be one of `base`, `base+one`, `one`, `sub`, or `domain` to specify a base object, base plus one-level, one-level, subtree, or children search. :param bool unique: Raise an exception if more than one object matches. :param bool required: Raise an exception instead of returning an empty dictionary. :param int timeout: wait at most timeout seconds for a search to complete. `-1` for no limit. :param int sizelimit: retrieve at most sizelimit entries for a search. `0` for no limit. :param serverctrls: a list of :py:class:`ldap.controls.LDAPControl` instances sent to the server along with the LDAP request. :type serverctrls: list[ldap.controls.LDAPControl] :param dict response: An optional dictionary to receive the server controls of the result. :returns: A list of distinguished names. :rtype: list[str] :raises ldap.NO_SUCH_OBJECT: Indicates the target object cannot be found. :raises ldap.INAPPROPRIATE_MATCHING: Indicates that the matching rule specified in the search filter does not match a rule defined for the attribute's syntax. """ return [x[0] for x in self.search(filter, base, scope, ['dn'], unique, required, timeout, sizelimit, serverctrls, response)]
[docs] @_fix_reconnect_handling def getPolicies(self, dn, policies=None, attrs=None, result=None, fixedattrs=None): # type: (str, List[str], Dict[str, List[Any]], Any, Any) -> Dict[str, Dict[str, Any]] """ Return |UCS| policies for |LDAP| entry. :param str dn: The distinguished name of the |LDAP| entry. :param list policies: List of policy object classes... :param dict attrs: |LDAP| attributes. If not given, the data is fetched from LDAP. :param result: UNUSED! :param fixedattrs: UNUSED! :returns: A mapping of policy names to """ if attrs is None: attrs = {} if policies is None: policies = [] if not dn and not policies: # if policies is set apply a fictionally referenced list of policies return {} # get current dn if 'objectClass' in attrs and 'univentionPolicyReference' in attrs: oattrs = attrs else: oattrs = self.get(dn, ['univentionPolicyReference', 'objectClass']) if 'univentionPolicyReference' in attrs: policies = [x.decode('utf-8') for x in attrs['univentionPolicyReference']] elif not policies and not attrs: policies = [x.decode('utf-8') for x in oattrs.get('univentionPolicyReference', [])] object_classes = set(oc.lower() for oc in oattrs.get('objectClass', [])) merged = {} # type: Dict[str, Dict[str, Any]] if dn: obj_dn = dn while True: for policy_dn in policies or []: self._merge_policy(policy_dn, obj_dn, object_classes, merged) dn = self.parentDn(dn) or '' if not dn: break try: parent = self.get(dn, attr=['univentionPolicyReference'], required=True) except ldap.NO_SUCH_OBJECT: break policies = [x.decode('utf-8') for x in parent.get('univentionPolicyReference', [])] univention.debug.debug( univention.debug.LDAP, univention.debug.INFO, "getPolicies: result: %s" % merged) return merged
def _merge_policy(self, policy_dn, obj_dn, object_classes, result): # type: (str, str, Set[bytes], Dict[str, Dict[str, Any]]) -> None """ Merge policies into result. :param str policy_dn: Distinguished name of the policy object. :param obj_dn: Distinguished name of the LDAP object. :param set object_classes: the set of object classes of the LDAP object. :param list result: A mapping, into which the policy is merged. """ pattrs = self.get(policy_dn) if not pattrs: return try: classes = set(pattrs['objectClass']) - {b'top', b'univentionPolicy', b'univentionObject'} ptype = classes.pop().decode('utf-8') except KeyError: return if pattrs.get('ldapFilter'): try: self.search(pattrs['ldapFilter'][0].decode('utf-8'), base=obj_dn, scope='base', unique=True, required=True) except ldap.NO_SUCH_OBJECT: return if not all(oc.lower() in object_classes for oc in pattrs.get('requiredObjectClasses', [])): return if any(oc.lower() in object_classes for oc in pattrs.get('prohibitedObjectClasses', [])): return fixed = set(x.decode('utf-8') for x in pattrs.get('fixedAttributes', ())) empty = set(x.decode('utf-8') for x in pattrs.get('emptyAttributes', ())) values = result.setdefault(ptype, {}) SKIP = {'requiredObjectClasses', 'prohibitedObjectClasses', 'fixedAttributes', 'emptyAttributes', 'objectClass', 'cn', 'univentionObjectType', 'ldapFilter'} for key in (empty | set(pattrs) | fixed) - SKIP: if key not in values or key in fixed: value = [] if key in empty else pattrs.get(key, []) univention.debug.debug( univention.debug.LDAP, univention.debug.INFO, "getPolicies: %s sets: %s=%r" % (policy_dn, key, value)) values[key] = { 'policy': policy_dn, 'value': value, 'fixed': key in fixed, }
[docs] @_fix_reconnect_handling def get_schema(self): # type: () -> ldap.schema.subentry.SubSchema """ Retrieve |LDAP| schema information from |LDAP| server. :returns: The |LDAP| schema. :rtype: ldap.schema.subentry.SubSchema """ if self.reconnect and self.lo._reconnects_done > self.__reconnects_done: # the schema might differ after reconnecting (e.g. slapd restart) self.__schema = None self.__reconnects_done = self.lo._reconnects_done if not self.__schema: self.__schema = ldap.schema.SubSchema(self.lo.read_subschemasubentry_s(self.lo.search_subschemasubentry_s()), 0) return self.__schema
[docs] @_fix_reconnect_handling def add(self, dn, al, serverctrls=None, response=None): # type: (str, List[Tuple], Optional[List[ldap.controls.LDAPControl]], Optional[dict]) -> None """ Add LDAP entry at distinguished name and attributes in add_list=(attribute-name, old-values. new-values) or (attribute-name, new-values). :param str dn: The distinguished name of the object to add. :param al: The add-list of 2-tuples (attribute-name, new-values). :param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request :type serverctrls: list[ldap.controls.LDAPControl] :param dict response: An optional dictionary to receive the server controls of the result. """ if not serverctrls: serverctrls = [] univention.debug.debug(univention.debug.LDAP, univention.debug.INFO, 'uldap.add dn=%s' % dn) nal = {} # type: Dict[str, Any] for i in al: key, val = i[0], i[-1] if not val: continue if isinstance(val, (bytes, six.text_type)): val = [val] vals = nal.setdefault(key, set()) vals |= set(val) nal = self.__encode_entry([(k, list(v)) for k, v in nal.items()]) try: rtype, rdata, rmsgid, resp_ctrls = self.lo.add_ext_s(dn, nal, serverctrls=serverctrls) except ldap.REFERRAL as exc: if not self.follow_referral: raise lo_ref = self._handle_referral(exc) rtype, rdata, rmsgid, resp_ctrls = lo_ref.add_ext_s(dn, nal, serverctrls=serverctrls) if serverctrls and isinstance(response, dict): response['ctrls'] = resp_ctrls
[docs] @_fix_reconnect_handling def modify(self, dn, changes, serverctrls=None, response=None, rename_callback=None): # type: (str, List[Tuple[str, Any, Any]], Optional[List[ldap.controls.LDAPControl]], Optional[dict], Optional[Callable]) -> str """ Modify LDAP entry DN with attributes in changes=(attribute-name, old-values, new-values). :param str dn: The distinguished name of the object to modify. :param changes: The modify-list of 3-tuples (attribute-name, old-values, new-values). :param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request :type serverctrls: list[ldap.controls.LDAPControl] :param dict response: An optional dictionary to receive the server controls of the result. :returns: The distinguished name. :rtype: str """ univention.debug.debug(univention.debug.LDAP, univention.debug.INFO, 'uldap.modify %s' % dn) if not serverctrls: serverctrls = [] ml = [] for key, oldvalue, newvalue in changes: if oldvalue and newvalue: if oldvalue == newvalue or (not isinstance(oldvalue, (bytes, six.text_type)) and not isinstance(newvalue, (bytes, six.text_type)) and set(oldvalue) == set(newvalue)): continue # equal values op = ldap.MOD_REPLACE val = newvalue elif not oldvalue and newvalue: op = ldap.MOD_ADD val = newvalue elif oldvalue and not newvalue: op = ldap.MOD_DELETE val = oldvalue # These attributes don't have a matching rule: # https://forge.univention.org/bugzilla/show_bug.cgi?id=15171 # https://forge.univention.org/bugzilla/show_bug.cgi?id=44019 if key in ['preferredDeliveryMethod', 'jpegPhoto', 'univentionPortalBackground', 'univentionPortalLogo', 'univentionPortalEntryIcon', 'univentionUMCIcon']: val = None else: continue ml.append((op, key, val)) ml = self.__encode_entry(ml) # check if we need to rename the object new_dn, new_rdn = self.__get_new_dn(dn, ml) if not self.compare_dn(dn, new_dn): if rename_callback: rename_callback(dn, new_dn, ml) univention.debug.debug(univention.debug.LDAP, univention.debug.WARN, 'rename %s' % (new_rdn,)) self.rename_ext_s(dn, new_rdn, serverctrls=serverctrls, response=response) dn = new_dn if ml: self.modify_ext_s(dn, ml, serverctrls=serverctrls, response=response) return dn
@classmethod def __get_new_dn(self, dn, ml): """ >>> get_dn = access._access__get_new_dn >>> get_dn('univentionAppID=foo,dc=bar', [(ldap.MOD_REPLACE, 'univentionAppID', 'foo')])[0] 'univentionAppID=foo,dc=bar' >>> get_dn('univentionAppID=foo,dc=bar', [(ldap.MOD_REPLACE, 'univentionAppID', 'föo')])[0] == u'univentionAppID=föo,dc=bar' True >>> get_dn('univentionAppID=foo,dc=bar', [(ldap.MOD_REPLACE, 'univentionAppID', 'bar')])[0] 'univentionAppID=bar,dc=bar' """ rdn = ldap.dn.str2dn(dn)[0] dn_vals = dict((x[0].lower(), x[1]) for x in rdn) new_vals = dict((key.lower(), val if isinstance(val, (bytes, six.text_type)) else val[0]) for op, key, val in ml if val and op not in (ldap.MOD_DELETE,)) new_rdn_ava = [(x, new_vals.get(x.lower(), dn_vals[x.lower()]), ldap.AVA_STRING) for x in [y[0] for y in rdn]] new_rdn_unicode = [(key, val.decode('UTF-8'), ava_type) if isinstance(val, bytes) else (key, val, ava_type) for key, val, ava_type in new_rdn_ava] new_rdn = ldap.dn.dn2str([new_rdn_unicode]) rdn = ldap.dn.dn2str([rdn]) if rdn != new_rdn: return ldap.dn.dn2str([ldap.dn.str2dn(new_rdn)[0]] + ldap.dn.str2dn(dn)[1:]), new_rdn return dn, rdn
[docs] @_fix_reconnect_handling def modify_s(self, dn, ml): # type: (str, List[Tuple[str, Optional[List[str]], List[str]]]) -> None """ Redirect `modify_s` directly to :py:attr:`lo`. :param str dn: The distinguished name of the object to modify. :param ml: The modify-list of 3-tuples (attribute-name, old-values, new-values). """ try: self.lo.modify_ext_s(dn, ml) except ldap.REFERRAL as exc: if not self.follow_referral: raise lo_ref = self._handle_referral(exc) lo_ref.modify_ext_s(dn, ml)
[docs] @_fix_reconnect_handling def modify_ext_s(self, dn, ml, serverctrls=None, response=None): # type: (str, List[Tuple[str, Any, Any]], Optional[List[ldap.controls.LDAPControl]], Optional[dict]) -> None """ Redirect `modify_ext_s` directly to :py:attr:`lo`. :param str dn: The distinguished name of the object to modify. :param ml: The modify-list of 3-tuples (attribute-name, old-values, new-values). :param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request :type serverctrls: list[ldap.controls.LDAPControl] :param dict response: An optional dictionary to receive the server controls of the result. """ if not serverctrls: serverctrls = [] try: rtype, rdata, rmsgid, resp_ctrls = self.lo.modify_ext_s(dn, ml, serverctrls=serverctrls) except ldap.REFERRAL as exc: if not self.follow_referral: raise lo_ref = self._handle_referral(exc) rtype, rdata, rmsgid, resp_ctrls = lo_ref.modify_ext_s(dn, ml, serverctrls=serverctrls) if serverctrls and isinstance(response, dict): response['ctrls'] = resp_ctrls
[docs] def rename(self, dn, newdn, serverctrls=None, response=None): # type: (str, str, Optional[List[ldap.controls.LDAPControl]], Optional[dict]) -> None """ Rename a LDAP object. :param str dn: The old distinguished name of the object to rename. :param str newdn: The new distinguished name of the object to rename. :param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request :type serverctrls: list[ldap.controls.LDAPControl] :param dict response: An optional dictionary to receive the server controls of the result. """ univention.debug.debug(univention.debug.LDAP, univention.debug.INFO, 'uldap.rename %s -> %s' % (dn, newdn)) oldsdn = self.parentDn(dn) newrdn = ldap.dn.dn2str([ldap.dn.str2dn(newdn)[0]]) newsdn = ldap.dn.dn2str(ldap.dn.str2dn(newdn)[1:]) if not serverctrls: serverctrls = [] if oldsdn and newsdn.lower() == oldsdn.lower(): univention.debug.debug(univention.debug.LDAP, univention.debug.INFO, 'uldap.rename: modrdn %s to %s' % (dn, newrdn)) self.rename_ext_s(dn, newrdn, serverctrls=serverctrls, response=response) else: univention.debug.debug(univention.debug.LDAP, univention.debug.INFO, 'uldap.rename: move %s to %s in %s' % (dn, newrdn, newsdn)) self.rename_ext_s(dn, newrdn, newsdn, serverctrls=serverctrls, response=response)
[docs] @_fix_reconnect_handling def rename_ext_s(self, dn, newrdn, newsuperior=None, serverctrls=None, response=None): # type: (str, str, Optional[str], Optional[List[ldap.controls.LDAPControl]], Optional[dict]) -> None """ Redirect `rename_ext_s` directly to :py:attr:`lo`. :param str dn: The old distinguished name of the object to rename. :param str newdn: The new distinguished name of the object to rename. :param str newsuperior: The distinguished name of the new container. :param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request :type serverctrls: list[ldap.controls.LDAPControl] :param dict response: An optional dictionary to receive the server controls of the result. """ if not serverctrls: serverctrls = [] try: rtype, rdata, rmsgid, resp_ctrls = self.lo.rename_s(dn, newrdn, newsuperior, serverctrls=serverctrls) except ldap.REFERRAL as exc: if not self.follow_referral: raise lo_ref = self._handle_referral(exc) rtype, rdata, rmsgid, resp_ctrls = lo_ref.rename_s(dn, newrdn, newsuperior, serverctrls=serverctrls) if serverctrls and isinstance(response, dict): response['ctrls'] = resp_ctrls
[docs] @_fix_reconnect_handling def delete(self, dn): # type: (str) -> None """ Delete a LDAP object. :param str dn: The distinguished name of the object to remove. """ univention.debug.debug(univention.debug.LDAP, univention.debug.INFO, 'uldap.delete %s' % dn) if dn: univention.debug.debug(univention.debug.LDAP, univention.debug.INFO, 'delete') try: self.lo.delete_s(dn) except ldap.REFERRAL as exc: if not self.follow_referral: raise lo_ref = self._handle_referral(exc) lo_ref.delete_s(dn)
[docs] def parentDn(self, dn): # type: (str) -> Optional[str] """ Return the parent container of a distinguished name. :param str dn: The distinguished name. :return: The parent distinguished name or None if the LDAP base is reached. :rtype: str or None """ return parentDn(dn, self.base)
[docs] def explodeDn(self, dn, notypes=False): # type: (str, Union[bool, int]) -> List[str] """ Break up a DN into its component parts. :param str dn: The distinguished name. :param bool notypes: Return only the component's attribute values if True. Also the attribute types if False. :return: A list of relative distinguished names. :rtype: list[str] """ return explodeDn(dn, notypes)
[docs] @classmethod def compare_dn(cls, a, b): # type: (str, str) -> bool r"""Test DNs are same :param str a: The first distinguished name. :param str b: A second distinguished name. :returns: True if the DNs are the same, False otherwise. :rtype: bool >>> compare_dn = access.compare_dn >>> compare_dn('foo=1', 'foo=1') True >>> compare_dn('foo=1', 'foo=2') False >>> compare_dn('Foo=1', 'foo=1') True >>> compare_dn('Foo=1', 'foo=2') False >>> compare_dn('foo=1,bar=2', 'foo=1,bar=2') True >>> compare_dn('bar=2,foo=1', 'foo=1,bar=2') False >>> compare_dn('foo=1+bar=2', 'foo=1+bar=2') True >>> compare_dn('bar=2+foo=1', 'foo=1+bar=2') True >>> compare_dn('bar=2+Foo=1', 'foo=1+Bar=2') True >>> compare_dn(r'foo=\31', r'foo=1') True """ return [sorted((x.lower(), y, z) for x, y, z in rdn) for rdn in ldap.dn.str2dn(a)] == [sorted((x.lower(), y, z) for x, y, z in rdn) for rdn in ldap.dn.str2dn(b)]
def __getstate__(self): """ Return state for pickling. """ odict = self.__dict__.copy() del odict['lo'] return odict def __setstate__(self, dict): """ Set state for pickling. """ self.__dict__.update(dict) self.__open(self.ca_certfile) def _handle_referral(self, exception): # type: (ldap.REFERRAL) -> ldap.ldapobject.ReconnectLDAPObject """ Follow LDAP rederral. :param ldap.REFERRAL exception: The LDAP referral exception. :returns: LDAP connection object for the referred LDAP server. :rtype: ldap.ldapobject.ReconnectLDAPObject """ univention.debug.debug(univention.debug.LDAP, univention.debug.INFO, 'Following LDAP referral') exc = exception.args[0] info = exc.get('info') ldap_url = info[info.find('ldap'):] if isLDAPUrl(ldap_url): conn_str = LDAPUrl(ldap_url).initializeUrl() # FIXME?: this upgrades a access(reconnect=False) connection to a reconnect=True connection lo_ref = ldap.ldapobject.ReconnectLDAPObject(conn_str, trace_stack_limit=None) if self.ca_certfile: lo_ref.set_option(ldap.OPT_X_TLS_CACERTFILE, self.ca_certfile) lo_ref.set_option(ldap.OPT_X_TLS_NEWCTX, 0) if self.start_tls == 1: try: lo_ref.start_tls_s() except Exception: univention.debug.debug(univention.debug.LDAP, univention.debug.WARN, 'Could not start TLS') elif self.start_tls == 2: lo_ref.start_tls_s() lo_ref.simple_bind_s(self.binddn, self.bindpw) return lo_ref else: raise ldap.CONNECT_ERROR('Bad referral "%s"' % (exc,))
if __name__ == '__main__': import doctest doctest.testmod()