Source code for univention.s4connector.s4.dc

#!/usr/bin/python3
#
# Univention S4 Connector
#  dc sync
#
# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

from logging import getLogger

import ldap

import univention.admin.handlers.settings.sambadomain
from univention.logging import Structured
from univention.s4connector.s4 import decode_sid, format_escaped


log = Structured(getLogger("LDAP").getChild(__name__))


def _unixTimeInverval2seconds(unixTime):
    if not isinstance(unixTime, list):
        return unixTime

    if len(unixTime) != 2:
        log.warning('dc _unixTimeInverval2seconds: Not a valid time format: %s', unixTime)
        return 0

    if int(unixTime[0]) < 0:
        return 0

    if unixTime[1] == 'seconds':
        return int(unixTime[0])
    elif unixTime[1] == 'minutes':
        return int(unixTime[0]) * 60
    elif unixTime[1] == 'hours':
        return int(unixTime[0]) * 3600  # 60 * 60
    elif unixTime[1] == 'days':
        return int(unixTime[0]) * 86400  # 60 * 60 * 24
    else:
        log.warning('dc _unixTimeInverval2seconds: Not a valid time unit: %s', unixTime)
        return 0


# Time interval in S4 / AD is often 100-nanosecond intervals:
# http://msdn.microsoft.com/en-us/library/windows/desktop/ms676863%28v=vs.85%29.aspx


def _s2nano(seconds):
    return seconds * 10000000


def _nano2s(nanoseconds):
    return int(nanoseconds / 10000000)


[docs] def ucs2con(s4connector, key, object): log.debug('dc ucs2con: Object (%s): %s', object['dn'], object) s4base_dn, s4base_attr = s4connector.lo_s4.lo.search_s(s4connector.s4_ldap_base, ldap.SCOPE_BASE, '(objectClass=*)')[0] log.debug("dc ucs2con: S4 object: %r", s4base_dn) log.debug("dc ucs2con: S4 object: %r", s4base_attr) if b'univentionBase' in object['attributes'].get('objectClass'): # DC object → sync GPO if s4connector.configRegistry.is_true('connector/s4/mapping/gpo', True): ucs_val = object['attributes'].get('msGPOLink', [None])[0] # msGPOLink is a single value s4_val = s4base_attr.get('msGPOLink') if ucs_val != s4_val: s4connector.lo_s4.lo.modify_s(s4connector.s4_ldap_base, [(ldap.MOD_REPLACE, 'gPLink', ucs_val)]) elif b'sambaDomain' in object['attributes'].get('objectClass'): # Samba Domain object ml = [] sync_times = [('sambaMaxPwdAge', 'maxPwdAge'), ('sambaMinPwdAge', 'minPwdAge'), ('sambaLockoutDuration', 'lockoutDuration')] for ucs_attr, s4_attr in sync_times: ucs_time = int(object['attributes'].get(ucs_attr, [0])[0]) s4_time = _nano2s(int(s4base_attr.get(s4_attr, [0])[0]) * -1) log.debug('dc ucs2con: ucs_time (%s): %s', ucs_attr, ucs_time) log.debug('dc ucs2con: s4-time (%s): %s', s4_attr, s4_time) if ucs_time != s4_time: s4_time = str(_s2nano(ucs_time) * -1) if ucs_time > 0 else '0' ml.append((ldap.MOD_REPLACE, s4_attr, [s4_time.encode('ASCII')])) sync_integers = [('sambaPwdHistoryLength', 'pwdHistoryLength'), ('sambaMinPwdLength', 'minPwdLength'), ('univentionSamba4pwdProperties', 'pwdProperties')] for ucs_attr, s4_attr in sync_integers: ucs_val = object['attributes'].get(ucs_attr, b'0') s4_val = s4base_attr.get(s4_attr, [b'0'])[0] if ucs_val != s4_val: ml.append((ldap.MOD_REPLACE, s4_attr, ucs_val)) if ml: log.debug('dc ucs2con: S4 object modlist: %s', ml) s4connector.lo_s4.lo.modify_s(s4connector.s4_ldap_base, ml) return True
[docs] def con2ucs(s4connector, key, object): log.debug('dc con2ucs: Object (%s): %s', object['dn'], object) # Search sambaDomainname object via sambaSID object_sid = decode_sid(object['attributes']['objectSid'][0]) sambadomainnameObject = univention.admin.handlers.settings.sambadomain.lookup(None, s4connector.lo, format_escaped('sambaSID={0!e}', object_sid)) if len(sambadomainnameObject) > 1: log.warning("dc con2ucs: Found more than one sambaDomainname object with sambaSID %r", object_sid) elif len(sambadomainnameObject) == 1: # Use the first sambaDomain sambadomainnameObject = sambadomainnameObject[0] # Do we modify this UCS object modify = False sync_times = [('maxPasswordAge', 'maxPwdAge'), ('minPasswordAge', 'minPwdAge'), ('lockoutDuration', 'lockoutDuration')] for ucs_attr, s4_attr in sync_times: ucs_time = _unixTimeInverval2seconds(sambadomainnameObject.get(ucs_attr, 0)) s4_time = _nano2s(int(object['attributes'].get(s4_attr, [0])[0]) * -1) if ucs_time != s4_time: sambadomainnameObject[ucs_attr] = [str(s4_time), 'seconds'] modify = True sync_integers = [('passwordHistory', 'pwdHistoryLength'), ('passwordLength', 'minPwdLength'), ('domainPwdProperties', 'pwdProperties')] for ucs_attr, s4_attr in sync_integers: ucs_val = sambadomainnameObject.get(ucs_attr, 0) s4_val = object['attributes'].get(s4_attr, [None])[0] if ucs_val != s4_val: sambadomainnameObject[ucs_attr] = s4_val.decode('UTF-8') modify = True if modify: sambadomainnameObject.modify() if s4connector.configRegistry.is_true('connector/s4/mapping/gpo', True): # Search DC object via ldap search dn, attr = s4connector.lo.search('objectClass=*', scope='base')[0] ml = [] ucs_val = attr.get('msGPOLink') s4_val = object['attributes'].get('gPLink') if ucs_val != s4_val: if b'msGPO' not in attr.get('objectClass', []): ml.append(('objectClass', b'', b'msGPO')) ml.append(('msGPOLink', ucs_val, s4_val)) if ml: s4connector.lo.modify(dn, ml) return True
[docs] def identify(dn, attr, canonical=0): return bool({b'univentionBase', b'sambaDomain'} & set(attr.get('objectClass', [])))