6.4. Listener tasks and examples#

All changes trigger a call to the function handler(). For simplicity and readability it is advisable to delegate the different change types to different sub-functions.

6.4.1. Listener API example#

The following boilerplate code uses the newer listener API.

Source code: UCS source: management/univention-directory-listener/examples/listener_module_template.py

# SPDX-FileCopyrightText: 2017-2024 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

from typing import Dict, Optional, List

from univention.listener import ListenerModuleHandler


class ListenerModuleTemplate(ListenerModuleHandler):

    class Configuration(object):
        name = 'unique_name'
        description = 'listener module description'
        ldap_filter = '(&(objectClass=inetOrgPerson)(uid=example))'
        attributes = ['sn', 'givenName']

    def create(self, dn: str, new: Dict[str, List[bytes]]) -> None:
        self.logger.debug('dn: %r', dn)

    def modify(
        self,
        dn: str,
        old: Dict[str, List[bytes]],
        new: Dict[str, List[bytes]],
        old_dn: Optional[str],
    ) -> None:
        self.logger.debug('dn: %r', dn)
        if old_dn:
            self.logger.debug('it is (also) a move! old_dn: %r', old_dn)
        self.logger.debug('changed attributes: %r', self.diff(old, new))

    def remove(self, dn: str, old: Dict[str, List[bytes]]) -> None:
        self.logger.debug('dn: %r', dn)

6.4.2. Basic example#

The following boilerplate code delegates each change type to a separate function. It does not handle renames and moves explicitly, but only as the removal of the object at the old dn and the following addition at the new dn.

Source code: UCS source: doc/developer-reference/listener/simple.py

from typing import Dict, List


def handler(
    dn: str,
    new: Dict[str, List[bytes]],
    old: Dict[str, List[bytes]],
) -> None:
    if new and not old:
        handler_add(dn, new)
    elif new and old:
        handler_modify(dn, old, new)
    elif not new and old:
        handler_remove(dn, old)
    else:
        pass  # ignore


def handler_add(dn: str, new: Dict[str, List[bytes]]) -> None:
    """Handle addition of object."""
    pass  # replace this


def handler_modify(
    dn: str,
    old: Dict[str, List[bytes]],
    new: Dict[str, List[bytes]],
) -> None:
    """Handle modification of object."""
    pass  # replace this


def handler_remove(dn: str, old: Dict[str, List[bytes]]) -> None:
    """Handle removal of object."""
    pass  # replace this

6.4.3. Rename and move#

In case rename and move actions should be handled separately, the following code may be used:

Source code: UCS source: doc/developer-reference/listener/modrdn.py

from typing import Dict, List

modrdn = "1"

_delay = None


def handler(
    dn: str,
    new: Dict[str, List[bytes]],
    old: Dict[str, List[bytes]],
    command: str = "",
) -> None:
    global _delay
    if _delay:
        old_dn, old = _delay
        _delay = None
        if "a" == command and old['entryUUID'] == new['entryUUID']:
            handler_move(old_dn, old, dn, new)
            return
        handler_remove(old_dn, old)

    if "n" == command and "cn=Subschema" == dn:
        handler_schema(old, new)
    elif new and not old:
        handler_add(dn, new)
    elif new and old:
        handler_modify(dn, old, new)
    elif not new and old:
        if "r" == command:
            _delay = (dn, old)
        else:
            handler_remove(dn, old)
    else:
        pass  # ignore, reserved for future use


def handler_add(dn: str, new: Dict[str, List[bytes]]) -> None:
    """Handle creation of object."""
    pass  # replace this


def handler_modify(
    dn: str,
    old: Dict[str, List[bytes]],
    new: Dict[str, List[bytes]],
) -> None:
    """Handle modification of object."""
    pass  # replace this


def handler_remove(dn: str, old: Dict[str, List[bytes]]) -> None:
    """Handle removal of object."""
    pass  # replace this


def handler_move(
    old_dn: str,
    old: Dict[str, List[bytes]],
    new_dn: str,
    new: Dict[str, List[bytes]],
) -> None:
    """Handle rename or move of object."""
    pass  # replace this


def handler_schema(
    old: Dict[str, List[bytes]],
    new: Dict[str, List[bytes]],
) -> None:
    """Handle change in LDAP schema."""
    pass  # replace this

Warning

Please be aware that tracking the two subsequent calls for modrdn in memory might cause duplicates, in case the Univention Directory Listener is terminated while such an operation is performed. If this is critical, the state should be stored persistently into a temporary file.

6.4.4. Full example with packaging#

The following example shows a listener module, which logs all changes to users into the file /root/UserList.txt.

Source code: UCS source: doc/developer-reference/listener/printusers/

"""
Example for a listener module, which logs changes to users.
"""

import errno
import os
from collections import namedtuple
from typing import Dict, List

import univention.debug as ud
from listener import SetUID

