#!/usr/bin/python3
#
# Univention S4 Connector
# control the password sync communication with the s4 password service
#
# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
import binascii
import time
from datetime import datetime
from logging import getLogger
import heimdal
import ldap
from ldap.controls import LDAPControl
from samba.dcerpc import drsblobs
from samba.ndr import ndr_pack, ndr_print, ndr_unpack
import univention.s4connector.s4
from univention.admin import uexceptions
from univention.logging import Structured
from univention.s4connector.s4 import format_escaped
log = Structured(getLogger("LDAP").getChild(__name__))
[docs]
class Krb5Context:
def __init__(self):
self.ctx = heimdal.context()
self.etypes = self.ctx.get_permitted_enctypes()
self.etype_ids = [et.toint() for et in self.etypes]
krb5_context = Krb5Context()
[docs]
def calculate_krb5key(unicodePwd, supplementalCredentials, kvno=0):
up_blob = unicodePwd
sc_blob = supplementalCredentials
keys = []
keytypes = []
context = heimdal.context()
if up_blob:
# log.debug("calculate_krb5key: up_blob: %s" % binascii.b2a_base64(up_blob))
assert len(up_blob) == 16
key = heimdal.keyblock_raw(context, 23, up_blob)
keys.append(heimdal.asn1_encode_key(key, None, kvno))
if sc_blob:
# log.debug("calculate_krb5key: sc_blob: %s" % binascii.b2a_base64(sc_blob))
try:
sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, sc_blob)
for p in sc.sub.packages:
krb = None
log.debug("calculate_krb5key: parsing %s blob", p.name)
if p.name == "Primary:Kerberos":
krb_blob = binascii.unhexlify(p.data)
krb = ndr_unpack(drsblobs.package_PrimaryKerberosBlob, krb_blob)
assert krb.version == 3
for k in krb.ctr.keys:
if k.keytype not in keytypes:
log.debug("calculate_krb5key: ctr3.key.keytype: %s", k.keytype)
try:
key = heimdal.keyblock_raw(context, k.keytype, k.value)
krb5SaltObject = heimdal.salt_raw(context, krb.ctr.salt.string)
keys.append(heimdal.asn1_encode_key(key, krb5SaltObject, kvno))
keytypes.append(k.keytype)
except Exception: # FIXME: which exception?
if k.keytype == 4294967156: # in all known cases W2k8 AD uses keytype 4294967156 (=-140L) for this
if k.value == up_blob: # the known case
log.debug("calculate_krb5key: ignoring arc4 NThash with special keytype %s in %s", k.keytype, p.name)
else: # unknown special case
log.debug("calculate_krb5key: ignoring unknown key with special keytype %s in %s", k.keytype, p.name)
else:
log.exception("calculate_krb5key: krb5Key with keytype %s could not be parsed in %s. Ignoring this keytype.", k.keytype, p.name)
elif p.name == "Primary:Kerberos-Newer-Keys":
krb_blob = binascii.unhexlify(p.data)
krb = ndr_unpack(drsblobs.package_PrimaryKerberosBlob, krb_blob)
assert krb.version == 4
for k in krb.ctr.keys:
if k.keytype not in keytypes:
log.debug("calculate_krb5key: ctr4.key.keytype: %s", k.keytype)
try:
key = heimdal.keyblock_raw(context, k.keytype, k.value)
krb5SaltObject = heimdal.salt_raw(context, krb.ctr.salt.string)
keys.append(heimdal.asn1_encode_key(key, krb5SaltObject, kvno))
keytypes.append(k.keytype)
except Exception: # FIXME: which exception?
if k.keytype == 4294967156: # in all known cases W2k8 AD uses keytype 4294967156 (=-140L) for this
if k.value == up_blob: # the known case
log.debug("calculate_krb5key: ignoring arc4 NThash with special keytype %s in %s", k.keytype, p.name)
else: # unknown special case
log.debug("calculate_krb5key: ignoring unknown key with special keytype %s in %s", k.keytype, p.name)
else:
log.exception("calculate_krb5key: krb5Key with keytype %s could not be parsed in %s. Ignoring this keytype.", k.keytype, p.name)
except Exception as exc:
if isinstance(exc, RuntimeError) and len(exc.args) == 2 and exc.args[1] == 'Buffer Size Error' or exc.args[0] == 11:
log.warning("calculate_krb5key: '%s' while unpacking supplementalCredentials:: %s", exc, binascii.b2a_base64(sc_blob))
log.warning("calculate_krb5key: the krb5Keys from the PrimaryKerberosBlob could not be parsed. Continuing anyway.")
else:
log.exception("calculate_krb5key: the krb5Keys from the PrimaryKerberosBlob could not be parsed. Continuing anyway.")
return keys
[docs]
def calculate_supplementalCredentials(ucs_krb5key, old_supplementalCredentials, nt_hash):
old_krb = {}
if old_supplementalCredentials:
sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, old_supplementalCredentials)
for p in sc.sub.packages:
log.debug("calculate_supplementalCredentials: parsing %s blob", p.name)
if p.name == "Primary:Kerberos":
krb_blob = binascii.unhexlify(p.data)
try:
krb = ndr_unpack(drsblobs.package_PrimaryKerberosBlob, krb_blob)
assert krb.version == 3
old_krb['ctr3'] = krb.ctr
for k in krb.ctr.keys:
log.debug("calculate_supplementalCredentials: ctr3.key.keytype: %s", k.keytype)
except Exception: # FIXME: which exception?
log.exception("calculate_supplementalCredentials: ndr_unpack of S4 Primary:Kerberos blob failed.")
log.error("calculate_supplementalCredentials: Continuing anyway, Primary:Kerberos (DES keys) blob will be missing in supplementalCredentials ctr3.old_keys.")
elif p.name == "Primary:Kerberos-Newer-Keys":
krb_blob = binascii.unhexlify(p.data)
try:
krb = ndr_unpack(drsblobs.package_PrimaryKerberosBlob, krb_blob)
assert krb.version == 4
old_krb['ctr4'] = krb.ctr
for k in krb.ctr.keys:
log.debug("calculate_supplementalCredentials: ctr4.key.keytype: %s", k.keytype)
except Exception: # FIXME: which exception?
log.exception("calculate_supplementalCredentials: ndr_unpack of S4 Primary:Kerberos-Newer-Keys blob failed.")
log.error("calculate_supplementalCredentials: Continuing anyway, Primary:Kerberos-Newer-Keys (AES and DES keys) blob will be missing in supplementalCredentials ctr4.old_keys.")
krb5_aes256 = ''
krb5_aes128 = ''
krb5_des_md5 = ''
krb5_des_crc = ''
krb_ctr3_salt = ''
krb_ctr4_salt = ''
for k in ucs_krb5key:
(keyblock, salt, _kvno) = heimdal.asn1_decode_key(k)
key_data = keyblock.keyvalue()
saltstring = salt.saltvalue()
enctype = keyblock.keytype()
enctype_id = enctype.toint()
if enctype_id not in krb5_context.etype_ids:
log.warning("calculate_supplementalCredentials: ignoring unsupported krb5_keytype: (%d)", enctype_id)
continue
log.trace("calculate_supplementalCredentials: krb5_keytype: %s (%d)", enctype, enctype_id)
if enctype_id == 18:
krb5_aes256 = key_data
if not krb_ctr4_salt:
krb_ctr4_salt = saltstring
elif enctype_id == 17:
krb5_aes128 = key_data
if not krb_ctr4_salt:
krb_ctr4_salt = saltstring
elif enctype_id == 3:
krb5_des_md5 = key_data
if not krb_ctr3_salt:
krb_ctr3_salt = saltstring
elif enctype_id == 1:
krb5_des_crc = key_data
if not krb_ctr3_salt:
krb_ctr3_salt = saltstring
if not krb_ctr3_salt:
krb_ctr3_salt = saltstring
# build new drsblobs.supplementalCredentialsBlob
sc_blob = None
cred_List = []
package_names = []
# Primary:Kerberos-Newer-Keys : AES keys
if krb5_aes256 or krb5_aes128:
log.debug("calculate_supplementalCredentials: building Primary:Kerberos-Newer-Keys blob")
kerberosKey4list = []
if krb5_aes256:
assert len(krb5_aes256) == 32
next_key = drsblobs.package_PrimaryKerberosKey4()
next_key.keytype = 18
next_key.value = krb5_aes256
next_key.value_len = len(krb5_aes256)
kerberosKey4list.append(next_key)
if krb5_aes128:
assert len(krb5_aes128) == 16
next_key = drsblobs.package_PrimaryKerberosKey4()
next_key.keytype = 17
next_key.value = krb5_aes128
next_key.value_len = len(krb5_aes128)
kerberosKey4list.append(next_key)
if krb5_des_md5:
assert len(krb5_des_md5) == 8
next_key = drsblobs.package_PrimaryKerberosKey4()
next_key.keytype = 3
next_key.value = krb5_des_md5
next_key.value_len = len(krb5_des_md5)
kerberosKey4list.append(next_key)
if not krb5_des_md5:
next_key = drsblobs.package_PrimaryKerberosKey4()
next_key.keytype = 4294967156
next_key.value = nt_hash
if nt_hash:
next_key.value_len = len(nt_hash)
else:
next_key.value_len = 0
kerberosKey4list.append(next_key)
if krb5_des_crc:
assert len(krb5_des_crc) == 8
next_key = drsblobs.package_PrimaryKerberosKey4()
next_key.keytype = 1
next_key.value = krb5_des_crc
next_key.value_len = len(krb5_des_crc)
kerberosKey4list.append(next_key)
# Windows Server 2012 does not always send the des encryption types.
# Samba does not allow a key number != 4, which is why we add a "dummy" hash.
if not krb5_des_crc:
next_key = drsblobs.package_PrimaryKerberosKey4()
next_key.keytype = 4294967156
next_key.value = nt_hash
if nt_hash:
next_key.value_len = len(nt_hash)
else:
next_key.value_len = 0
kerberosKey4list.append(next_key)
salt4 = drsblobs.package_PrimaryKerberosString()
salt4.string = krb_ctr4_salt
ctr4 = drsblobs.package_PrimaryKerberosCtr4()
ctr4.salt = salt4
ctr4.num_keys = len(kerberosKey4list)
ctr4.keys = kerberosKey4list
if old_krb.get('ctr4'):
# Backup old_keys to s4_old_keys
s4_num_old_keys = old_krb['ctr4'].num_old_keys
s4_old_keys = []
for key in old_krb['ctr4'].old_keys:
s4_old_keys.append(key) # noqa: PERF402
# keys -> old_keys
if len(old_krb['ctr4'].keys) > ctr4.num_keys:
cleaned_old_keys = []
for key in old_krb['ctr4'].keys:
if key.keytype == 4294967156: # in all known cases W2k8 AD uses keytype 4294967156 (=-140L) to include the arc4 hash
log.debug("calculate_supplementalCredentials: Primary:Kerberos-Newer-Keys filtering keytype %s from old_keys", key.keytype)
continue
else: # TODO: can we do something better at this point to make old_keys == num_keys ?
cleaned_old_keys.append(key)
ctr4.old_keys = cleaned_old_keys
ctr4.num_old_keys = len(cleaned_old_keys)
else:
ctr4.old_keys = old_krb['ctr4'].keys
ctr4.num_old_keys = old_krb['ctr4'].num_keys
# s4_old_keys -> older_keys
if ctr4.num_old_keys > ctr4.num_older_keys:
cleaned_older_keys = []
for key in s4_old_keys:
if key.keytype == 4294967156: # in all known cases W2k8 AD uses keytype 4294967156 (=-140L) to include the arc4 hash
log.debug("calculate_supplementalCredentials: Primary:Kerberos-Newer-Keys filtering keytype %s from older_keys", key.keytype)
continue
else: # TODO: can we do something better at this point to make old_keys == num_keys ?
cleaned_older_keys.append(key)
ctr4.older_keys = cleaned_older_keys
ctr4.num_older_keys = len(cleaned_older_keys)
else:
ctr4.older_keys = s4_old_keys
ctr4.num_older_keys = s4_num_old_keys
if ctr4.num_old_keys not in (0, ctr4.num_keys):
# TODO: Recommended policy is to fill up old_keys to match num_keys, this will result in a traceback, can we do something better?
log.warning("calculate_supplementalCredentials: Primary:Kerberos-Newer-Keys num_keys = %s", ctr4.num_keys)
for k in ctr4.keys:
log.warning("calculate_supplementalCredentials: ctr4.key.keytype: %s", k.keytype)
log.warning("calculate_supplementalCredentials: Primary:Kerberos-Newer-Keys num_old_keys = %s", ctr4.num_old_keys)
for k in ctr4.old_keys:
log.warning("calculate_supplementalCredentials: ctr4.old_key.keytype: %s", k.keytype)
if ctr4.num_older_keys not in (0, ctr4.num_old_keys):
# TODO: Recommended policy is to fill up old_keys to match num_keys, this will result in a traceback, can we do something better?
log.warning("calculate_supplementalCredentials: Primary:Kerberos-Newer-Keys num_old_keys = %s", ctr4.num_old_keys)
for k in ctr4.old_keys:
log.warning("calculate_supplementalCredentials: ctr4.old_key.keytype: %s", k.keytype)
log.warning("calculate_supplementalCredentials: Primary:Kerberos-Newer-Keys num_older_keys = %s", ctr4.num_older_keys)
for k in ctr4.older_keys:
log.warning("calculate_supplementalCredentials: ctr4.older_key.keytype: %s", k.keytype)
krb_Primary_Kerberos_Newer = drsblobs.package_PrimaryKerberosBlob()
krb_Primary_Kerberos_Newer.version = 4
krb_Primary_Kerberos_Newer.ctr = ctr4
krb_blob_Primary_Kerberos_Newer = ndr_pack(krb_Primary_Kerberos_Newer)
creddata_Primary_Kerberos_Newer = binascii.hexlify(krb_blob_Primary_Kerberos_Newer)
credname_Primary_Kerberos_Newer = "Primary:Kerberos-Newer-Keys"
cred_Primary_Kerberos_Newer = drsblobs.supplementalCredentialsPackage()
cred_Primary_Kerberos_Newer.name = credname_Primary_Kerberos_Newer
cred_Primary_Kerberos_Newer.name_len = len(credname_Primary_Kerberos_Newer)
cred_Primary_Kerberos_Newer.data = creddata_Primary_Kerberos_Newer
cred_Primary_Kerberos_Newer.data_len = len(creddata_Primary_Kerberos_Newer)
cred_Primary_Kerberos_Newer.reserved = 1
cred_List.append(cred_Primary_Kerberos_Newer)
package_names.append('Kerberos-Newer-Keys')
# Primary:Kerberos : MD5 and CRC keys
log.debug("calculate_supplementalCredentials: building Primary:Kerberos blob")
kerberosKey3list = []
if krb5_aes256 or krb5_aes128 or krb5_des_md5 or krb5_des_crc:
if krb5_des_md5:
next_key = drsblobs.package_PrimaryKerberosKey3()
next_key.keytype = 3
next_key.value = krb5_des_md5
next_key.value_len = len(krb5_des_md5)
kerberosKey3list.append(next_key)
if not krb5_des_md5:
next_key = drsblobs.package_PrimaryKerberosKey3()
next_key.keytype = 4294967156
next_key.value = nt_hash
if nt_hash:
next_key.value_len = len(nt_hash)
else:
next_key.value_len = 0
kerberosKey3list.append(next_key)
if krb5_des_crc:
next_key = drsblobs.package_PrimaryKerberosKey3()
next_key.keytype = 1
next_key.value = krb5_des_crc
next_key.value_len = len(krb5_des_crc)
kerberosKey3list.append(next_key)
# Windows Server 2012 does not always send the des encryption types.
# Samba does not allow a key number != 2, which is why we add a "dummy" hash.
if not krb5_des_crc:
next_key = drsblobs.package_PrimaryKerberosKey3()
next_key.keytype = 4294967156
next_key.value = nt_hash
if nt_hash:
next_key.value_len = len(nt_hash)
else:
next_key.value_len = 0
kerberosKey3list.append(next_key)
salt = drsblobs.package_PrimaryKerberosString()
salt.string = krb_ctr3_salt
ctr3 = drsblobs.package_PrimaryKerberosCtr3()
ctr3.salt = salt
ctr3.num_keys = len(kerberosKey3list)
ctr3.keys = kerberosKey3list
if old_krb.get('ctr3'):
# keys -> old_keys
if len(old_krb['ctr3'].keys) > ctr3.num_keys:
cleaned_ctr3_old_keys = []
for key in old_krb['ctr3'].keys:
if key.keytype == 4294967156: # in all known cases W2k8 AD uses keytype 4294967156 (=-140L) to include the arc4 hash
log.debug("calculate_supplementalCredentials: Primary:Kerberos filtering keytype %s from old_keys", key.keytype)
continue
else: # TODO: can we do something better at this point to make old_keys == num_keys ?
cleaned_ctr3_old_keys.append(key)
ctr3.old_keys = cleaned_ctr3_old_keys
ctr3.num_old_keys = len(cleaned_ctr3_old_keys)
else:
ctr3.old_keys = old_krb['ctr3'].keys
ctr3.num_old_keys = old_krb['ctr3'].num_keys
if ctr3.num_old_keys not in (0, ctr3.num_keys):
# TODO: Recommended policy is to fill up old_keys to match num_keys, this will result in a traceback, can we do something better?
log.warning("calculate_supplementalCredentials: Primary:Kerberos num_keys = %s", ctr3.num_keys)
for k in ctr3.keys:
log.warning("calculate_supplementalCredentials: ctr3.key.keytype: %s", k.keytype)
log.warning("calculate_supplementalCredentials: Primary:Kerberos num_old_keys = %s", ctr3.num_old_keys)
for k in ctr3.old_keys:
log.warning("calculate_supplementalCredentials: ctr3.old_key.keytype: %s", k.keytype)
krb = drsblobs.package_PrimaryKerberosBlob()
krb.version = 3
krb.ctr = ctr3
krb3_blob = ndr_pack(krb)
creddata_Primary_Kerberos = binascii.hexlify(krb3_blob)
credname_Primary_Kerberos = "Primary:Kerberos"
cred_Primary_Kerberos = drsblobs.supplementalCredentialsPackage()
cred_Primary_Kerberos.name = credname_Primary_Kerberos
cred_Primary_Kerberos.name_len = len(credname_Primary_Kerberos)
cred_Primary_Kerberos.data = creddata_Primary_Kerberos
cred_Primary_Kerberos.data_len = len(creddata_Primary_Kerberos)
cred_Primary_Kerberos.reserved = 1
cred_List.append(cred_Primary_Kerberos)
package_names.append('Kerberos')
if package_names:
krb_blob_Packages = '\0'.join(package_names).encode('utf-16le')
cred_PackagesBlob_data = binascii.hexlify(krb_blob_Packages).upper()
cred_PackagesBlob_name = "Packages"
cred_PackagesBlob = drsblobs.supplementalCredentialsPackage()
cred_PackagesBlob.name = cred_PackagesBlob_name
cred_PackagesBlob.name_len = len(cred_PackagesBlob_name)
cred_PackagesBlob.data = cred_PackagesBlob_data
cred_PackagesBlob.data_len = len(cred_PackagesBlob_data)
cred_PackagesBlob.reserved = 2
cred_List.insert(-1, cred_PackagesBlob)
sub = drsblobs.supplementalCredentialsSubBlob()
sub.num_packages = len(cred_List)
sub.packages = cred_List
sub.signature = drsblobs.SUPPLEMENTAL_CREDENTIALS_SIGNATURE
sub.prefix = drsblobs.SUPPLEMENTAL_CREDENTIALS_PREFIX
sc = drsblobs.supplementalCredentialsBlob()
sc.sub = sub
sc_blob = ndr_pack(sc)
log.trace("calculate_supplementalCredentials: sc: %s", ndr_print(sc))
return sc_blob
[docs]
def password_sync_ucs_to_s4(s4connector, key, object):
log.debug("password_sync_ucs_to_s4 called")
modify = False
old_ucs_object = object.get('old_ucs_object', {})
new_ucs_object = object.get('new_ucs_object', {})
if old_ucs_object or new_ucs_object:
for attr in ['sambaLMPassword', 'sambaNTPassword', 'sambaPwdLastSet', 'sambaPwdMustChange', 'krb5PrincipalName', 'krb5Key', 'shadowLastChange', 'shadowMax', 'krb5PasswordEnd', 'univentionService']:
old_values = set(old_ucs_object.get(attr, []))
new_values = set(new_ucs_object.get(attr, []))
if old_values != new_values:
modify = True
break
else:
# add mode
modify = True
if not modify:
log.debug('password_sync_ucs_to_s4: the password for %s has not been changed. Skipping password sync.', object['dn'])
return
log.debug("Object DN=%r", object['dn'])
ucs_object = s4connector._object_mapping(key, object, 'con')
log.debug(" UCS DN = %r", ucs_object['dn'])
try:
ucs_object_attributes = s4connector.lo.get(ucs_object['dn'], ['sambaLMPassword', 'sambaNTPassword', 'sambaPwdLastSet', 'sambaPwdMustChange', 'krb5PrincipalName', 'krb5Key', 'shadowLastChange', 'shadowMax', 'krb5PasswordEnd', 'univentionService'], required=True)
except ldap.NO_SUCH_OBJECT:
log.process("password_sync_ucs_to_s4: The UCS object (%s) was not found. The object was removed.", ucs_object['dn'])
return
services = ucs_object_attributes.get('univentionService', [])
if b'Samba 4' in services:
log.debug("password_sync_ucs_to_s4: %s is a S4 server, skip password sync", ucs_object['dn'])
return
sambaPwdLastSet = None
if 'sambaPwdLastSet' in ucs_object_attributes:
sambaPwdLastSet = int(ucs_object_attributes['sambaPwdLastSet'][0])
log.debug("password_sync_ucs_to_s4: sambaPwdLastSet: %s", sambaPwdLastSet)
if 'sambaPwdMustChange' in ucs_object_attributes:
sambaPwdMustChange = int(ucs_object_attributes['sambaPwdMustChange'][0])
log.warning("password_sync_ucs_to_s4: Ignoring sambaPwdMustChange: %s", sambaPwdMustChange)
ucsLMhash = ucs_object_attributes.get('sambaLMPassword', [None])[0]
ucsNThash = ucs_object_attributes.get('sambaNTPassword', [None])[0]
krb5Principal = ucs_object_attributes.get('krb5PrincipalName', [None])[0]
krb5Key = ucs_object_attributes.get('krb5Key', [])
if not ucsNThash:
log.debug("password_sync_ucs_to_s4: sambaNTPassword missing in UCS LDAP, trying krb5Key")
ucsNThash = extract_NThash_from_krb5key(krb5Key)
if not ucsNThash:
log.warning("password_sync_ucs_to_s4: Failed to get NT Password-Hash from UCS LDAP")
# log.debug("password_sync_ucs_to_s4: Password-Hash from UCS: %s" % ucsNThash)
s4_object_attributes = s4connector.lo_s4.get(object['dn'], ['pwdLastSet', 'objectSid'])
pwdLastSet = None
if 'pwdLastSet' in s4_object_attributes:
pwdLastSet = int(s4_object_attributes['pwdLastSet'][0])
objectSid = univention.s4connector.s4.decode_sid(s4_object_attributes['objectSid'][0])
log.debug("password_sync_ucs_to_s4: pwdLastSet from S4 : %s", pwdLastSet)
pwd_set = False
filter_expr = format_escaped('(objectSid={0!e})', objectSid)
res = s4connector.lo_s4.search(filter=filter_expr, attr=['unicodePwd', 'userPrincipalName', 'supplementalCredentials', 'msDS-KeyVersionNumber', 'dBCSPwd', 'ntPwdHistory', 'msDS-ResultantPSO'])
s4_search_attributes = res[0][1]
unicodePwd_attr = s4_search_attributes.get('unicodePwd', [None])[0]
dBCSPwd_attr = s4_search_attributes.get('dBCSPwd', [None])[0]
userPrincipalName_attr = s4_search_attributes.get('userPrincipalName', [None])[0]
supplementalCredentials = s4_search_attributes.get('supplementalCredentials', [None])[0]
ntPwdHistory = s4_search_attributes.get('ntPwdHistory', [b''])[0]
msDSResultantPSO = s4_search_attributes.get('msDS-ResultantPSO', [None])[0]
# get pwdhistorylength
pwdHistoryLength = 0
if msDSResultantPSO:
res = s4connector.lo_s4.get(msDSResultantPSO.decode('UTF-8'), attr=['msDS-PasswordHistoryLength'])
pwdHistoryLength = int(res.get('msDS-PasswordHistoryLength', [0])[0])
else:
res = s4connector.lo_s4.search(filter='(objectClass=domain)', attr=['pwdHistoryLength'])
pwdHistoryLength = int(res[0][1].get('pwdHistoryLength', [0])[0])
s4NThash = None
if unicodePwd_attr:
s4NThash = binascii.b2a_hex(unicodePwd_attr).upper()
else:
log.warning("password_sync_ucs_to_s4: Failed to get NT Password-Hash from S4")
s4LMhash = None
if dBCSPwd_attr:
s4LMhash = binascii.b2a_hex(dBCSPwd_attr).upper()
else:
log.info("password_sync_ucs_to_s4: Failed to get LM Password-Hash from S4")
modlist = []
if krb5Principal != userPrincipalName_attr:
if krb5Principal:
if not userPrincipalName_attr: # new and not old
modlist.append((ldap.MOD_ADD, 'userPrincipalName', krb5Principal))
else: # new and old differ
if krb5Principal.lower() != userPrincipalName_attr.lower():
log.warning("password_sync_ucs_to_s4: userPrincipalName != krb5Principal: %r != %r", userPrincipalName_attr, krb5Principal)
modlist.append((ldap.MOD_REPLACE, 'userPrincipalName', krb5Principal))
else:
if userPrincipalName_attr: # old and not new
modlist.append((ldap.MOD_DELETE, 'userPrincipalName', userPrincipalName_attr))
unicodePwd_new = None
if ucsNThash != s4NThash:
log.debug("password_sync_ucs_to_s4: NT Hash S4: %r NT Hash UCS: %r", s4NThash, ucsNThash)
# Now if ucsNThash is empty there should at least some timestamp in UCS,
# otherwise it's probably not a good idea to remove the unicodePwd.
# Usecase: LDB module on ucs_3.0-0-ucsschool slaves creates XP computers/windows in UDM without password
if ucsNThash or sambaPwdLastSet:
pwd_set = True
if ucsNThash:
try:
unicodePwd_new = binascii.a2b_hex(ucsNThash)
except TypeError:
if ucsNThash.startswith(b"NO PASSWORD"):
pwd_set = False
else:
raise
if pwd_set and unicodePwd_new:
if pwdHistoryLength:
userobject = s4connector.get_ucs_object(key, ucs_object['dn'])
pwhistoryPolicy = userobject.loadPolicyObject('policies/pwhistory')
pwhistory_length = pwhistoryPolicy['length']
pwhistory_length = int(pwhistory_length) if pwhistory_length else 0
if pwhistory_length != pwdHistoryLength:
log.warning("password_sync_ucs_to_s4: Mismatch between UCS pwhistoryPolicy (%s) and S4 pwhistoryPolicy (%s). Using the larger one.", pwhistory_length, pwdHistoryLength)
des_len = max(pwdHistoryLength, pwhistory_length) * 16
ntPwdHistory_new = unicodePwd_new + ntPwdHistory
ntPwdHistory_new = ntPwdHistory_new[:des_len]
log.debug("password_sync_ucs_to_s4: Update ntPwdHistory.")
modlist.append((ldap.MOD_REPLACE, 'ntPwdHistory', ntPwdHistory_new))
else:
log.debug("password_sync_ucs_to_s4: PwdHistoryLength is 0, do not update history.")
modlist.append((ldap.MOD_REPLACE, 'unicodePwd', unicodePwd_new))
if ucsLMhash != s4LMhash:
log.debug("password_sync_ucs_to_s4: LM Hash S4: %r LM Hash UCS: %r", s4LMhash, ucsLMhash)
pwd_set = True
if ucsLMhash:
dBCSPwd_new = binascii.a2b_hex(ucsLMhash)
modlist.append((ldap.MOD_REPLACE, 'dBCSPwd', dBCSPwd_new))
else:
modlist.append((ldap.MOD_DELETE, 'dBCSPwd', None))
if pwd_set or not supplementalCredentials:
if krb5Principal:
# encoding of Samba4 supplementalCredentials
if krb5Key:
supplementalCredentials_new = calculate_supplementalCredentials(krb5Key, supplementalCredentials, unicodePwd_new)
if supplementalCredentials_new:
modlist.append((ldap.MOD_REPLACE, 'supplementalCredentials', supplementalCredentials_new))
else:
log.debug("password_sync_ucs_to_s4: no supplementalCredentials_new")
# if supplementalCredentials:
# modlist.append((ldap.MOD_REPLACE, 'msDS-KeyVersionNumber', krb5KeyVersionNumber))
# else:
# modlist.append((ldap.MOD_ADD, 'msDS-KeyVersionNumber', krb5KeyVersionNumber))
if sambaPwdLastSet is None:
sambaPwdLastSet = int(time.time())
newpwdlastset = str(univention.s4connector.s4.samba2s4_time(sambaPwdLastSet))
elif sambaPwdLastSet in [0, 1]:
log.debug("password_sync_ucs_to_s4: samba pwd expired, set newpwdLastSet to 0")
newpwdlastset = 0
else:
newpwdlastset = univention.s4connector.s4.samba2s4_time(sambaPwdLastSet)
log.debug("password_sync_ucs_to_s4: pwdLastSet in modlist: %r", newpwdlastset)
modlist.append((ldap.MOD_REPLACE, 'pwdLastSet', str(newpwdlastset).encode('ASCII')))
modlist.append((ldap.MOD_REPLACE, 'badPwdCount', b'0'))
modlist.append((ldap.MOD_REPLACE, 'badPasswordTime', b'0'))
modlist.append((ldap.MOD_REPLACE, 'lockoutTime', b'0'))
else:
log.debug("password_sync_ucs_to_s4: No password change to sync to S4 ")
# check pwdLastSet
if sambaPwdLastSet is not None:
if sambaPwdLastSet in [0, 1]:
newpwdlastset = 0
else:
newpwdlastset = univention.s4connector.s4.samba2s4_time(sambaPwdLastSet)
log.debug("password_sync_ucs_to_s4: sambaPwdLastSet: %d", sambaPwdLastSet)
log.debug("password_sync_ucs_to_s4: newpwdlastset : %r", newpwdlastset)
log.debug("password_sync_ucs_to_s4: pwdLastSet (AD): %r", pwdLastSet)
if newpwdlastset != pwdLastSet and abs(newpwdlastset - pwdLastSet) >= 10000000:
modlist.append((ldap.MOD_REPLACE, 'pwdLastSet', str(newpwdlastset).encode('ASCII')))
ctrl_bypass_password_hash = LDAPControl('1.3.6.1.4.1.7165.4.3.12', criticality=0)
log.trace("password_sync_ucs_to_s4: modlist: %r", modlist)
if modlist:
s4connector.lo_s4.lo.modify_ext_s(object['dn'], modlist, serverctrls=[ctrl_bypass_password_hash])
[docs]
def password_sync_s4_to_ucs(s4connector, key, ucs_object, modifyUserPassword=True):
log.debug("password_sync_s4_to_ucs called")
if ucs_object['modtype'] == 'modify' and 'pwdLastSet' not in ucs_object.get('changed_attributes', []):
log.debug('password_sync_s4_to_ucs: the password for %s has not been changed. Skipping password sync.', ucs_object['dn'])
return
object = s4connector._object_mapping(key, ucs_object, 'ucs')
s4_object_attributes = s4connector.lo_s4.get(object['dn'], ['objectSid', 'pwdLastSet'])
if s4connector.isInCreationList(object['dn']):
s4connector.removeFromCreationList(object['dn'])
log.debug("password_sync_s4_to_ucs: Synchronisation of password has been canceled. Object was just created.")
return
pwdLastSet = None
if 'pwdLastSet' in s4_object_attributes:
pwdLastSet = int(s4_object_attributes['pwdLastSet'][0])
log.debug("password_sync_s4_to_ucs: pwdLastSet from S4: %s (%s)", pwdLastSet, s4_object_attributes)
objectSid = univention.s4connector.s4.decode_sid(s4_object_attributes['objectSid'][0])
filter_expr = format_escaped('(objectSid={0!e})', objectSid)
res = s4connector.lo_s4.search(filter=filter_expr, attr=['unicodePwd', 'supplementalCredentials', 'msDS-KeyVersionNumber', 'dBCSPwd', 'msDS-ResultantPSO', 'ntPwdHistory'])
s4_search_attributes = res[0][1]
unicodePwd_attr = s4_search_attributes.get('unicodePwd', [None])[0]
if unicodePwd_attr:
ntPwd = binascii.b2a_hex(unicodePwd_attr).upper()
lmPwd = b''
dBCSPwd = s4_search_attributes.get('dBCSPwd', [None])[0]
if dBCSPwd:
lmPwd = binascii.b2a_hex(dBCSPwd).upper()
supplementalCredentials = s4_search_attributes.get('supplementalCredentials', [None])[0]
msDS_KeyVersionNumber = s4_search_attributes.get('msDS-KeyVersionNumber', [0])[0]
ntPwd_ucs = b''
lmPwd_ucs = b''
krb5Principal = b''
# userPassword = b''
modlist = []
ucs_object_attributes = s4connector.lo.get(ucs_object['dn'], ['sambaPwdMustChange', 'sambaPwdLastSet', 'sambaNTPassword', 'sambaLMPassword', 'krb5PrincipalName', 'krb5Key', 'krb5KeyVersionNumber', 'userPassword', 'shadowLastChange', 'shadowMax', 'krb5PasswordEnd', 'univentionService', 'pwhistory'])
pwhistory_ucs = ucs_object_attributes.get('pwhistory', [b''])[0]
services = ucs_object_attributes.get('univentionService', [])
if 'S4 SlavePDC' in services:
log.debug("password_sync_s4_to_ucs: %s is a S4 SlavePDC server, skip password sync", ucs_object['dn'])
return
if 'sambaNTPassword' in ucs_object_attributes:
ntPwd_ucs = ucs_object_attributes['sambaNTPassword'][0]
if 'sambaLMPassword' in ucs_object_attributes:
lmPwd_ucs = ucs_object_attributes['sambaLMPassword'][0]
if 'krb5PrincipalName' in ucs_object_attributes:
krb5Principal = ucs_object_attributes['krb5PrincipalName'][0]
# if 'userPassword' in ucs_object_attributes:
# userPassword = ucs_object_attributes['userPassword'][0]
sambaPwdLastSet = None
if 'sambaPwdLastSet' in ucs_object_attributes:
sambaPwdLastSet = ucs_object_attributes['sambaPwdLastSet'][0]
log.debug("password_sync_s4_to_ucs: sambaPwdLastSet: %r", sambaPwdLastSet)
sambaPwdMustChange = ''
if 'sambaPwdMustChange' in ucs_object_attributes:
sambaPwdMustChange = ucs_object_attributes['sambaPwdMustChange'][0]
log.debug("password_sync_s4_to_ucs: Found sambaPwdMustChange: %r", sambaPwdMustChange)
krb5Key_ucs = ucs_object_attributes.get('krb5Key', [])
userPassword_ucs = ucs_object_attributes.get('userPassword', [None])[0]
krb5KeyVersionNumber = ucs_object_attributes.get('krb5KeyVersionNumber', [None])[0]
pwd_changed = False
if ntPwd != ntPwd_ucs:
pwd_changed = True
modlist.append(('sambaNTPassword', ntPwd_ucs, ntPwd))
if lmPwd != lmPwd_ucs:
pwd_changed = True
modlist.append(('sambaLMPassword', lmPwd_ucs, lmPwd))
if pwd_changed:
if krb5Principal:
# decoding of Samba4 supplementalCredentials
krb5Key_new = calculate_krb5key(unicodePwd_attr, supplementalCredentials, int(msDS_KeyVersionNumber))
modlist.append(('krb5Key', krb5Key_ucs, krb5Key_new))
if int(msDS_KeyVersionNumber) != int(krb5KeyVersionNumber):
modlist.append(('krb5KeyVersionNumber', krb5KeyVersionNumber, msDS_KeyVersionNumber))
# Append modification as well to modlist, to apply in one transaction
if modifyUserPassword:
userobject = s4connector.get_ucs_object(key, ucs_object['dn'])
pwhistoryPolicy = None
if userobject:
pwhistoryPolicy = userobject.loadPolicyObject('policies/pwhistory')
pwhistory_length = pwhistoryPolicy['length']
pwhistory_length = int(pwhistory_length) if pwhistory_length else 0
if pwhistory_length > 0:
msDSResultantPSO = s4_search_attributes.get('msDS-ResultantPSO', [None])[0]
# get pwdhistorylength from s4 object
s4_pwhistory_length = 0
if msDSResultantPSO:
res = s4connector.lo_s4.get(msDSResultantPSO.decode(), attr=['msDS-PasswordHistoryLength'])
s4_pwhistory_length = int(res.get('msDS-PasswordHistoryLength', [0])[0])
else:
res = s4connector.lo_s4.search(filter='(objectClass=domain)', attr=['pwdHistoryLength'])
s4_pwhistory_length = int(res[0][1].get('pwdHistoryLength', [0])[0])
if pwhistory_length != s4_pwhistory_length:
log.warning("password_sync_s4_to_ucs: Mismatch between UCS pwhistoryPolicy (%s) and S4 pwhistoryPolicy (%s). Using the larger one.", pwhistory_length, s4_pwhistory_length)
pwhistory_length = max(pwhistory_length, s4_pwhistory_length)
ntPwdHistory = s4_search_attributes.get('ntPwdHistory', [b''])[0]
ntPwdHistory_hex = binascii.hexlify(ntPwdHistory).upper()
ntPwdHistory_len = len(ntPwdHistory_hex) // 32
pwhistory_list = pwhistory_ucs.decode('ASCII').strip().split(' ')
pwhistory_len = len(pwhistory_list)
pwhistory_new = None
if ntPwdHistory_len and pwhistory_len == 1 and object.get('old_s4_object', {}).get('pwdLastSet', [None])[0] is None:
# In the first synchronization from S4->UCS the password history from the S4 User
# can have more than one entry.
pwhistory_new = b''
hist = [ntPwdHistory_hex[i: i + 32] for i in range(0, len(ntPwdHistory_hex), 32)]
for nt_hash in reversed(hist):
pwhistory_new = univention.admin.password.get_password_history('{NT}$' + nt_hash.decode('ASCII'), pwhistory_new.decode('ASCII'), pwhistory_length).encode('ASCII')
else:
pwhistory_new = univention.admin.password.get_password_history('{NT}$' + ntPwd.decode('ASCII'), pwhistory_ucs.decode('ASCII'), pwhistory_length).encode('ASCII')
modlist.append(('pwhistory', pwhistory_ucs, pwhistory_new))
modlist.append(('userPassword', userPassword_ucs, b'{K5KEY}'))
else:
log.debug("password_sync_s4_to_ucs: No password change to sync to UCS")
try:
old_pwdLastSet = object['old_s4_object']['pwdLastSet'][0]
except (KeyError, IndexError):
old_pwdLastSet = None
if pwdLastSet != old_pwdLastSet:
log.trace("password_sync_s4_to_ucs: updating shadowLastChange")
old_shadowLastChange = ucs_object_attributes.get('shadowLastChange', [None])[0]
new_shadowLastChange = old_shadowLastChange
# shadowMax (set to value of (univentionPWExpiryInterval - 1), otherwise delete)
# krb5PasswordEnd (set to today + univentionPWExpiryInterval, otherwise delete)
old_shadowMax = ucs_object_attributes.get('shadowMax', [None])[0]
new_shadowMax = old_shadowMax
old_krb5end = ucs_object_attributes.get('krb5PasswordEnd', [None])[0]
new_krb5end = old_krb5end
pwdLastSet_unix = univention.s4connector.s4.s42samba_time(pwdLastSet)
newSambaPwdLastSet = str(pwdLastSet_unix).encode('ASCII')
if pwdLastSet == 0: # pwd change on next login
new_shadowMax = b'0'
expiry = int(time.time())
new_krb5end = time.strftime("%Y%m%d000000Z", time.gmtime(expiry)).encode('ASCII')
# we need to expire the password. Since shadowMax=0 is its minimum value, we need to set shadowLastChange = today-2days ## FIXME: -1day should be enough
two_days_ago = int(time.time()) - 86400 * 2
new_shadowLastChange = str(two_days_ago // 3600 // 24).encode('ASCII')
else: # not pwd change on next login
new_shadowLastChange = str(pwdLastSet_unix // 3600 // 24).encode('ASCII')
userobject = s4connector.get_ucs_object(key, ucs_object['dn'])
if not userobject:
log.error("password_sync_s4_to_ucs: couldn't get user-object from UCS")
return False
pwhistoryPolicy = userobject.loadPolicyObject('policies/pwhistory')
try:
expiryInterval = int(pwhistoryPolicy['expiryInterval'])
except (TypeError, ValueError):
# expiryInterval is empty or no legal int-string
pwhistoryPolicy['expiryInterval'] = ''
expiryInterval = -1
log.debug("password_sync_s4_to_ucs: password expiryInterval for %s is %s", ucs_object['dn'], expiryInterval)
if expiryInterval in (-1, 0):
new_shadowMax = b''
new_krb5end = b''
else:
new_shadowMax = str(expiryInterval - 1).encode('ASCII')
new_krb5end = time.strftime("%Y%m%d000000Z", time.gmtime(pwdLastSet_unix + (int(expiryInterval) * 3600 * 24))).encode('ASCII')
if new_shadowLastChange != old_shadowLastChange:
log.debug("password_sync_s4_to_ucs: update shadowLastChange to %s for %s", new_shadowLastChange, ucs_object['dn'])
modlist.append(('shadowLastChange', old_shadowLastChange, new_shadowLastChange))
if new_shadowMax != old_shadowMax:
log.debug("password_sync_s4_to_ucs: update shadowMax to %s for %s", new_shadowMax, ucs_object['dn'])
modlist.append(('shadowMax', old_shadowMax, new_shadowMax))
if new_krb5end != old_krb5end:
log.debug("password_sync_s4_to_ucs: update krb5PasswordEnd to %s for %s", new_krb5end, ucs_object['dn'])
modlist.append(('krb5PasswordEnd', old_krb5end, new_krb5end))
if sambaPwdLastSet:
if sambaPwdLastSet != newSambaPwdLastSet:
modlist.append(('sambaPwdLastSet', sambaPwdLastSet, newSambaPwdLastSet))
log.debug("password_sync_s4_to_ucs: sambaPwdLastSet in modlist (replace): %s", newSambaPwdLastSet)
else:
modlist.append(('sambaPwdLastSet', b'', newSambaPwdLastSet))
log.debug("password_sync_s4_to_ucs: sambaPwdLastSet in modlist (set): %s", newSambaPwdLastSet)
if sambaPwdMustChange:
modlist.append(('sambaPwdMustChange', sambaPwdMustChange, b''))
log.debug("password_sync_s4_to_ucs: Removing sambaPwdMustChange")
if len(modlist) > 0:
log.debug("password_sync_s4_to_ucs: modlist: %s", modlist)
s4connector.lo.lo.modify(ucs_object['dn'], modlist)
else:
log.warning("password_sync_ucs_s4_to_ucs: Failed to get Password-Hash from S4")
[docs]
def password_sync_s4_to_ucs_no_userpassword(s4connector, key, ucs_object):
# The userPassword should not synchronized for computer accounts
password_sync_s4_to_ucs(s4connector, key, ucs_object, modifyUserPassword=False)
[docs]
def lockout_sync_to_ucs(connector, key, obj):
"""
Sync account locking *state* from AD to UCS:
sync AD (lockoutTime != 0) -> UCS locked = 1 and lockedTime = lockoutTime
(lockoutTime == 0) -> UCS locked = 0 and lockedTime = 0 (lockedTime would be set automaticly by UCS)
"""
if obj['modtype'] not in ('modify', 'add'):
return
if 'lockoutTime' not in obj['changed_attributes']:
log.trace(
'No lockout attribute in changed attributes.',
changed_attributes=obj['changed_attributes'],
dn=obj['dn'],
)
return
try:
udm_object = univention.admin.objects.get(
connector.modules[key],
co=None,
lo=connector.lo,
position='',
dn=obj['dn'],
)
except uexceptions.noObject:
log.warning('Object with DN %s not found!', obj['dn'])
return
udm_object.open()
is_locked = udm_object['locked'] == '1'
lockout_time = obj['attributes'].get('lockoutTime', [b'0'])[0]
# Convert AD lockoutTime (Windows Filetime) to UCS lockedTime (GeneralizedTime)
lockout_time_unix_ts = int(lockout_time.decode('ascii')) / 10000000 - 11644473600
locked_time = datetime.fromtimestamp(lockout_time_unix_ts).strftime('%Y%m%d%H%M%SZ')
should_be_locked = lockout_time != b'0'
log.debug(
'Locking user account',
locked_time=locked_time,
lockout_time=lockout_time,
is_locked=is_locked,
should_be_locked=should_be_locked,
dn=obj['dn'],
)
if should_be_locked == is_locked:
return
old_states = (
udm_object.descriptions['locked'].editable,
udm_object.descriptions['locked'].may_change,
udm_object.descriptions['lockedTime'].editable,
udm_object.descriptions['lockedTime'].may_change,
)
(
udm_object.descriptions['locked'].editable,
udm_object.descriptions['locked'].may_change,
udm_object.descriptions['lockedTime'].editable,
udm_object.descriptions['lockedTime'].may_change,
) = (True, True, True, True)
try:
if should_be_locked:
log.process('Lock user in UCS.', dn=obj['dn'])
udm_object['locked'] = '1'
udm_object['lockedTime'] = locked_time
else:
log.process('Unlock user in UCS.', dn=obj['dn'])
udm_object['locked'] = '0'
udm_object.modify()
finally:
(
udm_object.descriptions['locked'].editable,
udm_object.descriptions['locked'].may_change,
udm_object.descriptions['lockedTime'].editable,
udm_object.descriptions['lockedTime'].may_change,
) = old_states
[docs]
def lockout_sync_from_ucs(connector, key, obj):
"""
Sync unlock *modification* from OpenLDAP to AD:
sync OpenLDAP ("L" not in sambaAcctFlags) -> AD lockoutTime = 0
sync OpenLDAP ("L" in sambaAcctFlags) -> AD lockoutTime = sambaBadPasswordTime
and OpenLDAP sambaBadPasswordTime -> AD badPasswordTime
"""
if obj['modtype'] not in ('modify', 'add'):
return
if obj.get('new_ucs_object') and obj.get('old_ucs_object'):
is_locked = b'L' in obj['new_ucs_object'].get('sambaAcctFlags', [b''])[0]
was_locked = b'L' in obj['old_ucs_object'].get('sambaAcctFlags', [b''])[0]
if is_locked == was_locked:
return
ucs_object = connector._object_mapping(key, obj, 'con')
try:
ucs_object_attributes = connector.lo.get(ucs_object['dn'], ['sambaAcctFlags', 'sambaBadPasswordTime'], required=True)
except ldap.NO_SUCH_OBJECT:
log.warning('The UCS object (%s) was not found. The object was removed.', ucs_object['dn'])
return
try:
ad_object_attributes = connector.lo_s4.get(obj['dn'], ['lockoutTime'], required=True)
except ldap.NO_SUCH_OBJECT:
log.warning('The AD object (%s) was not found. The object was removed.', obj['dn'])
return
samba_acct_flags = ucs_object_attributes.get('sambaAcctFlags', [b''])[0]
is_locked = b'L' in samba_acct_flags
samba_bad_password_time = ucs_object_attributes.get('sambaBadPasswordTime', [b'0'])[0]
lockout_time = ad_object_attributes.get('lockoutTime', [b'0'])[0]
should_be_locked = lockout_time != b'0'
log.debug(
'Lockout states.',
samba_acct_flags=samba_acct_flags,
samba_bad_password_time=samba_bad_password_time,
lockout_time=lockout_time,
is_locked=is_locked,
should_be_locked=should_be_locked,
ucs_object_dn=ucs_object['dn'],
ad_object_dn=obj['dn'],
)
if should_be_locked == is_locked:
return
modlist = []
if not is_locked:
if samba_bad_password_time and samba_bad_password_time != b'0':
log.error('The UCS object (%s) is unlocked, but sambaBadPasswordTime is set.', ucs_object['dn'])
return
# Ok here we have:
# 1. Account currently not locked in OpenLDAP but in AD
# 2. Lockout state has changed to unlocked at some pickled point in the past
modlist.append((ldap.MOD_REPLACE, 'lockoutTime', b'0'))
modlist.append((ldap.MOD_REPLACE, 'badPasswordTime', b'0'))
log.process('Unlock user in AD.', dn=obj['dn'])
else:
if not samba_bad_password_time:
log.error('The UCS object (%s) is locked, but sambaBadPasswordTime is missing.', ucs_object['dn'])
return
if samba_bad_password_time == b'0':
log.error('The UCS object (%s) is locked, but sambaBadPasswordTime is 0.', ucs_object['dn'])
return
if samba_bad_password_time == lockout_time:
# already locked
return
# Ok here we have:
# 1. Account currently locked in OpenLDAP but not in AD
# 2. Lockout state has changed to locked at some pickled point in the past
modlist.append((ldap.MOD_REPLACE, 'lockoutTime', samba_bad_password_time))
modlist.append((ldap.MOD_REPLACE, 'badPasswordTime', samba_bad_password_time))
log.process('Lock user in AD.', dn=obj['dn'])
log.debug('Setting lockoutTime to the value of sambaBadPasswordTime: %s', samba_bad_password_time)
if modlist:
log.trace('modlist: %s', modlist)
connector.lo_s4.lo.modify_ext_s(obj['dn'], modlist)