#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
|UDM| module for nagios services
"""
import re
import ldap
from ldap.filter import filter_format
from univention.admin.layout import Tab, Group
import univention.admin.filter
import univention.admin.handlers
import univention.admin.syntax
import univention.admin.localization
import univention.admin.uexceptions
from univention.admin import configRegistry
import univention.debug as ud
from typing import List # noqa: F401
translation = univention.admin.localization.translation('univention.admin.handlers.nagios')
_ = translation.translate
module = 'nagios/service'
default_containers = ['cn=nagios']
childs = False
short_description = _('Nagios service')
object_name = _('Nagios service')
object_name_plural = _('Nagios services')
long_description = ''
operations = ['search', 'edit', 'add', 'remove']
[docs]class NagiosTimePeriod(univention.admin.syntax.UDM_Attribute):
udm_module = 'nagios/timeperiod'
attribute = 'name'
options = {
'default': univention.admin.option(
short_description=short_description,
default=True,
objectClasses=['top', 'univentionNagiosServiceClass'],
),
}
property_descriptions = {
'name': univention.admin.property(
short_description=_('Name'),
long_description=_('Service name'),
syntax=univention.admin.syntax.string_numbers_letters_dots,
include_in_default_search=True,
required=True,
may_change=False,
identifies=True
),
'description': univention.admin.property(
short_description=_('Description'),
long_description=_('Service description'),
syntax=univention.admin.syntax.string,
include_in_default_search=True,
),
'checkCommand': univention.admin.property(
short_description=_('Plugin command'),
long_description=_('Command name of Nagios plugin'),
syntax=univention.admin.syntax.string,
required=True,
),
'checkArgs': univention.admin.property(
short_description=_('Plugin command arguments'),
long_description=_('Arguments of used Nagios plugin'),
syntax=univention.admin.syntax.string,
),
'useNRPE': univention.admin.property(
short_description=_('Use NRPE'),
long_description=_('Use NRPE to check remote services'),
syntax=univention.admin.syntax.boolean,
),
'checkPeriod': univention.admin.property(
short_description=_('Check period'),
long_description=_('Check services within check period'),
syntax=getattr(univention.admin.syntax, 'NagiosTimePeriod', NagiosTimePeriod),
required=True,
),
'maxCheckAttempts': univention.admin.property(
short_description=_('Maximum number of check attempts'),
long_description=_('Maximum number of check attempts with non-OK-result until contact will be notified'),
syntax=univention.admin.syntax.integer,
required=True,
default='10',
size='One',
),
'normalCheckInterval': univention.admin.property(
short_description=_('Check interval'),
long_description=_('Interval between checks'),
syntax=univention.admin.syntax.integer,
required=True,
default='10',
size='One',
),
'retryCheckInterval': univention.admin.property(
short_description=_('Retry check interval'),
long_description=_('Interval between re-checks if service is in non-OK-state'),
syntax=univention.admin.syntax.integer,
required=True,
default='1',
size='One',
),
'notificationInterval': univention.admin.property(
short_description=_('Notification interval'),
long_description=_('Interval between notifications'),
syntax=univention.admin.syntax.integer,
required=True,
default='180',
size='One',
),
'notificationPeriod': univention.admin.property(
short_description=_('Notification period'),
long_description=_('Send notifications during this period'),
syntax=getattr(univention.admin.syntax, 'NagiosTimePeriod', NagiosTimePeriod),
required=True,
),
'notificationOptionWarning': univention.admin.property(
short_description=_('Notify if service state changes to WARNING'),
long_description='',
syntax=univention.admin.syntax.boolean,
default='1',
),
'notificationOptionCritical': univention.admin.property(
short_description=_('Notify if service state changes to CRITICAL'),
long_description='',
syntax=univention.admin.syntax.boolean,
default='1',
),
'notificationOptionUnreachable': univention.admin.property(
short_description=_('Notify if service state changes to UNREACHABLE'),
long_description='',
syntax=univention.admin.syntax.boolean,
default='1',
),
'notificationOptionRecovered': univention.admin.property(
short_description=_('Notify if service state changes to RECOVERED'),
long_description='',
syntax=univention.admin.syntax.boolean,
default='1',
),
'assignedHosts': univention.admin.property(
short_description=_('Assigned hosts'),
long_description=_('Check services on these hosts'),
syntax=univention.admin.syntax.nagiosHostsEnabledDn,
multivalue=True,
)
}
layout = [
Tab(_('General'), _('Basic settings'), layout=[
Group(_('General Nagios service settings'), layout=[
["name", "description"],
["checkCommand", "checkArgs"],
"useNRPE"
]),
]),
Tab(_('Interval'), _('Check settings'), advanced=True, layout=[
["normalCheckInterval", "retryCheckInterval"],
["maxCheckAttempts", "checkPeriod"]
]),
Tab(_('Notification'), _('Notification settings'), advanced=True, layout=[
["notificationInterval", "notificationPeriod"],
"notificationOptionWarning",
"notificationOptionCritical",
"notificationOptionUnreachable",
"notificationOptionRecovered"
]),
Tab(_('Hosts'), _('Assigned hosts'), layout=[
Group(_('Assigned hosts'), layout=[
"assignedHosts"
]),
]),
]
mapping = univention.admin.mapping.mapping()
mapping.register('name', 'cn', None, univention.admin.mapping.ListToString)
mapping.register('description', 'description', None, univention.admin.mapping.ListToString)
mapping.register('checkCommand', 'univentionNagiosCheckCommand', None, univention.admin.mapping.ListToString)
mapping.register('checkArgs', 'univentionNagiosCheckArgs', None, univention.admin.mapping.ListToString)
mapping.register('useNRPE', 'univentionNagiosUseNRPE', None, univention.admin.mapping.ListToString)
mapping.register('normalCheckInterval', 'univentionNagiosNormalCheckInterval', None, univention.admin.mapping.ListToString)
mapping.register('retryCheckInterval', 'univentionNagiosRetryCheckInterval', None, univention.admin.mapping.ListToString)
mapping.register('maxCheckAttempts', 'univentionNagiosMaxCheckAttempts', None, univention.admin.mapping.ListToString)
mapping.register('checkPeriod', 'univentionNagiosCheckPeriod', None, univention.admin.mapping.ListToString)
mapping.register('notificationInterval', 'univentionNagiosNotificationInterval', None, univention.admin.mapping.ListToString)
mapping.register('notificationPeriod', 'univentionNagiosNotificationPeriod', None, univention.admin.mapping.ListToString)
[docs]class object(univention.admin.handlers.simpleLdap):
module = module
OPTION_BITS = {
'notificationOptionWarning': b'w',
'notificationOptionCritical': b'c',
'notificationOptionUnreachable': b'u',
'notificationOptionRecovered': b'r',
}
def _post_unmap(self, info, values):
value = values.get('univentionNagiosNotificationOptions', [b''])[0]
if value:
options = value.split(b',') # type: List[bytes]
for key, value in self.OPTION_BITS.items():
info[key] = '1' if value in options else '0'
return info
[docs] def open(self):
_re = re.compile(r'^([^.]+)\.(.+?)$')
# convert host FQDN to host DN
hostlist = []
hosts = self.oldattr.get('univentionNagiosHostname', [])
for host in [x.decode('UTF-8') for x in hosts]:
# split into relDomainName and zoneName
if host and _re.match(host) is not None:
(relDomainName, zoneName) = _re.match(host).groups()
# find correct dNSZone entry
res = self.lo.search(filter=filter_format('(&(objectClass=dNSZone)(zoneName=%s)(relativeDomainName=%s)(aRecord=*))', (zoneName, relDomainName)))
if not res:
ud.debug(ud.ADMIN, ud.INFO, 'service.py: open: could not find dNSZone of %s' % (host,))
else:
# found dNSZone
filter = '(&(objectClass=univentionHost)'
for aRecord in [x.decode('ASCII') for x in res[0][1]['aRecord']]:
filter += filter_format('(aRecord=%s)', [aRecord])
filter += filter_format('(cn=%s))', [relDomainName])
# find dn of host that is related to given aRecords
res = self.lo.search(filter=filter)
if res:
hostlist.append(res[0][0]) # type: List[str]
self['assignedHosts'] = hostlist
self.save()
def _ldap_modlist(self):
ml = univention.admin.handlers.simpleLdap._ldap_modlist(self)
options = []
for key, value in self.OPTION_BITS.items():
if self[key] == '1':
options.append(value) # type: List[bytes]
# univentionNagiosNotificationOptions is required in LDAP schema
if not options:
options.append(b'n')
newoptions = b','.join(options)
ml.append(('univentionNagiosNotificationOptions', self.oldattr.get('univentionNagiosNotificationOptions', []), newoptions))
# save assigned hosts
if self.hasChanged('assignedHosts'):
hostlist = []
for hostdn in self.info.get('assignedHosts', []):
try:
host = self.lo.get(hostdn, ['associatedDomain', 'cn'], required=True)
cn = host['cn'][0] # type: bytes
except (univention.admin.uexceptions.noObject, ldap.NO_SUCH_OBJECT):
raise univention.admin.uexceptions.valueError(_('The host "%s" does not exists.') % (hostdn,), property='assignedHosts')
except KeyError:
raise univention.admin.uexceptions.valueError(_('The host "%s" is invalid, it has no "cn" attribute.') % (hostdn,), property='assignedHosts')
domain = host.get('associatedDomain', [configRegistry.get("domainname").encode('ASCII')])[0] # type: bytes
hostlist.append(b"%s.%s" % (cn, domain))
ml.insert(0, ('univentionNagiosHostname', self.oldattr.get('univentionNagiosHostname', []), hostlist))
return ml
lookup = object.lookup
lookup_filter = object.lookup_filter
identify = object.identify