name = 'printusers'
description = 'print all names/users/uidNumbers into a file'
filter = ''.join("""\
(&
    (|
        (&
            (objectClass=posixAccount)
            (objectClass=shadowAccount)
        )
        (objectClass=univentionMail)
        (objectClass=sambaSamAccount)
        (objectClass=simpleSecurityObject)
        (objectClass=inetOrgPerson)
    )
    (!(objectClass=univentionHost))
    (!(uidNumber=0))
    (!(uid=*$))
)""".split())
attributes = ['uid', 'uidNumber', 'cn']
_Rec = namedtuple('_Rec', 'uid uidNumber cn')

USER_LIST = '/root/UserList.txt'


def handler(dn: str, new: Dict[str, List[bytes]], old: Dict[str, List[bytes]]) -> None:
    """
    Write all changes into a text file.
    This function is called on each change.
    """
    if new and old:
        _handle_change(dn, new, old)
    elif new and not old:
        _handle_add(dn, new)
    elif old and not new:
        _handle_remove(dn, old)


def _handle_change(dn: str, new: Dict[str, List[bytes]], old: Dict[str, List[bytes]]) -> None:
    """
    Called when an object is modified.
    """
    o_rec = _rec(old)
    n_rec = _rec(new)
    ud.debug(ud.LISTENER, ud.INFO, 'Edited user "%s"' % (o_rec.uid,))
    _writeit(o_rec, u'edited. Is now:')
    _writeit(n_rec, u'')


def _handle_add(dn: str, new: Dict[str, List[bytes]]) -> None:
    """
    Called when an object is newly created.
    """
    n_rec = _rec(new)
    ud.debug(ud.LISTENER, ud.INFO, 'Added user "%s"' % (n_rec.uid,))
    _writeit(n_rec, u'added')


def _handle_remove(dn: str, old: Dict[str, List[bytes]]) -> None:
    """
    Called when an previously existing object is removed.
    """
    o_rec = _rec(old)
    ud.debug(ud.LISTENER, ud.INFO, 'Removed user "%s"' % (o_rec.uid,))
    _writeit(o_rec, u'removed')


def _rec(data):
    # type: (Dict[str, List[str]]) -> _Rec
    """
    Retrieve symbolic, numeric ID and name from user data.
    """
    return _Rec(*(data.get(attr, (None,))[0] for attr in attributes))


def _writeit(rec, comment):
    # type: (_Rec, str) -> None
    """
    Append CommonName, symbolic and numeric User-IDentifier, and comment to file.
    """
    nuid = u'*****' if rec.uid in ('root', 'spam') else rec.uidNumber
    indent = '\t' if comment is None else ''
    try:
        with SetUID():
            with open(USER_LIST, 'a') as out:
                print(u'%sName: "%s"' % (indent, rec.cn), file=out)
                print(u'%sUser: "%s"' % (indent, rec.uid), file=out)
                print(u'%sUID: "%s"' % (indent, nuid), file=out)
                if comment:
                    print(u'%s%s' % (indent, comment,), file=out)
    except IOError as ex:
        ud.debug(
            ud.LISTENER, ud.ERROR,
            'Failed to write "%s": %s' % (USER_LIST, ex))


def initialize():
    # type: () -> None
    """
    Remove the log file.
    This function is called when the module is forcefully reset.
    """
    try:
        with SetUID():
            os.remove(USER_LIST)
        ud.debug(
            ud.LISTENER, ud.INFO,
            'Successfully deleted "%s"' % (USER_LIST,))
    except OSError as ex:
        if errno.ENOENT == ex.errno:
            ud.debug(
                ud.LISTENER, ud.INFO,
                'File "%s" does not exist, will be created' % (USER_LIST,))
        else:
            ud.debug(
                ud.LISTENER, ud.WARN,
                'Failed to delete file "%s": %s' % (USER_LIST, ex))

Some comments on the code:

  • The LDAP filter is specifically chosen to only match user objects, but not computer objects, which have a uid characteristically terminated by a $-sign.

  • The attribute filter further restricts the module to only trigger on changes to the numeric and symbolic user identifier and the last name of the user.

  • To test this run a command like tail -f /root/UserList.txt &. Then create a new user or modify the lastname of an existing one to trigger the module.

For packaging the following files are required:

debian/printusers.install

The module should be installed into the directory /usr/lib/univention-directory-listener/system/.

printusers.py usr/lib/univention-directory-listener/system/
debian/printusers.postinst

The Univention Directory Listener must be restarted after package installation and removal:

#!/bin/sh
set -e

case "$1" in
configure)
    deb-systemd-invoke restart univention-directory-listener
    ;;
abort-upgrade|abort-remove|abort-deconfigure)
    ;;
*)
    echo "postinst called with unknown argument \`$1'" >&2
    exit 1
    ;;
esac

#DEBHELPER#

exit 0

debian/printusers.postrm

#!/bin/sh
set -e

case "$1" in
remove)
    deb-systemd-invoke restart univention-directory-listener
    ;;
purge|upgrade|failed-upgrade|abort-install|abort-upgrade|disappear)
    ;;
*)
    echo "postrm called with unknown argument \`$1'" >&2
    exit 1
    ;;
esac

#DEBHELPER#

exit 0

6.4.5. A little bit more object oriented#

