#!/usr/bin/python3
# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""|UDM| module for nagios services"""
from __future__ import annotations
import re
from typing import Any
import ldap
from ldap.filter import filter_format
import univention.admin.filter
import univention.admin.handlers
import univention.admin.localization
import univention.admin.syntax
import univention.admin.uexceptions
from univention.admin import configRegistry
from univention.admin.layout import Group, Tab
translation = univention.admin.localization.translation('univention.admin.handlers.nagios')
_ = translation.translate
module = 'nagios/service'
help_link = _('https://docs.software-univention.de/manual/5.0/en/monitoring/nagios.html#nagios-general')
default_containers = ['cn=nagios']
childs = False
short_description = _('Nagios service')
object_name = _('Nagios service')
object_name_plural = _('Nagios services')
long_description = ''
operations = ['search', 'edit', 'add', 'remove']
# fmt: off
options = {
'default': univention.admin.option(
short_description=short_description,
default=True,
objectClasses=['top', 'univentionNagiosServiceClass'],
),
}
property_descriptions = {
'name': univention.admin.property(
short_description=_('Name'),
long_description=_('Service name'),
syntax=univention.admin.syntax.string_numbers_letters_dots,
include_in_default_search=True,
required=True,
may_change=False,
identifies=True,
),
'description': univention.admin.property(
short_description=_('Description'),
long_description=_('Service description'),
syntax=univention.admin.syntax.string,
include_in_default_search=True,
),
'checkCommand': univention.admin.property(
short_description=_('Plugin command'),
long_description=_('Command name of Nagios plugin'),
syntax=univention.admin.syntax.string,
required=True,
),
'checkArgs': univention.admin.property(
short_description=_('Plugin command arguments'),
long_description=_('Arguments of used Nagios plugin'),
syntax=univention.admin.syntax.string,
),
'useNRPE': univention.admin.property(
short_description=_('Use NRPE'),
long_description=_('Use NRPE to check remote services'),
syntax=univention.admin.syntax.boolean,
),
'assignedHosts': univention.admin.property(
short_description=_('Assigned hosts'),
long_description=_('Check services on these hosts'),
syntax=univention.admin.syntax.nagiosHostsEnabledDn,
multivalue=True,
),
}
layout = [
Tab(_('General'), _('Basic settings'), layout=[
Group(_('General Nagios service settings'), layout=[
["name", "description"],
["checkCommand", "checkArgs"],
"useNRPE",
]),
]),
Tab(_('Hosts'), _('Assigned hosts'), layout=[
Group(_('Assigned hosts'), layout=[
"assignedHosts",
]),
]),
]
mapping = univention.admin.mapping.mapping()
mapping.register('name', 'cn', None, univention.admin.mapping.ListToString)
mapping.register('description', 'description', None, univention.admin.mapping.ListToString)
mapping.register('checkCommand', 'univentionNagiosCheckCommand', None, univention.admin.mapping.ListToString)
mapping.register('checkArgs', 'univentionNagiosCheckArgs', None, univention.admin.mapping.ListToString)
mapping.register('useNRPE', 'univentionNagiosUseNRPE', None, univention.admin.mapping.ListToString)
# fmt: on
[docs]
class object(univention.admin.handlers.simpleLdap):
module = module
[docs]
def open(self) -> None:
super().open()
_re = re.compile(r'^([^.]+)\.(.+?)$')
# convert host FQDN to host DN
hostlist: list[str] = []
hosts = self.oldattr.get('univentionNagiosHostname', [])
for host in [x.decode('UTF-8') for x in hosts]:
# split into relDomainName and zoneName
if host and _re.match(host) is not None:
(relDomainName, zoneName) = _re.match(host).groups()
# find correct dNSZone entry
res = self.lo.authz_connection.search(filter=filter_format('(&(objectClass=dNSZone)(zoneName=%s)(relativeDomainName=%s)(aRecord=*))', (zoneName, relDomainName)))
if not res:
self.log.debug('open: could not find dNSZone', host=host)
else:
# found dNSZone
filter = '(&(objectClass=univentionHost)'
for aRecord in [x.decode('ASCII') for x in res[0][1]['aRecord']]:
filter += filter_format('(aRecord=%s)', [aRecord])
filter += filter_format('(cn=%s))', [relDomainName])
# find dn of host that is related to given aRecords
res = self.lo.authz_connection.search(filter=filter)
if res:
hostlist.append(res[0][0])
self['assignedHosts'] = hostlist
self.save()
def _ldap_modlist(self) -> list[tuple[str, Any, Any]]:
ml = univention.admin.handlers.simpleLdap._ldap_modlist(self)
# save assigned hosts
if self.hasChanged('assignedHosts'):
hostlist = []
for hostdn in self.info.get('assignedHosts', []):
try:
host = self.lo.authz_connection.get(hostdn, ['associatedDomain', 'cn'], required=True)
cn: bytes = host['cn'][0]
except (univention.admin.uexceptions.noObject, ldap.NO_SUCH_OBJECT):
raise univention.admin.uexceptions.valueError(_('The host "%s" does not exists.') % (hostdn,), property='assignedHosts')
except KeyError:
raise univention.admin.uexceptions.valueError(_('The host "%s" is invalid, it has no "cn" attribute.') % (hostdn,), property='assignedHosts')
domain: bytes = host.get('associatedDomain', [configRegistry.get('domainname').encode('ASCII')])[0]
hostlist.append(b'%s.%s' % (cn, domain))
ml.insert(0, ('univentionNagiosHostname', self.oldattr.get('univentionNagiosHostname', []), hostlist))
return ml
lookup = object.lookup
lookup_filter = object.lookup_filter
identify = object.identify