Source code for univention.admin.handlers.dns.ptr_record

# -*- coding: utf-8 -*-
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

"""
|UDM| module for |DNS| reverse pointer records (PTR)
"""

from univention.admin.layout import Tab, Group
import univention.admin
import univention.admin.handlers
import univention.admin.localization
from univention.admin.filter import (expression, conjunction)
from univention.admin.handlers.dns import ARPA_IP4, ARPA_IP6

import univention.debug as ud
import ipaddress

translation = univention.admin.localization.translation('univention.admin.handlers.dns')
_ = translation.translate


module = 'dns/ptr_record'
operations = ['add', 'edit', 'remove', 'search']
columns = ['ptr_record']
superordinate = 'dns/reverse_zone'
childs = False
short_description = _('DNS: Pointer record')
object_name = _('Pointer record')
object_name_plural = _('Pointer records')
long_description = _('Map IP addresses back to hostnames.')
options = {
	'default': univention.admin.option(
		short_description=short_description,
		default=True,
		objectClasses=['top', 'dNSZone'],
	),
}
property_descriptions = {
	'address': univention.admin.property(
		short_description=_('Reverse address'),
		long_description=_('The host part of the IP address in reverse notation (e.g. \"172.16.1.2/16\" -> \"2.1\" or \"2001:0db8:0100::0007:0008/96\" -> \"8.0.0.0.7.0.0.0\").'),
		syntax=univention.admin.syntax.dnsPTR,
		required=True,
		identifies=True,
	),
	'ip': univention.admin.property(
		short_description=_('IP Address'),
		long_description='',
		syntax=univention.admin.syntax.ipAddress,
		include_in_default_search=True,
	),
	'ptr_record': univention.admin.property(
		short_description=_('Pointer record'),
		long_description=_("FQDNs must end with a dot."),
		syntax=univention.admin.syntax.dnsName,
		multivalue=True,
		include_in_default_search=True,
		required=True,
	),
}

layout = [
	Tab(_('General'), _('Basic settings'), layout=[
		Group(_('General pointer record settings'), layout=[
			['ip', 'ptr_record'],
		]),
	]),
]

mapping = univention.admin.mapping.mapping()
mapping.register('address', 'relativeDomainName', None, univention.admin.mapping.ListToString, encoding='ASCII')
mapping.register('ptr_record', 'pTRRecord', encoding='ASCII')


