Source code for univention.admin.nagios

# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""|UDM| methods and defines for Nagios related attributes."""

import copy

from ldap.filter import filter_format

import univention.admin
import univention.admin.localization
import univention.admin.syntax
from univention.admin import configRegistry
from univention.admin.layout import Tab


translation = univention.admin.localization.translation('univention.admin')
_ = translation.translate


nagios_properties = {
    'nagiosServices': univention.admin.property(
        short_description=_('Assigned Nagios services'),
        long_description='',
        syntax=univention.admin.syntax.nagiosServiceDn,
        multivalue=True,
        options=['nagios'],
    ),
}


nagios_tab_A = Tab(_('Nagios services'), _('Nagios Service Settings'), advanced=True, layout=[
    "nagiosServices",
])  # fmt: skip


nagios_options = {
    'nagios': univention.admin.option(
        short_description=_('Nagios support'),
        default=0,
        editable=True,
        objectClasses=['univentionNagiosHostClass'],
    ),
}


[docs] def addPropertiesMappingOptionsAndLayout(new_property, new_mapping, new_options, new_layout): """Add Nagios properties.""" # FIXME: property_descriptions is not changed atomically during module initialization for key, value in nagios_properties.items(): new_property[key] = value # append tab with Nagios options new_layout.append(nagios_tab_A) for key, value in nagios_options.items(): new_options[key] = value
[docs] class Support: """Mixin base class indicating Nagios support for computers/* UDM modules""" def __init__(self): self.nagiosRemoveFromServices = False def __getFQDN(self): hostname = self.oldattr.get('cn', [b''])[0].decode('UTF-8') domain = self.oldattr.get('associatedDomain', [b''])[0].decode('UTF-8') if not domain: domain = configRegistry.get('domainname', None) if domain and hostname: return hostname + '.' + domain return None
[docs] def nagiosGetAssignedServices(self): fqdn = self.__getFQDN() if fqdn: return self.lo.searchDn(filter=filter_format('(&(objectClass=univentionNagiosServiceClass)(univentionNagiosHostname=%s))', [fqdn]), base=self.position.getDomain()) return []
[docs] def nagios_open(self): if 'nagios' in self.options: self['nagiosServices'] = self.nagiosGetAssignedServices()
[docs] def nagios_ldap_modlist(self, ml): if 'nagios' in self.options: if ('ip' not in self.info) or (not self.info['ip']) or (len(self.info['ip']) == 1 and self.info['ip'][0] == ''): raise univention.admin.uexceptions.nagiosARecordRequired() if not self.info.get('domain', None): if ( ('dnsEntryZoneForward' not in self.info) or (not self.info['dnsEntryZoneForward']) or (len(self.info['dnsEntryZoneForward']) == 1 and self.info['dnsEntryZoneForward'][0] == '') ): raise univention.admin.uexceptions.nagiosDNSForwardZoneEntryRequired() # add nagios option if self.option_toggled('nagios') and 'nagios' in self.options: self.log.debug('added nagios option') if b'univentionNagiosHostClass' not in self.oldattr.get('objectClass', []): ml.insert(0, ('univentionNagiosEnabled', b'', b'1')) # remove nagios option if self.option_toggled('nagios') and 'nagios' not in self.options: self.log.debug('remove nagios option') for key in ['univentionNagiosParent', 'univentionNagiosEmail', 'univentionNagiosEnabled']: if self.oldattr.get(key, []): ml.insert(0, (key, self.oldattr.get(key, []), b'')) # trigger deletion from services self.nagiosRemoveFromServices = True
[docs] def nagios_ldap_pre_modify(self): pass
[docs] def nagios_ldap_pre_create(self): pass
def __change_fqdn(self, oldfqdn, newfqdn): oldfqdn = oldfqdn.encode('utf-8') newfqdn = newfqdn.encode('utf-8') for servicedn in self.oldinfo['nagiosServices']: oldmembers = self.lo.authz_connection.getAttr(servicedn, 'univentionNagiosHostname') if oldfqdn in oldmembers: newmembers = copy.deepcopy(oldmembers) newmembers.remove(oldfqdn) newmembers.append(newfqdn) self.lo.authz_connection.modify( servicedn, [('univentionNagiosHostname', oldmembers, newmembers)], ) # TODO: why not simply ('univentionNagiosHostname', oldfqdn, newfqdn) ?
[docs] def nagiosModifyServiceList(self): fqdn = '' if 'nagios' in self.old_options: if self.hasChanged('name') and self.hasChanged('domain'): oldfqdn = '%s.%s' % (self.oldinfo['name'], self.oldinfo['domain']) newfqdn = '%s.%s' % (self['name'], self['domain']) self.__change_fqdn(oldfqdn, newfqdn) elif self.hasChanged('name'): oldfqdn = '%s.%s' % (self.oldinfo['name'], self['domain']) newfqdn = '%s.%s' % (self['name'], self['domain']) self.__change_fqdn(oldfqdn, newfqdn) elif self.hasChanged('domain'): oldfqdn = '%s.%s' % (self.oldinfo['name'], self.oldinfo['domain']) newfqdn = '%s.%s' % (self['name'], self['domain']) self.__change_fqdn(oldfqdn, newfqdn) fqdn = '%s.%s' % (self['name'], configRegistry.get('domainname')) if self.has_property('domain') and self['domain']: fqdn = '%s.%s' % (self['name'], self['domain']) # remove host from services if 'nagios' in self.old_options: for servicedn in self.oldinfo.get('nagiosServices', []): if servicedn not in self.info.get('nagiosServices', []): oldmembers = self.lo.authz_connection.getAttr(servicedn, 'univentionNagiosHostname') newmembers = [x for x in oldmembers if x.decode('UTF-8').lower() != fqdn.lower()] self.lo.authz_connection.modify(servicedn, [('univentionNagiosHostname', oldmembers, newmembers)]) if 'nagios' in self.options: # add host to new services self.log.debug('NMSL: nagios in options') for servicedn in self.info.get('nagiosServices', []): if not servicedn: continue self.log.trace('NMSL: servicedn', service=servicedn) if 'nagios' not in self.old_options or servicedn not in self.oldinfo['nagiosServices']: self.log.trace('NMSL: add') # option nagios was freshly enabled or service has been enabled just now oldmembers = self.lo.authz_connection.getAttr(servicedn, 'univentionNagiosHostname') newmembers = copy.deepcopy(oldmembers) newmembers.append(fqdn.encode('UTF-8')) self.log.trace('NMSL: members', old=oldmembers, new=newmembers) self.lo.authz_connection.modify(servicedn, [('univentionNagiosHostname', oldmembers, newmembers)])
[docs] def nagiosRemoveHostFromServices(self): self.nagiosRemoveFromServices = False fqdn = self.__getFQDN() if fqdn: searchResult = self.lo.search( filter=filter_format('(&(objectClass=univentionNagiosServiceClass)(univentionNagiosHostname=%s))', [fqdn]), base=self.position.getDomain(), attr=['univentionNagiosHostname'], ) for dn, attrs in searchResult: newattrs = [x for x in attrs['univentionNagiosHostname'] if x.decode('UTF-8').lower() != fqdn.lower()] self.lo.authz_connection.modify(dn, [('univentionNagiosHostname', attrs['univentionNagiosHostname'], newattrs)])
[docs] def nagiosRemoveHostFromParent(self): self.nagiosRemoveFromParent = False fqdn = self.__getFQDN() if fqdn: searchResult = self.lo.search( filter=filter_format('(&(objectClass=univentionNagiosHostClass)(univentionNagiosParent=%s))', [fqdn]), base=self.position.getDomain(), attr=['univentionNagiosParent'], ) for dn, attrs in searchResult: newattrs = [x for x in attrs['univentionNagiosParent'] if x.decode('UTF-8').lower() != fqdn.lower()] self.lo.authz_connection.modify(dn, [('univentionNagiosParent', attrs['univentionNagiosParent'], newattrs)])
[docs] def nagios_ldap_post_modify(self): if self.nagiosRemoveFromServices: # nagios support has been disabled self.nagiosRemoveHostFromServices() self.nagiosRemoveHostFromParent() else: # modify service objects if needed if 'nagios' in self.options: self.nagiosModifyServiceList()
[docs] def nagios_ldap_post_create(self): if 'nagios' in self.options: self.nagiosModifyServiceList()
[docs] def nagios_ldap_post_remove(self): if 'nagios' not in self.options: return self.nagiosRemoveHostFromServices() self.nagiosRemoveHostFromParent()
[docs] def nagios_cleanup(self): if 'nagios' not in self.options: return self.nagiosRemoveHostFromServices() self.nagiosRemoveHostFromParent()