# -*- coding: utf-8 -*-
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
|UDM| methods and defines for Nagios related attributes.
"""
import re
import copy
from ldap.filter import filter_format
from univention.admin.layout import Tab
import univention.admin
import univention.admin.localization
import univention.admin.syntax
from univention.admin import configRegistry
import univention.debug as ud
translation = univention.admin.localization.translation('univention.admin')
_ = translation.translate
nagios_properties = {
'nagiosContactEmail': univention.admin.property(
short_description=_('Email address of Nagios contacts'),
long_description='',
syntax=univention.admin.syntax.emailAddress,
multivalue=True,
options=['nagios'],
),
'nagiosParents': univention.admin.property(
short_description=_('Parent hosts'),
long_description='',
syntax=univention.admin.syntax.nagiosHostsEnabledDn,
multivalue=True,
options=['nagios'],
),
'nagiosServices': univention.admin.property(
short_description=_('Assigned Nagios services'),
long_description='',
syntax=univention.admin.syntax.nagiosServiceDn,
multivalue=True,
options=['nagios'],
)
}
nagios_tab_A = Tab(_('Nagios services'), _('Nagios Service Settings'), advanced=True, layout=[
"nagiosServices",
])
nagios_tab_B = Tab(_('Nagios notification'), _('Nagios Notification Settings'), advanced=True, layout=[
"nagiosContactEmail",
"nagiosParents",
])
nagios_mapping = [
['nagiosContactEmail', 'univentionNagiosEmail', None, None],
]
nagios_options = {
'nagios': univention.admin.option(
short_description=_('Nagios support'),
default=0,
editable=True,
objectClasses=['univentionNagiosHostClass'],
)
}
[docs]def addPropertiesMappingOptionsAndLayout(new_property, new_mapping, new_options, new_layout):
"""
Add Nagios properties.
"""
for key, value in nagios_properties.items():
new_property[key] = value
# append tab with Nagios options
new_layout.append(nagios_tab_A)
new_layout.append(nagios_tab_B)
# append nagios attribute mapping
for (ucskey, ldapkey, mapto, mapfrom) in nagios_mapping:
new_mapping.register(ucskey, ldapkey, mapto, mapfrom)
for key, value in nagios_options.items():
new_options[key] = value
[docs]class Support(object):
def __init__(self):
self.nagiosRemoveFromServices = False
def __getFQDN(self):
hostname = self.oldattr.get("cn", [b''])[0].decode('UTF-8')
domain = self.oldattr.get("associatedDomain", [b''])[0].decode('UTF-8')
if not domain:
domain = configRegistry.get("domainname", None)
if domain and hostname:
return hostname + "." + domain
return None
[docs] def nagiosGetAssignedServices(self):
fqdn = self.__getFQDN()
if fqdn:
return self.lo.searchDn(filter=filter_format('(&(objectClass=univentionNagiosServiceClass)(univentionNagiosHostname=%s))', [fqdn]), base=self.position.getDomain())
return []
[docs] def nagiosGetParentHosts(self):
# univentionNagiosParent
_re = re.compile(r'^([^.]+)\.(.+?)$')
parentlist = []
parents = self.oldattr.get('univentionNagiosParent', [])
for parent in [x.decode('UTF-8') for x in parents]:
if parent and _re.match(parent) is not None:
(relDomainName, zoneName) = _re.match(parent).groups()
res = self.lo.search(filter_format('(&(objectClass=dNSZone)(zoneName=%s)(relativeDomainName=%s)(aRecord=*))', (zoneName, relDomainName)))
if not res:
ud.debug(ud.ADMIN, ud.INFO, 'nagios.py: NGPH: couldn''t find dNSZone of %s' % parent)
else:
# found dNSZone
filter = '(&(objectClass=univentionHost)'
for aRecord in res[0][1]['aRecord']:
filter += filter_format('(aRecord=%s)', [aRecord.decode('ASCII')])
filter += filter_format('(cn=%s))', [relDomainName])
res = self.lo.search(filter)
if res:
parentlist.append(res[0][0])
return parentlist
[docs] def nagios_open(self):
if 'nagios' in self.options:
self['nagiosServices'] = self.nagiosGetAssignedServices()
self['nagiosParents'] = self.nagiosGetParentHosts()
[docs] def nagiosSaveParentHostList(self, ml):
if self.hasChanged('nagiosParents'):
parentlist = []
for parentdn in self.info.get('nagiosParents', []):
domain = self.lo.getAttr(parentdn, 'associatedDomain')
cn = self.lo.getAttr(parentdn, 'cn')
if not domain:
domain = [configRegistry["domainname"].encode('UTF-8')]
if cn and domain:
parentlist.append(b'.'.join((cn[0], domain[0])))
ml.insert(0, ('univentionNagiosParent', self.oldattr.get('univentionNagiosParent', []), parentlist))
[docs] def nagios_ldap_modlist(self, ml):
if 'nagios' in self.options:
if ('ip' not in self.info) or (not self.info['ip']) or (len(self.info['ip']) == 1 and self.info['ip'][0] == ''):
raise univention.admin.uexceptions.nagiosARecordRequired()
if not self.info.get('domain', None):
if ('dnsEntryZoneForward' not in self.info) or (not self.info['dnsEntryZoneForward']) or (len(self.info['dnsEntryZoneForward']) == 1 and self.info['dnsEntryZoneForward'][0] == ''):
raise univention.admin.uexceptions.nagiosDNSForwardZoneEntryRequired()
# add nagios option
if self.option_toggled('nagios') and 'nagios' in self.options:
ud.debug(ud.ADMIN, ud.INFO, 'added nagios option')
if b'univentionNagiosHostClass' not in self.oldattr.get('objectClass', []):
ml.insert(0, ('univentionNagiosEnabled', b'', b'1'))
# remove nagios option
if self.option_toggled('nagios') and 'nagios' not in self.options:
ud.debug(ud.ADMIN, ud.INFO, 'remove nagios option')
for key in ['univentionNagiosParent', 'univentionNagiosEmail', 'univentionNagiosEnabled']:
if self.oldattr.get(key, []):
ml.insert(0, (key, self.oldattr.get(key, []), b''))
# trigger deletion from services
self.nagiosRemoveFromServices = True
if 'nagios' in self.options:
self.nagiosSaveParentHostList(ml)
[docs] def nagios_ldap_pre_modify(self):
pass
[docs] def nagios_ldap_pre_create(self):
pass
def __change_fqdn(self, oldfqdn, newfqdn):
oldfqdn = oldfqdn.encode('utf-8')
newfqdn = newfqdn.encode('utf-8')
for servicedn in self.oldinfo['nagiosServices']:
oldmembers = self.lo.getAttr(servicedn, 'univentionNagiosHostname')
if oldfqdn in oldmembers:
newmembers = copy.deepcopy(oldmembers)
newmembers.remove(oldfqdn)
newmembers.append(newfqdn)
self.lo.modify(servicedn, [('univentionNagiosHostname', oldmembers, newmembers)]) # TODO: why not simply ('univentionNagiosHostname', oldfqdn, newfqdn) ?
[docs] def nagiosModifyServiceList(self):
fqdn = ''
if 'nagios' in self.old_options:
if self.hasChanged('name') and self.hasChanged('domain'):
oldfqdn = u'%s.%s' % (self.oldinfo['name'], self.oldinfo['domain'])
newfqdn = u'%s.%s' % (self['name'], self['domain'])
self.__change_fqdn(oldfqdn, newfqdn)
elif self.hasChanged('name'):
oldfqdn = u'%s.%s' % (self.oldinfo['name'], self['domain'])
newfqdn = u'%s.%s' % (self['name'], self['domain'])
self.__change_fqdn(oldfqdn, newfqdn)
elif self.hasChanged('domain'):
oldfqdn = u'%s.%s' % (self.oldinfo['name'], self.oldinfo['domain'])
newfqdn = u'%s.%s' % (self['name'], self['domain'])
self.__change_fqdn(oldfqdn, newfqdn)
fqdn = '%s.%s' % (self['name'], configRegistry.get("domainname"))
if self.has_property('domain') and self['domain']:
fqdn = '%s.%s' % (self['name'], self['domain'])
# remove host from services
if 'nagios' in self.old_options:
for servicedn in self.oldinfo.get('nagiosServices', []):
if servicedn not in self.info.get('nagiosServices', []):
oldmembers = self.lo.getAttr(servicedn, 'univentionNagiosHostname')
newmembers = [x for x in oldmembers if x.decode('UTF-8').lower() != fqdn.lower()]
self.lo.modify(servicedn, [('univentionNagiosHostname', oldmembers, newmembers)])
if 'nagios' in self.options:
# add host to new services
ud.debug(ud.ADMIN, ud.INFO, 'nagios.py: NMSL: nagios in options')
for servicedn in self.info.get('nagiosServices', []):
if not servicedn:
continue
ud.debug(ud.ADMIN, ud.INFO, 'nagios.py: NMSL: servicedn %s' % servicedn)
if 'nagios' not in self.old_options or servicedn not in self.oldinfo['nagiosServices']:
ud.debug(ud.ADMIN, ud.INFO, 'nagios.py: NMSL: add')
# option nagios was freshly enabled or service has been enabled just now
oldmembers = self.lo.getAttr(servicedn, 'univentionNagiosHostname')
newmembers = copy.deepcopy(oldmembers)
newmembers.append(fqdn.encode('UTF-8'))
ud.debug(ud.ADMIN, ud.WARN, 'nagios.py: NMSL: oldmembers: %s' % oldmembers)
ud.debug(ud.ADMIN, ud.WARN, 'nagios.py: NMSL: newmembers: %s' % newmembers)
self.lo.modify(servicedn, [('univentionNagiosHostname', oldmembers, newmembers)])
[docs] def nagiosRemoveHostFromServices(self):
self.nagiosRemoveFromServices = False
fqdn = self.__getFQDN()
if fqdn:
searchResult = self.lo.search(
filter=filter_format('(&(objectClass=univentionNagiosServiceClass)(univentionNagiosHostname=%s))', [fqdn]),
base=self.position.getDomain(), attr=['univentionNagiosHostname'])
for (dn, attrs) in searchResult:
newattrs = [x for x in attrs['univentionNagiosHostname'] if x.decode('UTF-8').lower() != fqdn.lower()]
self.lo.modify(dn, [('univentionNagiosHostname', attrs['univentionNagiosHostname'], newattrs)])
[docs] def nagiosRemoveHostFromParent(self):
self.nagiosRemoveFromParent = False
fqdn = self.__getFQDN()
if fqdn:
searchResult = self.lo.search(
filter=filter_format('(&(objectClass=univentionNagiosHostClass)(univentionNagiosParent=%s))', [fqdn]),
base=self.position.getDomain(), attr=['univentionNagiosParent'])
for (dn, attrs) in searchResult:
newattrs = [x for x in attrs['univentionNagiosParent'] if x.decode('UTF-8').lower() != fqdn.lower()]
self.lo.modify(dn, [('univentionNagiosParent', attrs['univentionNagiosParent'], newattrs)])
[docs] def nagios_ldap_post_modify(self):
if self.nagiosRemoveFromServices:
# nagios support has been disabled
self.nagiosRemoveHostFromServices()
self.nagiosRemoveHostFromParent()
else:
# modify service objects if needed
if 'nagios' in self.options:
self.nagiosModifyServiceList()
[docs] def nagios_ldap_post_create(self):
if 'nagios' in self.options:
self.nagiosModifyServiceList()
[docs] def nagios_ldap_post_remove(self):
self.nagiosRemoveHostFromServices()
self.nagiosRemoveHostFromParent()
[docs] def nagios_cleanup(self):
self.nagiosRemoveHostFromServices()
self.nagiosRemoveHostFromParent()