#!/usr/bin/python3
#
# Univention S4 Connector
# dns helper functions
#
# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
import copy
import time
from logging import getLogger
import ldap
from dns import rdataclass, rdatatype
from dns.rdtypes.ANY.TXT import TXT
from dns.tokenizer import Tokenizer
from samba.dcerpc import dnsp
from samba.ndr import ndr_pack, ndr_unpack
from samba.provision.sambadns import AAAARecord, ARecord, CNAMERecord, NSRecord, SOARecord, SRVRecord, TXTRecord
import univention.admin.handlers
import univention.admin.handlers.dns.alias
import univention.admin.handlers.dns.forward_zone
import univention.admin.handlers.dns.host_record
import univention.admin.handlers.dns.ptr_record
import univention.admin.handlers.dns.reverse_zone
import univention.admin.handlers.dns.srv_record
import univention.admin.uldap
from univention.admin.mapping import unmapUNIX_TimeInterval
from univention.logging import Structured
from univention.s4connector.s4 import format_escaped, str2dn
from univention.s4connector.s4.dc import _unixTimeInverval2seconds
log = Structured(getLogger("LDAP").getChild(__name__))
[docs]
class PTRRecord(dnsp.DnssrvRpcRecord):
def __init__(self, ptr, serial=1, ttl=900, rank=dnsp.DNS_RANK_ZONE):
super().__init__()
self.wType = dnsp.DNS_TYPE_PTR
self.rank = rank
self.dwSerial = serial
self.dwTtlSeconds = ttl
self.data = ptr
[docs]
class MXRecord(dnsp.DnssrvRpcRecord):
def __init__(self, name, priority, serial=1, ttl=900, rank=dnsp.DNS_RANK_ZONE):
super().__init__()
self.wType = dnsp.DNS_TYPE_MX
self.rank = rank
self.dwSerial = serial
self.dwTtlSeconds = ttl
self.data.wPriority = priority
self.data.nameTarget = name
# mapping functions
[docs]
def dns_dn_mapping(s4connector, given_object, dn_mapping_stored, isUCSobject):
"""
map dn of given object (which must have an s4_RR_attr in S4)
ol_oc_filter and s4_RR_filter are objectclass filters in UCS and S4
Code is based on univention.s4connector.s4.samaccountname_dn_mapping
"""
obj = copy.deepcopy(given_object)
propertyname = 'dns'
propertyattrib = 'relativeDomainName' # using LDAP name here, for simplicity
ol_oc_filter = '(objectClass=dNSZone)' # all OpenLDAP DNS records match
ol_RR_attr = 'relativeDomainName'
s4_RR_filter = '(objectClass=dnsNode)' # This also matches the DC=@ SOA object
s4_RR_attr = 'dc' # Note: the S4 attribute itself is lowercase
if obj['dn'] is not None:
try:
s4_RR_val = [_value for _key, _value in obj['attributes'].items() if s4_RR_attr.lower() == _key.lower()][0][0].decode('UTf-8') # noqa: RUF015
except (KeyError, IndexError):
s4_RR_val = ''
def dn_premapped(given_object, dn_key, dn_mapping_stored):
if (dn_key not in dn_mapping_stored) or (not given_object[dn_key]):
log.debug("dns_dn_mapping: not premapped (in first instance)")
return False
else: # check if DN exists
if isUCSobject:
premapped_dn = s4connector.get_object_dn(given_object[dn_key])
if premapped_dn is not None:
# log.debug("dns_dn_mapping: premapped S4 object found")
log.debug("dns_dn_mapping: premapped S4 object: %s", premapped_dn)
return True
else:
log.debug("dns_dn_mapping: premapped S4 object not found")
return False
else:
premapped_dn = s4connector.get_ucs_ldap_object_dn(given_object[dn_key])
if premapped_dn is not None:
# log.debug("dns_dn_mapping: premapped UCS object found")
log.debug("dns_dn_mapping: premapped UCS object: %s", premapped_dn)
return True
else:
log.debug("dns_dn_mapping: premapped UCS object not found")
return False
for dn_key in ['dn', 'olddn']:
log.debug("dns_dn_mapping: check newdn for key '%s'", dn_key)
if dn_key in obj and not dn_premapped(obj, dn_key, dn_mapping_stored):
dn = obj[dn_key]
log.debug("dns_dn_mapping: dn: %s", dn)
# Skip Configuration objects with empty DNs
if dn is None:
break
exploded_dn = str2dn(dn)
(fst_rdn_attribute_utf8, fst_rdn_value_utf8, _flags) = exploded_dn[0][0]
if isUCSobject:
log.debug("dns_dn_mapping: got an UCS-Object")
# lookup the relativeDomainName as DC/dnsNode in S4 to get corresponding DN, if not found create new
# Case move with rename
if dn_key == 'olddn' and fst_rdn_attribute_utf8 == 'relativeDomainName':
relativeDomainName = fst_rdn_value_utf8
else:
try:
relativeDomainName = obj['attributes'][ol_RR_attr][0].decode('UTF-8')
except (KeyError, IndexError):
# Safety fallback for the unexpected case, where relativeDomainName would not be set
if fst_rdn_attribute_utf8 == 'zoneName':
relativeDomainName = '@'
else:
raise # can't determine relativeDomainName
for ucsval, conval in s4connector.property[propertyname].mapping_table.get(propertyattrib, []):
if relativeDomainName.lower() == ucsval.lower():
relativeDomainName = conval
log.debug("dns_dn_mapping: map relativeDomainName according to mapping-table")
continue
try:
ol_zone_name = obj['attributes']['zoneName'][0].decode('UTF-8')
except (KeyError, IndexError):
# Safety fallback for the unexpected case, where zoneName would not be set
if ol_RR_attr == fst_rdn_attribute_utf8:
(snd_rdn_attribute_utf8, snd_rdn_value_utf8, _flags) = exploded_dn[1][0]
if snd_rdn_attribute_utf8 == 'zoneName':
ol_zone_name = snd_rdn_value_utf8
else:
raise # can't determine zoneName for this relativeDomainName
target_RR_val = relativeDomainName
target_zone_name = ol_zone_name
s4dn_utf16_le = None
s4_zone_dn = None
if relativeDomainName == '@': # or dn starts with 'zoneName='
s4_filter = format_escaped('(&(objectClass=dnsZone)({0}={1!e}))', s4_RR_attr, ol_zone_name)
log.debug("dns_dn_mapping: search in S4")
for base in s4connector.s4_ldap_partitions:
result = s4connector._s4__search_s4(
base,
ldap.SCOPE_SUBTREE,
s4_filter,
attrlist=(s4_RR_attr,),
show_deleted=False)
if result:
# We only need the SOA dn here
s4dn_utf16_le = ldap.dn.dn2str([[('DC', '@', ldap.AVA_STRING)], *str2dn(result[0][0])])
break
else:
# identify position by parent zone name
target_zone_dn = s4connector.lo.parentDn(dn)
if s4connector.configRegistry.get('connector/s4/mapping/dns/position') != 'legacy' and relativeDomainName.endswith('._msdcs'):
target_zone_name = '_msdcs.' + ol_zone_name
target_RR_val = relativeDomainName[:-7]
target_zone_dn = ldap.dn.dn2str([[(s4_RR_attr.upper(), target_zone_name, ldap.AVA_STRING)], *exploded_dn[2:]])
log.debug("dns_dn_mapping: get dns_dn_mapping for target zone %s", target_zone_dn)
fake_ol_zone_object = {
'dn': target_zone_dn,
'attributes': {
'objectClass': [b'top', b'dNSZone'],
'relativeDomainName': [b'@'],
'zoneName': [target_zone_name.encode('UTF-8')],
},
}
s4_soa_object = dns_dn_mapping(s4connector, fake_ol_zone_object, dn_mapping_stored, isUCSobject)
# and use its parent as the search base
if s4_soa_object['dn'].startswith('DC=@,'):
s4_zone_dn = s4connector.lo_s4.parentDn(s4_soa_object['dn'])
else:
# There is the corner case, where con2ucsc
# syncs the objectClass=dnsZone container and
# stores it's DN in the premapping.
# After that, we don't get the DC=@ dnsNode
# object DN here, but directly the parent.
# So, actually it's not the SOA object DN:
s4_zone_dn = s4_soa_object['dn']
log.debug("dns_dn_mapping: search in S4 base %s", s4_zone_dn)
s4_filter = format_escaped('(&{0}({1}={2!e}))', s4_RR_filter, s4_RR_attr, target_RR_val)
result = s4connector._s4__search_s4(
s4_zone_dn,
ldap.SCOPE_SUBTREE,
s4_filter,
attrlist=('dn',),
show_deleted=False)
if result:
s4dn_utf16_le = result[0][0]
if s4dn_utf16_le: # no referral, so we've got a valid result
s4dn = s4dn_utf16_le
log.debug("dns_dn_mapping: got s4dn %s", s4dn)
if dn_key == 'olddn' or (dn_key == 'dn' and 'olddn' not in obj):
# Cases: ("delete") or ("add" but exists already)
newdn = s4dn
else:
# Case: "moved" (?)
raw_new_dn = ldap.dn.dn2str([str2dn(s4dn)[0], *exploded_dn[1:]])
# The next line looks wrong to me: the source DN is a UCS dn here..
# But this is just like samaccountname_dn_mapping does it:
newdn = raw_new_dn.lower().replace(s4connector.lo_s4.base.lower(), s4connector.lo.base.lower())
log.debug("dns_dn_mapping: move case newdn=%s", newdn)
else:
log.debug("dns_dn_mapping: target object not found")
if s4_zone_dn:
# At least we found the zone
zone_dn = s4_zone_dn
relativeDomainName = target_RR_val
else:
# Ok, it's a new object without existing parent zone in S4 (probably this object itself is a soa/zone), so propose an S4 DN for it:
default_dn = s4connector.property['dns'].con_default_dn
zone_dn = ldap.dn.dn2str([[('DC', ol_zone_name, ldap.AVA_STRING)], *str2dn(default_dn)])
newdn = ldap.dn.dn2str([[('DC', relativeDomainName, ldap.AVA_STRING)], *str2dn(zone_dn)])
else:
# get the object to read the s4_RR_attr in S4 and use it as name
# we have no fallback here, the given dn must be found in S4 or we've got an error
log.debug("dns_dn_mapping: got an S4-Object")
i = 0
while not s4_RR_val: # in case of olddn this is already set
i = i + 1
search_base_dn = obj.get('deleted_dn', dn)
try:
search_result = s4connector.lo_s4.search(filter=s4_RR_filter, base=search_base_dn, scope='base', attr=[s4_RR_attr], required=True)
except ldap.NO_SUCH_OBJECT: # S4 may need time
if i > 5:
raise
time.sleep(1) # S4 may need some time...
else:
(_search_result_dn, search_result_attributes) = search_result[0]
search_result_attributes = {k.lower(): v for k, v in search_result_attributes}
s4_RR_val = search_result_attributes[s4_RR_attr.lower()][0].decode('UTF-8')
log.debug("dns_dn_mapping: got %s from S4", s4_RR_attr)
for ucsval, conval in s4connector.property[propertyname].mapping_table.get(propertyattrib, []):
if s4_RR_val.lower() == conval.lower():
s4_RR_val = ucsval
log.debug("dns_dn_mapping: map %s according to mapping-table", s4_RR_attr)
continue
# search for object with this dn in ucs, needed if it is located in a different container
try:
s4_ocs = obj['attributes']['objectClass']
except (KeyError, TypeError):
s4_ocs = []
target_RR_val = s4_RR_val
ol_zone_dn = None
if b'dnsZone' in s4_ocs:
if s4connector.configRegistry.get('connector/s4/mapping/dns/position') != 'legacy' and s4_RR_val.startswith('_msdcs.'):
target_RR_val = s4_RR_val[7:]
target_zone_name = target_RR_val
base = s4connector.lo.base
ol_search_attr = 'zoneName'
# could use a specific LDAP filter here, but not necessary:
# ol_oc_filter = '(&(objectClass=dNSZone)(|(univentionObjectType=dns/forward_zone)(univentionObjectType=dns/reverse_zone)))'
elif b'dnsNode' in s4_ocs:
# identify position of the parent zone
(snd_rdn_attribute_utf8, snd_rdn_value_utf8, _flags) = exploded_dn[1][0]
target_zone_name = snd_rdn_value_utf8
target_zone_dn = s4connector.lo_s4.parentDn(dn)
log.debug("dns_dn_mapping: get dns_dn_mapping for %s", target_zone_dn)
if s4connector.configRegistry.get('connector/s4/mapping/dns/position') != 'legacy' and target_zone_name.startswith('_msdcs.'):
target_zone_name = target_zone_name[7:]
target_RR_val += '._msdcs'
target_zone_dn = ldap.dn.dn2str([[(snd_rdn_attribute_utf8, target_zone_name, ldap.AVA_STRING)], *exploded_dn[2:]])
fake_s4_zone_object = {
'dn': target_zone_dn,
'attributes': {
'objectClass': [b'top', b'dnsZone'],
'dc': [target_zone_name.encode('UTF-8')],
},
}
ol_zone_object = dns_dn_mapping(s4connector, fake_s4_zone_object, dn_mapping_stored, isUCSobject)
# and use that as the search base
ol_zone_dn = ol_zone_object['dn']
base = ol_zone_dn
ol_search_attr = ol_RR_attr
# could use a specific LDAP filter here, but not necessary:
# ol_oc_filter = '(&(objectClass=dNSZone)(!(|(univentionObjectType=dns/forward_zone)(univentionObjectType=dns/reverse_zone))))'
s4_filter = format_escaped('(&{0}({1}={2!e}))', ol_oc_filter, ol_search_attr, target_RR_val)
log.debug("dns_dn_mapping: UCS filter: %s", s4_filter)
log.debug("dns_dn_mapping: UCS base: %s", base)
try:
ucsdn_result = s4connector.search_ucs(filter=s4_filter, base=base, scope='sub', attr=('1.1',))
except univention.admin.uexceptions.noObject:
ucsdn_result = None
try:
ucsdn = ucsdn_result[0][0]
except (IndexError, TypeError):
ucsdn = None
log.trace("dns_dn_mapping: Found ucsdn: %s", ucsdn)
if ucsdn and (dn_key == 'olddn' or (dn_key == 'dn' and 'olddn' not in obj)):
# Cases: ("delete") or ("add" but exists already)
newdn = ucsdn
log.debug("dns_dn_mapping: newdn is ucsdn")
else:
# Cases: (Target not found) or/and ("moved" (?))
# Ok, it's a new object, so propose a S4 DN for it:
if ol_zone_dn:
# At least we found the zone
zone_dn = ol_zone_dn
s4_RR_val = target_RR_val
else:
# Fallback, e.g. for new zones
zone_dn = __get_zone_dn(s4connector, target_zone_name)
if s4_RR_val == '@':
newdn = zone_dn
elif b'dnsZone' in s4_ocs:
# Hmm, is it ok to map it to the same as '@'?
newdn = zone_dn
else:
newdn = ldap.dn.dn2str([[('relativeDomainName', s4_RR_val, ldap.AVA_STRING)], *str2dn(zone_dn)])
if not (dn_key == 'olddn' or (dn_key == 'dn' and 'olddn' not in obj)):
# Case: "moved" (?)
log.debug("dns_dn_mapping: move case newdn=%s", newdn)
log.debug("dns_dn_mapping: mapping for key %r:", dn_key)
log.debug("dns_dn_mapping: source DN: %r", dn)
log.debug("dns_dn_mapping: mapped DN: %r", newdn)
obj[dn_key] = newdn
return obj
''' HELPER functions '''
def __get_zone_dn(s4connector, zone_name):
default_dn = s4connector.property['dns'].ucs_default_dn
return ldap.dn.dn2str([[('zoneName', zone_name, ldap.AVA_STRING)], *str2dn(default_dn)])
def __append_dot(string):
if not string.endswith('.'):
string += '.'
return string
def __remove_dot(string):
if string.endswith(b'.'):
string = string[:-1]
return string
def __split_s4_dnsNode_dn(dn):
exploded_dn = str2dn(dn) # TODO: fix encoding
# split the DC= from the zoneName
(_, zoneName, _) = exploded_dn[1][0]
(_, relativeDomainName, _) = exploded_dn[0][0]
return (zoneName, relativeDomainName)
def __split_ol_dNSZone_dn(dn, objectclasses):
exploded_dn = str2dn(dn)
(fst_rdn_attribute_utf8, fst_rdn_value_utf8, _flags) = exploded_dn[0][0]
(snd_rdn_attribute_utf8, snd_rdn_value_utf8, _flags) = exploded_dn[1][0]
if fst_rdn_attribute_utf8.lower() == 'zonename':
zoneName = fst_rdn_value_utf8
if b'dnsNode' in objectclasses:
relativeDomainName = '@'
elif b'dnsZone' in objectclasses:
# make S4 dnsZone containers distinguishable from SOA records
relativeDomainName = zoneName
else:
relativeDomainName = None
elif snd_rdn_attribute_utf8.lower() == 'zonename':
zoneName = snd_rdn_value_utf8
relativeDomainName = fst_rdn_value_utf8
else:
zoneName = None
relativeDomainName = None
log.warning("Failed to get zone name for object %r", dn)
return (zoneName, relativeDomainName)
def __create_s4_forward_zone(s4connector, zone_dn):
al = []
al.append(('objectClass', [b'top', b'dnsZone']))
log.debug('_dns_zone_forward_con_create: dn: %s', zone_dn)
log.debug('_dns_zone_forward_con_create: al: %s', al)
s4connector.lo_s4.lo.add_s(zone_dn, al)
def __create_s4_forward_zone_soa(s4connector, soa_dn):
al = []
al.append(('objectClass', [b'top', b'dnsNode']))
al.append(('dc', [b'@']))
s4connector.lo_s4.lo.add_s(soa_dn, al)
def __create_s4_dns_node(s4connector, dnsNodeDn, relativeDomainNames, dnsRecords):
al = []
al.append(('objectClass', [b'top', b'dnsNode']))
al.append(('dc', relativeDomainNames))
if dnsRecords:
al.append(('dnsRecord', dnsRecords))
log.debug('__create_s4_dns_node: dn: %s', dnsNodeDn)
log.debug('__create_s4_dns_node: al: %s', al)
s4connector.lo_s4.lo.add_s(dnsNodeDn, al)
''' Pack and unpack DNS records by using the
Samba NDR functions
'''
def __pack_aRecord(object, dnsRecords):
# add aRecords
# IPv4
for a in object['attributes'].get('aRecord', []):
a_record = ARecord(a)
dnsRecords.append(ndr_pack(a_record))
# IPv6
for a in object['attributes'].get('aAAARecord', []):
a_record = AAAARecord(a)
dnsRecords.append(ndr_pack(a_record))
def __unpack_aRecord(object):
a = []
dnsRecords = object['attributes'].get('dnsRecord', [])
for dnsRecord in dnsRecords:
ndrRecord = ndr_unpack(dnsp.DnssrvRpcRecord, dnsRecord)
if ndrRecord.wType in (dnsp.DNS_TYPE_A, dnsp.DNS_TYPE_AAAA):
a.append(ndrRecord.data)
return a
def __pack_soaRecord(object, dnsRecords):
soaRecord = object['attributes'].get('sOARecord', [None])[0]
if soaRecord:
soa = soaRecord.split(b' ')
mname = soa[0]
rname = soa[1]
serial = int(soa[2])
refresh = int(soa[3])
retry = int(soa[4])
expire = int(soa[5])
ttl = int(soa[6])
soa_record = SOARecord(mname=mname, rname=rname, serial=serial, refresh=refresh, retry=retry, expire=expire, minimum=3600, ttl=ttl)
dnsRecords.append(ndr_pack(soa_record))
def __unpack_soaRecord(object):
soa = {}
dnsRecords = object['attributes'].get('dnsRecord', [])
for dnsRecord in dnsRecords:
ndrRecord = ndr_unpack(dnsp.DnssrvRpcRecord, dnsRecord)
if ndrRecord.wType == dnsp.DNS_TYPE_SOA:
soa['mname'] = ndrRecord.data.mname
soa['rname'] = ndrRecord.data.rname
soa['serial'] = str(ndrRecord.data.serial)
soa['refresh'] = str(ndrRecord.data.refresh)
soa['retry'] = str(ndrRecord.data.retry)
soa['expire'] = str(ndrRecord.data.expire)
soa['minimum'] = str(ndrRecord.data.minimum)
soa['ttl'] = str(ndrRecord.dwTtlSeconds)
return soa
def __pack_nsRecord(object, dnsRecords):
for nSRecord in object['attributes'].get('nSRecord', []):
a_record = NSRecord(nSRecord)
dnsRecords.append(ndr_pack(a_record))
def __unpack_nsRecord(object):
ns = []
dnsRecords = object['attributes'].get('dnsRecord', [])
for dnsRecord in dnsRecords:
ndrRecord = ndr_unpack(dnsp.DnssrvRpcRecord, dnsRecord)
if ndrRecord.wType == dnsp.DNS_TYPE_NS:
ns.append(__append_dot(ndrRecord.data))
return ns
def __pack_mxRecord(object, dnsRecords):
for mXRecord in object['attributes'].get('mXRecord', []):
if mXRecord:
log.debug('__pack_mxRecord: %s', mXRecord)
mx = mXRecord.split(b' ')
priority = mx[0]
name = mx[1]
mx_record = MXRecord(name, int(priority))
dnsRecords.append(ndr_pack(mx_record))
log.debug('__pack_mxRecord: %s', ndr_pack(mx_record))
def __unpack_mxRecord(object):
mx = []
dnsRecords = object['attributes'].get('dnsRecord', [])
for dnsRecord in dnsRecords:
ndrRecord = ndr_unpack(dnsp.DnssrvRpcRecord, dnsRecord)
if ndrRecord.wType == dnsp.DNS_TYPE_MX:
mx.append([str(ndrRecord.data.wPriority), __append_dot(ndrRecord.data.nameTarget)])
return mx
def __pack_txtRecord(object, dnsRecords):
for txtRecord in object['attributes'].get('tXTRecord', []):
if txtRecord:
log.debug('__pack_txtRecord: %s', txtRecord)
token_list = TXT.from_text(rdataclass.IN, rdatatype.TXT, Tokenizer(txtRecord)).strings
ndr_txt_record = ndr_pack(TXTRecord(list(token_list)))
dnsRecords.append(ndr_txt_record)
log.debug('__pack_txtRecord: %s', ndr_txt_record)
def __unpack_txtRecord(object):
txt = []
dnsRecords = object['attributes'].get('dnsRecord', [])
for dnsRecord in dnsRecords:
ndrRecord = ndr_unpack(dnsp.DnssrvRpcRecord, dnsRecord)
if ndrRecord.wType == dnsp.DNS_TYPE_TXT:
txt.append(str(TXT(rdataclass.IN, rdatatype.TXT, ndrRecord.data.str)))
# or: txt.append(' '.join(['"%s"' % token for token in ndrRecord.data.str]))
return txt
def __pack_cName(object, dnsRecords):
for c in object['attributes'].get('cNAMERecord', []):
c = __remove_dot(c)
c_record = CNAMERecord(c)
dnsRecords.append(ndr_pack(c_record))
def __unpack_cName(object):
c = []
dnsRecords = object['attributes'].get('dnsRecord', [])
for dnsRecord in dnsRecords:
ndrRecord = ndr_unpack(dnsp.DnssrvRpcRecord, dnsRecord)
if ndrRecord.wType == dnsp.DNS_TYPE_CNAME:
if "." in ndrRecord.data:
c.append(__append_dot(ndrRecord.data))
else:
c.append(ndrRecord.data)
return c
def __pack_sRVrecord(object, dnsRecords):
for srvRecord in object['attributes'].get('sRVRecord', []):
srv = srvRecord.split(b' ')
priority = int(srv[0])
weight = int(srv[1])
port = int(srv[2])
target = __remove_dot(srv[3])
s = SRVRecord(target, port, priority, weight)
dnsRecords.append(ndr_pack(s))
def __unpack_sRVrecord(object):
srv = []
dnsRecords = object['attributes'].get('dnsRecord', [])
for dnsRecord in dnsRecords:
ndrRecord = ndr_unpack(dnsp.DnssrvRpcRecord, dnsRecord)
if ndrRecord.wType == dnsp.DNS_TYPE_SRV:
srv.append([str(ndrRecord.data.wPriority), str(ndrRecord.data.wWeight), str(ndrRecord.data.wPort), __append_dot(ndrRecord.data.nameTarget)])
return srv
def __pack_ptrRecord(object, dnsRecords):
for ptr in object['attributes'].get('pTRRecord', []):
ptr = __remove_dot(ptr)
ptr_record = PTRRecord(ptr)
dnsRecords.append(ndr_pack(ptr_record))
def __unpack_ptrRecord(object):
ptr = []
dnsRecords = object['attributes'].get('dnsRecord', [])
for dnsRecord in dnsRecords:
ndrRecord = ndr_unpack(dnsp.DnssrvRpcRecord, dnsRecord)
if ndrRecord.wType == dnsp.DNS_TYPE_PTR:
ptr.append(__append_dot(ndrRecord.data))
return ptr
def __get_s4_msdcs_soa(s4connector, zoneName):
"""Required to keep the SOA serial numbers in sync"""
msdcs_obj = {}
msdcs_zonename = f'_msdcs.{zoneName}'
s4_filter = format_escaped('(&(objectClass=dnsZone)(DC={0!e}))', msdcs_zonename)
log.debug("__get_s4_msdcs_soa: search _msdcs in S4")
msdcs_obj = {}
for base in s4connector.s4_ldap_partitions:
resultlist = s4connector._s4__search_s4(
base,
ldap.SCOPE_SUBTREE,
s4_filter,
show_deleted=False)
if resultlist:
break
else:
log.warning("__get_s4_msdcs_soa: _msdcs sub-zone for %s not found in S4", zoneName)
return
# We need the SOA here
msdcs_soa_dn = ldap.dn.dn2str([[('DC', '@', ldap.AVA_STRING)], *str2dn(resultlist[0][0])])
log.debug("__get_s4_msdcs_soa: search DC=@ for _msdcs in S4")
resultlist = s4connector._s4__search_s4(
msdcs_soa_dn,
ldap.SCOPE_BASE,
'(objectClass=dnsNode)',
show_deleted=False)
if resultlist:
# __object_from_element not required here
msdcs_obj = s4connector._s4__object_from_element(resultlist[0])
return msdcs_obj
''' Create/modify a DNS zone in Samba 4 '''
[docs]
def s4_zone_create(s4connector, object):
soa_dn = object['dn']
zone_dn = s4connector.lo.parentDn(soa_dn)
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
# Create the forward zone in S4 if it does not exist
try:
s4connector.lo_s4.get(zone_dn, attr=[''], required=True)
except ldap.NO_SUCH_OBJECT:
__create_s4_forward_zone(s4connector, zone_dn)
# Create SOA DC=@ object
old_dnsRecords = []
try:
old_dnsRecords = s4connector.lo_s4.get(soa_dn, attr=['dnsRecord'], required=True).get('dnsRecord')
except ldap.NO_SUCH_OBJECT:
__create_s4_forward_zone_soa(s4connector, soa_dn)
dnsRecords = []
__pack_nsRecord(object, dnsRecords)
__pack_soaRecord(object, dnsRecords)
# The IP address of the DNS forward zone will be used to determine the
# sysvol share. On a selective replicated DC only a short list of DCs
# should be returned
aRecords = s4connector.configRegistry.get(f'connector/s4/mapping/dns/forward_zone/{zoneName.lower()}/static/ipv4')
aAAARecords = s4connector.configRegistry.get(f'connector/s4/mapping/dns/forward_zone/{zoneName.lower()}/static/ipv6')
if aRecords or aAAARecords:
# IPv4
if aRecords:
for a in aRecords.split(' '):
a_record = ARecord(a)
dnsRecords.append(ndr_pack(a_record))
# IPv6
if aAAARecords:
for a in aAAARecords.split(' '):
a_record = AAAARecord(a)
dnsRecords.append(ndr_pack(a_record))
else:
__pack_aRecord(object, dnsRecords)
__pack_mxRecord(object, dnsRecords)
__pack_txtRecord(object, dnsRecords)
s4connector.lo_s4.modify(soa_dn, [('dnsRecord', old_dnsRecords, dnsRecords)])
return True
[docs]
def s4_zone_msdcs_sync(s4connector, object):
# Get the current serial number of the OpenLDAP domainname zone
domainZoneName = object['attributes']['zoneName'][0].decode('UTF-8')
soaRecord = object['attributes'].get('sOARecord', [None])[0]
if not soaRecord:
log.warning('s4_zone_msdcs_sync: OL zone %s has no SOA info', domainZoneName)
return
soa = soaRecord.split(b' ')
serial = int(soa[2])
# lookup the SOA record of the _msdcs sub-zone for the domainname zone
msdcs_soa_obj = __get_s4_msdcs_soa(s4connector, domainZoneName)
if not msdcs_soa_obj:
return
msdcs_soa_dn = msdcs_soa_obj['dn']
dnsRecords = []
old_dnsRecords = msdcs_soa_obj['attributes'].get('dnsRecord', [])
found = False
for dnsRecord in old_dnsRecords:
ndrRecord = ndr_unpack(dnsp.DnssrvRpcRecord, dnsRecord)
if ndrRecord.wType == dnsp.DNS_TYPE_SOA:
if ndrRecord.data.serial >= serial:
log.warning('s4_zone_msdcs_sync: SOA serial OpenLDAP zone %s is higher than corresponding value of %s', domainZoneName, msdcs_soa_dn)
return
ndrRecord.data.serial = serial
dnsRecords.append(ndr_pack(ndrRecord))
found = True
else:
dnsRecords.append(dnsRecord)
if not found:
log.warning('s4_zone_msdcs_sync: object %s has no SOA info', msdcs_soa_dn)
return
s4connector.lo_s4.modify(msdcs_soa_dn, [('dnsRecord', old_dnsRecords, dnsRecords)])
return True
''' Create/modify a DNS zone and possibly _msdcs in Samba 4 '''
[docs]
def s4_zone_create_wrapper(s4connector, object):
"""
Handle s4_zone_create to additionally sync to _msdcs.$domainname
Required to keep the SOA serial numbers in sync
"""
result = s4_zone_create(s4connector, object)
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
if (
zoneName == s4connector.configRegistry.get('domainname')
and s4connector.configRegistry.get('connector/s4/mapping/dns/position') != 'legacy'
and object['modtype'] == 'modify'
):
# Additionally sync serialNumber to _msdcs zone
result = result and s4_zone_msdcs_sync(s4connector, object)
return result
''' Delete a forward zone in Samaba 4 '''
[docs]
def s4_zone_delete(s4connector, object):
soa_dn = object['dn']
zone_dn = s4connector.lo.parentDn(soa_dn)
try:
s4connector.lo_s4.lo.delete_s(soa_dn)
except ldap.NO_SUCH_OBJECT:
pass # the object was already removed
try:
s4connector.lo_s4.lo.delete_s(zone_dn)
except ldap.NO_SUCH_OBJECT:
pass # the object was already removed
return True
[docs]
def s4_dns_node_base_create(s4connector, object, dnsRecords):
relativeDomainNames = object['attributes'].get('relativeDomainName')
old_dnsRecords = []
# Create dnsNode object
dnsNodeDn = object['dn']
try:
old_dnsRecords = s4connector.lo_s4.get(dnsNodeDn, attr=['dnsRecord'], required=True).get('dnsRecord')
except ldap.NO_SUCH_OBJECT:
__create_s4_dns_node(s4connector, dnsNodeDn, relativeDomainNames, dnsRecords)
else:
s4connector.lo_s4.modify(dnsNodeDn, [('dnsRecord', old_dnsRecords, dnsRecords)])
return dnsNodeDn
[docs]
def s4_dns_node_base_delete(s4connector, object):
dnsNodeDn = object['dn']
try:
s4connector.lo_s4.lo.delete_s(dnsNodeDn)
except ldap.NO_SUCH_OBJECT:
pass # the object was already removed
return True
''' Create a host record in Samaba 4 '''
[docs]
def s4_host_record_create(s4connector, object):
dnsRecords = []
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
relativeDomainName = object['attributes']['relativeDomainName'][0].decode('UTF-8')
aRecords = s4connector.configRegistry.get(f'connector/s4/mapping/dns/host_record/{relativeDomainName.lower()}.{zoneName.lower()}/static/ipv4')
aAAARecords = s4connector.configRegistry.get(f'connector/s4/mapping/dns/host_record/{relativeDomainName.lower()}.{zoneName.lower()}/static/ipv6')
if aRecords or aAAARecords:
# IPv4
if aRecords:
for a in aRecords.split(' '):
a_record = ARecord(a)
dnsRecords.append(ndr_pack(a_record))
# IPv6
if aAAARecords:
for a in aAAARecords.split(' '):
a_record = AAAARecord(a)
dnsRecords.append(ndr_pack(a_record))
else:
__pack_aRecord(object, dnsRecords)
__pack_mxRecord(object, dnsRecords)
__pack_txtRecord(object, dnsRecords)
s4_dns_node_base_create(s4connector, object, dnsRecords)
return True
[docs]
def ucs_host_record_create(s4connector, object):
log.debug('ucs_host_record_create: object: %s', object)
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
relativeDomainName = object['attributes']['relativeDomainName'][0].decode('UTF-8')
aRecords = s4connector.configRegistry.get(f'connector/s4/mapping/dns/host_record/{relativeDomainName.lower()}.{zoneName.lower()}/static/ipv4')
if aRecords:
log.debug('ucs_host_record_create: do not write host record back from S4 to UCS because location of A record has been overwritten by UCR')
return
aAAARecords = s4connector.configRegistry.get(f'connector/s4/mapping/dns/host_record/{relativeDomainName.lower()}.{zoneName.lower()}/static/ipv6')
if aAAARecords:
log.debug('ucs_host_record_create: do not write host record back from S4 to UCS because location of AAAA record has been overwritten by UCR')
return
# unpack the host record
a = __unpack_aRecord(object)
# Does a host record for this zone already exist?
ol_filter = format_escaped('(&(relativeDomainName={0!e})(zoneName={1!e}))', relativeDomainName, zoneName)
searchResult = s4connector.lo.search(filter=ol_filter, unique=True)
if len(searchResult) > 0:
newRecord = univention.admin.handlers.dns.host_record.object(None, s4connector.lo, position=None, dn=searchResult[0][0], update_zone=False)
newRecord.open()
if set(newRecord['a']) != set(a):
newRecord['a'] = a
newRecord.modify()
else:
log.debug('ucs_host_record_create: do not modify host record')
else:
zoneDN = __get_zone_dn(s4connector, zoneName)
log.debug('ucs_host_record_create: zoneDN: %s', zoneDN)
position = univention.admin.uldap.position(zoneDN)
newRecord = univention.admin.handlers.dns.host_record.object(None, s4connector.lo, position, dn=None, update_zone=False)
newRecord.open()
newRecord['name'] = relativeDomainName
newRecord['a'] = a
newRecord.create()
[docs]
def ucs_host_record_delete(s4connector, object):
log.debug('ucs_host_record_delete: object: %s', object)
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
relativeDomainName = object['attributes']['relativeDomainName'][0].decode('UTF-8')
ol_filter = format_escaped('(&(relativeDomainName={0!e})(zoneName={1!e}))', relativeDomainName, zoneName)
searchResult = s4connector.lo.search(filter=ol_filter, unique=True)
if len(searchResult) > 0:
newRecord = univention.admin.handlers.dns.host_record.object(None, s4connector.lo, position=None, dn=searchResult[0][0], update_zone=False)
newRecord.open()
newRecord.delete()
else:
log.debug('ucs_host_record_delete: Object was not found, filter was: %s', ol_filter)
return True
[docs]
def s4_ptr_record_create(s4connector, object):
dnsRecords = []
__pack_ptrRecord(object, dnsRecords)
s4_dns_node_base_create(s4connector, object, dnsRecords)
return True
[docs]
def ucs_ptr_record_create(s4connector, object):
log.debug('ucs_ptr_record_create: object: %s', object)
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
relativeDomainName = object['attributes']['relativeDomainName'][0].decode('UTF-8')
# unpack the host record
ptr = __unpack_ptrRecord(object)
# Does a host record for this zone already exist?
ol_filter = format_escaped('(&(relativeDomainName={0!e})(zoneName={1!e}))', relativeDomainName, zoneName)
searchResult = s4connector.lo.search(filter=ol_filter, unique=True)
if len(searchResult) > 0:
newRecord = univention.admin.handlers.dns.ptr_record.object(None, s4connector.lo, position=None, dn=searchResult[0][0], update_zone=False)
newRecord.open()
if set(newRecord['ptr_record']) != set(ptr):
newRecord['ptr_record'] = ptr[0]
newRecord.modify()
else:
log.debug('ucs_ptr_record_create: do not modify ptr record')
else:
zoneDN = __get_zone_dn(s4connector, zoneName)
position = univention.admin.uldap.position(zoneDN)
newRecord = univention.admin.handlers.dns.ptr_record.object(None, s4connector.lo, position, dn=None, update_zone=False)
newRecord.open()
newRecord['address'] = relativeDomainName
newRecord['ptr_record'] = ptr[0]
newRecord.create()
[docs]
def ucs_ptr_record_delete(s4connector, object):
log.debug('ucs_ptr_record_delete: object: %s', object)
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
relativeDomainName = object['attributes']['relativeDomainName'][0].decode('UTF-8')
ol_filter = format_escaped('(&(relativeDomainName={0!e})(zoneName={1!e}))', relativeDomainName, zoneName)
searchResult = s4connector.lo.search(filter=ol_filter, unique=True)
if len(searchResult) > 0:
newRecord = univention.admin.handlers.dns.ptr_record.object(None, s4connector.lo, position=None, dn=searchResult[0][0], update_zone=False)
newRecord.open()
newRecord.delete()
else:
log.debug('ucs_ptr_record_delete: Object was not found, filter was: %s', ol_filter)
return True
[docs]
def ucs_cname_create(s4connector, object):
log.debug('ucs_cname_create: object: %s', object)
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
relativeDomainName = object['attributes']['relativeDomainName'][0].decode('UTF-8')
# unpack the host record
c = __unpack_cName(object)
# Does a host record for this zone already exist?
ol_filter = format_escaped('(&(relativeDomainName={0!e})(zoneName={1!e}))', relativeDomainName, zoneName)
searchResult = s4connector.lo.search(filter=ol_filter, unique=True)
if len(searchResult) > 0:
newRecord = univention.admin.handlers.dns.alias.object(None, s4connector.lo, position=None, dn=searchResult[0][0], update_zone=False)
newRecord.open()
if set(newRecord['cname']) != set(c):
newRecord['cname'] = c[0]
newRecord.modify()
else:
log.debug('ucs_cname_create: do not modify cname record')
else:
zoneDN = __get_zone_dn(s4connector, zoneName)
position = univention.admin.uldap.position(zoneDN)
newRecord = univention.admin.handlers.dns.alias.object(None, s4connector.lo, position, dn=None, update_zone=False)
newRecord.open()
newRecord['name'] = relativeDomainName
newRecord['cname'] = c[0]
newRecord.create()
[docs]
def ucs_cname_delete(s4connector, object):
log.debug('ucs_cname_delete: object: %s', object)
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
relativeDomainName = object['attributes']['relativeDomainName'][0].decode('UTF-8')
ol_filter = format_escaped('(&(relativeDomainName={0!e})(zoneName={1!e}))', relativeDomainName, zoneName)
searchResult = s4connector.lo.search(filter=ol_filter, unique=True)
if len(searchResult) > 0:
newRecord = univention.admin.handlers.dns.alias.object(None, s4connector.lo, position=None, dn=searchResult[0][0], update_zone=False)
newRecord.open()
newRecord.delete()
else:
log.debug('ucs_cname_delete: Object was not found, filter was: %s', ol_filter)
return True
[docs]
def s4_cname_create(s4connector, object):
dnsRecords = []
__pack_cName(object, dnsRecords)
s4_dns_node_base_create(s4connector, object, dnsRecords)
[docs]
def ucs_srv_record_create(s4connector, object):
log.debug('ucs_srv_record_create: object: %s', object)
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
relativeDomainName = object['attributes']['relativeDomainName'][0].decode('UTF-8')
# unpack the host record
srv = __unpack_sRVrecord(object)
# ucr set connector/s4/mapping/dns/srv_record/_ldap._tcp.test.example/location='100 0 389 foobar.test.example. 100 0 389 foobar2.test.example.'
ucr_locations = s4connector.configRegistry.get(f'connector/s4/mapping/dns/srv_record/{relativeDomainName.lower()}.{zoneName.lower()}/location')
log.debug('ucs_srv_record_create: ucr_locations for connector/s4/mapping/dns/srv_record/%s.%s/location: %s', relativeDomainName.lower(), zoneName.lower(), ucr_locations)
if ucr_locations and ucr_locations.lower() == 'ignore':
return
# Does a host record for this zone already exist?
ol_filter = format_escaped('(&(relativeDomainName={0!e})(zoneName={1!e}))', relativeDomainName, zoneName)
searchResult = s4connector.lo.search(filter=ol_filter, unique=True)
if len(searchResult) > 0:
newRecord = univention.admin.handlers.dns.srv_record.object(None, s4connector.lo, position=None, dn=searchResult[0][0], update_zone=False)
newRecord.open()
if ucr_locations:
log.debug('ucs_srv_record_create: do not write SRV record back from S4 to UCS because location of SRV record have been overwritten by UCR')
else:
log.debug('ucs_srv_record_create: location: %s', newRecord['location'])
log.debug('ucs_srv_record_create: srv : %s', srv)
srv.sort()
newRecord['location'].sort()
if srv != newRecord['location']:
newRecord['location'] = srv
newRecord.modify()
else:
log.debug('ucs_srv_record_create: do not modify srv record')
else:
zoneDN = __get_zone_dn(s4connector, zoneName)
position = univention.admin.uldap.position(zoneDN)
newRecord = univention.admin.handlers.dns.srv_record.object(None, s4connector.lo, position, dn=None, update_zone=False)
newRecord.open()
# Make syntax UDM compatible
parts = univention.admin.handlers.dns.srv_record.unmapName([relativeDomainName.encode('UTF-8')])
if len(parts) == 3 and parts[2]:
msg = f'SRV create: service="{parts[0]}" protocol="{parts[1]}" extension="{parts[2]}"'
if len(parts) == 2:
msg = f'SRV create: service="{parts[0]}" protocol="{parts[1]}"'
else:
msg = f'SRV create: unexpected format, parts: {parts}'
log.debug(msg)
newRecord['name'] = parts
newRecord['location'] = srv
newRecord.create()
[docs]
def ucs_srv_record_delete(s4connector, object):
log.debug('ucs_srv_record_delete: object: %s', object)
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
relativeDomainName = object['attributes']['relativeDomainName'][0].decode('UTF-8')
ol_filter = format_escaped('(&(relativeDomainName={0!e})(zoneName={1!e}))', relativeDomainName, zoneName)
searchResult = s4connector.lo.search(filter=ol_filter, unique=True)
if len(searchResult) > 0:
newRecord = univention.admin.handlers.dns.srv_record.object(None, s4connector.lo, position=None, dn=searchResult[0][0], update_zone=False)
newRecord.open()
newRecord.delete()
else:
log.debug('ucs_srv_record_delete: Object was not found, filter was: %s', ol_filter)
return True
[docs]
def s4_srv_record_create(s4connector, object):
dnsRecords = []
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
relativeDomainName = object['attributes']['relativeDomainName'][0].decode('UTF-8')
# ucr set connector/s4/mapping/dns/srv_record/_ldap._tcp.test.example/location='100 0 389 foobar.test.example.'
# ucr set connector/s4/mapping/dns/srv_record/_ldap._tcp.test.example/location='100 0 389 foobar.test.example. 100 0 389 foobar2.test.example.'
ucr_locations = s4connector.configRegistry.get(f'connector/s4/mapping/dns/srv_record/{relativeDomainName.lower()}.{zoneName.lower()}/location')
log.debug('s4_srv_record_create: ucr_locations for connector/s4/mapping/dns/srv_record/%s.%s/location: %s', relativeDomainName.lower(), zoneName.lower(), ucr_locations)
if ucr_locations:
if ucr_locations.lower() == 'ignore':
return
# Convert ucr variable
priority = None
weight = None
port = None
target = None
for v in ucr_locations.split(' '):
# Check explicit for None, because the int values may be 0
if priority is None:
priority = int(v)
elif weight is None:
weight = int(v)
elif port is None:
port = int(v)
elif not target:
target = __remove_dot(v.encode('UTF-8')).decode('UTF-8')
if priority is not None and weight is not None and port is not None and target:
log.debug('priority=%d weight=%d port=%d target=%s', priority, weight, port, target)
s = SRVRecord(target, port, priority, weight)
dnsRecords.append(ndr_pack(s))
priority = None
weight = None
port = None
target = None
else:
__pack_sRVrecord(object, dnsRecords)
s4_dns_node_base_create(s4connector, object, dnsRecords)
[docs]
def ucs_txt_record_create(s4connector, object):
log.debug('ucs_txt_record_create: object: %s', object)
udm_property = 'txt'
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
relativeDomainName = object['attributes']['relativeDomainName'][0].decode('UTF-8')
# unpack the record
c = __unpack_txtRecord(object)
# Does a host record for this zone already exist?
ol_filter = format_escaped('(&(relativeDomainName={0!e})(zoneName={1!e}))', relativeDomainName, zoneName)
searchResult = s4connector.lo.search(filter=ol_filter, unique=True)
if len(searchResult) > 0:
foundRecord = univention.admin.handlers.dns.txt_record.object(None, s4connector.lo, position=None, dn=searchResult[0][0], update_zone=False)
foundRecord.open()
# use normalized TXT records for comparison
normalized_txtRecord_list = []
for txtRecord in foundRecord['txt']:
normalized_txtRecord = str(TXT.from_text(rdataclass.IN, rdatatype.TXT, Tokenizer(txtRecord)))
normalized_txtRecord_list.append(normalized_txtRecord)
if set(normalized_txtRecord_list) != set(c):
foundRecord[udm_property] = c
foundRecord.modify()
else:
log.debug('ucs_txt_record_create: do not modify txt record')
else:
zoneDN = __get_zone_dn(s4connector, zoneName)
position = univention.admin.uldap.position(zoneDN)
newRecord = univention.admin.handlers.dns.txt_record.object(None, s4connector.lo, position, dn=None, update_zone=False)
newRecord.open()
newRecord['name'] = relativeDomainName
newRecord[udm_property] = c
newRecord.create()
[docs]
def ucs_txt_record_delete(s4connector, object):
log.debug('ucs_txt_record_delete: object: %s', object)
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
relativeDomainName = object['attributes']['relativeDomainName'][0].decode('UTF-8')
ol_filter = format_escaped('(&(relativeDomainName={0!e})(zoneName={1!e}))', relativeDomainName, zoneName)
searchResult = s4connector.lo.search(filter=ol_filter, unique=True)
if len(searchResult) > 0:
newRecord = univention.admin.handlers.dns.txt_record.object(None, s4connector.lo, position=None, dn=searchResult[0][0], update_zone=False)
newRecord.open()
newRecord.delete()
else:
log.debug('ucs_txt_record_delete: Object was not found, filter was: %s', ol_filter)
return True
[docs]
def s4_txt_record_create(s4connector, object):
dnsRecords = []
__pack_txtRecord(object, dnsRecords)
s4_dns_node_base_create(s4connector, object, dnsRecords)
[docs]
def ucs_ns_record_create(s4connector, object):
log.debug('ucs_ns_record_create: object: %s', object)
udm_property = 'nameserver'
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
relativeDomainName = object['attributes']['relativeDomainName'][0].decode('UTF-8')
# unpack the record
c = __unpack_nsRecord(object)
# Does a host record for this zone already exist?
ol_filter = format_escaped('(&(relativeDomainName={0!e})(zoneName={1!e}))', relativeDomainName, zoneName)
searchResult = s4connector.lo.search(filter=ol_filter, unique=True)
if len(searchResult) > 0:
foundRecord = univention.admin.handlers.dns.ns_record.object(None, s4connector.lo, position=None, dn=searchResult[0][0], update_zone=False)
foundRecord.open()
if set(foundRecord[udm_property]) != set(c):
foundRecord[udm_property] = c
foundRecord.modify()
else:
log.debug('ucs_ns_record_create: do not modify ns record')
else:
zoneDN = __get_zone_dn(s4connector, zoneName)
position = univention.admin.uldap.position(zoneDN)
newRecord = univention.admin.handlers.dns.ns_record.object(None, s4connector.lo, position, dn=None, update_zone=False)
newRecord.open()
newRecord['zone'] = relativeDomainName
newRecord[udm_property] = c
newRecord.create()
[docs]
def ucs_ns_record_delete(s4connector, object):
log.debug('ucs_ns_record_delete: object: %s', object)
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
relativeDomainName = object['attributes']['relativeDomainName'][0].decode('UTF-8')
ol_filter = format_escaped('(&(relativeDomainName={0!e})(zoneName={1!e}))', relativeDomainName, zoneName)
searchResult = s4connector.lo.search(filter=ol_filter, unique=True)
if len(searchResult) > 0:
newRecord = univention.admin.handlers.dns.ns_record.object(None, s4connector.lo, position=None, dn=searchResult[0][0], update_zone=False)
newRecord.open()
newRecord.delete()
else:
log.debug('ucs_ns_record_delete: Object was not found, filter was: %s', ol_filter)
return True
[docs]
def s4_ns_record_create(s4connector, object):
dnsRecords = []
__pack_nsRecord(object, dnsRecords)
s4_dns_node_base_create(s4connector, object, dnsRecords)
[docs]
def ucs_zone_create(s4connector, object, dns_type):
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
relativeDomainName = object['attributes']['relativeDomainName'][0].decode('UTF-8')
# create the zone when the dc=@ object has been created
if relativeDomainName != '@':
log.debug("ucs_zone_create: ignoring DC=%s object", relativeDomainName)
return
ns = __unpack_nsRecord(object)
soa = __unpack_soaRecord(object)
a = __unpack_aRecord(object)
mx = __unpack_mxRecord(object)
if (
zoneName == s4connector.configRegistry.get('domainname')
and s4connector.configRegistry.get('connector/s4/mapping/dns/position') != 'legacy'
and object['modtype'] == 'modify'
):
# Determine max of serialNumber from _msdcs zone
msdcs_soa_obj = __get_s4_msdcs_soa(s4connector, zoneName)
if msdcs_soa_obj:
msdcs_soa = __unpack_soaRecord(msdcs_soa_obj)
soa['serial'] = str(max(int(soa['serial']), int(msdcs_soa['serial'])))
mname = soa['mname']
if mname and not mname.endswith("."):
mname = f"{mname}."
ns_lower = [x.lower() for x in ns]
mname_lower = mname.lower()
if mname_lower not in ns_lower:
ns.insert(0, mname)
ns_lower.insert(0, mname_lower)
# Does a zone already exist?
modify = False
ol_filter = format_escaped('(&(relativeDomainName={0!e})(zoneName={1!e}))', relativeDomainName, zoneName)
searchResult = s4connector.lo.search(filter=ol_filter, unique=True)
if len(searchResult) > 0:
if dns_type == 'forward_zone':
zone = univention.admin.handlers.dns.forward_zone.object(None, s4connector.lo, position=None, dn=searchResult[0][0])
elif dns_type == 'reverse_zone':
zone = univention.admin.handlers.dns.reverse_zone.object(None, s4connector.lo, position=None, dn=searchResult[0][0])
zone.open()
udm_zone_nameservers_lower = [x.lower() for x in zone['nameserver']]
if set(ns_lower) != set(udm_zone_nameservers_lower):
zone['nameserver'] = ns
modify = True
if soa['rname'].replace('.', '@', 1) != zone['contact'].rstrip('.'):
zone['contact'] = soa['rname'].replace('.', '@', 1)
modify = True
if int(soa['serial']) != int(zone['serial']):
zone['serial'] = soa['serial']
modify = True
for k in ['refresh', 'retry', 'expire', 'ttl']:
if int(soa[k]) != _unixTimeInverval2seconds(zone[k]):
zone[k] = unmapUNIX_TimeInterval(soa[k])
modify = True
if dns_type == 'forward_zone':
# The IP address of the DNS forward zone will be used to determine the
# sysvol share. On a selective replicated DC only a short list of DCs
# should be returned
aRecords = s4connector.configRegistry.get(f'connector/s4/mapping/dns/forward_zone/{zoneName.lower()}/static/ipv4')
aAAARecords = s4connector.configRegistry.get(f'connector/s4/mapping/dns/forward_zone/{zoneName.lower()}/static/ipv6')
if not aRecords and not aAAARecords and set(a) != set(zone['a']):
zone['a'] = a
modify = True
if mx:
def mapMX(m):
return f'{m[0]} {m[1]}'
if set(map(mapMX, mx)) != set(map(mapMX, zone['mx'])):
zone['mx'] = mx
modify = True
if modify:
zone.modify()
else:
position = univention.admin.uldap.position(s4connector.property['dns'].ucs_default_dn)
if dns_type == 'forward_zone':
zone = univention.admin.handlers.dns.forward_zone.object(None, s4connector.lo, position, dn=None)
name_key = 'zone'
elif dns_type == 'reverse_zone':
zone = univention.admin.handlers.dns.reverse_zone.object(None, s4connector.lo, position, dn=None)
name_key = 'subnet'
zoneName = univention.admin.handlers.dns.reverse_zone.unmapSubnet(zoneName.encode('ASCII'))
zone.open()
zone[name_key] = zoneName
zone['nameserver'] = ns
zone['contact'] = soa['rname'].replace('.', '@', 1)
zone['serial'] = soa['serial']
zone['refresh'] = [soa['refresh']] # complex UDM syntax
zone['retry'] = [soa['retry']] # complex UDM syntax
zone['expire'] = [soa['expire']] # complex UDM syntax
zone['ttl'] = [soa['ttl']] # complex UDM syntax
if dns_type == 'forward_zone':
zone['a'] = a
zone['mx'] = mx
zone.create()
[docs]
def ucs_zone_delete(s4connector, object, dns_type):
zoneName = object['attributes']['zoneName'][0].decode('UTF-8')
relativeDomainName = object['attributes']['relativeDomainName'][0].decode('UTF-8')
if relativeDomainName != '@':
log.debug("ucs_zone_delete: ignoring DC=%s object", relativeDomainName)
return
ol_filter = format_escaped('(&(relativeDomainName={0!e})(zoneName={1!e}))', relativeDomainName, zoneName)
searchResult = s4connector.lo.search(filter=ol_filter, unique=True)
if len(searchResult) > 0:
if dns_type == 'forward_zone':
zone = univention.admin.handlers.dns.forward_zone.object(None, s4connector.lo, position=None, dn=searchResult[0][0], update_zone=False)
elif dns_type == 'reverse_zone':
zone = univention.admin.handlers.dns.reverse_zone.object(None, s4connector.lo, position=None, dn=searchResult[0][0], update_zone=False)
zone.open()
zone.delete()
def _identify_dns_ucs_object(s4connector, object):
# At this point dn_mapping_function already has converted object['dn'] from ucs to con
# But since there is no attribute mapping defined for DNS, the object attributes still
# are the ones from UCS. Passing the Samba4 object['dn'] is irrelevant here:
if object.get('attributes'):
if univention.admin.handlers.dns.forward_zone.identify(object['dn'], object['attributes']):
return 'forward_zone'
if univention.admin.handlers.dns.reverse_zone.identify(object['dn'], object['attributes']):
return 'reverse_zone'
if univention.admin.handlers.dns.alias.identify(object['dn'], object['attributes']):
return 'alias'
if univention.admin.handlers.dns.host_record.identify(object['dn'], object['attributes']):
return 'host_record'
if univention.admin.handlers.dns.srv_record.identify(object['dn'], object['attributes']):
return 'srv_record'
if univention.admin.handlers.dns.ptr_record.identify(object['dn'], object['attributes']):
return 'ptr_record'
if univention.admin.handlers.dns.txt_record.identify(object['dn'], object['attributes']):
return 'txt_record'
if univention.admin.handlers.dns.ns_record.identify(object['dn'], object['attributes']):
return 'ns_record'
return None
def _identify_dns_con_object(s4connector, object):
# At this point dn_mapping_function already has converted object['dn'] from con to ucs
# But since there is no attribute mapping defined for DNS, the object attributes still
# are the ones from Samba.
if object.get('attributes'):
oc = object['attributes'].get('objectClass')
dc = object['attributes'].get('dc') or object['attributes'].get('DC')
if oc and b'dnsZone' in oc:
# forward or reverse zone
if dc and (dc[0].lower().endswith(b'.in-addr.arpa') or dc[0].lower().endswith(b'.ip6.arpa')):
return 'reverse_zone'
else:
return 'forward_zone'
if oc and b'dnsNode' in oc:
if dc and dc[0] == b'@':
zone_type = 'forward_zone'
exploded_dn = str2dn(object['dn'])
for multi_rdn in exploded_dn:
(attribute, value, _flags) = multi_rdn[0]
if attribute.lower() == 'zonename' and (value.lower().endswith('.in-addr.arpa') or value.lower().endswith('.ip6.arpa')):
zone_type = 'reverse_zone'
break
return zone_type
else:
dnsRecords = object['attributes'].get('dnsRecord')
if not dnsRecords:
return None
dns_types = set()
for dnsRecord in dnsRecords:
dnsRecord_DnssrvRpcRecord = ndr_unpack(dnsp.DnssrvRpcRecord, dnsRecord)
dns_types.add(dnsRecord_DnssrvRpcRecord.wType)
if dnsp.DNS_TYPE_PTR in dns_types:
return 'ptr_record'
elif dnsp.DNS_TYPE_CNAME in dns_types:
return 'alias'
elif dnsp.DNS_TYPE_SRV in dns_types:
return 'srv_record'
elif {dnsp.DNS_TYPE_A, dnsp.DNS_TYPE_AAAA} & dns_types:
return 'host_record'
elif dnsp.DNS_TYPE_TXT in dns_types:
return 'txt_record'
elif dnsp.DNS_TYPE_NS in dns_types:
return 'ns_record'
return None
[docs]
def ucs2con(s4connector, key, object):
# At this point dn_mapping_function already has converted object['dn'] from ucs to con
# But since there is no attribute mapping defined for DNS, the object attributes still
# are the ones from UCS.
dns_type = _identify_dns_ucs_object(s4connector, object)
if not dns_type:
# unknown object -> ignore
log.debug('dns ucs2con: Ignore unknown dns object: %s', object['dn'])
return True
log.debug('dns ucs2con: Object (%s) is of type %s', object['dn'], dns_type)
# We can only get the mapped zone_name from the DN here (see comment above):
# (In the case of _msdcs the zoneName would be wrong here otherwise)
(zoneName, relativeDomainName) = __split_s4_dnsNode_dn(object['dn'])
object['attributes']['zoneName'] = [zoneName.encode('UTF-8')]
object['attributes']['relativeDomainName'] = [relativeDomainName.encode('UTF-8')]
if dns_type in ('forward_zone', 'reverse_zone'):
if object['modtype'] in ['add', 'modify']:
s4_zone_create_wrapper(s4connector, object)
elif object['modtype'] in ['delete']:
s4_zone_delete(s4connector, object)
# ignore move
elif dns_type == 'host_record':
if object['modtype'] in ['add', 'modify']:
s4_host_record_create(s4connector, object)
elif object['modtype'] in ['delete']:
s4_dns_node_base_delete(s4connector, object)
# ignore move
elif dns_type == 'alias':
if object['modtype'] in ['add', 'modify']:
s4_cname_create(s4connector, object)
elif object['modtype'] in ['delete']:
s4_dns_node_base_delete(s4connector, object)
# ignore move
elif dns_type == 'srv_record':
if object['modtype'] in ['add', 'modify']:
s4_srv_record_create(s4connector, object)
elif object['modtype'] in ['delete']:
s4_dns_node_base_delete(s4connector, object)
# ignore move
elif dns_type == 'ptr_record':
if object['modtype'] in ['add', 'modify']:
s4_ptr_record_create(s4connector, object)
elif object['modtype'] in ['delete']:
s4_dns_node_base_delete(s4connector, object)
# ignore move
elif dns_type == 'txt_record':
if object['modtype'] in ['add', 'modify']:
s4_txt_record_create(s4connector, object)
elif object['modtype'] in ['delete']:
s4_dns_node_base_delete(s4connector, object)
# ignore move
elif dns_type == 'ns_record':
if object['modtype'] in ['add', 'modify']:
s4_ns_record_create(s4connector, object)
elif object['modtype'] in ['delete']:
s4_dns_node_base_delete(s4connector, object)
# ignore move
return True
[docs]
def con2ucs(s4connector, key, object):
log.debug('dns con2ucs: Object (%s): %s', object['dn'], object)
# At this point dn_mapping_function already has converted object['dn'] from con to ucs
# But since there is no attribute mapping defined for DNS, the object attributes still
# are the ones from Samba.
dns_type = _identify_dns_con_object(s4connector, object)
if not dns_type:
# unknown object -> ignore
log.debug('dns con2ucs: Ignore unknown dns object: %s', object['dn'])
return True
log.debug('dns con2ucs: Object (%s) is of type %s', object['dn'], dns_type)
# We can only get the mapped zone_name from the DN here (see comment above):
(zoneName, relativeDomainName) = __split_ol_dNSZone_dn(object['dn'], object['attributes']['objectClass'])
# Inject the zoneName and relativeDomainName to simplify things below
object['attributes']['zoneName'] = [zoneName.encode('UTF-8')]
object['attributes']['relativeDomainName'] = [relativeDomainName.encode('UTF-8')]
if dns_type == 'host_record':
if object['modtype'] in ['add', 'modify']:
ucs_host_record_create(s4connector, object)
elif object['modtype'] in ['delete']:
ucs_host_record_delete(s4connector, object)
# ignore move
elif dns_type == 'ptr_record':
if object['modtype'] in ['add', 'modify']:
ucs_ptr_record_create(s4connector, object)
elif object['modtype'] in ['delete']:
ucs_ptr_record_delete(s4connector, object)
# ignore move
elif dns_type == 'alias':
if object['modtype'] in ['add', 'modify']:
ucs_cname_create(s4connector, object)
elif object['modtype'] in ['delete']:
ucs_cname_delete(s4connector, object)
# ignore move
elif dns_type == 'srv_record':
if object['modtype'] in ['add', 'modify']:
ucs_srv_record_create(s4connector, object)
elif object['modtype'] in ['delete']:
ucs_srv_record_delete(s4connector, object)
# ignore move
elif dns_type == 'txt_record':
if object['modtype'] in ['add', 'modify']:
ucs_txt_record_create(s4connector, object)
elif object['modtype'] in ['delete']:
ucs_txt_record_delete(s4connector, object)
# ignore move
elif dns_type == 'ns_record':
if object['modtype'] in ['add', 'modify']:
ucs_ns_record_create(s4connector, object)
elif object['modtype'] in ['delete']:
ucs_ns_record_delete(s4connector, object)
# ignore move
if dns_type in ['forward_zone', 'reverse_zone']:
if object['modtype'] in ['add', 'modify']:
ucs_zone_create(s4connector, object, dns_type)
elif object['modtype'] in ['delete']:
ucs_zone_delete(s4connector, object, dns_type)
# ignore move
return True