Source code for univention.admin.blocklist
# SPDX-FileCopyrightText: 2024-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""|UDM| functions to check and create blocklist entries"""
from __future__ import annotations
import hashlib
import re
from datetime import datetime
from typing import TYPE_CHECKING
import ldap
from dateutil.relativedelta import relativedelta
import univention.admin.localization
import univention.admin.uexceptions
import univention.admin.uldap
from univention.admin._ucr import configRegistry
if TYPE_CHECKING:
from collections.abc import Iterable
from typing import Any
translation = univention.admin.localization.translation('univention.admin.handlers')
_ = translation.translate
BLOCKLIST_BASE = 'cn=blocklists,cn=internal'
[docs]
def hash_blocklist_value(value: bytes) -> str:
return 'sha256:%s' % hashlib.sha256(value.lower()).hexdigest()
[docs]
def parse_timedelta(timedelta_string: str) -> relativedelta | None:
"""
Parse time delta.
>>> parse_timedelta("1y10m340d")
relativedelta(years=+1, months=+10, days=+340)
"""
match = re.match(r'((?P<years>-?\d+)y)?((?P<months>-?\d+)m)?((?P<days>-?\d+)d)?', timedelta_string)
if match:
parts = {unit: int(value) for unit, value in match.groupdict().items() if value}
return relativedelta(**parts)
@univention.admin._ldap_cache(ttl=120)
def get_blocklist_config(lo: univention.admin.uldap.access) -> dict:
config = {}
try:
for blist in univention.admin.modules.get('blocklists/list').lookup(None, lo, 'entryUUID=*', base=BLOCKLIST_BASE, scope='one', authz=False):
config[blist.dn] = blist.get('retentionTime', '30d')
for mod, prop in blist.get('blockingProperties', []):
config.setdefault(mod, {})[prop] = blist.dn
except univention.admin.uexceptions.noObject:
# that means cn=internal is not (yet) available
# return an empty config, without cn=internal, there is no config
pass
return config
[docs]
def get_blocking_udm_properties(udm_obj: univention.admin.handlers.simpleLdap) -> dict:
config = get_blocklist_config(udm_obj.lo_machine_primary)
return config.get(udm_obj.module, {})
[docs]
def get_blockeduntil(dn: str, lo: univention.admin.uldap.access) -> str:
config = get_blocklist_config(lo)
retention = config.get(dn, '30d')
blocking_duration = parse_timedelta(retention)
blocked_until = datetime.utcnow() + blocking_duration
return datetime.strftime(blocked_until, '%Y%m%d%H%M%SZ')
[docs]
def blocklist_enabled(udm_obj: univention.admin.handlers.simpleLdap) -> bool:
return not udm_obj.module.startswith('blocklists/') and configRegistry.is_true('directory/manager/blocklist/enabled', False)
[docs]
def get_blocklist_values_from_udm_property(udm_property_value: Any, udm_property_name: str) -> list[Any]:
if isinstance(udm_property_value, str):
return [udm_property_value]
if not isinstance(udm_property_value, list) or not all(isinstance(mem, str) for mem in udm_property_value):
raise RuntimeError('The property %r uses a complex syntax. This is not supported for blocklist objects.' % udm_property_name)
return udm_property_value
[docs]
def create_blocklistentry(udm_obj: univention.admin.handlers.simpleLdap) -> list:
if not blocklist_enabled(udm_obj):
return []
blocklist_entries = []
for prop, bl_dn in get_blocking_udm_properties(udm_obj).items():
if (not udm_obj.exists() and udm_obj.oldinfo.get(prop)) or (udm_obj.hasChanged(prop) and udm_obj.oldinfo.get(prop)):
blocklist_position = univention.admin.uldap.position(bl_dn)
for value in get_blocklist_values_from_udm_property(udm_obj.oldinfo[prop], prop):
blocklistentry = univention.admin.modules.get('blocklists/entry').object(None, udm_obj.lo_machine_primary, blocklist_position)
blocklistentry.open()
blocklistentry['value'] = value
blocklistentry['originUniventionObjectIdentifier'] = udm_obj.entry_uuid
blocklistentry['blockedUntil'] = get_blockeduntil(bl_dn, udm_obj.lo_machine_primary)
try:
blocklistentry.create(ignore_license=True)
except univention.admin.uexceptions.objectExists:
pass
else:
blocklist_entries.append(blocklistentry.dn)
return blocklist_entries
[docs]
def check_blocklistentry(udm_obj: univention.admin.handlers.simpleLdap) -> None:
if not blocklist_enabled(udm_obj):
return
for prop, bl_dn in get_blocking_udm_properties(udm_obj).items():
if udm_obj.hasChanged(prop) and udm_obj.info.get(prop):
for value in get_blocklist_values_from_udm_property(udm_obj.info[prop], prop):
hashed_value = ldap.dn.escape_dn_chars(hash_blocklist_value(value.encode(*udm_obj.mapping.getEncoding(prop))))
dn = 'cn=%s,%s' % (hashed_value, bl_dn)
obj = udm_obj.lo_machine_primary.get(dn)
if obj and obj['originUniventionObjectIdentifier'][0].decode('utf-8') != udm_obj.entry_uuid:
raise univention.admin.uexceptions.valueError(
_('The value "%(value)s" is blocked for the property "%(prop)s".') % {'value': value, 'prop': prop}, property=prop,
)
[docs]
def cleanup_blocklistentry(blocklist_entries: Iterable, udm_obj: univention.admin.handlers.simpleLdap) -> None:
for entry in blocklist_entries:
try:
udm_obj.lo_machine_primary.delete(entry)
except univention.admin.uexceptions.noObject:
pass