# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""|UDM| module for network objects"""
import ipaddress
import ldap
from ldap.filter import filter_format
import univention.admin.filter
import univention.admin.handlers
import univention.admin.localization
import univention.admin.modules
import univention.admin.uexceptions
from univention.admin.layout import Group, Tab
translation = univention.admin.localization.translation('univention.admin.handlers.networks')
_ = translation.translate
module = 'networks/network'
operations = ['add', 'edit', 'remove', 'search']
childs = False
short_description = _('Networks: Network')
object_name = _('Network')
object_name_plural = _('Networks')
long_description = ''
# fmt: off
options = {
'default': univention.admin.option(
short_description=short_description,
default=True,
objectClasses=['top', 'univentionNetworkClass'],
),
}
property_descriptions = {
'name': univention.admin.property(
short_description=_('Name'),
long_description='',
syntax=univention.admin.syntax.string,
include_in_default_search=True,
required=True,
may_change=False,
identifies=True,
),
'network': univention.admin.property(
short_description=_('Networks'),
long_description='',
syntax=univention.admin.syntax.ipAddress,
include_in_default_search=True,
required=True,
may_change=False,
),
'netmask': univention.admin.property(
short_description=_('Netmask'),
long_description='',
syntax=univention.admin.syntax.netmask,
include_in_default_search=True,
required=True,
may_change=False,
),
'nextIp': univention.admin.property(
short_description=_('Next IP address'),
long_description='',
syntax=univention.admin.syntax.string,
dontsearch=True,
),
'ipRange': univention.admin.property(
short_description=_('IP address range'),
long_description='',
syntax=univention.admin.syntax.IP_AddressRange,
multivalue=True,
dontsearch=True,
),
'dnsEntryZoneForward': univention.admin.property(
short_description=_('DNS forward lookup zone'),
long_description='',
syntax=univention.admin.syntax.DNS_ForwardZone,
dontsearch=True,
),
'dnsEntryZoneReverse': univention.admin.property(
short_description=_('DNS reverse lookup zone'),
long_description='',
syntax=univention.admin.syntax.DNS_ReverseZone,
dontsearch=True,
),
'dhcpEntryZone': univention.admin.property(
short_description=_('DHCP service'),
long_description='',
syntax=univention.admin.syntax.dhcpService,
dontsearch=True,
),
}
layout = [
Tab(_('General'), _('Basic settings'), layout=[
Group(_('General network settings'), layout=[
'name',
['network', 'netmask'],
'ipRange',
]),
Group(_('DNS preferences'), layout=[
'dnsEntryZoneForward',
'dnsEntryZoneReverse',
]),
Group(_('DHCP preferences'), layout=[
'dhcpEntryZone',
]),
]),
]
# fmt: on
[docs]
def rangeMap(value, encoding=()):
return [' '.join(x).encode(*encoding) for x in value]
[docs]
def rangeUnmap(value, encoding=()):
return [x.decode(*encoding).split(' ') for x in value]
# fmt: off
mapping = univention.admin.mapping.mapping()
mapping.register('name', 'cn', None, univention.admin.mapping.ListToString)
mapping.register('network', 'univentionNetwork', None, univention.admin.mapping.ListToString, encoding='ASCII')
mapping.register('netmask', 'univentionNetmask', None, univention.admin.mapping.ListToString, encoding='ASCII')
mapping.register('nextIp', 'univentionNextIp', None, univention.admin.mapping.ListToString, encoding='ASCII')
mapping.register('dnsEntryZoneForward', 'univentionDnsForwardZone', univention.admin.mapping.IgnoreNone, univention.admin.mapping.ListToString, encoding='ASCII')
mapping.register('dnsEntryZoneReverse', 'univentionDnsReverseZone', univention.admin.mapping.IgnoreNone, univention.admin.mapping.ListToString, encoding='ASCII')
mapping.register('dhcpEntryZone', 'univentionDhcpEntry', univention.admin.mapping.IgnoreNone, univention.admin.mapping.ListToString, encoding='ASCII')
mapping.register('ipRange', 'univentionIpRange', rangeMap, rangeUnmap, encoding='ASCII')
# fmt: on
[docs]
class object(univention.admin.handlers.simpleLdap):
module = module
[docs]
def stepIp(self):
try:
network = ipaddress.ip_network('%s/%s' % (self['network'], self['netmask']), strict=False)
except ValueError as exc:
raise univention.admin.uexceptions.valueError(str(exc), property='nextIp')
if self['nextIp']:
# nextIP is already set:
# - check range for actual ip
# - inc ip
# - check for range
currentIp = ipaddress.ip_address('%s' % self['nextIp'])
newIp = ipaddress.ip_address('%s' % self['nextIp']) + 1
for ipRange in self['ipRange']:
if not ipRange: # ignore bad default value self['ipRange'] = ['']
continue
firstIP = ipaddress.ip_address('%s' % ipRange[0])
lastIP = ipaddress.ip_address('%s' % ipRange[1])
if firstIP <= currentIp <= lastIP:
if firstIP <= newIp <= lastIP:
self['nextIp'] = str(newIp)
else:
position = (self['ipRange'].index(ipRange) + 1) % len(self['ipRange']) # find "next" ipRange
self['nextIp'] = self['ipRange'][position][0] # select first IP of that range
if ipaddress.ip_address('%s' % self['nextIp']) == network.network_address: # do not give out all hostbits zero
self['nextIp'] = str(ipaddress.ip_address('%s' % self['nextIp']) + 1)
break
else: # currentIp is not in any ipRange
if self['ipRange'] and self['ipRange'][0]: # ignore bad default value self['ipRange'] = ['']
self['nextIp'] = self['ipRange'][0][0]
if ipaddress.ip_address('%s' % self['nextIp']) == network.network_address: # do not give out all hostbits zero
self['nextIp'] = str(ipaddress.ip_address('%s' % self['nextIp']) + 1)
else: # did not find nextIp in ipRanges because ipRanges are empty
if newIp in network:
self['nextIp'] = str(newIp)
else:
self['nextIp'] = str(network.network_address + 1) # first usable host address in network
elif self['ipRange']:
# nextIP is not set
# - use first ip range entry
self['nextIp'] = self['ipRange'][0][0]
if ipaddress.ip_address('%s' % self['nextIp']) == network.network_address: # do not give out all hostbits zero
self['nextIp'] = str(ipaddress.ip_address('%s' % self['nextIp']) + 1)
elif self['network']:
# nextIP is not set, no IPrange, then we use the first ip of the network
self['nextIp'] = str(network.network_address + 1) # first usable host address in network
[docs]
def refreshNextIp(self):
start_ip = self['nextIp']
while self.lo.authz_connection.search(scope='domain', attr=['aRecord'], filter=filter_format('(&(aRecord=%s))', [self['nextIp']])) or self['nextIp'].split('.')[-1] in ['0', '1', '254']:
self.stepIp()
if self['nextIp'] == start_ip:
raise univention.admin.uexceptions.nextFreeIp()
self.modify(ignore_license=True)
def _ldap_post_remove(self):
super()._ldap_post_remove()
filter_ = univention.admin.filter.expression('univentionNetworkLink', self.dn, escape=True)
for computer in univention.admin.modules._get('computers/computer').lookup(None, self.lo, filter_s=filter_):
try:
self.lo.authz_connection.modify(computer.dn, [('univentionNetworkLink', self.dn.encode('UTF-8'), b'')])
except (univention.admin.uexceptions.base, ldap.LDAPError):
self.log.exception('Failed to remove network from computer', dn=self.dn, computer=computer.dn)
def _ldap_addlist(self):
if not self['nextIp']:
self.stepIp()
return super()._ldap_addlist()
def _ldap_modlist(self):
ml = univention.admin.handlers.simpleLdap._ldap_modlist(self)
next_ip_changed = False
if self.hasChanged('ipRange'):
try:
network = ipaddress.ip_network('%s/%s' % (self['network'], self['netmask']), strict=False)
ipaddress.ip_address('%s' % self['nextIp'])
except ValueError as exc:
raise univention.admin.uexceptions.valueError(str(exc), property='nextIp')
if self['ipRange']:
try:
self.sort_ipranges()
except TypeError as exc:
raise univention.admin.uexceptions.valueError(str(exc))
self['nextIp'] = self['ipRange'][0][0]
else:
self['nextIp'] = str(network.network_address + 1)
if self['nextIp'] != self.oldattr.get('univentionNextIp', [b''])[0].decode('UTF-8'):
next_ip_changed = True
ipRange = []
for i in self['ipRange']:
firstIP = ipaddress.ip_address('%s' % i[0])
lastIP = ipaddress.ip_address('%s' % i[1])
for j in self['ipRange']:
if i != j:
otherFirstIP = ipaddress.ip_address('%s' % j[0])
otherLastIP = ipaddress.ip_address('%s' % j[1])
if firstIP < otherFirstIP < lastIP or firstIP < otherLastIP < lastIP or otherFirstIP < firstIP < otherLastIP or otherFirstIP < lastIP < otherLastIP:
raise univention.admin.uexceptions.rangesOverlapping('%s-%s; %s-%s' % (i[0], i[1], j[0], j[1]))
if firstIP not in network or lastIP not in network:
raise univention.admin.uexceptions.rangeNotInNetwork('%s-%s' % (firstIP, lastIP))
if network.network_address in (firstIP, lastIP):
raise univention.admin.uexceptions.rangeInNetworkAddress('%s-%s' % (firstIP, lastIP))
if network.broadcast_address in (firstIP, lastIP):
raise univention.admin.uexceptions.rangeInBroadcastAddress('%s-%s' % (firstIP, lastIP))
ipRange.append(' '.join(i).encode('ASCII'))
self.log.debug('old Range: %s', self.oldinfo.get('ipRange'))
ml = [x for x in ml if x[0] != 'univentionIpRange']
ml.append(('univentionIpRange', self.oldattr.get('univentionIpRange', [b'']), ipRange))
if next_ip_changed:
ml = [x for x in ml if x[0] != 'univentionNextIp']
ml.append(('univentionNextIp', self.oldattr.get('univentionNextIp', b''), self['nextIp'].encode('ASCII')))
return ml
[docs]
def sort_ipranges(self):
# make sure that the ipRanges are ordered by their start address (smallest first)
for i in range(1, len(self['ipRange'])):
if ipaddress.ip_address('%s' % self['ipRange'][i][0]) < ipaddress.ip_address('%s' % self['ipRange'][i - 1][0]):
self['ipRange'].insert(i - 1, self['ipRange'].pop(i))
for i in range(1, len(self['ipRange'])):
if ipaddress.ip_address('%s' % self['ipRange'][i][0]) < ipaddress.ip_address('%s' % self['ipRange'][i - 1][0]):
self.sort_ipranges()
lookup = object.lookup
lookup_filter = object.lookup_filter
identify = object.identify