Source code for univention.admin.allocators

# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""|UDM| allocators to allocate and lock resources for |LDAP| object creation."""

from __future__ import annotations

from typing import TYPE_CHECKING, overload

import ldap
from ldap.filter import filter_format

import univention.admin.localization
import univention.admin.locking
import univention.admin.uexceptions
from univention.admin._ucr import configRegistry
from univention.admin.log import log


if TYPE_CHECKING:
    from collections.abc import Sequence


try:
    from typing import Literal

    _TypesUidGid = Literal['uidNumber', 'gidNumber']
    _Types = Literal[
        'uidNumber',
        'gidNumber',
        'uid',
        'gid',
        'sid',
        'domainSid',
        'mailPrimaryAddress',
        'mailAlternativeAddress',
        'aRecord',
        'mac',
        'groupName',
        'cn-uid-position',
        'univentionObjectIdentifier',
    ]
    _Scopes = Literal['base', 'one', 'sub', 'domain']
except ImportError:
    pass


log = log.getChild('ALLOCATE')
translation = univention.admin.localization.translation('univention/admin')
_ = translation.translate

_type2attr: dict[_Types, str] = {
    'uidNumber': 'uidNumber',
    'gidNumber': 'gidNumber',
    'uid': 'uid',
    'gid': 'gid',
    'sid': 'sambaSID',
    'domainSid': 'sambaSID',
    'mailPrimaryAddress': 'mailPrimaryAddress',
    'mailAlternativeAddress': 'mailAlternativeAddress',
    'aRecord': 'aRecord',
    'mac': 'macAddress',
    'groupName': 'cn',
    'cn-uid-position': 'cn',  # ['cn', 'uid', 'ou'],
    'univentionObjectIdentifier': 'univentionObjectIdentifier',
}
_type2scope: dict[_Types, _Scopes] = {
    'uidNumber': 'base',
    'gidNumber': 'base',
    'uid': 'domain',
    'gid': 'domain',
    'sid': 'base',
    'domainSid': 'base',
    'mailPrimaryAddress': 'domain',
    'mailAlternativeAddress': 'domain',
    'aRecord': 'domain',
    'mac': 'domain',
    'groupName': 'domain',
    'cn-uid-position': 'one',
    'univentionObjectIdentifier': 'domain',
}


