Source code for univention.admin.handlers.nagios.service

#!/usr/bin/python3
# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""|UDM| module for nagios services"""

from __future__ import annotations

import re
from typing import Any

import ldap
from ldap.filter import filter_format

import univention.admin.filter
import univention.admin.handlers
import univention.admin.localization
import univention.admin.syntax
import univention.admin.uexceptions
from univention.admin import configRegistry
from univention.admin.layout import Group, Tab


translation = univention.admin.localization.translation('univention.admin.handlers.nagios')
_ = translation.translate

module = 'nagios/service'
help_link = _('https://docs.software-univention.de/manual/5.0/en/monitoring/nagios.html#nagios-general')
default_containers = ['cn=nagios']

childs = False
short_description = _('Nagios service')
object_name = _('Nagios service')
object_name_plural = _('Nagios services')
long_description = ''
operations = ['search', 'edit', 'add', 'remove']


# fmt: off
options = {
    'default': univention.admin.option(
        short_description=short_description,
        default=True,
        objectClasses=['top', 'univentionNagiosServiceClass'],
    ),
}

property_descriptions = {
    'name': univention.admin.property(
        short_description=_('Name'),
        long_description=_('Service name'),
        syntax=univention.admin.syntax.string_numbers_letters_dots,
        include_in_default_search=True,
        required=True,
        may_change=False,
        identifies=True,
    ),
    'description': univention.admin.property(
        short_description=_('Description'),
        long_description=_('Service description'),
        syntax=univention.admin.syntax.string,
        include_in_default_search=True,
    ),
    'checkCommand': univention.admin.property(
        short_description=_('Plugin command'),
        long_description=_('Command name of Nagios plugin'),
        syntax=univention.admin.syntax.string,
        required=True,
    ),
    'checkArgs': univention.admin.property(
        short_description=_('Plugin command arguments'),
        long_description=_('Arguments of used Nagios plugin'),
        syntax=univention.admin.syntax.string,
    ),
    'useNRPE': univention.admin.property(
        short_description=_('Use NRPE'),
        long_description=_('Use NRPE to check remote services'),
        syntax=univention.admin.syntax.boolean,
    ),
    'assignedHosts': univention.admin.property(
        short_description=_('Assigned hosts'),
        long_description=_('Check services on these hosts'),
        syntax=univention.admin.syntax.nagiosHostsEnabledDn,
        multivalue=True,
    ),
}


layout = [
    Tab(_('General'), _('Basic settings'), layout=[
        Group(_('General Nagios service settings'), layout=[
            ["name", "description"],
            ["checkCommand", "checkArgs"],
            "useNRPE",
        ]),
    ]),
    Tab(_('Hosts'), _('Assigned hosts'), layout=[
        Group(_('Assigned hosts'), layout=[
            "assignedHosts",
        ]),
    ]),
]


mapping = univention.admin.mapping.mapping()

mapping.register('name', 'cn', None, univention.admin.mapping.ListToString)
mapping.register('description', 'description', None, univention.admin.mapping.ListToString)
mapping.register('checkCommand', 'univentionNagiosCheckCommand', None, univention.admin.mapping.ListToString)
mapping.register('checkArgs', 'univentionNagiosCheckArgs', None, univention.admin.mapping.ListToString)
mapping.register('useNRPE', 'univentionNagiosUseNRPE', None, univention.admin.mapping.ListToString)
# fmt: on


[docs] class object(univention.admin.handlers.simpleLdap): module = module
[docs] def open(self) -> None: super().open() _re = re.compile(r'^([^.]+)\.(.+?)$') # convert host FQDN to host DN hostlist: list[str] = [] hosts = self.oldattr.get('univentionNagiosHostname', []) for host in [x.decode('UTF-8') for x in hosts]: # split into relDomainName and zoneName if host and _re.match(host) is not None: (relDomainName, zoneName) = _re.match(host).groups() # find correct dNSZone entry res = self.lo.authz_connection.search(filter=filter_format('(&(objectClass=dNSZone)(zoneName=%s)(relativeDomainName=%s)(aRecord=*))', (zoneName, relDomainName))) if not res: self.log.debug('open: could not find dNSZone', host=host) else: # found dNSZone filter = '(&(objectClass=univentionHost)' for aRecord in [x.decode('ASCII') for x in res[0][1]['aRecord']]: filter += filter_format('(aRecord=%s)', [aRecord]) filter += filter_format('(cn=%s))', [relDomainName]) # find dn of host that is related to given aRecords res = self.lo.authz_connection.search(filter=filter) if res: hostlist.append(res[0][0]) self['assignedHosts'] = hostlist self.save()
def _ldap_modlist(self) -> list[tuple[str, Any, Any]]: ml = univention.admin.handlers.simpleLdap._ldap_modlist(self) # save assigned hosts if self.hasChanged('assignedHosts'): hostlist = [] for hostdn in self.info.get('assignedHosts', []): try: host = self.lo.authz_connection.get(hostdn, ['associatedDomain', 'cn'], required=True) cn: bytes = host['cn'][0] except (univention.admin.uexceptions.noObject, ldap.NO_SUCH_OBJECT): raise univention.admin.uexceptions.valueError(_('The host "%s" does not exists.') % (hostdn,), property='assignedHosts') except KeyError: raise univention.admin.uexceptions.valueError(_('The host "%s" is invalid, it has no "cn" attribute.') % (hostdn,), property='assignedHosts') domain: bytes = host.get('associatedDomain', [configRegistry.get('domainname').encode('ASCII')])[0] hostlist.append(b'%s.%s' % (cn, domain)) ml.insert(0, ('univentionNagiosHostname', self.oldattr.get('univentionNagiosHostname', []), hostlist)) return ml
lookup = object.lookup lookup_filter = object.lookup_filter identify = object.identify