[docs]def ipv6(string): """ >>> ipv6('0123456789abcdef0123456789abcdef') '0123:4567:89ab:cdef:0123:4567:89ab:cdef' """ assert len(string) == 32, string return ':'.join(string[i:i + 4] for i in range(0, 32, 4))
[docs]def calc_ip(rev, subnet): """ >>> calc_ip(rev='8.0.0.0.7.0.0.0.6.0.0.0.5.0.0.0.4.0.0', subnet='0001:0002:0003:0').exploded '0001:0002:0003:0004:0005:0006:0007:0008' >>> calc_ip(rev='4.3', subnet='1.2').exploded '1.2.3.4' """ parts = rev.split('.') parts.reverse() if ':' in subnet: string = ''.join(subnet.split(':') + parts) ip = ipaddress.IPv6Address(u'%s' % (ipv6(string),)) else: octets = subnet.split('.') + parts assert len(octets) == 4, octets addr = '.'.join(octets) ip = ipaddress.IPv4Address(u'%s' % (addr,)) return ip
[docs]def calc_rev(ip, subnet): """ >>> calc_rev(ip='1.2.3.4', subnet='1.2') '4.3' >>> calc_rev(ip='0001:0002:0003:0004:0005:0006:0007:0008', subnet='0001:0002:0003:0') '8.0.0.0.7.0.0.0.6.0.0.0.5.0.0.0.4.0.0' >>> calc_rev(ip='1:2:3:4:5:6:7:8', subnet='0001:0002:0003:0') '8.0.0.0.7.0.0.0.6.0.0.0.5.0.0.0.4.0.0' """ if ':' in subnet: string = ''.join(subnet.split(':')) prefix = len(string) assert 1 <= prefix < 32 string += '0' * (32 - prefix) net = ipaddress.IPv6Network(u'%s/%d' % (ipv6(string), 4 * prefix), strict=False) addr = ipaddress.IPv6Address(u'%s' % (ip,)) host = ''.join(addr.exploded.split(':')) else: octets = subnet.split('.') prefix = len(octets) assert 1 <= prefix < 4 octets += ['0'] * (4 - prefix) net = ipaddress.IPv4Network(u'%s/%d' % ('.'.join(octets), 8 * prefix), strict=False) addr = ipaddress.IPv4Address(u'%s' % (ip,)) host = addr.exploded.split('.') if addr not in net: raise ValueError() return '.'.join(reversed(host[prefix:]))
[docs]class object(univention.admin.handlers.simpleLdap): module = module
[docs] def description(self): try: return calc_ip(self.info['address'], self.superordinate.info['subnet']).compressed except (LookupError, ValueError, AssertionError) as ex: ud.debug(ud.ADMIN, ud.WARN, 'Failed to parse dn=%s: (%s)' % (self.dn, ex)) return super(object, self).description()
[docs] def open(self): super(object, self).open() try: self.info['ip'] = calc_ip(self.info['address'], self.superordinate.info['subnet']).compressed self.save() except (LookupError, ValueError, AssertionError) as ex: ud.debug(ud.ADMIN, ud.WARN, 'Failed to parse dn=%s: (%s)' % (self.dn, ex))
[docs] def ready(self): old_ip = self.oldinfo.get('ip') new_ip = self.info.get('ip') if old_ip != new_ip: try: self.info['address'] = calc_rev(new_ip, self.superordinate.info['subnet']) except (LookupError, ValueError, AssertionError) as ex: ud.debug(ud.ADMIN, ud.WARN, 'Failed to handle address: dn=%s addr=%r (%s)' % (self.dn, new_ip, ex)) raise univention.admin.uexceptions.InvalidDNS_Information(_('Reverse zone and IP address are incompatible.')) super(object, self).ready()
def _updateZone(self): if self.update_zone: self.superordinate.open() self.superordinate.modify() def __init__(self, co, lo, position, dn='', superordinate=None, attributes=[], update_zone=True): self.update_zone = update_zone univention.admin.handlers.simpleLdap.__init__(self, co, lo, position, dn, superordinate, attributes=attributes) def _ldap_addlist(self): return super(object, self)._ldap_addlist() + [ (self.superordinate.mapping.mapName('subnet'), self.superordinate.mapping.mapValue('subnet', self.superordinate['subnet'])), ] def _ldap_post_modify(self): super(object, self)._ldap_post_modify() if self.hasChanged(self.descriptions.keys()): self._updateZone() def _ldap_post_create(self): super(object, self)._ldap_post_create() self._updateZone() def _ldap_post_remove(self): super(object, self)._ldap_post_remove() self._updateZone()
[docs] @classmethod def lookup_filter_superordinate(cls, filter, superordinate): filter.expressions.append(univention.admin.filter.expression('zoneName', superordinate.mapping.mapValueDecoded('subnet', superordinate['subnet']), escape=True)) filter = rewrite_rev(filter, superordinate.info['subnet']) return filter
[docs] @classmethod def unmapped_lookup_filter(cls): return univention.admin.filter.conjunction('&', [ univention.admin.filter.expression('objectClass', 'dNSZone'), univention.admin.filter.conjunction('!', [univention.admin.filter.expression('relativeDomainName', '@')]), univention.admin.filter.conjunction('|', [ univention.admin.filter.expression('zoneName', '*.in-addr.arpa', escape=False), univention.admin.filter.expression('zoneName', '*.ip6.arpa', escape=False), ]), ])
[docs]def rewrite_rev(filter, subnet): """Rewrite LDAP filter expression and convert (ip) -> (zone,reversed) >>> rewrite_rev(expression('ip', '1.2.3.4'), subnet='1.2') conjunction('&', [expression('zoneName', '2.1.in-addr.arpa', '='), expression('relativeDomainName', '4.3', '=')]) >>> rewrite_rev(expression('ip', '1.2.3.*', escape=False), subnet='1.2') conjunction('&', [expression('zoneName', '2.1.in-addr.arpa', '='), expression('relativeDomainName', '*.3', '=')]) >>> rewrite_rev(expression('ip', '1.2.*.*', escape=False), subnet='1.2') conjunction('&', [expression('zoneName', '2.1.in-addr.arpa', '='), expression('relativeDomainName', '*.*', '=')]) >>> rewrite_rev(expression('ip', '1.2.*.4', escape=False), subnet='1.2') conjunction('&', [expression('zoneName', '2.1.in-addr.arpa', '='), expression('relativeDomainName', '4.*', '=')]) >>> rewrite_rev(expression('ip', '1.2.*', escape=False), subnet='1.2') conjunction('&', [expression('zoneName', '2.1.in-addr.arpa', '='), expression('relativeDomainName', '*', '=')]) >>> rewrite_rev(expression('ip', '1:2:3:4:5:6:7:8'), subnet='0001:0002') conjunction('&', [expression('zoneName', '2.0.0.0.1.0.0.0.ip6.arpa', '='), expression('relativeDomainName', '8.0.0.0.7.0.0.0.6.0.0.0.5.0.0.0.4.0.0.0.3.0.0.0', '=')]) >>> rewrite_rev(expression('ip', '1:2:3:4:5:6:7:*', escape=False), subnet='0001:0002') conjunction('&', [expression('zoneName', '2.0.0.0.1.0.0.0.ip6.arpa', '='), expression('relativeDomainName', '*.7.0.0.0.6.0.0.0.5.0.0.0.4.0.0.0.3.0.0.0', '=')]) >>> rewrite_rev(expression('ip', '1:2:3:4:5:6:*:8', escape=False), subnet='0001:0002') conjunction('&', [expression('zoneName', '2.0.0.0.1.0.0.0.ip6.arpa', '='), expression('relativeDomainName', '8.0.0.0.*.6.0.0.0.5.0.0.0.4.0.0.0.3.0.0.0', '=')]) >>> rewrite_rev(expression('ip', '1:2:3:*', escape=False), subnet='0001:0002') conjunction('&', [expression('zoneName', '2.0.0.0.1.0.0.0.ip6.arpa', '='), expression('relativeDomainName', '*.3.0.0.0', '=')]) """ if isinstance(filter, conjunction): filter.expressions = [rewrite_rev(expr, subnet) for expr in filter.expressions] if isinstance(filter, expression) and filter.variable == 'ip': if ':' in subnet: string = ''.join(subnet.split(':')) prefix = len(string) assert 1 <= prefix < 32 addr = ''.join( part if '*' in part else part.rjust(4, '0')[-4:] for part in filter.value.split(':') ) suffix = '.ip6.arpa' else: octets = subnet.split('.') prefix = len(octets) assert 1 <= prefix < 4 addr = filter.value.split('.') suffix = '.in-addr.arpa' addr_net, addr_host = ['.'.join(reversed(_)) for _ in (addr[:prefix], addr[prefix:])] filter = conjunction('&', [ expression('zoneName', addr_net + suffix), expression('relativeDomainName', addr_host or '*', escape=False), ]) return filter
lookup = object.lookup lookup_filter = object.lookup_filter
[docs]def identify(dn, attr): return all([ b'dNSZone' in attr.get('objectClass', []), b'@' not in attr.get('relativeDomainName', []), (attr.get('zoneName', [b''])[0].decode('ASCII').endswith(ARPA_IP4) or attr.get('zoneName', [b''])[0].decode('ASCII').endswith(ARPA_IP6)) ])