#!/usr/bin/python3
# SPDX-FileCopyrightText: 2024-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""
This program compares LDAP host entries with a local comparative ldif file.
All differences will be displayed at the console.
"""
import base64
import errno
import os
import re
import select
import signal
import subprocess
import sys
import time
import unicodedata
from collections.abc import Iterable, Iterator
from optparse import SUPPRESS_HELP, OptionGroup, OptionParser, Values
from typing import Any, Literal, NoReturn, Self
Entry = dict[str, list[str]]
USAGE = 'usage: %prog [option] <LDIF1> [[option] <LDIF2>]'
DESCRIPTION = '''
Compares the LDIF files.
LDIF can be wither a local LDIF file or
a hostname whose LDAP will be dumped using slapcat over ssh.
If LDIF2 is omitted, a local 'slapcat' is used.
'''.strip()
[docs]
class LdifError(Exception):
"""Error in input processing."""
[docs]
class SlapError(Exception):
"""Error in slapcat processing."""
[docs]
class Ldif:
"""Abstract class for LDIF source."""
# RFC2849: LDAP Data Interchange Format
RE = re.compile(r'''
^
(?:
([0-9]+(?:\.[0-9]+)*) # ldap-oid
|([A-Za-z][\-0-9A-Za-z]*) # AttributeType
) # AttributeDescription
(;[\-0-9A-Za-z]+)* # OPTIONS
:
(?:
$ # EMPTY
|:[ ]*([+/0-9=A-Za-z]+) # BASE64-STRING
|[ ]*([\x01-\x09\x0b-\x0c\x0e-\x1f\x21-\x39\x3b\x3d-\x7f][\x01-\x09\x0b-\x0c\x0e-\x7f]*) # SAFE-STRING
) # value-spec
$
''', re.VERBOSE)
# Operational LDAP attributes
OPERATIONAL = {
"entryCSN",
"modifiersName",
"modifyTimestamp",
"creatorsName",
"entryUUID",
"createTimestamp",
'structuralObjectClass',
}
def __init__(self, src: Iterable[bytes], exclude: set[str] = OPERATIONAL) -> None:
self.src = src
self.exclude = exclude
self.lno = 0
[docs]
def next_line(self) -> Iterator[str]:
"""Return line iterator."""
lines = []
for lno, chunk in enumerate(self.src, start=1):
line = chunk.decode('utf-8', 'replace')
line = line.rstrip('\r\n')
if line[:1] in (' ', '\t'):
lines.append(line[1:])
else:
yield ''.join(lines)
self.lno = lno
lines[:] = [line]
yield ''.join(lines)
[docs]
def split(self, line: str) -> tuple[str, str]:
r"""
Split attribute and value.
Options are stripped.
Base64 encoded values are decoded.
:param str line: The line to split.
:return: A tuple (name, value).
>>> Ldif(b'').split('a:') == ('a', u'')
True
>>> Ldif(b'').split('a: b') == ('a', u'b')
True
>>> Ldif(b'').split('a:: YWFh') == ('a', u'aaa')
True
>>> Ldif(b'').split('a;b:c') == ('a', u'c')
True
>>> Ldif(b'').split('a;b;c::YWFh') == ('a', u'aaa')
True
>>> Ldif(b'').split('a:: ACB/') == ('a', u'\\u0000 \\u007f')
True
"""
match = self.RE.match(line)
if not match:
raise LdifError('%d: %s' % (self.lno, line))
oid, attr, _opt, b64, plain = match.groups()
key = attr or oid
if plain:
value = plain
elif b64:
value = base64.b64decode(b64).decode('utf-8', 'replace')
value = self.printable(value)
else:
value = ""
return (key, value)
def __iter__(self) -> Iterator[Entry]:
"""Return line iterator."""
obj: Entry = {}
for line in self.next_line():
if line.startswith('#'):
continue
if line:
key, value = self.split(line)
if key in self.exclude:
continue
obj.setdefault(key, []).append(value)
elif obj:
yield obj
obj = {}
[docs]
@staticmethod
def printable(value: str) -> str:
"""Convert binary data to printable string."""
# Py2 has no str.isprintable()
return ''.join(
f'\\u{ord(c):04x}' if c != ' ' and unicodedata.category(c)[0] in 'CZ' else c
for c in value
)
[docs]
class LdifSource:
[docs]
@classmethod
def create(cls, arg: str, options: Values) -> 'LdifFile':
raise NotImplementedError()
[docs]
def start_reading(self) -> Ldif:
"""Start reading the LDIF data."""
raise NotImplementedError()
[docs]
class LdifFile:
"""LDIF source from local file."""
[docs]
@classmethod
def create(cls, arg: str, options: Values) -> Self:
return cls(arg)
def __init__(self, filename: str) -> None:
super().__init__()
self.filename = filename
[docs]
def start_reading(self) -> Ldif:
"""Start reading the LDIF data."""
try:
return Ldif(open(self.filename, 'rb'))
except OSError as ex:
raise LdifError(ex)
[docs]
class LdifSlapcat:
"""LDIF source from local LDAP."""
[docs]
@classmethod
def create(cls, arg: Any, options: Values) -> Self:
return cls()
def __init__(self) -> None:
super().__init__()
self.command = ['slapcat', '-f', '/etc/ldap/slapd.conf', '-d0']
[docs]
def start_reading(self) -> Ldif:
"""Start reading the LDIF data."""
try:
proc = subprocess.Popen(self.command, stdout=subprocess.PIPE)
assert proc.stdout
self.wait_for_data(proc)
return Ldif(proc.stdout)
except OSError as ex:
raise SlapError("Error executing", self.command, ex)
[docs]
def wait_for_data(self, proc: subprocess.Popen) -> None:
"""
Wait for the remote process to send data.
>>> LdifSlapcat().wait_for_data(subprocess.Popen(('echo',), stdout=subprocess.PIPE))
>>> LdifSlapcat().wait_for_data(subprocess.Popen(('false',), stdout=subprocess.PIPE)) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
SlapError: ('Error executing', ['slapcat', '-d0'], 1)
"""
while True:
rlist = [proc.stdout]
wlist: list[int] = []
xlist: list[int] = []
try:
rlist, wlist, xlist = select.select(rlist, wlist, xlist)
break
except OSError as ex:
if ex.errno == errno.EINTR:
continue
else:
raise
time.sleep(0.5)
ret = proc.poll()
if ret is not None and ret != 0:
raise SlapError("Error executing", self.command, ret)
[docs]
class LdifSsh(LdifSlapcat):
"""LDIF source from remote LDAP."""
[docs]
@classmethod
def create(cls, hostname: str, options: Values) -> Self:
return cls(hostname, options.ssh)
def __init__(self, hostname: str, ssh: str = 'ssh') -> None:
super().__init__()
self.command = [ssh, hostname, *self.command]
def __test(_option: Values, _opt_str: str, _value: None, _parser: OptionParser) -> NoReturn:
"""Run internal test suite."""
import doctest
res = doctest.testmod()
sys.exit(int(bool(res[0])))
[docs]
def stream2object(ldif: Ldif) -> dict[str, Entry]:
"""
Convert LDIF stream to dictionary of objects.
:param Ldif ldif: A LDIF stream.
:return: A dictionary mapping distinguished names to a dictionary of key-values.
>>> stream2object([{'dn': ['dc=test']}])
{'dc=test': {}}
"""
objects: dict[str, Entry] = {}
for obj in ldif:
try:
dname, = obj.pop('dn')
objects[dname] = obj # type: ignore
except KeyError:
print(f'Missing dn: {obj!r}', file=sys.stderr)
except ValueError:
print(f'Multiple dn: {obj!r}', file=sys.stderr)
return objects
[docs]
def sort_dn(dname: str) -> tuple[tuple[str, ...], ...]:
"""
Sort by reversed dn.
:param str dname: distinguished name.
:return: tuple of relative distinguised names.
>>> sort_dn('a=1')
(('a=1',),)
>>> sort_dn('b=1,a=1')
(('a=1',), ('b=1',))
>>> sort_dn('b=2+a=1')
(('a=1', 'b=2'),)
"""
return tuple(reversed([tuple(sorted(_.split('+'))) for _ in dname.split(',')]))
[docs]
def compare_ldif(lldif: Ldif, rldif: Ldif, options: Values) -> int:
"""
Compare two LDIF files.
:param ldif1: first LDIF to compare.
:param ldif2: second LDIF to compare.
:param options: command line options.
"""
lefts = stream2object(lldif)
rights = stream2object(rldif)
lkeys = sorted(lefts, key=sort_dn, reverse=True)
rkeys = sorted(rights, key=sort_dn, reverse=True)
ret = 0
ldn = rdn = ""
while True:
if not ldn and lkeys:
ldn = lkeys.pop(0)
if not rdn and rkeys:
rdn = rkeys.pop(0)
if not ldn and not rdn:
break
lk, rk = sort_dn(ldn), sort_dn(rdn)
if lk < rk:
diffs = list(compare_keys({}, rights[rdn]))
print(f'+dn: {rdn}')
rdn = ""
elif lk > rk:
diffs = list(compare_keys(lefts[ldn], {}))
print(f'-dn: {ldn}')
ldn = ""
else:
diffs = list(compare_keys(lefts[ldn], rights[rdn]))
if not options.objects and all(diff == 0 for diff, key, val in diffs):
ldn = rdn = ""
continue
print(f' dn: {rdn}')
ldn = rdn = ""
for diff, key, val in diffs:
if options.attributes or diff:
print('%s%s: %s' % (' +-'[diff], key, val))
print()
ret = 1
return ret
[docs]
def compare_keys(ldata: Entry, rdata: Entry) -> Iterator[tuple[Literal[-1, 0, 1], str, str]]:
"""
Compare and return attributes of two LDAP objects.
:param dict ldata: the first LDAP object.
:param dict rdata: the second LDAP object.
:return: an iterator of differences as 3-tuples (comparison, key, value).
>>> list(compare_keys({}, {}))
[]
>>> list(compare_keys({'a': ['1']}, {}))
[(-1, 'a', '1')]
>>> list(compare_keys({}, {'a': ['1']}))
[(1, 'a', '1')]
>>> list(compare_keys({'a': ['1']}, {'a': ['1']}))
[(0, 'a', '1')]
>>> list(compare_keys({'a': ['1']}, {'a': ['2']}))
[(1, 'a', '2'), (-1, 'a', '1')]
"""
lkeys = sorted(ldata, reverse=True)
rkeys = sorted(rdata, reverse=True)
lkey = rkey = ""
while True:
if not lkey and lkeys:
lkey = lkeys.pop(0)
if not rkey and rkeys:
rkey = rkeys.pop(0)
if not lkey and not rkey:
break
if lkey < rkey:
yield from compare_values(rkey, [], rdata[rkey])
rkey = ""
elif lkey > rkey:
yield from compare_values(lkey, ldata[lkey], [])
lkey = ""
else:
yield from compare_values(lkey, ldata[lkey], rdata[rkey])
lkey = rkey = ""
[docs]
def compare_values(attr: str, lvalues: list[str], rvalues: list[str]) -> Iterator[tuple[Literal[-1, 0, 1], str, str]]:
"""
Compare and return values of two multi-valued LDAP attributes.
:param list lvalues: the first values.
:param list rvalues: the second values.
:return: an iterator of differences as 3-tuples (comparison, key, value), where comparison<0 if key is missing in lvalues, comparison>0 if key is missing in rvalues, otherwise 0.
>>> list(compare_values('attr', [], []))
[]
>>> list(compare_values('attr', ['1', '2'], ['2', '3']))
[(1, 'attr', '3'), (0, 'attr', '2'), (-1, 'attr', '1')]
"""
lvalues.sort(reverse=True)
rvalues.sort(reverse=True)
lval = rval = ""
while True:
if not lval and lvalues:
lval = lvalues.pop(0)
if not rval and rvalues:
rval = rvalues.pop(0)
if not lval and not rval:
break
if lval < rval:
yield (1, attr, rval)
rval = ""
elif lval > rval:
yield (-1, attr, lval)
lval = ""
else:
yield (0, attr, lval)
lval = rval = ""
[docs]
def parse_args() -> tuple[LdifSource, LdifSource, Values]:
"""Parse command line arguments."""
parser = OptionParser(usage=USAGE, description=DESCRIPTION)
parser.disable_interspersed_args()
parser.set_defaults(source=LdifFile, verbose=1)
group = OptionGroup(parser, "Source", "Source for LDIF")
group.add_option(
"--file", "-f",
action="store_const", dest="source", const=LdifFile,
help="next arguments are LDIF files")
group.add_option(
"--host", "-H",
action="store_const", dest="source", const=LdifSsh,
help="next arguments are LDAP hosts")
group.add_option(
"--ssh", "-s", default="ssh",
dest="ssh",
help="specify the remote shell to use [%default]")
parser.add_option_group(group)
group = OptionGroup(parser, "Attributes", "Ignore attributes")
group.add_option(
"--operational",
action="store_true", dest="operational",
help="also compare operational attributes")
group.add_option(
"--exclude", "-x",
action="append", dest="exclude",
help="ignore attribute", default=[])
parser.add_option_group(group)
group = OptionGroup(parser, "Output", "Control output")
group.add_option(
"--objects", "-o",
action="store_true", dest="objects",
help="show even unchanged objects")
group.add_option(
"--attributes", "-a",
action="store_true", dest="attributes",
help="show even unchanged attributes")
parser.add_option_group(group)
parser.add_option(
'--test-internal',
action='callback', callback=__test,
help=SUPPRESS_HELP)
try:
options, args = parser.parse_args(args=sys.argv[1:])
try:
ldif1 = options.source.create(args.pop(0), options)
except IndexError:
parser.error("No arguments were given")
options, args = parser.parse_args(args=args, values=options)
ldif2 = options.source.create(args.pop(0), options) if args else LdifSlapcat.create(None, options)
if args:
parser.error("More than two LDIFs given.")
except LdifError as ex:
parser.error("Failed to parse LDIF: %s", ex)
return ldif1, ldif2, options
[docs]
def main() -> None:
"""A main()-method with options."""
src1, src2, options = parse_args()
try:
ldif1, ldif2 = (src.start_reading() for src in (src1, src2))
except (LdifError, SlapError) as ex:
sys.exit("Failed to setup source: %s" % ex)
exclude = set(options.exclude)
if not options.operational:
exclude |= Ldif.OPERATIONAL
ldif1.exclude = ldif2.exclude = exclude
run_compare(ldif1, ldif2, options)
[docs]
def run_compare(ldif1: Ldif, ldif2: Ldif, options: Values) -> NoReturn:
"""
UNIX correct error handling.
Termination by signal is propagaed as signal.
:param ldif1: first LDIF to compare.
:param ldif2: second LDIF to compare.
:param options: command line options.
"""
ret = 2
try:
ret = compare_ldif(ldif1, ldif2, options)
except KeyboardInterrupt:
signal.signal(signal.SIGINT, signal.SIG_DFL)
os.kill(os.getpid(), signal.SIGINT)
except OSError as ex:
if ex.errno == errno.EPIPE:
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
os.kill(os.getpid(), signal.SIGPIPE)
else:
print(f'Error: {ex}', file=sys.stderr)
except LdifError as ex:
print(f'Invalid LDIF: {ex}', file=sys.stderr)
sys.exit(ret)
if __name__ == '__main__':
main()