# -*- coding: utf-8 -*-
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
|UDM| module for network objects
"""
import ipaddress
import traceback
import ldap
from ldap.filter import filter_format
from univention.admin.layout import Tab, Group
import univention.admin.filter
import univention.admin.handlers
import univention.admin.modules
import univention.admin.uexceptions
import univention.admin.localization
import univention.debug as ud
translation = univention.admin.localization.translation('univention.admin.handlers.networks')
_ = translation.translate
module = 'networks/network'
operations = ['add', 'edit', 'remove', 'search']
childs = False
short_description = _('Networks: Network')
object_name = _('Network')
object_name_plural = _('Networks')
long_description = ''
options = {
'default': univention.admin.option(
short_description=short_description,
default=True,
objectClasses=['top', 'univentionNetworkClass'],
),
}
property_descriptions = {
'name': univention.admin.property(
short_description=_('Name'),
long_description='',
syntax=univention.admin.syntax.string,
include_in_default_search=True,
required=True,
may_change=False,
identifies=True
),
'network': univention.admin.property(
short_description=_('Networks'),
long_description='',
syntax=univention.admin.syntax.ipAddress,
include_in_default_search=True,
required=True,
may_change=False,
),
'netmask': univention.admin.property(
short_description=_('Netmask'),
long_description='',
syntax=univention.admin.syntax.netmask,
include_in_default_search=True,
required=True,
may_change=False,
),
'nextIp': univention.admin.property(
short_description=_('Next IP address'),
long_description='',
syntax=univention.admin.syntax.string,
dontsearch=True,
),
'ipRange': univention.admin.property(
short_description=_('IP address range'),
long_description='',
syntax=univention.admin.syntax.IP_AddressRange,
multivalue=True,
dontsearch=True,
),
'dnsEntryZoneForward': univention.admin.property(
short_description=_('DNS forward lookup zone'),
long_description='',
syntax=univention.admin.syntax.DNS_ForwardZone,
dontsearch=True,
),
'dnsEntryZoneReverse': univention.admin.property(
short_description=_('DNS reverse lookup zone'),
long_description='',
syntax=univention.admin.syntax.DNS_ReverseZone,
dontsearch=True,
),
'dhcpEntryZone': univention.admin.property(
short_description=_('DHCP service'),
long_description='',
syntax=univention.admin.syntax.dhcpService,
dontsearch=True,
),
}
layout = [
Tab(_('General'), _('Basic settings'), layout=[
Group(_('General network settings'), layout=[
'name',
['network', 'netmask'],
'ipRange',
]),
Group(_('DNS preferences'), layout=[
'dnsEntryZoneForward',
'dnsEntryZoneReverse',
]),
Group(_('DHCP preferences'), layout=[
'dhcpEntryZone',
]),
]),
]
[docs]def rangeMap(value, encoding=()):
return [u' '.join(x).encode(*encoding) for x in value]
[docs]def rangeUnmap(value, encoding=()):
return [x.decode(*encoding).split(u' ') for x in value]
mapping = univention.admin.mapping.mapping()
mapping.register('name', 'cn', None, univention.admin.mapping.ListToString)
mapping.register('network', 'univentionNetwork', None, univention.admin.mapping.ListToString, encoding='ASCII')
mapping.register('netmask', 'univentionNetmask', None, univention.admin.mapping.ListToString, encoding='ASCII')
mapping.register('nextIp', 'univentionNextIp', None, univention.admin.mapping.ListToString, encoding='ASCII')
mapping.register('dnsEntryZoneForward', 'univentionDnsForwardZone', univention.admin.mapping.IgnoreNone, univention.admin.mapping.ListToString, encoding='ASCII')
mapping.register('dnsEntryZoneReverse', 'univentionDnsReverseZone', univention.admin.mapping.IgnoreNone, univention.admin.mapping.ListToString, encoding='ASCII')
mapping.register('dhcpEntryZone', 'univentionDhcpEntry', univention.admin.mapping.IgnoreNone, univention.admin.mapping.ListToString, encoding='ASCII')
mapping.register('ipRange', 'univentionIpRange', rangeMap, rangeUnmap, encoding='ASCII')
[docs]class object(univention.admin.handlers.simpleLdap):
module = module
[docs] def stepIp(self):
try:
network = ipaddress.ip_network(u'%s/%s' % (self['network'], self['netmask']), strict=False)
except ValueError as exc:
raise univention.admin.uexceptions.valueError(str(exc), property='nextIp')
if self['nextIp']:
# nextIP is already set:
# - check range for actual ip
# - inc ip
# - check for range
currentIp = ipaddress.ip_address(u'%s' % self['nextIp'])
newIp = ipaddress.ip_address(u'%s' % self['nextIp']) + 1
for ipRange in self['ipRange']:
if not ipRange: # ignore bad default value self['ipRange'] = ['']
continue
firstIP = ipaddress.ip_address(u'%s' % ipRange[0])
lastIP = ipaddress.ip_address(u'%s' % ipRange[1])
if firstIP <= currentIp <= lastIP:
if firstIP <= newIp <= lastIP:
self['nextIp'] = str(newIp)
else:
position = (self['ipRange'].index(ipRange) + 1) % len(self['ipRange']) # find "next" ipRange
self['nextIp'] = self['ipRange'][position][0] # select first IP of that range
if ipaddress.ip_address(u'%s' % self['nextIp']) == network.network_address: # do not give out all hostbits zero
self['nextIp'] = str(ipaddress.ip_address(u'%s' % self['nextIp']) + 1)
break
else: # currentIp is not in any ipRange
if self['ipRange'] and self['ipRange'][0]: # ignore bad default value self['ipRange'] = ['']
self['nextIp'] = self['ipRange'][0][0]
if ipaddress.ip_address(u'%s' % self['nextIp']) == network.network_address: # do not give out all hostbits zero
self['nextIp'] = str(ipaddress.ip_address(u'%s' % self['nextIp']) + 1)
else: # did not find nextIp in ipRanges because ipRanges are empty
if newIp in network:
self['nextIp'] = str(newIp)
else:
self['nextIp'] = str(network.network_address + 1) # first usable host address in network
elif self['ipRange']:
# nextIP is not set
# - use first ip range entry
self['nextIp'] = self['ipRange'][0][0]
if ipaddress.ip_address(u'%s' % self['nextIp']) == network.network_address: # do not give out all hostbits zero
self['nextIp'] = str(ipaddress.ip_address(u'%s' % self['nextIp']) + 1)
elif self['network']:
# nextIP is not set, no IPrange, then we use the first ip of the network
self['nextIp'] = str(network.network_address + 1) # first usable host address in network
[docs] def refreshNextIp(self):
start_ip = self['nextIp']
while self.lo.search(scope='domain', attr=['aRecord'], filter=filter_format('(&(aRecord=%s))', [self['nextIp']])) or self['nextIp'].split('.')[-1] in ['0', '1', '254']:
self.stepIp()
if self['nextIp'] == start_ip:
raise univention.admin.uexceptions.nextFreeIp()
self.modify(ignore_license=True)
def _ldap_post_remove(self):
super(object, self)._ldap_post_remove()
filter_ = univention.admin.filter.expression('univentionNetworkLink', self.dn, escape=True)
for computer in univention.admin.modules.get('computers/computer').lookup(None, self.lo, filter_s=filter_):
try:
self.lo.modify(computer.dn, [('univentionNetworkLink', self.dn.encode('UTF-8'), b'')])
except (univention.admin.uexceptions.base, ldap.LDAPError):
ud.debug(ud.ADMIN, ud.ERROR, 'Failed to remove network %s from %s: %s' % (self.dn, computer.dn, traceback.format_exc()))
def _ldap_addlist(self):
if not self['nextIp']:
self.stepIp()
return super(object, self)._ldap_addlist()
def _ldap_modlist(self):
ml = univention.admin.handlers.simpleLdap._ldap_modlist(self)
next_ip_changed = False
if self.hasChanged('ipRange'):
try:
network = ipaddress.ip_network(u'%s/%s' % (self['network'], self['netmask']), strict=False)
ipaddress.ip_address(u'%s' % self['nextIp'])
except ValueError as exc:
raise univention.admin.uexceptions.valueError(str(exc), property='nextIp')
if self['ipRange']:
try:
self.sort_ipranges()
except TypeError as exc:
raise univention.admin.uexceptions.valueError(str(exc))
self['nextIp'] = self['ipRange'][0][0]
else:
self['nextIp'] = str(network.network_address + 1)
if self['nextIp'] != self.oldattr.get('univentionNextIp', [b''])[0].decode('UTF-8'):
next_ip_changed = True
ipRange = []
for i in self['ipRange']:
firstIP = ipaddress.ip_address(u'%s' % i[0])
lastIP = ipaddress.ip_address(u'%s' % i[1])
for j in self['ipRange']:
if i != j:
otherFirstIP = ipaddress.ip_address(u'%s' % j[0])
otherLastIP = ipaddress.ip_address(u'%s' % j[1])
if firstIP < otherFirstIP < lastIP or \
firstIP < otherLastIP < lastIP or \
otherFirstIP < firstIP < otherLastIP or \
otherFirstIP < lastIP < otherLastIP:
raise univention.admin.uexceptions.rangesOverlapping('%s-%s; %s-%s' % (i[0], i[1], j[0], j[1]))
if firstIP not in network or lastIP not in network:
raise univention.admin.uexceptions.rangeNotInNetwork('%s-%s' % (firstIP, lastIP, ))
if firstIP == network.network_address or lastIP == network.network_address:
raise univention.admin.uexceptions.rangeInNetworkAddress('%s-%s' % (firstIP, lastIP, ))
if firstIP == network.broadcast_address or lastIP == network.broadcast_address:
raise univention.admin.uexceptions.rangeInBroadcastAddress('%s-%s' % (firstIP, lastIP, ))
ipRange.append(u' '.join(i).encode('ASCII'))
ud.debug(ud.ADMIN, ud.INFO, 'old Range: %s' % self.oldinfo.get('ipRange'))
ml = [x for x in ml if x[0] != 'univentionIpRange']
ml.append(('univentionIpRange', self.oldattr.get('univentionIpRange', [b'']), ipRange))
if next_ip_changed:
ml = [x for x in ml if x[0] != 'univentionNextIp']
ml.append(('univentionNextIp', self.oldattr.get('univentionNextIp', b''), self['nextIp'].encode('ASCII')))
return ml
[docs] def sort_ipranges(self):
# make sure that the ipRanges are ordered by their start address (smallest first)
for i in range(1, len(self['ipRange'])):
if ipaddress.ip_address(u'%s' % self['ipRange'][i][0]) < ipaddress.ip_address(u'%s' % self['ipRange'][i - 1][0]):
self['ipRange'].insert(i - 1, self['ipRange'].pop(i))
for i in range(1, len(self['ipRange'])):
if ipaddress.ip_address(u'%s' % self['ipRange'][i][0]) < ipaddress.ip_address(u'%s' % self['ipRange'][i - 1][0]):
self.sort_ipranges()
lookup = object.lookup
lookup_filter = object.lookup_filter
identify = object.identify