Source code for univention.testing.ldif

#!/usr/bin/python3
# coding: utf-8

"""
This program compares LDAP host entries with a local comparative ldif file.
All differences will be displayed at the console.
"""
from __future__ import print_function

import base64
import errno
import os
import re
import select
import signal
import subprocess
import sys
import time
import unicodedata
from optparse import SUPPRESS_HELP, OptionGroup, OptionParser, Values  # noqa: F401
from typing import Any, Dict, Iterable, Iterator, List, Set, Text, Tuple, NoReturn  # noqa: F401
try:
	from typing_extensions import Literal  # noqa: F401
except ImportError:
	pass

Entry = Dict[str, List[Text]]

USAGE = 'usage: %prog [option] <LDIF1> [[option] <LDIF2>]'
DESCRIPTION = '''
Compares the LDIF files.
LDIF can be wither a local LDIF file or
a hostname whose LDAP will be dumped using slapcat over ssh.
If LDIF2 is omitted, a local 'slapcat' is used.
'''.strip()


[docs]class LdifError(Exception): """ Error in input processing. """
[docs]class SlapError(Exception): """ Error in slapcat processing. """
[docs]class Ldif(object): """ Abstract class for LDIF source. """ # RFC2849: LDAP Data Interchange Format RE = re.compile(r''' ^ (?: ([0-9]+(?:\.[0-9]+)*) # ldap-oid |([A-Za-z][\-0-9A-Za-z]*) # AttributeType ) # AttributeDescription (;[\-0-9A-Za-z]+)* # OPTIONS : (?: $ # EMPTY |:[ ]*([+/0-9=A-Za-z]+) # BASE64-STRING |[ ]*([\x01-\x09\x0b-\x0c\x0e-\x1f\x21-\x39\x3b\x3d-\x7f][\x01-\x09\x0b-\x0c\x0e-\x7f]*) # SAFE-STRING ) # value-spec $ ''', re.VERBOSE) # Operational LDAP attributes OPERATIONAL = { "entryCSN", "modifiersName", "modifyTimestamp", "creatorsName", "entryUUID", "createTimestamp", 'structuralObjectClass', } def __init__(self, src, exclude=OPERATIONAL): # type: (Iterable[bytes], Set[str]) -> None self.src = src self.exclude = exclude self.lno = 0
[docs] def next_line(self): # type: () -> Iterator[str] """ Return line iterator. """ lines = [] for lno, chunk in enumerate(self.src, start=1): line = chunk.decode('utf-8', 'replace') line = line.rstrip('\r\n') if line[:1] in (' ', '\t'): lines.append(line[1:]) else: yield ''.join(lines) self.lno = lno lines[:] = [line] yield ''.join(lines)
[docs] def split(self, line): # type: (str) -> Tuple[str, Text] r""" Split attribute and value. Options are stripped. Base64 encoded values are decoded. :param str line: The line to split. :return: A tuple (name, value). >>> Ldif(b'').split('a:') == ('a', u'') True >>> Ldif(b'').split('a: b') == ('a', u'b') True >>> Ldif(b'').split('a:: YWFh') == ('a', u'aaa') True >>> Ldif(b'').split('a;b:c') == ('a', u'c') True >>> Ldif(b'').split('a;b;c::YWFh') == ('a', u'aaa') True >>> Ldif(b'').split('a:: ACB/') == ('a', u'\\u0000 \\u007f') True """ match = self.RE.match(line) if not match: raise LdifError('%d: %s' % (self.lno, line)) oid, attr, _opt, b64, plain = match.groups() key = attr or oid if plain: value = plain elif b64: value = base64.b64decode(b64).decode('utf-8', 'replace') value = self.printable(value) else: value = "" return (key, value)
def __iter__(self): # type: () -> Iterator[Entry] """ Return line iterator. """ obj = {} # type: Entry for line in self.next_line(): if line.startswith('#'): continue if line: key, value = self.split(line) if key in self.exclude: continue obj.setdefault(key, []).append(value) elif obj: yield obj obj = {}
[docs] @staticmethod def printable(value): # type: (Text) -> Text """ Convert binary data to printable string. """ # Py2 has no str.isprintable() return u''.join( u'\\u%04x' % (ord(c),) if c != u' ' and unicodedata.category(c)[0] in 'CZ' else c for c in value )
[docs]class LdifSource(object):
[docs] @classmethod def create(cls, arg, options): # type: (str, Values) -> LdifFile raise NotImplementedError()
[docs] def start_reading(self): # type: () -> Ldif """ Start reading the LDIF data. """ raise NotImplementedError()
[docs]class LdifFile(object): """ LDIF source from local file. """
[docs] @classmethod def create(cls, arg, options): # type: (str, Values) -> LdifFile return cls(arg)
def __init__(self, filename): # type: (str) -> None super(LdifFile, self).__init__() self.filename = filename
[docs] def start_reading(self): # type: () -> Ldif """ Start reading the LDIF data. """ try: return Ldif(open(self.filename, 'rb')) except IOError as ex: raise LdifError(ex)
[docs]class LdifSlapcat(object): """ LDIF source from local LDAP. """
[docs] @classmethod def create(cls, arg, options): # type: (Any, Values) -> LdifSlapcat return cls()
def __init__(self): # type: () -> None super(LdifSlapcat, self).__init__() self.command = ['slapcat', '-d0']
[docs] def start_reading(self): # type: () -> Ldif """ Start reading the LDIF data. """ try: proc = subprocess.Popen(self.command, stdout=subprocess.PIPE) assert proc.stdout self.wait_for_data(proc) return Ldif(proc.stdout) except OSError as ex: raise SlapError("Error executing", self.command, ex)
[docs] def wait_for_data(self, proc): # type: (subprocess.Popen) -> None """ Wait for the remote process to send data. >>> LdifSlapcat().wait_for_data(subprocess.Popen(('echo',), stdout=subprocess.PIPE)) >>> LdifSlapcat().wait_for_data(subprocess.Popen(('false',), stdout=subprocess.PIPE)) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... SlapError: ('Error executing', ['slapcat', '-d0'], 1) """ while True: rlist = [proc.stdout] wlist = [] # type: List[int] xlist = [] # type: List[int] try: rlist, wlist, xlist = select.select(rlist, wlist, xlist) break except (OSError, select.error) as ex: if ex.errno == errno.EINTR: continue else: raise time.sleep(0.5) ret = proc.poll() if ret is not None and ret != 0: raise SlapError("Error executing", self.command, ret)
[docs]class LdifSsh(LdifSlapcat): """ LDIF source from remote LDAP. """
[docs] @classmethod def create(cls, hostname, options): # type: (str, Values) -> LdifSsh return cls(hostname, options.ssh)
def __init__(self, hostname, ssh='ssh'): # type: (str, str) -> None super(LdifSsh, self).__init__() self.command = [ssh, hostname] + self.command
def __test(_option, _opt_str, _value, _parser): # type: (Values, str, None, OptionParser) -> NoReturn """ Run internal test suite. """ import doctest res = doctest.testmod() sys.exit(int(bool(res[0])))
[docs]def stream2object(ldif): # type: (Ldif) -> Dict[str, Entry] """ Convert LDIF stream to dictionary of objects. :param Ldif ldif: A LDIF stream. :return: A dictionary mapping distinguished names to a dictionary of key-values. >>> stream2object([{'dn': ['dc=test']}]) {'dc=test': {}} """ objects = {} # type: Dict[str, Entry] for obj in ldif: try: dname, = obj.pop('dn') objects[dname] = obj # type: ignore except KeyError: print('Missing dn: %r' % (obj,), file=sys.stderr) except ValueError: print('Multiple dn: %r' % (obj,), file=sys.stderr) return objects
[docs]def sort_dn(dname): # type: (str) -> Tuple[Tuple[str, ...], ...] """ Sort by reversed dn. :param str dname: distinguished name. :return: tuple of relative distinguised names. >>> sort_dn('a=1') (('a=1',),) >>> sort_dn('b=1,a=1') (('a=1',), ('b=1',)) >>> sort_dn('b=2+a=1') (('a=1', 'b=2'),) """ return tuple(reversed([tuple(sorted(_.split('+'))) for _ in dname.split(',')]))
[docs]def compare_ldif(lldif, rldif, options): # type: (Ldif, Ldif, Values) -> int """ Compare two LDIF files. :param ldif1: first LDIF to compare. :param ldif2: second LDIF to compare. :param options: command line options. """ lefts = stream2object(lldif) rights = stream2object(rldif) lkeys = sorted(lefts, key=sort_dn, reverse=True) rkeys = sorted(rights, key=sort_dn, reverse=True) ret = 0 ldn = rdn = "" while True: if not ldn and lkeys: ldn = lkeys.pop(0) if not rdn and rkeys: rdn = rkeys.pop(0) if not ldn and not rdn: break lk, rk = sort_dn(ldn), sort_dn(rdn) if lk < rk: diffs = list(compare_keys({}, rights[rdn])) print('+dn: %s' % (rdn,)) rdn = "" elif lk > rk: diffs = list(compare_keys(lefts[ldn], {})) print('-dn: %s' % (ldn,)) ldn = "" else: diffs = list(compare_keys(lefts[ldn], rights[rdn])) if not options.objects and all(diff == 0 for diff, key, val in diffs): ldn = rdn = "" continue print(' dn: %s' % (rdn,)) ldn = rdn = "" for diff, key, val in diffs: if options.attributes or diff: print('%s%s: %s' % (' +-'[diff], key, val)) print() ret = 1 return ret
[docs]def compare_keys(ldata, rdata): # type: (Entry, Entry) -> Iterator[Tuple[Literal[-1, 0, 1], str, Text]] """ Compare and return attributes of two LDAP objects. :param dict ldata: the first LDAP object. :param dict rdata: the second LDAP object. :return: an iterator of differences as 3-tuples (comparison, key, value). >>> list(compare_keys({}, {})) [] >>> list(compare_keys({'a': ['1']}, {})) [(-1, 'a', '1')] >>> list(compare_keys({}, {'a': ['1']})) [(1, 'a', '1')] >>> list(compare_keys({'a': ['1']}, {'a': ['1']})) [(0, 'a', '1')] >>> list(compare_keys({'a': ['1']}, {'a': ['2']})) [(1, 'a', '2'), (-1, 'a', '1')] """ lkeys = sorted(ldata, reverse=True) rkeys = sorted(rdata, reverse=True) lkey = rkey = "" while True: if not lkey and lkeys: lkey = lkeys.pop(0) if not rkey and rkeys: rkey = rkeys.pop(0) if not lkey and not rkey: break if lkey < rkey: for diff in compare_values(rkey, [], rdata[rkey]): yield diff rkey = "" elif lkey > rkey: for diff in compare_values(lkey, ldata[lkey], []): yield diff lkey = "" else: for diff in compare_values(lkey, ldata[lkey], rdata[rkey]): yield diff lkey = rkey = ""
[docs]def compare_values(attr, lvalues, rvalues): # type: (str, List[Text], List[Text]) -> Iterator[Tuple[Literal[-1, 0, 1], str, Text]] """ Compare and return values of two multi-valued LDAP attributes. :param list lvalues: the first values. :param list rvalues: the second values. :return: an iterator of differences as 3-tuples (comparison, key, value), where comparison<0 if key is missing in lvalues, comparison>0 if key is missing in rvalues, otherwise 0. >>> list(compare_values('attr', [], [])) [] >>> list(compare_values('attr', ['1', '2'], ['2', '3'])) [(1, 'attr', '3'), (0, 'attr', '2'), (-1, 'attr', '1')] """ lvalues.sort(reverse=True) rvalues.sort(reverse=True) lval = rval = "" while True: if not lval and lvalues: lval = lvalues.pop(0) if not rval and rvalues: rval = rvalues.pop(0) if not lval and not rval: break if lval < rval: yield (1, attr, rval) rval = "" elif lval > rval: yield (-1, attr, lval) lval = "" else: yield (0, attr, lval) lval = rval = ""
[docs]def parse_args(): # type: () -> Tuple[LdifSource, LdifSource, Values] """ Parse command line arguments. """ parser = OptionParser(usage=USAGE, description=DESCRIPTION) parser.disable_interspersed_args() parser.set_defaults(source=LdifFile, verbose=1) group = OptionGroup(parser, "Source", "Source for LDIF") group.add_option( "--file", "-f", action="store_const", dest="source", const=LdifFile, help="next arguments are LDIF files") group.add_option( "--host", "-H", action="store_const", dest="source", const=LdifSsh, help="next arguments are LDAP hosts") group.add_option( "--ssh", "-s", default="ssh", dest="ssh", help="specify the remote shell to use [%default]") parser.add_option_group(group) group = OptionGroup(parser, "Attributes", "Ignore attributes") group.add_option( "--operational", action="store_true", dest="operational", help="also compare operational attributes") group.add_option( "--exclude", "-x", action="append", dest="exclude", help="ignore attribute", default=[]) parser.add_option_group(group) group = OptionGroup(parser, "Output", "Control output") group.add_option( "--objects", "-o", action="store_true", dest="objects", help="show even unchanged objects") group.add_option( "--attributes", "-a", action="store_true", dest="attributes", help="show even unchanged attributes") parser.add_option_group(group) parser.add_option( '--test-internal', action='callback', callback=__test, help=SUPPRESS_HELP) try: options, args = parser.parse_args(args=sys.argv[1:]) try: ldif1 = options.source.create(args.pop(0), options) except IndexError: parser.error("No arguments were given") options, args = parser.parse_args(args=args, values=options) ldif2 = options.source.create(args.pop(0), options) if args else LdifSlapcat.create(None, options) if args: parser.error("More than two LDIFs given.") except LdifError as ex: parser.error("Failed to parse LDIF: %s" % (ex,)) return ldif1, ldif2, options
[docs]def main(): # type: () -> None """ A main()-method with options. """ src1, src2, options = parse_args() try: ldif1, ldif2 = (src.start_reading() for src in (src1, src2)) except (LdifError, SlapError) as ex: sys.exit("Failed to setup source: %s" % ex) exclude = set(options.exclude) if not options.operational: exclude |= Ldif.OPERATIONAL ldif1.exclude = ldif2.exclude = exclude run_compare(ldif1, ldif2, options)
[docs]def run_compare(ldif1, ldif2, options): # type: (Ldif, Ldif, Values) -> NoReturn """ UNIX correct error handling. Termination by signal is propagaed as signal. :param ldif1: first LDIF to compare. :param ldif2: second LDIF to compare. :param options: command line options. """ ret = 2 try: ret = compare_ldif(ldif1, ldif2, options) except KeyboardInterrupt: signal.signal(signal.SIGINT, signal.SIG_DFL) os.kill(os.getpid(), signal.SIGINT) except EnvironmentError as ex: if ex.errno == errno.EPIPE: signal.signal(signal.SIGPIPE, signal.SIG_DFL) os.kill(os.getpid(), signal.SIGPIPE) else: print('Error: %s' % (ex,), file=sys.stderr) except LdifError as ex: print('Invalid LDIF: %s' % (ex,), file=sys.stderr) sys.exit(ret)
if __name__ == '__main__': main()