Source code for univention.s4connector.s4.ntsecurity_descriptor

#!/usr/bin/python3
#
# Univention S4 Connector
#  nTSecurityDescriptor sync
#
# SPDX-FileCopyrightText: 2014-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only


from logging import getLogger

import ldap
from ldap.controls.readentry import PostReadControl
from samba.dcerpc import security
from samba.ndr import ndr_pack, ndr_unpack

from univention.logging import Structured


log = Structured(getLogger("LDAP").getChild(__name__))


[docs] def encode_sddl_to_sd_in_ndr(domain_sid, ntsd_sddl): ntsd = security.descriptor.from_sddl(ntsd_sddl, domain_sid) ntsd_ndr = ndr_pack(ntsd) return ntsd_ndr
[docs] def decode_sd_in_ndr_to_sddl(domain_sid, value): ntsd = ndr_unpack(security.descriptor, value) return ntsd.as_sddl(domain_sid)
# Post-create/modify functions
[docs] def ntsd_to_s4(s4connector, key, object): log.debug("ntsd_to_s4 object: %s", object) # object dn was already mapped to the s4 DN: s4_dn = object['dn'] modlist = [] # search the ucs object via if 'msNTSecurityDescriptor' not in object['attributes']: log.debug('ntsd_to_s4: UCS object does not have a msNTSecurityDescriptor') return ucs_ntsd_sddl = object['attributes']['msNTSecurityDescriptor'][0].decode('ASCII') s4_attributes = s4connector.lo_s4.get(s4_dn, attr=['nTSecurityDescriptor']) ntsd_ndr = s4_attributes.get('nTSecurityDescriptor') if ntsd_ndr: domain_sid = security.dom_sid(s4connector.s4_sid) s4_ntsd_sddl = decode_sd_in_ndr_to_sddl(domain_sid, ntsd_ndr[0]) if s4_ntsd_sddl == ucs_ntsd_sddl: log.debug('ntsd_to_s4: nTSecurityDescriptors are equal') return log.debug('ntsd_to_s4: changing nTSecurityDescriptor from %s to %s', s4_ntsd_sddl, ucs_ntsd_sddl) ucs_ntsd_ndr = encode_sddl_to_sd_in_ndr(domain_sid, ucs_ntsd_sddl) modlist.append((ldap.MOD_REPLACE, 'nTSecurityDescriptor', ucs_ntsd_ndr)) s4connector.lo_s4.lo.modify_ext_s(s4_dn, modlist)
[docs] def ntsd_to_ucs(s4connector, key, s4_object): log.debug("ntsd_to_ucs S4 object: %s", s4_object) log.debug("ntsd_to_ucs S4 key: %s", key) # modlist ml = [] # search Samba DS expicitly for hidden attribute # object dn is already mapped to the UCS DN: s4_dn = s4_object.get('dn') if not s4_dn: return # ignore try: s4_attributes = s4connector.lo_s4.get(s4_dn, attr=['nTSecurityDescriptor'], required=True) except ldap.NO_SUCH_OBJECT: log.warning('ntsd_to_ucs: S4 object (%s) not found', s4_dn) return ntsd_ndr = s4_attributes.get('nTSecurityDescriptor') if not ntsd_ndr: log.debug('ntsd_to_ucs: nTSecurityDescriptor not found in attributes!') return # search in UCS/OpenLDAP DS to determine modify/add ucs_dn = s4_dn try: ucs_attributes = s4connector.lo.get(ucs_dn, attr=['msNTSecurityDescriptor']) except ldap.NO_SUCH_OBJECT: log.warning('sid_to_ucs: UCS object (%s) not found', ucs_dn) return domain_sid = security.dom_sid(s4connector.s4_sid) s4_ntsd_sddl = decode_sd_in_ndr_to_sddl(domain_sid, ntsd_ndr[0]).encode('ASCII') ucs_ntsd_sddl = ucs_attributes.get('msNTSecurityDescriptor', [None])[0] if not ucs_ntsd_sddl or ucs_ntsd_sddl != s4_ntsd_sddl: ml.append(('msNTSecurityDescriptor', ucs_ntsd_sddl, s4_ntsd_sddl)) if ml: log.debug('ntsd_to_ucs: modlist = %s', ml) serverctrls = [PostReadControl(True, ['entryUUID', 'entryCSN'])] response = {} s4connector.lo.lo.modify(ucs_dn, ml, serverctrls=serverctrls, response=response) for c in response.get('ctrls', []): # If the modify actually did something if c.controlType == PostReadControl.controlType: entryUUID = c.entry['entryUUID'][0].decode('ASCII') entryCSN = c.entry['entryCSN'][0].decode('ASCII') s4connector._remember_entryCSN_commited_by_connector(entryUUID, entryCSN)