# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""|UDM| functions to parse, modify and create |LDAP| style search filters"""
from __future__ import annotations
import re
from typing import TYPE_CHECKING, TypeVar
from ldap.filter import filter_format
import univention.admin.uexceptions
if TYPE_CHECKING:
from collections.abc import Callable, Iterator, Sequence
from re import Match
T = TypeVar('T')
[docs]
class conjunction:
"""LDAP filter conjunction (`&`) or disjunction (`|`)."""
OPS = frozenset({'&', '|', '!'})
def __init__(self, type: str, expressions: list[conjunction | expression]) -> None:
"""
Create LDAP filter conjunction or disjunction.
>>> c = conjunction('&', ['(objectClass=*)'])
>>> c = conjunction('|', ['(objectClass=*)'])
"""
assert type in self.OPS
self.type = type
self.expressions = expressions
@classmethod
def _parse(cls, text: str) -> conjunction:
op = text[0]
expressions = [parse(s) for s in cls._split(text[1:])]
return conjunction(op, expressions)
@staticmethod
def _split(text: str) -> Iterator[str]:
depth = 0
begin = -1
for i, c in enumerate(text):
if c == '(':
depth += 1
if depth == 1:
begin = i
elif c == ')':
depth -= 1
if depth == 0 and begin > -1:
yield text[begin:i + 1]
begin = -1
def __str__(self) -> str:
"""
Return string representation.
>>> str(conjunction('&', ['(objectClass=*)']))
'(&(objectClass=*))'
>>> str(conjunction('|', ['(objectClass=*)']))
'(|(objectClass=*))'
>>> str(conjunction('!', ['(objectClass=*)']))
'(!(objectClass=*))'
>>> str(conjunction('&', []))
''
"""
if not self.expressions:
return ''
return '(%s%s)' % (self.type, ''.join(map(str, self.expressions)))
def __repr__(self) -> str:
"""
Return canonical representation.
>>> conjunction('&', ['(objectClass=*)'])
conjunction('&', ['(objectClass=*)'])
>>> conjunction('|', ['(objectClass=*)'])
conjunction('|', ['(objectClass=*)'])
"""
return '%s(%r, %r)' % (self.__class__.__name__, self.type, self.expressions)
[docs]
def append_unmapped_filter_string(self, filter_s: str | None, rewrite_function: Callable[[expression, T | None], None], mapping: T) -> None:
if filter_s:
filter_p = parse(filter_s)
walk(filter_p, rewrite_function, arg=mapping)
self.expressions.append(filter_p)
[docs]
class expression:
"""LDAP filter expression."""
OPS = frozenset({'=', '>=', '<=', '~=', '=*'} | {'>', '<', '!='})
# LDAP RFC 4515 + UCS specific extensions
RE_OP = re.compile(r'([<>]=?|[!~]=|=(?:[*]$)?)')
def __init__(self, variable: str = '', value: str = '', operator: str = '=', escape: bool = False) -> None:
"""
Create LDAP filter expression.
>>> e = expression('objectClass', '*', escape=False)
>>> e = expression('objectClass', '*', '!=', escape=False)
>>> e = expression('uidNumber', '10', '<') # < <= > >=
"""
assert operator in self.OPS
if operator == '=' and value == '*':
operator, value = '=*', ''
if operator == '=*' and value:
raise univention.admin.uexceptions.valueInvalidSyntax(value)
self.variable = variable
self.value = value
self.operator = operator
self._escape = escape
@classmethod
def _parse(cls, text: str) -> expression:
var, op, val = cls.RE_OP.split(text, 1)
return expression(var, val, operator=op)
def __str__(self) -> str:
r"""
Return string representation.
>>> str(expression('objectClass', '*', escape=False))
'(objectClass=*)'
>>> str(expression('objectClass', '*', '!=', escape=False))
'(!(objectClass=*))'
>>> str(expression('uidNumber', '10', '<'))
'(!(uidNumber>=10))'
>>> str(expression('cn', '', '=*'))
'(cn=*)'
>>> str(expression('cn', '*', '='))
'(cn=*)'
>>> str(expression('cn', r'*\2A*', '='))
'(cn=*\\2A*)'
"""
if self.operator == '<':
return self.escape('(!(%s>=%s))', (self.variable, self.value))
elif self.operator == '>':
return self.escape('(!(%s<=%s))', (self.variable, self.value))
elif self.operator == '!=':
return self.escape('(!(%s=%s))', (self.variable, self.value))
else:
return self.escape('(%%s%s%%s)' % (self.operator,), (self.variable, self.value))
[docs]
def escape(self, string: str, args: Sequence[str]) -> str:
if self._escape:
return filter_format(string, args)
return string % args
def __repr__(self) -> str:
"""
Return canonical representation.
>>> expression('objectClass', 'foo*', escape=False)
expression('objectClass', 'foo*', '=')
>>> expression('objectClass', '*', '!=', escape=False)
expression('objectClass', '*', '!=')
>>> expression('objectClass', '*', '=', escape=False)
expression('objectClass', '', '=*')
>>> expression('objectClass', '', '=*', escape=False)
expression('objectClass', '', '=*')
"""
return '%s(%r, %r, %r)' % (self.__class__.__name__, self.variable, self.value, self.operator)
[docs]
def parse(filter_s: conjunction | expression | str, begin: int = 0, end: int = -1) -> conjunction | expression:
r"""
Parse LDAP filter string.
>>> filter_s='(|(&(!(zone=univention.de))(soa=test))(nameserver=bar))'
>>> parse(filter_s)
conjunction('|', [conjunction('&', [conjunction('!', [expression('zone', 'univention.de', '=')]), expression('soa', 'test', '=')]), expression('nameserver', 'bar', '=')])
>>> parse('(!(key>=29))')
conjunction('!', [expression('key', '29', '>=')])
>>> parse('(&(key=va\\\\28!\\\\29ue))')
conjunction('&', [expression('key', 'va\\\\28!\\\\29ue', '=')])
>>> parse('(cn=Babs Jensen)')
expression('cn', 'Babs Jensen', '=')
>>> parse('(!(cn=Tim Howes))')
conjunction('!', [expression('cn', 'Tim Howes', '=')])
>>> parse('(&(objectClass=Person)(|(sn=Jensen)(cn=Babs J*)))')
conjunction('&', [expression('objectClass', 'Person', '='), conjunction('|', [expression('sn', 'Jensen', '='), expression('cn', 'Babs J*', '=')])])
>>> parse('(o=univ*of*mich*)')
expression('o', 'univ*of*mich*', '=')
>>> parse('(seeAlso=)')
expression('seeAlso', '', '=')
>>> parse('(cn:caseExactMatch:=Fred Flintstone)')
expression('cn:caseExactMatch:', 'Fred Flintstone', '=')
>>> parse('(cn:=Betty Rubble)')
expression('cn:', 'Betty Rubble', '=')
>>> parse('(sn:dn:2.4.6.8.10:=Barney Rubble)')
expression('sn:dn:2.4.6.8.10:', 'Barney Rubble', '=')
>>> parse('(o:dn:=Ace Industry)')
expression('o:dn:', 'Ace Industry', '=')
>>> parse('(:1.2.3:=Wilma Flintstone)')
expression(':1.2.3:', 'Wilma Flintstone', '=')
>>> parse('(:DN:2.4.6.8.10:=Dino)')
expression(':DN:2.4.6.8.10:', 'Dino', '=')
>>> parse(r'(o=Parens R Us \28for all your parenthetical needs\29)')
expression('o', 'Parens R Us \\28for all your parenthetical needs\\29', '=')
>>> parse(r'(cn=*\2A*)')
expression('cn', '*\\2A*', '=')
>>> parse(r'(cn=*)')
expression('cn', '', '=*')
>>> parse(r'(filename=C:\5cMyFile)')
expression('filename', 'C:\\5cMyFile', '=')
>>> parse(r'(bin=\00\00\00\04)')
expression('bin', '\\00\\00\\00\\04', '=')
>>> parse(r'(sn=Lu\c4\8di\c4\87)')
expression('sn', 'Lu\\c4\\8di\\c4\\87', '=')
>>> parse(r'(1.3.6.1.4.1.1466.0=\04\02\48\69)')
expression('1.3.6.1.4.1.1466.0', '\\04\\02\\48\\69', '=')
"""
# filter is already parsed
if not isinstance(filter_s, str):
return filter_s
if end == -1:
end = len(filter_s) - 1
if filter_s[begin] == '(' and filter_s[end] == ')':
begin += 1
end -= 1
part = filter_s[begin:end + 1]
try:
if filter_s[begin] in conjunction.OPS:
return conjunction._parse(part)
else:
return expression._parse(part)
except (AssertionError, ValueError):
raise univention.admin.uexceptions.valueInvalidSyntax(part)
[docs]
def walk(
filter_p: conjunction | expression,
expression_walk_function: Callable[[expression, T | None], None] | None = None,
conjunction_walk_function: Callable[[conjunction, T | None], None] | None = None,
arg: T | None = None,
) -> None:
"""
Walk LDAP filter expression tree.
:param filter_p: expression tree.
:param expression_walk_function: Callback for expressions.
:param conjunction_walk_function: Callback for conjunctions.
:param arg: Argument to the callback functions.
>>> filter_s = '(|(&(!(zone=univention.de))(soa=test))(nameserver=bar))'
>>> filter_p = parse(filter_s)
>>> def trace(e, a): print((a, str(e)))
>>> walk(filter_p, trace, None, 'e')
('e', '(zone=univention.de)')
('e', '(soa=test)')
('e', '(nameserver=bar)')
>>> walk(filter_p, None, trace, 'c')
('c', '(!(zone=univention.de))')
('c', '(&(!(zone=univention.de))(soa=test))')
('c', '(|(&(!(zone=univention.de))(soa=test))(nameserver=bar))')
"""
if isinstance(filter_p, conjunction):
for e in filter_p.expressions:
walk(e, expression_walk_function, conjunction_walk_function, arg)
if conjunction_walk_function:
conjunction_walk_function(filter_p, arg)
elif isinstance(filter_p, expression):
if expression_walk_function:
expression_walk_function(filter_p, arg)
else:
raise TypeError(type(filter_p))
FQDN_REGEX = re.compile(r'(?:^|\()fqdn=([^)]+)(?:\)|$)')
[docs]
def replace_fqdn_filter(filter_s: str) -> str:
"""
Replaces a filter expression for the read-only attribute fqdn. If no
such expression can be found the unmodified filter is returned.
>>> replace_fqdn_filter('fqdn=host.domain.tld')
'(&(cn=host)(associatedDomain=domain.tld))'
>>> replace_fqdn_filter('(fqdn=host.domain.tld)')
'(&(cn=host)(associatedDomain=domain.tld))'
>>> replace_fqdn_filter('fqdn=domain')
'(|(cn=domain)(associatedDomain=domain))'
>>> replace_fqdn_filter('(|(fqdn=host.domain.tld)(fqdn=other.domain.tld2))')
'(|(&(cn=host)(associatedDomain=domain.tld))(&(cn=other)(associatedDomain=domain.tld2)))'
"""
if not isinstance(filter_s, str):
return filter_s
return FQDN_REGEX.sub(_replace_fqdn_filter, filter_s)
def _replace_fqdn_filter(match: Match[str]) -> str:
(value,) = match.groups()
try:
host, domain = value.split('.', 1)
operator = '&'
except ValueError:
host = domain = value
operator = '|'
return '(%s(cn=%s)(associatedDomain=%s))' % (operator, host, domain)