[docs] def requestUserSid( lo: univention.admin.uldap.access, position: univention.admin.uldap.position, uid_s: str, ) -> str: uid = int(uid_s) algorithmical_rid_base = 1000 rid = str(uid * 2 + algorithmical_rid_base) searchResult = lo.authz_connection.search(filter='objectClass=sambaDomain', attr=['sambaSID']) domainsid = searchResult[0][1]['sambaSID'][0].decode('ASCII') sid = domainsid + '-' + rid log.trace('request user sid', SID=f'{domainsid}-{rid}') return request(lo, position, 'sid', sid)
[docs] def requestGroupSid( lo: univention.admin.uldap.access, position: univention.admin.uldap.position, gid_s: str, generateDomainLocalSid: bool = False, ) -> str: gid = int(gid_s) algorithmical_rid_base = 1000 rid = str(gid * 2 + algorithmical_rid_base + 1) if generateDomainLocalSid: sid = 'S-1-5-32-' + rid else: searchResult = lo.authz_connection.search(filter='objectClass=sambaDomain', attr=['sambaSID']) domainsid = searchResult[0][1]['sambaSID'][0].decode('ASCII') sid = domainsid + '-' + rid return request(lo, position, 'sid', sid)
[docs] def acquireRange( lo: univention.admin.uldap.access, position: univention.admin.uldap.position, atype: _Types, attr: str, ranges: Sequence[dict[str, int]], scope: _Scopes = 'base', ) -> str: start_id = lo.authz_connection.getAttr('cn=%s,cn=temporary,cn=univention,%s' % (ldap.dn.escape_dn_chars(atype), position.getBase()), 'univentionLastUsedValue') log.trace('Start allocating range', type=atype, start=start_id) if not start_id: startID = ranges[0]['first'] log.trace('Restart Start ID', start=startID) else: startID = int(start_id[0]) for _range in ranges: if startID < _range['first']: startID = _range['first'] last = _range['last'] + 1 other = None while startID < last: startID += 1 log.trace('Set Start ID', start=startID) try: if other: # exception occurred while locking other, so atype was successfully locked and must be released univention.admin.locking.unlock(lo, position, atype, str(startID - 1).encode('utf-8'), scope=scope) other = None log.trace('Get lock', value=startID, type=atype) univention.admin.locking.lock(lo, position, atype, str(startID).encode('utf-8'), scope=scope) if atype in ('uidNumber', 'gidNumber'): # reserve the same ID for both other = 'uidNumber' if atype == 'gidNumber' else 'gidNumber' log.trace('Get lock', value=startID, type=other) univention.admin.locking.lock(lo, position, other, str(startID).encode('utf-8'), scope=scope) except univention.admin.uexceptions.noLock: log.trace('Cannot get lock', value=startID) continue except univention.admin.uexceptions.objectExists: log.trace('Cannot get lock (already existing)', value=startID) continue if atype in ('uidNumber', 'gidNumber'): _filter = filter_format('(|(uidNumber=%s)(gidNumber=%s))', (str(startID), str(startID))) else: _filter = '(%s=%d)' % (attr, startID) log.trace('lock searchfor', filter=_filter) if lo.authz_connection.searchDn(base=position.getBase(), filter=_filter): log.trace('lock already used ID', value=startID) univention.admin.locking.unlock(lo, position, atype, str(startID).encode('utf-8'), scope=scope) if other: univention.admin.locking.unlock(lo, position, other, str(startID).encode('utf-8'), scope=scope) other = None continue log.trace('Got lock', value=startID) if other: univention.admin.locking.unlock(lo, position, other, str(startID).encode('utf-8'), scope=scope) return str(startID) raise univention.admin.uexceptions.noLock(_('The attribute %r could not get locked.') % (atype,))
[docs] def acquireUnique( lo: univention.admin.uldap.access, position: univention.admin.uldap.position, type: _Types, value: str, attr: str, scope: _Scopes = 'base', ) -> str: log.trace('acquire unique lock', scope=scope) searchBase = position.getDomain() if scope == 'domain' else position.getBase() if type == 'aRecord': # uniqueness is only relevant among hosts (one or more dns entries having the same aRecord as a host are allowed) univention.admin.locking.lock(lo, position, type, value.encode('utf-8'), scope=scope) if not lo.authz_connection.searchDn(base=searchBase, filter=filter_format('(&(objectClass=univentionHost)(%s=%s))', (attr, value))): return value elif type in ['groupName', 'uid'] and configRegistry.is_true('directory/manager/user_group/uniqueness', True): univention.admin.locking.lock(lo, position, type, value.encode('utf-8'), scope=scope) if not lo.authz_connection.searchDn(base=searchBase, filter=filter_format('(|(&(cn=%s)(|(objectClass=univentionGroup)(objectClass=sambaGroupMapping)(objectClass=posixGroup)))(uid=%s))', (value, value))): log.trace('aquired unique', value=value) return value elif type == 'groupName': # search filter is more complex then in general case univention.admin.locking.lock(lo, position, type, value.encode('utf-8'), scope=scope) if not lo.authz_connection.searchDn(base=searchBase, filter=filter_format('(&(%s=%s)(|(objectClass=univentionGroup)(objectClass=sambaGroupMapping)(objectClass=posixGroup)))', (attr, value))): log.trace('aquired unique', value=value) return value elif type == 'cn-uid-position': base = lo.parentDn(value) attr, value, __ = ldap.dn.str2dn(value)[0][0] try: attrs = {'cn': ['uid'], 'uid': ['cn', 'ou'], 'ou': ['uid']}[attr] except KeyError: return value assert base is not None if all( ldap.dn.str2dn(x)[0][0][0] not in attrs for x in lo.authz_connection.searchDn(base=base, filter='(|%s)' % ''.join(filter_format('(%s=%s)', (attr, value)) for attr in attrs), scope=scope) ): log.trace('aquired unique', value=value) return value raise univention.admin.uexceptions.alreadyUsedInSubtree('name=%r position=%r' % (value, base)) elif type in ('mailPrimaryAddress', 'mailAlternativeAddress') and configRegistry.is_true('directory/manager/mail-address/uniqueness'): univention.admin.locking.lock(lo, position, 'mailPrimaryAddress', value.encode('utf-8'), scope=scope) other = 'mailPrimaryAddress' if type == 'mailAlternativeAddress' else 'mailAlternativeAddress' if not lo.authz_connection.searchDn(base=searchBase, filter=filter_format('(|(%s=%s)(%s=%s))', (attr, value, other, value))): log.trace('aquired unique', value=value) return value elif type == 'mailAlternativeAddress': return value # lock for mailAlternativeAddress exists only if above UCR variable is enabled else: univention.admin.locking.lock(lo, position, type, value.encode('utf-8'), scope=scope) if not lo.authz_connection.searchDn(base=searchBase, filter=filter_format('%s=%s', (attr, value))): log.trace('aquired unique', value=value) return value # this is the else-part to the uniqueness search above, if an entry with the to-be-locked attribute already exists in LDAP # before that search, we created a lock-object already, which we don't need anymore as we are aborting anyway # other processes also don't need the lock, as they will fail here again due to the same search univention.admin.locking.unlock(lo, position, type, value.encode('utf-8'), scope=scope) raise univention.admin.uexceptions.noLock(_('The attribute %r could not get locked. The value is already in use.') % (type,))
@overload def request( lo: univention.admin.uldap.access, position: univention.admin.uldap.position, type: _TypesUidGid, value: str | None = None, ) -> str: pass @overload def request( lo: univention.admin.uldap.access, position: univention.admin.uldap.position, type: _Types, value: str, ) -> str: pass
[docs] def request( lo: univention.admin.uldap.access, position: univention.admin.uldap.position, type: _Types, value: str | None = None, ) -> str: if type in ('uidNumber', 'gidNumber'): return acquireRange(lo, position, type, _type2attr[type], [{'first': 1000, 'last': 55000}, {'first': 65536, 'last': 1000000}], scope=_type2scope[type]) assert value is not None return acquireUnique(lo, position, type, value, _type2attr[type], scope=_type2scope[type])
[docs] def confirm( lo: univention.admin.uldap.access, position: univention.admin.uldap.position, type: _Types, value: str, updateLastUsedValue: bool = True, ) -> None: if type in ('uidNumber', 'gidNumber') and updateLastUsedValue: lo.authz_connection.modify('cn=%s,cn=temporary,cn=univention,%s' % (ldap.dn.escape_dn_chars(type), position.getBase()), [('univentionLastUsedValue', b'1', value.encode('utf-8'))]) elif type == 'cn-uid-position': return univention.admin.locking.unlock(lo, position, type, value.encode('utf-8'), _type2scope[type])
[docs] def release( lo: univention.admin.uldap.access, position: univention.admin.uldap.position, type: _Types, value: str, ) -> None: univention.admin.locking.unlock(lo, position, type, value.encode('utf-8'), _type2scope[type])