For larger modules it might be preferable to use a more object oriented design like the following example, which logs referential integrity violations into a file.

Source code: UCS source: doc/developer-reference/listener/obj.py

import os
from pwd import getpwnam
from typing import Dict, List, Optional, Tuple

import ldap
import univention.debug as ud
from listener import SetUID

name = "refcheck"
description = "Check referential integrity of uniqueMember relations"
filter = "(uniqueMember=*)"
attribute = ["uniqueMember"]
modrdn = "1"


class LocalLdap(object):
    PORT = 7636

    def __init__(self) -> None:
        self.data: Dict[str, str] = {}
        self.con: Optional[ldap.ldapobject.LDAPObject] = None

    def setdata(self, key: str, value: str):
        self.data[key] = value

    def prerun(self) -> None:
        try:
            self.con = ldap.initialize('ldaps://%s:%d' % (self.data["ldapserver"], self.PORT))
            self.con.simple_bind_s(self.data["binddn"], self.data["bindpw"])
        except ldap.LDAPError as ex:
            ud.debug(ud.LISTENER, ud.ERROR, str(ex))

    def postrun(self) -> None:
        if not self.con:
            return
        try:
            self.con.unbind()
            self.con = None
        except ldap.LDAPError as ex:
            ud.debug(ud.LISTENER, ud.ERROR, str(ex))


class LocalFile(object):
    USER = "listener"
    LOG = "/var/log/univention/refcheck.log"

    def initialize(self) -> None:
        try:
            ent = getpwnam(self.USER)
            with SetUID():
                with open(self.LOG, "w"):
                    pass
                os.chown(self.LOG, ent.pw_uid, -1)
        except OSError as ex:
            ud.debug(ud.LISTENER, ud.ERROR, str(ex))

    def log(self, msg) -> None:
        with open(self.LOG, 'a') as log:
            print(msg, file=log)

    def clean(self) -> None:
        try:
            with SetUID():
                os.remove(self.LOG)
        except OSError as ex:
            ud.debug(ud.LISTENER, ud.ERROR, str(ex))


class ReferentialIntegrityCheck(LocalLdap, LocalFile):
    MESSAGES = {
        (False, False): "Still invalid: ",
        (False, True): "Now valid: ",
        (True, False): "Now invalid: ",
        (True, True): "Still valid: ",
    }

    def __init__(self) -> None:
        super(ReferentialIntegrityCheck, self).__init__()
        self._delay: Optional[Tuple[str, Dict[str, List[bytes]]]] = None

    def handler(
        self,
        dn: str,
        new: Dict[str, List[bytes]],
        old: Dict[str, List[bytes]],
        command: str = '',
    ) -> None:
        if self._delay:
            old_dn, old = self._delay
            self._delay = None
            if "a" == command and old['entryUUID'] == new['entryUUID']:
                self.handler_move(old_dn, old, dn, new)
                return
            self.handler_remove(old_dn, old)

        if "n" == command and "cn=Subschema" == dn:
            self.handler_schema(old, new)
        elif new and not old:
            self.handler_add(dn, new)
        elif new and old:
            self.handler_modify(dn, old, new)
        elif not new and old:
            if "r" == command:
                self._delay = (dn, old)
            else:
                self.handler_remove(dn, old)
        else:
            pass  # ignore, reserved for future use

    def handler_add(self, dn: str, new: Dict[str, List[bytes]]) -> None:
        if not self._validate(new):
            self.log("New invalid object: " + dn)

    def handler_modify(
        self,
        dn: str,
        old: Dict[str, List[bytes]],
        new: Dict[str, List[bytes]],
    ) -> None:
        valid = (self._validate(old), self._validate(new))
        msg = self.MESSAGES[valid]
        self.log(msg + dn)

    def handler_remove(self, dn: str, old: Dict[str, List[bytes]]) -> None:
        if not self._validate(old):
            self.log("Removed invalid: " + dn)

    def handler_move(
        self,
        old_dn: str,
        old: Dict[str, List[bytes]],
        new_dn: str,
        new: Dict[str, List[bytes]],
    ) -> None:
        valid = (self._validate(old), self._validate(new))
        msg = self.MESSAGES[valid]
        self.log("%s %s -> %s" % (msg, old_dn, new_dn))

    def handler_schema(
        self,
        old: Dict[str, List[bytes]],
        new: Dict[str, List[bytes]],
    ) -> None:
        self.log("Schema change")

    def _validate(self, data: Dict[str, List[bytes]]) -> bool:
        assert self.con
        try:
            for dn in data["uniqueMember"]:
                self.con.search_ext_s(dn, ldap.SCOPE_BASE, attrlist=[], attrsonly=1)
            return True
        except ldap.NO_SUCH_OBJECT:
            return False
        except ldap.LDAPError as ex:
            ud.debug(ud.LISTENER, ud.ERROR, str(ex))
            return False


_instance = ReferentialIntegrityCheck()
initialize = _instance.initialize
handler = _instance.handler
clean = _instance.clean
prerun = _instance.prerun
postrun = _instance.postrun
setdata = _instance.setdata