Source code for univention.admin.handlers.users.user

# -*- coding: utf-8 -*-
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

"""
|UDM| module for the user objects
"""

from __future__ import absolute_import

import base64
import calendar
import codecs
import copy
import hashlib
import os
import re
import time
from datetime import datetime

import ldap
import pytz
import six
from ldap.filter import filter_format
import tzlocal
import passlib.hash
from M2Crypto import X509

import univention.admin
import univention.admin.allocators
import univention.admin.filter
import univention.admin.handlers
import univention.admin.handlers.settings.prohibited_username
import univention.admin.localization
import univention.admin.mapping
import univention.admin.modules
import univention.admin.password
import univention.admin.samba
import univention.admin.syntax
import univention.admin.uexceptions
import univention.admin.uldap
import univention.debug as ud
import univention.password
from univention.admin import configRegistry
from univention.admin.layout import Group, Tab
from univention.lib.s4 import rids_for_well_known_security_identifiers

from typing import List  # noqa: F401

try:
	from univention.admin.syntax import ActivationDateTimeTimezone
except ImportError:
	# workaround for errors during errata-updates. should be removable with UCS 5.0-1
	class ActivationDateTimeTimezone(univention.admin.syntax.complex):
		"""
		Syntax for YYYY-mm-dd HH:MM TZNAME
		"""
		delimiter = ' '
		subsyntaxes = [('Date', univention.admin.syntax.iso8601Date), ('Time', univention.admin.syntax.TimeString), ('Timezone', univention.admin.syntax.string)]
		subsyntax_names = ('date', 'time', 'timezone')
		subsyntax_names = ('activation-date', 'activation-time', 'activation-timezone')
		size = ('TwoThirds', 'TwoThirds', 'TwoThirds')
		all_required = False
		min_elements = 0
	univention.admin.syntax.ActivationDateTimeTimezone = ActivationDateTimeTimezone

if not six.PY2:
	long = int

translation = univention.admin.localization.translation('univention.admin.handlers.users')
_ = translation.translate

module = 'users/user'
operations = ['add', 'edit', 'remove', 'search', 'move', 'copy']
template = 'settings/usertemplate'

childs = False
short_description = _('User')
object_name = _('User')
object_name_plural = _('Users')
long_description = _('POSIX, Samba, Kerberos and mail account')


options = {
	'default': univention.admin.option(
		short_description=_('POSIX, Samba, Kerberos and mail account'),
		default=True,
		objectClasses=['top', 'person', 'univentionPWHistory', 'posixAccount', 'shadowAccount', 'sambaSamAccount', 'krb5Principal', 'krb5KDCEntry', 'univentionMail', 'organizationalPerson', 'inetOrgPerson']
	),
	'pki': univention.admin.option(
		short_description=_('Public key infrastructure account'),
		default=False,
		editable=True,
		objectClasses=['pkiUser'],
	),
}
property_descriptions = {
	'username': univention.admin.property(
		short_description=_('User name'),
		long_description='',
		syntax=univention.admin.syntax.uid_umlauts,
		include_in_default_search=True,
		required=True,
		identifies=True,
		readonly_when_synced=True,
	),
	'uidNumber': univention.admin.property(
		short_description=_('User ID'),
		long_description='',
		syntax=univention.admin.syntax.integer,
		may_change=False,
		dontsearch=True,
	),
	'gidNumber': univention.admin.property(
		short_description=_('Group ID of the primary group'),
		long_description='',
		syntax=univention.admin.syntax.integer,
		may_change=False,
		editable=False,
		dontsearch=True,
		readonly_when_synced=True,
	),
	'firstname': univention.admin.property(
		short_description=_('First name'),
		long_description='',
		syntax=univention.admin.syntax.TwoThirdsString,
		include_in_default_search=True,
		readonly_when_synced=True,
	),
	'lastname': univention.admin.property(
		short_description=_('Last name'),
		long_description='',
		syntax=univention.admin.syntax.string,
		include_in_default_search=True,
		required=True,
		readonly_when_synced=True,
	),
	'gecos': univention.admin.property(
		short_description=_('GECOS'),
		long_description='',
		syntax=univention.admin.syntax.IA5string,
		default='<firstname> <lastname><:umlauts,strip>',
		dontsearch=True,
	),
	'displayName': univention.admin.property(
		short_description=_('Display name'),
		long_description='',
		syntax=univention.admin.syntax.string,
		default='<firstname> <lastname><:strip>',
		readonly_when_synced=True,
	),
	'title': univention.admin.property(
		short_description=_('Title'),
		long_description='',
		syntax=univention.admin.syntax.OneThirdString,
		readonly_when_synced=True,
	),
	'initials': univention.admin.property(
		short_description=_('Initials'),
		long_description='',
		syntax=univention.admin.syntax.string6,
	),
	'preferredDeliveryMethod': univention.admin.property(
		short_description=_('Preferred delivery method'),
		long_description='',
		syntax=univention.admin.syntax.string,
	),
	'sambaPrivileges': univention.admin.property(
		short_description=_('Samba privilege'),
		long_description=_('Manage Samba privileges'),
		syntax=univention.admin.syntax.SambaPrivileges,
		multivalue=True,
		readonly_when_synced=True,
		copyable=True,
	),
	'description': univention.admin.property(
		short_description=_('Description'),
		long_description='',
		syntax=univention.admin.syntax.string,
		include_in_default_search=True,
		readonly_when_synced=True,
		copyable=True,
	),
	'organisation': univention.admin.property(
		short_description=_('Organisation'),
		long_description='',
		syntax=univention.admin.syntax.string64,
		readonly_when_synced=True,
		copyable=True,
	),
	'userexpiry': univention.admin.property(
		short_description=_('Account expiry date'),
		long_description=_('Specifies the date from when the user is not allowed to login anymore. Enter date as "day.month.year".'),
		syntax=univention.admin.syntax.date2,
		dontsearch=True,
		copyable=True,
	),
	'passwordexpiry': univention.admin.property(
		short_description=_('Password expiry date'),
		long_description=_('Specified the date from when the user must change his password. Enter date as "day.month.year".'),
		syntax=univention.admin.syntax.date,
		editable=False,
		dontsearch=True,
		readonly_when_synced=True,
	),
	'pwdChangeNextLogin': univention.admin.property(
		short_description=_('User has to change password on next login'),
		long_description=_('If enabled, the user has to change his password the next time when he logs in.'),
		syntax=univention.admin.syntax.boolean,
		dontsearch=True,
		readonly_when_synced=True,
		size='Two',
	),
	'preferredLanguage': univention.admin.property(
		short_description=_('Preferred language'),
		long_description=_('Preferred written or spoken language for the person.'),
		syntax=univention.admin.syntax.string,
		copyable=True,
	),
	'disabled': univention.admin.property(
		short_description=_('Account is deactivated'),
		long_description=_('Disable the user account for Windows, Kerberos and POSIX.'),
		syntax=univention.admin.syntax.disabled,
		show_in_lists=True,
		copyable=True,
		default='0',
		size='Two',
	),
	'accountActivationDate': univention.admin.property(
		short_description=_('Activate user account starting from'),
		long_description=_('This disables the account until the specified time.'),
		syntax=univention.admin.syntax.ActivationDateTimeTimezone,
		dontsearch=True,
		default=[[None, None, tzlocal.get_localzone().zone], []],
	),
	'locked': univention.admin.property(  # This property only serves two purposes: 1) filtering 2) artificial simulation of lockout
		short_description=_('Locked state of account'),
		long_description=_('This indicates if the account is locked out due to too many authentication failures.'),
		syntax=univention.admin.syntax.locked,
		show_in_lists=True,
		default='0',
	),
	'lockedTime': univention.admin.property(
		short_description=_('Lockout time'),
		long_description=_('Timestamp when account lockout happened.'),
		syntax=univention.admin.syntax.string,
		default=0,
		may_change=False,  # caution! this gets overwritten by some scripts
		editable=False,  # caution! this gets overwritten by some scripts
		dontsearch=True,
	),
	'unlock': univention.admin.property(  # Just a trigger to reset self['locked']
		short_description=_('Unlock account'),
		long_description=_('If the account is locked out due to too many login failures, this checkbox allows unlocking.'),
		syntax=univention.admin.syntax.boolean,
		show_in_lists=True,
		default='0',
		prevent_umc_default_popup=True,
	),
	'unlockTime': univention.admin.property(
		short_description=_('Lockout till'),
		long_description=_('Shows the time when the account gets unlocked again according to policy.'),
		syntax=univention.admin.syntax.string,  # see posixSecondsToLocaltimeDate
		may_change=False,
		editable=False,
		show_in_lists=True,
		dontsearch=True,
	),
	'password': univention.admin.property(
		short_description=_('Password'),
		long_description='',
		syntax=univention.admin.syntax.userPasswd,
		required=True,
		dontsearch=True,
		readonly_when_synced=True,
	),
	'street': univention.admin.property(
		short_description=_('Street'),
		long_description='',
		syntax=univention.admin.syntax.string,
		readonly_when_synced=True,
		copyable=True,
	),
	'e-mail': univention.admin.property(
		short_description=_('E-mail address'),
		long_description=_('This e-mail address serves only as contact information. This address has no effect on the UCS mail stack and is not related to a local mailbox.'),
		syntax=univention.admin.syntax.emailAddress,
		multivalue=True,
	),
	'postcode': univention.admin.property(
		short_description=_('Postal code'),
		long_description='',
		syntax=univention.admin.syntax.OneThirdString,
		readonly_when_synced=True,
		copyable=True,
	),
	'postOfficeBox': univention.admin.property(
		short_description=_('Post office box'),
		long_description='',
		syntax=univention.admin.syntax.string,
		multivalue=True,
		copyable=True,
	),
	'city': univention.admin.property(
		short_description=_('City'),
		long_description='',
		syntax=univention.admin.syntax.TwoThirdsString,
		readonly_when_synced=True,
		copyable=True,
	),
	'country': univention.admin.property(
		short_description=_('Country'),
		long_description='',
		syntax=univention.admin.syntax.Country,
		readonly_when_synced=True,
		copyable=True,
	),
	'phone': univention.admin.property(
		short_description=_('Telephone number'),
		long_description='',
		syntax=univention.admin.syntax.phone,
		multivalue=True,
		readonly_when_synced=True,
		copyable=True,
	),
	'employeeNumber': univention.admin.property(
		short_description=_('Employee number'),
		long_description='',
		syntax=univention.admin.syntax.string,
		include_in_default_search=True,
	),
	'roomNumber': univention.admin.property(
		short_description=_('Room number'),
		long_description='',
		syntax=univention.admin.syntax.OneThirdString,
		multivalue=True,
		copyable=True,
	),
	'secretary': univention.admin.property(
		short_description=_('Superior'),
		long_description='',
		syntax=univention.admin.syntax.UserDN,
		multivalue=True,
		copyable=True,
	),
	'departmentNumber': univention.admin.property(
		short_description=_('Department number'),
		long_description='',
		syntax=univention.admin.syntax.OneThirdString,
		multivalue=True,
		copyable=True,
	),
	'employeeType': univention.admin.property(
		short_description=_('Employee type'),
		long_description='',
		syntax=univention.admin.syntax.string,
		copyable=True,
	),
	'homePostalAddress': univention.admin.property(
		short_description=_('Private postal address'),
		long_description='',
		syntax=univention.admin.syntax.postalAddress,
		multivalue=True,
	),
	'physicalDeliveryOfficeName': univention.admin.property(
		short_description=_('Delivery office name'),
		long_description='',
		syntax=univention.admin.syntax.string,
		copyable=True,
	),
	'homeTelephoneNumber': univention.admin.property(
		short_description=_('Private telephone number'),
		long_description='',
		syntax=univention.admin.syntax.phone,
		multivalue=True,
		readonly_when_synced=True,
	),
	'mobileTelephoneNumber': univention.admin.property(
		short_description=_('Mobile phone number'),
		long_description='',
		syntax=univention.admin.syntax.phone,
		multivalue=True,
		readonly_when_synced=True,
	),
	'pagerTelephoneNumber': univention.admin.property(
		short_description=_('Pager telephone number'),
		long_description='',
		syntax=univention.admin.syntax.phone,
		multivalue=True,
		readonly_when_synced=True,
	),
	'birthday': univention.admin.property(
		short_description=_('Birthdate'),
		long_description=_('Date of birth'),
		syntax=univention.admin.syntax.iso8601Date,
	),
	'unixhome': univention.admin.property(
		short_description=_('Unix home directory'),
		long_description='',
		syntax=univention.admin.syntax.absolutePath,
		required=True,
		default='/home/<username>'
	),

	'shell': univention.admin.property(
		short_description=_('Login shell'),
		long_description='',
		syntax=univention.admin.syntax.OneThirdString,
		default='/bin/bash',
		copyable=True,
	),
	'sambahome': univention.admin.property(
		short_description=_('Windows home path'),
		long_description=_('The directory path which is used as the user\'s Windows home directory, e.g. \\\\ucs-file-server\\smith.'),
		syntax=univention.admin.syntax.string,
		readonly_when_synced=True,
		copyable=True,
	),
	'scriptpath': univention.admin.property(
		short_description=_('Windows logon script'),
		long_description=_('The user-specific logon script relative to the NETLOGON share, e.g. user.bat.'),
		syntax=univention.admin.syntax.string,
		readonly_when_synced=True,
		copyable=True,
	),
	'profilepath': univention.admin.property(
		short_description=_('Windows profile directory'),
		long_description=_('The directory path (resolvable by windows clients) e.g. %LOGONSERVER%\\%USERNAME%\\windows-profiles\\default which is used to configure a roaming profile.'),
		syntax=univention.admin.syntax.string,
		readonly_when_synced=True,
		copyable=True,
	),
	'homedrive': univention.admin.property(
		short_description=_('Windows home drive'),
		long_description=_('The drive letter (with trailing colon) where the Windows home directory of this user lies, e.g. M:. Needs only be specified if it is different to the Samba configuration.'),
		syntax=univention.admin.syntax.string,
		readonly_when_synced=True,
		copyable=True,
	),
	'sambaRID': univention.admin.property(
		short_description=_('Relative ID'),
		long_description=_('The relative ID (RID) is the local part of the SID and will be assigned automatically to next available RID. It can not be subsequently changed. Valid values are numbers upwards 1000. RIDs below 1000 are reserved to standard groups and other special objects.'),
		syntax=univention.admin.syntax.integer,
		dontsearch=True,
		readonly_when_synced=True,
	),
	'groups': univention.admin.property(
		short_description=_('Groups'),
		long_description='',
		syntax=univention.admin.syntax.GroupDN,
		multivalue=True,
		readonly_when_synced=True,
		copyable=True,
	),
	'primaryGroup': univention.admin.property(
		short_description=_('Primary group'),
		long_description='',
		syntax=univention.admin.syntax.GroupDN,
		required=True,
		dontsearch=True,
		readonly_when_synced=True,
		copyable=True,
	),
	'mailHomeServer': univention.admin.property(
		short_description=_('Mail home server'),
		long_description='',
		syntax=univention.admin.syntax.MailHomeServer,
		nonempty_is_default=True,
		copyable=True,
	),
	'mailPrimaryAddress': univention.admin.property(
		short_description=_('Primary e-mail address (mailbox)'),
		long_description=_('E-mail address that will be used to create the IMAP/POP3 mailbox and that can be used as login for SMTP/IMAP/POP3 connections. The domain must be one of the UCS hosted e-mail domains.'),
		syntax=univention.admin.syntax.primaryEmailAddressValidDomain,
		include_in_default_search=True,
		readonly_when_synced=True,
	),
	'mailAlternativeAddress': univention.admin.property(
		short_description=_('E-mail alias address'),
		long_description=_('Additional e-mail addresses for which e-mails will be delivered to the "Primary e-mail address". The domain must be one of the UCS hosted e-mail domains.'),
		syntax=univention.admin.syntax.emailAddressValidDomain,
		multivalue=True,
		copyable=True,
	),
	'mailForwardAddress': univention.admin.property(
		short_description=_('Forward e-mail address'),
		long_description=_("Incoming e-mails for this user are copied/redirected to the specified forward e-mail addresses. Depending on the forwarding setting, a local copy of each e-mail is kept. If no forwarding e-mail addresses are specified, the e-mails are always kept in the user's mailbox."),
		syntax=univention.admin.syntax.emailAddress,
		multivalue=True,
		copyable=True,
	),
	'mailForwardCopyToSelf': univention.admin.property(
		short_description=_('Forwarding setting'),
		long_description=_("Specifies if a local copy of each incoming e-mail is kept for this user. If no forwarding e-mail addresses are specified, the e-mails are always kept in the user's mailbox."),
		syntax=univention.admin.syntax.emailForwardSetting,
		dontsearch=True,
		copyable=True,
		default='0',
		prevent_umc_default_popup=True,
	),
	'overridePWHistory': univention.admin.property(
		short_description=_('Override password history'),
		long_description=_('No check if the password was already used is performed.'),
		syntax=univention.admin.syntax.boolean,
		dontsearch=True,
		readonly_when_synced=True,
		copyable=True,
	),
	'overridePWLength': univention.admin.property(
		short_description=_('Override password check'),
		long_description=_('No check for password quality and minimum length is performed.'),
		syntax=univention.admin.syntax.boolean,
		dontsearch=True,
		readonly_when_synced=True,
		copyable=True,
	),
	'homeShare': univention.admin.property(
		short_description=_('Home share'),
		long_description=_('Share, the user\'s home directory resides on'),
		syntax=univention.admin.syntax.WritableShare,
		dontsearch=True,
		copyable=True,
	),
	'homeSharePath': univention.admin.property(
		short_description=_('Home share path'),
		long_description=_('Path to the home directory on the home share'),
		syntax=univention.admin.syntax.HalfString,
		dontsearch=True,
		default='<username>',
		prevent_umc_default_popup=True,
	),
	'sambaUserWorkstations': univention.admin.property(
		short_description=_('Allow the authentication only on this Microsoft Windows host'),
		long_description=(''),
		syntax=univention.admin.syntax.string,
		multivalue=True,
		readonly_when_synced=True,
		copyable=True,
	),
	'sambaLogonHours': univention.admin.property(
		short_description=_('Permitted times for Windows logins'),
		long_description=(""),
		syntax=univention.admin.syntax.SambaLogonHours,
		dontsearch=True,
		readonly_when_synced=True,
		copyable=True,
	),
	'jpegPhoto': univention.admin.property(
		short_description=_("Picture of the user (JPEG format)"),
		long_description=_('Picture for user account in JPEG format'),
		syntax=univention.admin.syntax.jpegPhoto,
		dontsearch=True,
	),
	'userCertificate': univention.admin.property(
		short_description=_("PKI user certificate (DER format)"),
		long_description=_('Public key infrastructure - user certificate '),
		syntax=univention.admin.syntax.Base64Upload,
		dontsearch=True,
		options=['pki'],
	),
	'certificateIssuerCountry': univention.admin.property(
		short_description=_('Issuer Country'),
		long_description=_('Certificate Issuer Country'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateIssuerState': univention.admin.property(
		short_description=_('Issuer State'),
		long_description=_('Certificate Issuer State'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateIssuerLocation': univention.admin.property(
		short_description=_('Issuer Location'),
		long_description=_('Certificate Issuer Location'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateIssuerOrganisation': univention.admin.property(
		short_description=_('Issuer Organisation'),
		long_description=_('Certificate Issuer Organisation'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateIssuerOrganisationalUnit': univention.admin.property(
		short_description=_('Issuer Organisational Unit'),
		long_description=_('Certificate Issuer Organisational Unit'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateIssuerCommonName': univention.admin.property(
		short_description=_('Issuer Common Name'),
		long_description=_('Certificate Issuer Common Name'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateIssuerMail': univention.admin.property(
		short_description=_('Issuer Mail'),
		long_description=_('Certificate Issuer Mail'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateSubjectCountry': univention.admin.property(
		short_description=_('Subject Country'),
		long_description=_('Certificate Subject Country'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateSubjectState': univention.admin.property(
		short_description=_('Subject State'),
		long_description=_('Certificate Subject State'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateSubjectLocation': univention.admin.property(
		short_description=_('Subject Location'),
		long_description=_('Certificate Subject Location'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateSubjectOrganisation': univention.admin.property(
		short_description=_('Subject Organisation'),
		long_description=_('Certificate Subject Organisation'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateSubjectOrganisationalUnit': univention.admin.property(
		short_description=_('Subject Organisational Unit'),
		long_description=_('Certificate Subject Organisational Unit'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateSubjectCommonName': univention.admin.property(
		short_description=_('Subject Common Name'),
		long_description=_('Certificate Subject Common Name'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateSubjectMail': univention.admin.property(
		short_description=_('Subject Mail'),
		long_description=_('Certificate Subject Mail'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateDateNotBefore': univention.admin.property(
		short_description=_('Valid from'),
		long_description=_('Certificate valid from'),
		syntax=univention.admin.syntax.date,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateDateNotAfter': univention.admin.property(
		short_description=_('Valid until'),
		long_description=_('Certificate valid until'),
		syntax=univention.admin.syntax.date,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateVersion': univention.admin.property(
		short_description=_('Version'),
		long_description=_('Certificate Version'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'certificateSerial': univention.admin.property(
		short_description=_('Serial'),
		long_description=_('Certificate Serial'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		editable=False,
		options=['pki'],
	),
	'umcProperty': univention.admin.property(
		short_description=_('UMC user preferences'),
		long_description=_('Key value pairs storing user preferences for UMC'),
		syntax=univention.admin.syntax.keyAndValue,
		dontsearch=True,
		multivalue=True,
		copyable=True,
	),
	'serviceSpecificPassword': univention.admin.property(
		short_description=_('Service Specific Password'),
		long_description=_('Virtual attribute to set different Service Specific Passwords via UDM'),
		syntax=univention.admin.syntax.string,
		dontsearch=True,
		show_in_lists=False,
		cli_enabled=False,
	),
}

default_property_descriptions = copy.deepcopy(property_descriptions)  # for later reset of descriptions

layout = [
	Tab(_('General'), _('Basic settings'), layout=[
		Group(_('User account'), layout=[
			['title', 'firstname', 'lastname'],
			['username', 'description'],
			'password',
			['overridePWHistory', 'overridePWLength'],
			'mailPrimaryAddress',
		]),
		Group(_('Personal information'), layout=[
			'displayName',
			'birthday',
			'jpegPhoto',
		]),
		Group(_('Organisation'), layout=[
			'organisation',
			['employeeNumber', 'employeeType'],
			'secretary',
		]),
	]),
	Tab(_('Groups'), _('Groups'), layout=[
		Group(_('Primary group'), layout=[
			'primaryGroup',
		]),
		Group(_('Additional groups'), layout=[
			'groups',
		]),
	]),
	Tab(_('Account'), _('Account settings'), layout=[
		Group(_('Deactivation'), layout=[
			['disabled'],
			['userexpiry'],
		]),
		Group(_('Locked login'), layout=[
			['pwdChangeNextLogin'],
			['passwordexpiry'],
			['unlock'],
			['unlockTime'],
		]),
		Group(_('Activation'), layout=[
			['accountActivationDate'],
		]),
		Group(_('Windows'), layout=[
			['homedrive', 'sambahome'],
			['scriptpath', 'profilepath'],
			'sambaRID',
			'sambaPrivileges',
			'sambaLogonHours',
			'sambaUserWorkstations'
		]),
		Group(_('POSIX (Linux/UNIX)'), layout=[
			['unixhome', 'shell'],
			['uidNumber', 'gidNumber'],
			['homeShare', 'homeSharePath'],
		]),
	]),
	Tab(_('Mail'), _('Mail preferences'), advanced=True, layout=[
		Group(_('Advanced settings'), layout=[
			'mailAlternativeAddress',
			'mailHomeServer',
		], ),
		Group(_('Mail forwarding'), layout=[
			'mailForwardCopyToSelf',
			'mailForwardAddress',
		], ),
	]),
	Tab(_('Contact'), _('Contact information'), layout=[
		Group(_('Business'), layout=[
			'e-mail',
			'phone',
			['roomNumber', 'departmentNumber'],
			['street', 'postcode', 'city'],
			['country']
		]),
		Group(_('Private'), layout=[
			'homeTelephoneNumber',
			'mobileTelephoneNumber',
			'pagerTelephoneNumber',
			'homePostalAddress'
		]),
	]),
	Tab('Apps'),  # not translated!
	Tab(_('UMC preferences'), _('UMC preferences'), advanced=True, layout=[
		Group(_('UMC preferences'), layout=[
			'umcProperty',
		]),
	]),
	Tab(_('Certificate'), _('Certificate'), advanced=True, layout=[
		Group(_('General'), '', [
			'userCertificate',
		]),
		Group(_('Subject'), '', [
			['certificateSubjectCommonName', 'certificateSubjectMail'],
			['certificateSubjectOrganisation', 'certificateSubjectOrganisationalUnit'],
			'certificateSubjectLocation',
			['certificateSubjectState', 'certificateSubjectCountry'],
		]),
		Group(_('Issuer'), '', [
			['certificateIssuerCommonName', 'certificateIssuerMail'],
			['certificateIssuerOrganisation', 'certificateIssuerOrganisationalUnit'],
			'certificateIssuerLocation',
			['certificateIssuerState', 'certificateIssuerCountry'],
		]),
		Group(_('Validity'), '', [
			['certificateDateNotBefore', 'certificateDateNotAfter']
		]),
		Group(_('Misc'), '', [
			['certificateVersion', 'certificateSerial']
		])
	])
]


[docs]def check_prohibited_username(lo, username): """check if the username is allowed""" module = univention.admin.modules.get('settings/prohibited_username') for prohibited_object in (module.lookup(None, lo, u'') or []): if username in prohibited_object['usernames']: raise univention.admin.uexceptions.prohibitedUsername(username)
[docs]def case_insensitive_in_list(dn, list): assert isinstance(dn, six.text_type) for element in list: assert isinstance(element, six.text_type) if dn.lower() == element.lower(): return True return False
[docs]def posixSecondsToLocaltimeDate(seconds): return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(seconds))
[docs]def posixDaysToDate(days): return time.strftime("%Y-%m-%d", time.gmtime(long(days) * 3600 * 24))
[docs]def sambaWorkstationsMap(workstations, encoding=()): return u','.join(workstations).encode(*encoding)
[docs]def sambaWorkstationsUnmap(workstations, encoding=()): return workstations[0].decode(*encoding).split(u',')
[docs]def logonHoursMap(logontimes): "converts the bitfield 001110010110...100 to the respective hex string" # convert list of bit numbers to bit-string # bitstring = '0' * 168 if logontimes == '': # if unsetting it, see Bug #33703 return None bitstring = ''.join(map(lambda x: x in logontimes and '1' or '0', range(168))) # for idx in logontimes: # bitstring[ idx ] = '1' logontimes = bitstring # the order of the bits of each byte has to be reversed. The reason for this is that # consecutive bytes mean consecutive 8-hrs-intervals, but the MSB stands for # the last hour in that interval, the 2nd but leftmost bit for the second-to-last # hour and so on. We want to hide this from anybody using this feature. # See <http://ma.ph-freiburg.de/tng/tng-technical/2003-04/msg00015.html> for details. newtimes = "" for i in range(0, 21): bitlist = list(logontimes[(i * 8):(i * 8) + 8]) bitlist.reverse() newtimes += "".join(bitlist) logontimes = newtimes # create a hexnumber from each 8-bit-segment ret = "" for i in range(0, 21): val = 0 exp = 7 for j in range((i * 8), (i * 8) + 8): if not (logontimes[j] == "0"): val += 2 ** exp exp -= 1 # we now have: 0<=val<=255 hx = hex(val)[2:4] if len(hx) == 1: hx = "0" + hx ret += hx return ret.encode('ASCII')
[docs]def logonHoursUnmap(logontimes): """Converts hex-string to an array of bits set.""" times = logontimes[0][:42] while len(times) < 42: times = times ret = "" for i in range(0, 42, 2): val = int(times[i:i + 2], 16) ret += intToBinary(val) # reverse order of the bits in each byte. See above for details newtime = "" for i in range(0, 21): bitlist = list(ret[(i * 8):(i * 8) + 8]) bitlist.reverse() newtime += "".join(bitlist) # convert bit-string to list return [i for i in range(168) if newtime[i] == '1']
[docs]def intToBinary(val): ret = "" while val > 0: ret = str(val & 1) + ret val = val >> 1 # pad with leading 0s until length is n*8 if ret == "": ret = "0" while not (len(ret) % 8 == 0): ret = "0" + ret return ret
[docs]def GMTOffset(): # returns the difference in hours between local time and GMT (is -1 for CET and CEST) return time.timezone // 3600
[docs]def load_certificate(user_certificate): """Import a certificate in DER format""" if not user_certificate: return {} try: certificate = base64.b64decode(user_certificate) except base64.binascii.Error: return {} try: x509 = X509.load_cert_string(certificate, X509.FORMAT_DER) values = { 'certificateDateNotBefore': x509.get_not_before().get_datetime().date().isoformat(), 'certificateDateNotAfter': x509.get_not_after().get_datetime().date().isoformat(), 'certificateVersion': str(x509.get_version()), 'certificateSerial': str(x509.get_serial_number()), } X509.m2.XN_FLAG_SEP_MULTILINE & ~X509.m2.ASN1_STRFLGS_ESC_MSB | X509.m2.ASN1_STRFLGS_UTF8_CONVERT for entity, prefix in ( (x509.get_issuer(), "certificateIssuer"), (x509.get_subject(), "certificateSubject"), ): for key, attr in load_certificate.ATTR.items(): try: value = getattr(entity, key) except TypeError: # not expecting type '<class 'NoneType'>' value = None values[prefix + attr] = value except (X509.X509Error, AttributeError): return {} ud.debug(ud.ADMIN, ud.INFO, 'value=%s' % values) return values
load_certificate.ATTR = { "C": "Country", "ST": "State", "L": "Location", "O": "Organisation", "OU": "OrganisationalUnit", "CN": "CommonName", "emailAddress": "Mail", }
[docs]def mapHomePostalAddress(old, encoding=()): """Map address to LDAP encoding. >>> mapHomePostalAddress([["a", "b", "c"]]) [b'a$b$c'] """ new = [] for i in old: new.append(u'$'.join(i).encode(*encoding)) return new
[docs]def unmapHomePostalAddress(old, encoding=()): """Expand LDAP encoded address. >>> unmapHomePostalAddress([b'foo']) [['foo', ' ', ' ']] >>> unmapHomePostalAddress([b'foo$bar$baz']) [['foo', 'bar', 'baz']] """ new = [] for i in old: if b'$' in i: new.append(i.decode(*encoding).split(u'$')) else: new.append([i.decode(*encoding), u" ", u" "]) return new
[docs]def unmapUserExpiry(oldattr): return unmapKrb5ValidEndToUserexpiry(oldattr) or unmapSambaKickoffTimeToUserexpiry(oldattr) or unmapShadowExpireToUserexpiry(oldattr)
[docs]def unmapShadowExpireToUserexpiry(oldattr): # The shadowLastChange attribute is the amount of days between 1/1/1970 up to the day that password was modified, # shadowMax is the number of days a password is valid. So the password expires on 1/1/1970 + shadowLastChange + shadowMax. # shadowExpire contains the absolute date to expire the account. if 'shadowExpire' in oldattr and len(oldattr['shadowExpire']) > 0: ud.debug(ud.ADMIN, ud.INFO, 'userexpiry: %s' % posixDaysToDate(oldattr['shadowExpire'][0])) if oldattr['shadowExpire'][0] != b'1': return posixDaysToDate(oldattr['shadowExpire'][0])
[docs]def unmapKrb5ValidEndToUserexpiry(oldattr): if 'krb5ValidEnd' in oldattr: krb5validend = oldattr['krb5ValidEnd'][0].decode('ASCII') ud.debug(ud.ADMIN, ud.INFO, 'krb5validend is: %s' % krb5validend) return "%s-%s-%s" % (krb5validend[0:4], krb5validend[4:6], krb5validend[6:8])
[docs]def unmapSambaKickoffTimeToUserexpiry(oldattr): if 'sambaKickoffTime' in oldattr: ud.debug(ud.ADMIN, ud.INFO, 'sambaKickoffTime is: %s' % oldattr['sambaKickoffTime'][0].decode('ASCII')) return time.strftime("%Y-%m-%d", time.gmtime(long(oldattr['sambaKickoffTime'][0]) + (3600 * 24)))
def _mapUserExpiryToShadowExpire(userexpiry): return u"%d" % long(time.mktime(time.strptime(userexpiry, "%Y-%m-%d")) / 3600 / 24 + 1) def _mapUserExpiryToKrb5ValidEnd(userexpiry): return u"%s%s%s000000Z" % (userexpiry[0:4], userexpiry[5:7], userexpiry[8:10]) def _mapUserExpiryToSambaKickoffTime(userexpiry): return u"%d" % long(time.mktime(time.strptime(userexpiry, "%Y-%m-%d")))
[docs]def unmapPasswordExpiry(oldattr): if oldattr.get('shadowLastChange') and oldattr.get('shadowMax'): shadow_max = int(oldattr['shadowMax'][0]) shadow_last_change = 0 try: shadow_last_change = int(oldattr['shadowLastChange'][0]) except ValueError: ud.debug(ud.ADMIN, ud.WARN, 'users/user: failed to calculate password expiration correctly, use only shadowMax instead') return posixDaysToDate(shadow_last_change + shadow_max)
[docs]def unmapDisabled(oldattr): if all([ unmapSambaDisabled(oldattr), unmapKerberosDisabled(oldattr), unmapPosixDisabled(oldattr) or isPosixLocked(oldattr), ]): return '1' return '0'
[docs]def inconsistentDisabledState(oldattr): disabled = [ unmapSambaDisabled(oldattr), unmapKerberosDisabled(oldattr), unmapPosixDisabled(oldattr), isPosixLocked(oldattr), ] return len(set(map(bool, disabled))) > 1
[docs]def unmapSambaDisabled(oldattr): flags = oldattr.get('sambaAcctFlags', None) if flags: acctFlags = univention.admin.samba.acctFlags(flags[0].decode('ASCII')) try: return acctFlags['D'] == 1 except KeyError: pass return False
[docs]def unmapKerberosDisabled(oldattr): try: kdcflags = int(oldattr.get('krb5KDCFlags', [b'0'])[0]) except ValueError: kdcflags = 0 return kdcflags & (1 << 7) == (1 << 7)
[docs]def unmapPosixDisabled(oldattr): try: shadowExpire = int(oldattr['shadowExpire'][0]) except (KeyError, ValueError): return False return shadowExpire == 1 or shadowExpire < int(time.time() / 3600 / 24)
[docs]def unmapLocked(oldattr): if isSambaLocked(oldattr) or isKerberosLocked(oldattr): # or isLDAPLocked(oldattr) return '1' return '0'
[docs]def inconsistentLockedState(oldattr): return isSambaLocked(oldattr) ^ isKerberosLocked(oldattr)
[docs]def isPosixLocked(oldattr): userPassword = oldattr.get('userPassword', [b''])[0].decode('ASCII') return userPassword and univention.admin.password.is_locked(userPassword)
[docs]def isSambaLocked(oldattr): flags = oldattr.get('sambaAcctFlags', None) if flags: acctFlags = univention.admin.samba.acctFlags(flags[0].decode('ASCII')) try: return acctFlags['L'] == 1 except KeyError: pass return False
[docs]def isKerberosLocked(oldattr): flags = oldattr.get('krb5KDCFlags', [b'0'])[0] try: state = 1 << 17 return int(flags) & state == state except ValueError: return False
[docs]def isLDAPLocked(oldattr): return bool(oldattr.get('pwdAccountLockedTime', [b''])[0])
[docs]def unmapSambaRid(oldattr): sid = oldattr.get('sambaSID', [b''])[0] pos = sid.rfind(b'-') return sid[pos + 1:].decode('ASCII')
[docs]def mapKeyAndValue(old, encoding=()): """Map (key, value) list to key=value list. >>> mapKeyAndValue([("a", "b")]) [b'a=b'] """ return [u'='.join(entry).encode(*encoding) for entry in old]
[docs]def unmapKeyAndValue(old, encoding=()): """Map (key=value) list to (key, value) list. >>> unmapKeyAndValue([b"a=b"]) [['a', 'b']] """ return [entry.decode(*encoding).split(u'=', 1) for entry in old]
[docs]def mapWindowsFiletime(old, encoding=()): # type: (str) -> List[bytes] if old: if old == "0": return [old.encode(*encoding)] unixtime = time.strptime(old, '%Y%m%d%H%M%SZ') d = long(116444736000000000) # difference between 1601 and 1970 windows_filetime = long(calendar.timegm(unixtime)) * 10000000 + d return [str(int(windows_filetime)).encode('ASCII')] return []
[docs]def unmapWindowsFiletime(old, encoding=()): # type: (List[bytes]) -> str if old and old[0]: password_time = int(old[0].decode(*encoding)) if password_time == 0: return u'%d' % (password_time,) d = long(116444736000000000) # difference between 1601 and 1970 unixtime = (password_time - d) // 10000000 try: return time.strftime('%Y%m%d%H%M%SZ', time.gmtime(unixtime)) except ValueError: # already unixtime, happens in environments with Samba3 ud.debug(ud.ADMIN, ud.INFO, 'Value of sambaBadPasswordTime is not set to a Windows Filetime (100 nanoseconds since January 1, 1601.)\nInstead its set to %s' % (password_time,)) return time.strftime('%Y%m%d%H%M%SZ', time.gmtime(password_time)) return u''
[docs]def datetime_from_local_datetimetimezone_tuple(local_datetimetimezone_tuple): # type: (List[str]) -> datetime.datetime d, t, tz = local_datetimetimezone_tuple # dttz_str = module.property_descriptions[key].syntax.tostring(local_datetimetimezone_tuple) naive_dt = datetime.strptime("%s %s" % (d, t), "%Y-%m-%d %H:%M") return pytz.timezone(tz).localize(naive_dt)
[docs]def mapDateTimeTimezoneTupleToUTCDateTimeString(local_datetimetimezone_tuple, encoding=()): # type: (List[str]) -> List[bytes] if local_datetimetimezone_tuple and all(local_datetimetimezone_tuple): dt = datetime_from_local_datetimetimezone_tuple(local_datetimetimezone_tuple) return [dt.astimezone(pytz.utc).strftime("%Y%m%d%H%M%SZ").encode(*encoding)] return []
[docs]def unmapUTCDateTimeToLocaltime(attribute_value, encoding=()): # type: (List[bytes]) -> List[str] if attribute_value and attribute_value[0]: generalizedtime = attribute_value[0].decode(*encoding) try: utc_datetime = datetime.strptime(generalizedtime, "%Y%m%d%H%M%SZ") except ValueError: ud.debug(ud.ADMIN, ud.ERROR, 'Value of krb5ValidStart is not in generalizedTime format: %s' % (generalizedtime,)) raise local_datetimetimezone_tuple = datetime.strftime(utc_datetime, "%Y-%m-%d %H:%M UTC").split() return local_datetimetimezone_tuple return []
mapping = univention.admin.mapping.mapping() mapping.register('username', 'uid', None, univention.admin.mapping.ListToString) mapping.register('uidNumber', 'uidNumber', None, univention.admin.mapping.ListToString) mapping.register('gidNumber', 'gidNumber', None, univention.admin.mapping.ListToString) mapping.register('title', 'title', None, univention.admin.mapping.ListToString) mapping.register('initials', 'initials', None, univention.admin.mapping.ListToString) mapping.register('description', 'description', None, univention.admin.mapping.ListToString) mapping.register('organisation', 'o', None, univention.admin.mapping.ListToString) mapping.register('mailPrimaryAddress', 'mailPrimaryAddress', None, univention.admin.mapping.ListToLowerString, encoding='ASCII') mapping.register('mailAlternativeAddress', 'mailAlternativeAddress', encoding='ASCII') mapping.register('mailHomeServer', 'univentionMailHomeServer', None, univention.admin.mapping.ListToString) mapping.register('mailForwardAddress', 'mailForwardAddress') if configRegistry.is_true('directory/manager/user/activate_ldap_attribute_mailForwardCopyToSelf', False): mapping.register('mailForwardCopyToSelf', 'mailForwardCopyToSelf', None, univention.admin.mapping.ListToString) mapping.register('preferredLanguage', 'preferredLanguage', None, univention.admin.mapping.ListToString) mapping.register('street', 'street', None, univention.admin.mapping.ListToString) mapping.register('e-mail', 'mail', encoding='ASCII') mapping.register('postcode', 'postalCode', None, univention.admin.mapping.ListToString) mapping.register('postOfficeBox', 'postOfficeBox') mapping.register('city', 'l', None, univention.admin.mapping.ListToString) mapping.register('country', 'st', None, univention.admin.mapping.ListToString) mapping.register('phone', 'telephoneNumber') mapping.register('roomNumber', 'roomNumber') mapping.register('employeeNumber', 'employeeNumber', None, univention.admin.mapping.ListToString) mapping.register('employeeType', 'employeeType', None, univention.admin.mapping.ListToString) mapping.register('secretary', 'secretary') mapping.register('departmentNumber', 'departmentNumber') mapping.register('mobileTelephoneNumber', 'mobile') mapping.register('pagerTelephoneNumber', 'pager') mapping.register('homeTelephoneNumber', 'homePhone') mapping.register('homePostalAddress', 'homePostalAddress', mapHomePostalAddress, unmapHomePostalAddress) mapping.register('physicalDeliveryOfficeName', 'physicalDeliveryOfficeName', None, univention.admin.mapping.ListToString) mapping.register('preferredDeliveryMethod', 'preferredDeliveryMethod', None, univention.admin.mapping.ListToString) mapping.register('unixhome', 'homeDirectory', None, univention.admin.mapping.ListToString) mapping.register('shell', 'loginShell', None, univention.admin.mapping.ListToString, encoding='ASCII') mapping.register('sambahome', 'sambaHomePath', None, univention.admin.mapping.ListToString) mapping.register('sambaUserWorkstations', 'sambaUserWorkstations', sambaWorkstationsMap, sambaWorkstationsUnmap) mapping.register('sambaLogonHours', 'sambaLogonHours', logonHoursMap, logonHoursUnmap, encoding='ASCII') mapping.register('sambaPrivileges', 'univentionSambaPrivilegeList', encoding='ASCII') mapping.register('scriptpath', 'sambaLogonScript', None, univention.admin.mapping.ListToString) mapping.register('profilepath', 'sambaProfilePath', None, univention.admin.mapping.ListToString) mapping.register('homedrive', 'sambaHomeDrive', None, univention.admin.mapping.ListToString, encoding='ASCII') mapping.register('gecos', 'gecos', None, univention.admin.mapping.ListToString, encoding='ASCII') mapping.register('displayName', 'displayName', None, univention.admin.mapping.ListToString) mapping.register('birthday', 'univentionBirthday', None, univention.admin.mapping.ListToString) mapping.register('lastname', 'sn', None, univention.admin.mapping.ListToString) mapping.register('firstname', 'givenName', None, univention.admin.mapping.ListToString) mapping.register('userCertificate', 'userCertificate;binary', univention.admin.mapping.mapBase64, univention.admin.mapping.unmapBase64) mapping.register('jpegPhoto', 'jpegPhoto', univention.admin.mapping.mapBase64, univention.admin.mapping.unmapBase64) mapping.register('umcProperty', 'univentionUMCProperty', mapKeyAndValue, unmapKeyAndValue) mapping.register('lockedTime', 'sambaBadPasswordTime', mapWindowsFiletime, unmapWindowsFiletime) mapping.register('accountActivationDate', 'krb5ValidStart', mapDateTimeTimezoneTupleToUTCDateTimeString, unmapUTCDateTimeToLocaltime, encoding='ASCII') mapping.registerUnmapping('sambaRID', unmapSambaRid) mapping.registerUnmapping('passwordexpiry', unmapPasswordExpiry) mapping.registerUnmapping('userexpiry', unmapUserExpiry) mapping.registerUnmapping('disabled', unmapDisabled) mapping.registerUnmapping('locked', unmapLocked) mapping.register('password', 'userPassword', univention.admin.mapping.dontMap(), univention.admin.mapping.ListToString)
[docs]class object(univention.admin.handlers.simpleLdap): module = module use_performant_ldap_search_filter = True def __init__(self, co, lo, position, dn=u'', superordinate=None, attributes=None): self.groupsLoaded = True self.password_length = 8 univention.admin.handlers.simpleLdap.__init__(self, co, lo, position, dn, superordinate, attributes=attributes) def _simulate_legacy_options(self): '''simulate old options behavior to provide backward compatibility for udm extensions''' options = dict( posix=b'posixAccount', samba=b'sambaSamAccount', kerberos=b'krb5Principal', mail=b'univentionMail', person=b'person', ) for opt, oc in options.items(): # existing object if self.oldattr: if oc in self.oldattr.get('objectClass', []): self.options.append(opt) # new object else: self.options.append(opt)
[docs] def open(self, loadGroups=True): univention.admin.handlers.simpleLdap.open(self) if self.exists(): self._unmap_mail_forward() self._unmap_pwd_change_next_login() self._unmap_automount_information() self._unmapUnlockTime() self.reload_certificate() self._load_groups(loadGroups) self.save() if not self.exists(): # TODO: move this block into _ldap_pre_create! self._set_default_group()
def _load_groups(self, loadGroups): if self.exists(): if loadGroups: # this is optional because it can take much time on larger installations, default is true self['groups'] = [x.decode('UTF-8') if six.PY2 else x for x in self.lo.searchDn(filter=filter_format(u'(&(cn=*)(|(objectClass=univentionGroup)(objectClass=sambaGroupMapping))(uniqueMember=%s))', [self.dn]))] else: ud.debug(ud.ADMIN, ud.INFO, 'user: open with loadGroups=false for user %s' % self['username']) self.groupsLoaded = loadGroups primaryGroupNumber = self.oldattr.get('gidNumber', [b''])[0].decode('ASCII') if primaryGroupNumber: primaryGroupResult = self.lo.searchDn(filter=filter_format(u'(&(cn=*)(|(objectClass=posixGroup)(objectClass=sambaGroupMapping))(gidNumber=%s))', [primaryGroupNumber])) if primaryGroupResult: self['primaryGroup'] = primaryGroupResult[0] else: try: primaryGroup = self.lo.search(filter='(objectClass=univentionDefault)', base='cn=univention,' + self.position.getDomain(), attr=['univentionDefaultGroup']) try: primaryGroup = primaryGroup[0][1]["univentionDefaultGroup"][0].decode('UTF-8') except Exception: primaryGroup = None except Exception: primaryGroup = None ud.debug(ud.ADMIN, ud.INFO, 'user: could not find primaryGroup, setting primaryGroup to %s' % primaryGroup) if not primaryGroup: raise univention.admin.uexceptions.primaryGroup(self.dn) self.info['primaryGroup'] = primaryGroup self.__primary_group() else: self.info['primaryGroup'] = None self.save() raise univention.admin.uexceptions.primaryGroup(self.dn) def _set_default_group(self): primary_group_from_template = self['primaryGroup'] if not primary_group_from_template: searchResult = self.lo.search(filter=u'(objectClass=univentionDefault)', base=u'cn=univention,' + self.position.getDomain(), attr=['univentionDefaultGroup']) if not searchResult or not searchResult[0][1]: self.info['primaryGroup'] = None self.save() raise univention.admin.uexceptions.primaryGroup(self.dn) for tmp, number in searchResult: primaryGroupResult = self.lo.searchDn(filter=filter_format(u'(&(objectClass=posixGroup)(cn=%s))', (univention.admin.uldap.explodeDn(number['univentionDefaultGroup'][0].decode('UTF-8'), 1)[0],)), base=self.position.getDomain(), scope='domain') if primaryGroupResult: self['primaryGroup'] = primaryGroupResult[0] # self.save() must not be called after this point in self.open() # otherwise self.__primary_group doesn't add a new user to the # univentionDefaultGroup because "not self.hasChanged('primaryGroup')" def _unmap_pwd_change_next_login(self): if self.oldattr.get('shadowLastChange', [b''])[0] == b'0': self['pwdChangeNextLogin'] = '1' elif self['passwordexpiry']: today = time.strftime('%Y-%m-%d').split('-') expiry = self['passwordexpiry'].split('-') # expiry.reverse() # today.reverse() if int(''.join(today)) >= int(''.join(expiry)): self['pwdChangeNextLogin'] = '1' def _unmap_mail_forward(self): if configRegistry.is_true('directory/manager/user/activate_ldap_attribute_mailForwardCopyToSelf', False): return # mailForwardCopyToSelf is a "virtual" property. The boolean value is set to True, if # the LDAP attribute mailForwardAddress contains the mailPrimaryAddress. The mailPrimaryAddress # is removed from oldattr for correct display in CLI/UMC and for proper detection of changes. # Remark: By setting the ucr-v the attribute is saved directly to LDAP. if self.get('mailPrimaryAddress') in self.get('mailForwardAddress', []): self.oldattr['mailForwardAddress'] = self.oldattr.get('mailForwardAddress', [])[:] self['mailForwardAddress'].remove(self['mailPrimaryAddress']) self['mailForwardCopyToSelf'] = '1' else: self['mailForwardCopyToSelf'] = '0' def _unmap_automount_information(self): if 'automountInformation' not in self.oldattr: return try: flags, unc = re.split(b' +', self.oldattr['automountInformation'][0], 1) host, path = unc.split(b':', 1) except ValueError: return host, path = host.decode('ASCII'), path.decode('ASCII') sharepath = path while len(sharepath) > 1: filter_ = univention.admin.filter.conjunction('&', [ univention.admin.filter.expression('univentionShareHost', host, escape=True), univention.admin.filter.conjunction('|', [ univention.admin.filter.expression('univentionSharePath', sharepath.rstrip(u'/'), escape=True), univention.admin.filter.expression('univentionSharePath', u'%s/' % (sharepath.rstrip(u'/')), escape=True), ]) ]) res = univention.admin.modules.lookup(univention.admin.modules.get('shares/share'), None, self.lo, filter=filter_, scope='domain') if len(res) == 1: self['homeShare'] = res[0].dn relpath = path.replace(sharepath, u'') if len(relpath) > 0 and relpath[0] == u'/': relpath = relpath[1:] self['homeSharePath'] = relpath break elif len(res) > 1: break elif len(res) < 1: sharepath = os.path.split(sharepath)[0] def _unmapUnlockTime(self): self.info['unlockTime'] = '' locked_timestamp = self['lockedTime'] if locked_timestamp and locked_timestamp != "0": try: locked_unixtime = long(calendar.timegm(time.strptime(locked_timestamp, '%Y%m%d%H%M%SZ'))) lockout_duration = int(self.lo.search(filter='objectClass=sambaDomain', attr=['sambaLockoutDuration'])[0][1].get('sambaLockoutDuration', [0])[0]) except (ValueError, KeyError, IndexError, AttributeError): return if lockout_duration == 0: self.info['unlockTime'] = _("unlimited") else: self.info['unlockTime'] = posixSecondsToLocaltimeDate(lockout_duration + locked_unixtime)
[docs] def modify(self, *args, **kwargs): try: return super(object, self).modify(*args, **kwargs) except univention.admin.uexceptions.licenseDisableModify: # it has to be possible to deactivate an user account when the license is exceeded if '1' != self['disabled'] or not self.hasChanged('disabled'): raise kwargs['ignore_license'] = True return super(object, self).modify(*args, **kwargs)
[docs] def reload_certificate(self): """Reload user certificate.""" if 'pki' not in self.options: return self.info['certificateSubjectCountry'] = '' self.info['certificateSubjectState'] = '' self.info['certificateSubjectLocation'] = '' self.info['certificateSubjectOrganisation'] = '' self.info['certificateSubjectOrganisationalUnit'] = '' self.info['certificateSubjectCommonName'] = '' self.info['certificateSubjectMail'] = '' self.info['certificateIssuerCountry'] = '' self.info['certificateIssuerState'] = '' self.info['certificateIssuerLocation'] = '' self.info['certificateIssuerOrganisation'] = '' self.info['certificateIssuerOrganisationalUnit'] = '' self.info['certificateIssuerCommonName'] = '' self.info['certificateIssuerMail'] = '' self.info['certificateDateNotBefore'] = '' self.info['certificateDateNotAfter'] = '' self.info['certificateVersion'] = '' self.info['certificateSerial'] = '' _certificate = self.info.get('userCertificate') certificate = _certificate[0] if isinstance(_certificate, list) else _certificate values = load_certificate(certificate) if values: self.info.update(values) else: self.info['userCertificate'] = ''
[docs] def hasChanged(self, key): if key == 'disabled' and inconsistentDisabledState(self.oldattr): return True if key == 'locked' and inconsistentLockedState(self.oldattr): return True return super(object, self).hasChanged(key)
# if key == 'disabled': # acctFlags = univention.admin.samba.acctFlags(self.oldattr.get("sambaAcctFlags", [b''])[0].decode('ASCII')).decode() # krb5Flags = self.oldattr.get('krb5KDCFlags', []) # shadowExpire = self.oldattr.get('shadowExpire', []) # # if not acctFlags and not krb5Flags and not shadowExpire: # return False # if self['disabled'] == 'all': # return 'D' not in acctFlags or b'126' in krb5Flags or b'1' not in shadowExpire # elif self['disabled'] == 'windows': # return 'D' not in acctFlags or b'254' in krb5Flags or b'1' in shadowExpire # elif self['disabled'] == 'kerberos': # return 'D' in acctFlags or b'126' in krb5Flags or b'1' in shadowExpire # elif self['disabled'] == 'posix': # return 'D' in acctFlags or b'254' in krb5Flags or b'1' not in shadowExpire # elif self['disabled'] == 'windows_kerberos': # return 'D' not in acctFlags or b'126' in krb5Flags or b'1' in shadowExpire # elif self['disabled'] == 'windows_posix': # return 'D' not in acctFlags or b'254' in krb5Flags or b'1' not in shadowExpire # elif self['disabled'] == 'posix_kerberos': # return 'D' in acctFlags or b'126' in krb5Flags or b'1' not in shadowExpire # else: # enabled # return 'D' in acctFlags or b'254' in krb5Flags or b'1' in shadowExpire # elif key == 'locked': # password = self['password'] # acctFlags = univention.admin.samba.acctFlags(self.oldattr.get("sambaAcctFlags", [b''])[0].decode('ASCII')).decode() # if not password and not acctFlags: # return False # if self['locked'] == 'all': # return not univention.admin.password.is_locked(password) or 'L' not in acctFlags # elif self['locked'] == 'windows': # return univention.admin.password.is_locked(password) or 'L' not in acctFlags # elif self['locked'] == 'posix': # return not univention.admin.password.is_locked(password) or 'L' in acctFlags # else: # return univention.admin.password.is_locked(password) or 'L' in acctFlags # # return super(object, self).hasChanged(key) def __update_groups(self): if not self.groupsLoaded: return if self.exists(): old_groups = self.oldinfo.get('groups', []) old_uid = self.oldinfo.get('username', '') else: old_groups = [] old_uid = "" new_uid = self.info.get('username', '') new_groups = self.info.get('groups', []) # change memberUid if we have a new username if old_uid and old_uid != new_uid and self.exists(): ud.debug(ud.ADMIN, ud.INFO, 'users/user: rewrite memberuid after rename') for group in new_groups: self.__rewrite_member_uid(group) group_mod = univention.admin.modules.get('groups/group') ud.debug(ud.ADMIN, ud.INFO, 'users/user: check groups in old_groups') for group in old_groups: if group and not case_insensitive_in_list(group, self.info.get('groups', [])) and group.lower() != self['primaryGroup'].lower(): grpobj = group_mod.object(None, self.lo, self.position, group) grpobj.fast_member_remove([self.old_dn], [old_uid]) if self.dn != self.old_dn: # we change our DN _before_ removing it from the group # so if we changed it and if we use refint overlay, it already updated the uniqueMember of the group and we will not catch it with old_dn grpobj.fast_member_remove([self.dn], [old_uid]) ud.debug(ud.ADMIN, ud.INFO, 'users/user: check groups in info[groups]') for group in self.info.get('groups', []): if group and not case_insensitive_in_list(group, old_groups): grpobj = group_mod.object(None, self.lo, self.position, group) grpobj.fast_member_add([self.dn], [new_uid]) if configRegistry.is_true("directory/manager/user/primarygroup/update", True): ud.debug(ud.ADMIN, ud.INFO, 'users/user: check primaryGroup') if not self.exists() and self.info.get('primaryGroup'): grpobj = group_mod.object(None, self.lo, self.position, self.info.get('primaryGroup')) grpobj.fast_member_add([self.dn], [new_uid]) def __rewrite_member_uid(self, group, members=[]): uids = self.lo.getAttr(group, 'memberUid') if not members: members = self.lo.getAttr(group, 'uniqueMember') new_uids = [] for memberDNstr in members: memberDN = ldap.dn.str2dn(memberDNstr) if memberDN[0][0][0] == 'uid': # UID is stored in DN --> use UID directly new_uids.append(memberDN[0][0][1].encode('UTF-8')) else: UIDs = self.lo.getAttr(memberDNstr.decode('UTF-8'), 'uid') if UIDs: new_uids.append(UIDs[0]) if len(UIDs) > 1: ud.debug(ud.ADMIN, ud.WARN, 'users/user: A groupmember has multiple UIDs (%s %r)' % (memberDNstr, UIDs)) self.lo.modify(group, [('memberUid', uids, new_uids)]) # TODO: check if encoding is correct def __primary_group(self): if not self.hasChanged('primaryGroup'): return if configRegistry.is_true("directory/manager/user/primarygroup/update", True): new_uid = self.info.get('username') group_mod = univention.admin.modules.get('groups/group') grpobj = group_mod.object(None, self.lo, self.position, self['primaryGroup']) grpobj.fast_member_add([self.dn], [new_uid]) ud.debug(ud.ADMIN, ud.INFO, 'users/user: adding to new primaryGroup %s (uid=%s)' % (self['primaryGroup'], new_uid))
[docs] def krb5_principal(self): domain = univention.admin.uldap.domain(self.lo, self.position) realm = domain.getKerberosRealm() if not realm: raise univention.admin.uexceptions.noKerberosRealm() return self['username'] + '@' + realm
def _check_uid_gid_uniqueness(self): if not configRegistry.is_true("directory/manager/uid_gid/uniqueness", True): return # POSIX, Samba fg = univention.admin.filter.expression('gidNumber', self['uidNumber']) group_objects = univention.admin.handlers.groups.group.lookup(self.co, self.lo, filter_s=fg) if group_objects: raise univention.admin.uexceptions.uidNumberAlreadyUsedAsGidNumber(repr(self["uidNumber"])) def _ldap_pre_create(self): super(object, self)._ldap_pre_create() ud.debug(ud.ADMIN, ud.INFO, 'users/user: dn was set to %s' % (self.dn,)) # request a new uidNumber or get lock for manually set uidNumber if self['uidNumber']: univention.admin.allocators.acquireUnique(self.lo, self.position, 'uidNumber', self['uidNumber'], 'uidNumber', scope='base') # "False" ==> do not update univentionLastUsedValue in LDAP if a specific value has been specified self.alloc.append(('uidNumber', self['uidNumber'], False)) else: self['uidNumber'] = self.request_lock('uidNumber') self._check_uid_gid_uniqueness() def _ldap_pre_ready(self): super(object, self)._ldap_pre_ready() if self.exists() and not self.oldinfo.get('password') and not self['password']: # password property is required but LDAP ACL's disallow reading them self.info['password'] = self.oldinfo['password'] = u'*' self.info['disabled'] = self.oldinfo['disabled'] if not self.exists() or self.hasChanged('primaryGroup'): # Ensure the primary Group has the samba option enabled if self['primaryGroup'] and not self.lo.getAttr(self['primaryGroup'], 'sambaSID'): raise univention.admin.uexceptions.primaryGroupWithoutSamba(self['primaryGroup']) if not self.exists() or self.hasChanged('username') and self['username'].lower() != self.oldinfo['username'].lower(): check_prohibited_username(self.lo, self['username']) # get lock for username try: if self['username']: # might not be set when using CLI without --set username= self.request_lock('uid', self['username']) except univention.admin.uexceptions.noLock: raise univention.admin.uexceptions.uidAlreadyUsed(self['username']) # get lock for mailPrimaryAddress if not self.exists() or self.hasChanged('mailPrimaryAddress'): if self['mailPrimaryAddress']: self['mailPrimaryAddress'] = self['mailPrimaryAddress'].lower() # ignore case in change of mailPrimaryAddress, we only store the lowercase address anyway if self['mailPrimaryAddress'] and self['mailPrimaryAddress'].lower() != (self.oldinfo.get('mailPrimaryAddress', None) or '').lower(): try: self.request_lock('mailPrimaryAddress', self['mailPrimaryAddress']) except univention.admin.uexceptions.noLock: raise univention.admin.uexceptions.mailAddressUsed(self['mailPrimaryAddress']) if self['unlock'] == '1': self['locked'] = u'0' if self.hasChanged('disabled') and self['disabled'] == '0' and not self.hasChanged('accountActivationDate'): self['accountActivationDate'] = self.descriptions['accountActivationDate'].default(self) if self['accountActivationDate'] and all(self['accountActivationDate']) and datetime.now(tz=pytz.utc) < datetime_from_local_datetimetimezone_tuple(self['accountActivationDate']): self['disabled'] = '1' if self['disabled'] == '1': self['locked'] = u'0' # Samba/AD behavior # legacy options to make old hooks happy (46539) self._simulate_legacy_options() def _ldap_addlist(self): al = super(object, self)._ldap_addlist() # Kerberos al.append((u'krb5MaxLife', b'86400')) al.append((u'krb5MaxRenew', b'604800')) return al def _ldap_post_create(self): super(object, self)._ldap_post_create() self.__update_groups() self.__primary_group() def _ldap_post_modify(self): super(object, self)._ldap_post_modify() # POSIX self.__update_groups() self.__primary_group() def _ldap_pre_rename(self, newdn): super(object, self)._ldap_pre_rename(newdn) try: self.move(newdn) finally: univention.admin.allocators.release(self.lo, self.position, 'uid', self['username']) def _ldap_pre_modify(self): super(object, self)._ldap_pre_modify() if not self.oldattr.get('mailForwardCopyToSelf') and self['mailForwardCopyToSelf'] == '0' and not self['mailForwardAddress']: self['mailForwardCopyToSelf'] = None if self.hasChanged("uidNumber"): # this should never happen, as uidNumber is marked as unchangeable self._check_uid_gid_uniqueness() def _ldap_modlist(self): ml = univention.admin.handlers.simpleLdap._ldap_modlist(self) ml = self._modlist_pwd_account_locked_time(ml) ml = self._modlist_samba_privileges(ml) ml = self._modlist_cn(ml) ml = self._modlist_gecos(ml) ml = self._modlist_display_name(ml) ml = self._modlist_krb_principal(ml) ml = self._modlist_krb5kdc_flags(ml) ml = self._modlist_posix_password(ml) ml = self._modlist_kerberos_password(ml) if not self.exists() or self.hasChanged(['password', 'pwdChangeNextLogin']): pwhistoryPolicy = univention.admin.password.PasswortHistoryPolicy(self.loadPolicyObject('policies/pwhistory')) ml = self._check_password_history(ml, pwhistoryPolicy) self._check_password_complexity(pwhistoryPolicy) ml = self._modlist_samba_password(ml, pwhistoryPolicy) ml = self._modlist_password_expiry(ml, pwhistoryPolicy) ml = self._modlist_samba_bad_pw_count(ml) ml = self._modlist_sambaAcctFlags(ml) ml = self._modlist_samba_kickoff_time(ml) ml = self._modlist_krb5_valid_end(ml) ml = self._modlist_shadow_expire(ml) ml = self._modlist_mail_forward(ml) ml = self._modlist_home_share(ml) ml = self._modlist_samba_sid(ml) ml = self._modlist_primary_group(ml) ml = self._modlist_service_specific_password(ml) ml = self._modlist_univention_person(ml) return ml def _modlist_samba_privileges(self, ml): if self.hasChanged('sambaPrivileges'): # add univentionSambaPrivileges objectclass if self['sambaPrivileges'] and b'univentionSambaPrivileges' not in self.oldattr.get('objectClass', []): ml.append(('objectClass', b'', b'univentionSambaPrivileges')) return ml def _modlist_cn(self, ml): cnAtts = configRegistry.get('directory/manager/usercn/attributes', "<firstname> <lastname>") prop = univention.admin.property() old_cn = self.oldattr.get('cn', [b''])[0] cn = prop._replace(cnAtts, self) # TODO: prop._replace() must return unicode cn = cn.strip() or cn cn = cn.encode('UTF-8') if cn != old_cn: ml.append(('cn', old_cn, cn)) return ml def _modlist_gecos(self, ml): if self.hasChanged(['firstname', 'lastname']): prop = self.descriptions['gecos'] old_gecos = self.oldattr.get('gecos', [b''])[0] gecos = prop._replace(prop.base_default, self) if old_gecos: current_gecos = prop._replace(prop.base_default, self.oldinfo) current_gecos = current_gecos.encode('utf-8') if current_gecos == old_gecos: ml.append(('gecos', old_gecos, [gecos.encode('utf-8')])) return ml def _modlist_display_name(self, ml): # update displayName automatically if no custom value has been entered by the user and the name changed if self.info.get('displayName') == self.oldinfo.get('displayName') and (self.info.get('firstname') != self.oldinfo.get('firstname') or self.info.get('lastname') != self.oldinfo.get('lastname')): prop_displayName = self.descriptions['displayName'] old_default_displayName = prop_displayName._replace(prop_displayName.base_default, self.oldinfo) # does old displayName match with old default displayName? if self.oldinfo.get('displayName', '') == old_default_displayName: # yes ==> update displayName automatically new_displayName = prop_displayName._replace(prop_displayName.base_default, self) ml.append(('displayName', self.oldattr.get('displayName', [b''])[0], new_displayName.encode('utf-8'))) return ml def _modlist_krb_principal(self, ml): if not self.exists() or self.hasChanged('username'): ml.append(('krb5PrincipalName', self.oldattr.get('krb5PrincipalName', []), [self.krb5_principal().encode('utf-8')])) # TODO: decide to let krb5_principal return bytestring?! return ml # If you change anything here, please also check users/ldap.py def _check_password_history(self, ml, pwhistoryPolicy): if self.exists() and not self.hasChanged('password'): return ml if self['overridePWHistory'] == '1': return ml pwhistory = self.oldattr.get('pwhistory', [b''])[0].decode('ASCII') if univention.admin.password.password_already_used(self['password'], pwhistory): raise univention.admin.uexceptions.pwalreadyused() if pwhistoryPolicy.pwhistoryLength is not None: newPWHistory = univention.admin.password.get_password_history(self['password'], pwhistory, pwhistoryPolicy.pwhistoryLength) ml.append(('pwhistory', self.oldattr.get('pwhistory', [b''])[0], newPWHistory.encode('ASCII'))) return ml # If you change anything here, please also check users/ldap.py def _check_password_complexity(self, pwhistoryPolicy): if self.exists() and not self.hasChanged('password'): return if self['overridePWLength'] == '1': return password_minlength = max(0, pwhistoryPolicy.pwhistoryPasswordLength) or self.password_length if len(self['password']) < password_minlength: raise univention.admin.uexceptions.pwToShort(_('The password is too short, at least %d characters needed!') % (password_minlength,)) if pwhistoryPolicy.pwhistoryPasswordCheck: pwdCheck = univention.password.Check(self.lo) pwdCheck.enableQualityCheck = True try: pwdCheck.check(self['password'], username=self['username'], displayname=self['displayName']) except univention.password.CheckFailed as exc: raise univention.admin.uexceptions.pwQuality(str(exc)) def _modlist_samba_password(self, ml, pwhistoryPolicy): if self.exists() and not self.hasChanged('password'): return ml password_nt, password_lm = univention.admin.password.ntlm(self['password']) # TODO: decide to let ntlm() return bytestring?! password_nt, password_lm = password_nt.encode('ASCII'), password_lm.encode('ASCII') ml.append(('sambaNTPassword', self.oldattr.get('sambaNTPassword', [b''])[0], password_nt)) ml.append(('sambaLMPassword', self.oldattr.get('sambaLMPassword', [b''])[0], password_lm)) if pwhistoryPolicy.pwhistoryLength is not None: smbpwhistory = self.oldattr.get('sambaPasswordHistory', [b''])[0].decode('ASCII') newsmbPWHistory = self._get_samba_password_history(password_nt, smbpwhistory, pwhistoryPolicy.pwhistoryLength) ml.append(('sambaPasswordHistory', self.oldattr.get('sambaPasswordHistory', [b''])[0], newsmbPWHistory.encode('ASCII'))) return ml def _modlist_kerberos_password(self, ml): if self.exists() and not self.hasChanged('password'): return ml krb_keys = univention.admin.password.krb5_asn1(self.krb5_principal(), self['password']) krb_key_version = str(int(self.oldattr.get('krb5KeyVersionNumber', ['0'])[0]) + 1).encode('ASCII') ml.append(('krb5Key', self.oldattr.get('krb5Key', []), krb_keys)) ml.append(('krb5KeyVersionNumber', self.oldattr.get('krb5KeyVersionNumber', []), krb_key_version)) return ml def _modlist_password_expiry(self, ml, pwhistoryPolicy): pwd_change_next_login = self.hasChanged('pwdChangeNextLogin') and self['pwdChangeNextLogin'] == '1' unset_pwd_change_next_login = self.hasChanged('pwdChangeNextLogin') and self['pwdChangeNextLogin'] == '0' now = (long(time.time()) / 3600 / 24) shadowLastChange = str(int(now)) shadowMax = str(pwhistoryPolicy.expiryInterval or u'') # FIXME: is pwhistoryPolicy.expiryInterval a unicode or bytestring? if pwd_change_next_login: # force user to change password on next login shadowMax = shadowMax or '1' shadowLastChange = str(int(now) - int(shadowMax) - 1) elif unset_pwd_change_next_login: shadowMax = u'' if not pwhistoryPolicy.expiryInterval and not self.hasChanged('pwdChangeNextLogin'): # An empty field means that password aging features are disabled. shadowLastChange = u'' shadowMax = shadowMax.encode('ASCII') old_shadowMax = self.oldattr.get('shadowMax', [b''])[0] if old_shadowMax != shadowMax: ml.append(('shadowMax', old_shadowMax, shadowMax)) shadowLastChange = shadowLastChange.encode('ASCII') if shadowLastChange: # FIXME: this check causes, that the value is not unset. Is this correct? ml.append(('shadowLastChange', self.oldattr.get('shadowLastChange', [b''])[0], shadowLastChange)) # if pwdChangeNextLogin has been set, set sambaPwdLastSet to 0 (see UCS Bug #17890) # OLD behavior was: set sambaPwdLastSet to 1 (see UCS Bug #8292 and Samba Bug #4313) sambaPwdLastSetValue = u'0' if pwd_change_next_login else str(long(time.time())) ud.debug(ud.ADMIN, ud.INFO, 'sambaPwdLastSetValue: %s' % sambaPwdLastSetValue) sambaPwdLastSetValue = sambaPwdLastSetValue.encode('UTF-8') ml.append(('sambaPwdLastSet', self.oldattr.get('sambaPwdLastSet', [b''])[0], sambaPwdLastSetValue)) krb5PasswordEnd = u'' if pwhistoryPolicy.expiryInterval or pwd_change_next_login: expiry = long(time.time()) if not pwd_change_next_login: expiry = expiry + (pwhistoryPolicy.expiryInterval * 3600 * 24) krb5PasswordEnd = time.strftime("%Y%m%d000000Z", time.gmtime(expiry)) ud.debug(ud.ADMIN, ud.INFO, 'krb5PasswordEnd: %s' % krb5PasswordEnd) old_krb5PasswordEnd = self.oldattr.get('krb5PasswordEnd', [b''])[0] krb5PasswordEnd = krb5PasswordEnd.encode('ASCII') if old_krb5PasswordEnd != krb5PasswordEnd: ml.append(('krb5PasswordEnd', old_krb5PasswordEnd, krb5PasswordEnd)) return ml def _modlist_krb5kdc_flags(self, ml): """Set the krb5KDCFlags. default = 1 << 6 | 1 << 5 | 1 << 4 | 1 << 3 | 1 << 2 | 1 << 1 = 126 initial(0), -- require as-req forwardable(1), -- may issue forwardable proxiable(2), -- may issue proxiable renewable(3), -- may issue renewable postdate(4),-- may issue postdatable server(5),-- may be server client(6),-- may be client invalid(7), -- entry is invalid require-preauth(8), -- must use preauth change-pw(9), -- change password service require-hwauth(10), -- must use hwauth ok-as-delegate(11), -- as in TicketFlags user-to-user(12), -- may use user-to-user auth immutable(13),-- may not be deleted trusted-for-delegation(14), -- Trusted to print forwardabled tickets allow-kerberos4(15),-- Allow Kerberos 4 requests allow-digest(16), -- Allow digest requests locked-out(17), -- Account is locked out, authentication will be denied require-pwchange(18), -- require a passwd change do-not-store(31)-- Not to be modified and stored in HDB """ if not self.exists() or self.hasChanged(['disabled', 'locked']): try: old_kdcflags = int(self.oldattr.get('krb5KDCFlags', [b'0'])[0]) except ValueError: old_kdcflags = 0 krb_kdcflags = old_kdcflags if not self.exists(): krb_kdcflags |= 126 if self['disabled'] == '1': krb_kdcflags |= (1 << 7) else: # enable kerberos account krb_kdcflags &= ~(1 << 7) if self['locked'] == '0': # unlock kerberos password krb_kdcflags &= ~(1 << 17) # elif self['locked'] == '1': # lock kerberos password # krb_kdcflags |= (1 << 17) ml.append(('krb5KDCFlags', self.oldattr.get('krb5KDCFlags', []), str(krb_kdcflags).encode('ASCII'))) return ml # If you change anything here, please also check users/ldap.py def _modlist_posix_password(self, ml): if not self.exists() or self.hasChanged(['disabled', 'password']): old_password = self.oldattr.get('userPassword', [b''])[0].decode('ASCII') password = self['password'] if self.hasChanged('password') and univention.admin.password.RE_PASSWORD_SCHEME.match(password): # hacking attempt. user tries to change the password to e.g. {KINIT} or {crypt}$6$... raise univention.admin.uexceptions.valueError(_('Invalid password.'), property='password') if univention.admin.password.password_is_auth_saslpassthrough(old_password): # do not change {SASL} password, but lock it if necessary password = old_password password_hash = univention.admin.password.lock_password(password) # TODO: decode to let lock_password() and unlock_passowrd() return bytestring?! if self['disabled'] != '1': password_hash = univention.admin.password.unlock_password(password_hash) ml.append(('userPassword', old_password.encode('ASCII'), password_hash.encode('ASCII'))) return ml def _modlist_pwd_account_locked_time(self, ml): # remove pwdAccountLockedTime during unlocking if self.hasChanged('locked') and self['locked'] == '0': pwdAccountLockedTime = self.oldattr.get('pwdAccountLockedTime', [b''])[0] if pwdAccountLockedTime: ml.append(('pwdAccountLockedTime', pwdAccountLockedTime, b'')) return ml def _modlist_samba_bad_pw_count(self, ml): if self.hasChanged('locked') and self['locked'] == '0': # reset bad pw count ml.append(('sambaBadPasswordCount', self.oldattr.get('sambaBadPasswordCount', [b''])[0], b"0")) ml.append(('sambaBadPasswordTime', self.oldattr.get('sambaBadPasswordTime', [b''])[0], b'0')) return ml def _modlist_samba_kickoff_time(self, ml): if self.hasChanged('userexpiry'): sambaKickoffTime = b'' if self['userexpiry']: sambaKickoffTime = _mapUserExpiryToSambaKickoffTime(self['userexpiry']).encode("ASCII") ud.debug(ud.ADMIN, ud.INFO, 'sambaKickoffTime: %s' % sambaKickoffTime) old_sambaKickoffTime = self.oldattr.get('sambaKickoffTime', [b''])[0] if old_sambaKickoffTime != sambaKickoffTime: ml.append(('sambaKickoffTime', self.oldattr.get('sambaKickoffTime', [b''])[0], sambaKickoffTime)) return ml def _modlist_krb5_valid_end(self, ml): if self.hasChanged('userexpiry'): krb5ValidEnd = u'' if self['userexpiry']: krb5ValidEnd = _mapUserExpiryToKrb5ValidEnd(self['userexpiry']) ud.debug(ud.ADMIN, ud.INFO, 'krb5ValidEnd: %s' % krb5ValidEnd) krb5ValidEnd = krb5ValidEnd.encode('ASCII') old_krb5ValidEnd = self.oldattr.get('krb5ValidEnd', [b''])[0] if old_krb5ValidEnd != krb5ValidEnd: if not self['userexpiry']: ml.append(('krb5ValidEnd', old_krb5ValidEnd, None)) else: ml.append(('krb5ValidEnd', self.oldattr.get('krb5ValidEnd', [b''])[0], krb5ValidEnd)) return ml def _modlist_shadow_expire(self, ml): if self.hasChanged('disabled') or self.hasChanged('userexpiry'): if self['disabled'] == '1' and self.hasChanged('disabled') and not self.hasChanged('userexpiry'): shadowExpire = u'1' elif self['userexpiry']: shadowExpire = _mapUserExpiryToShadowExpire(self['userexpiry']) elif self['disabled'] == '1': shadowExpire = u'1' else: shadowExpire = u'' old_shadowExpire = self.oldattr.get('shadowExpire', [b''])[0] shadowExpire = shadowExpire.encode('ASCII') if old_shadowExpire != shadowExpire: ml.append(('shadowExpire', old_shadowExpire, shadowExpire)) return ml def _modlist_mail_forward(self, ml): if self['mailForwardAddress'] and not self['mailPrimaryAddress']: raise univention.admin.uexceptions.missingInformation(_('Primary e-mail address must be set, if messages should be forwarded for it.')) if self.get('mailForwardCopyToSelf') == '1' and not self['mailPrimaryAddress']: raise univention.admin.uexceptions.missingInformation(_('Primary e-mail address must be set, if a copy of forwarded messages should be stored in its mailbox.')) if configRegistry.is_true('directory/manager/user/activate_ldap_attribute_mailForwardCopyToSelf', False): return ml try: new = [x[2] if isinstance(x[2], (list, tuple)) else [x[2]] for x in ml if x[0] == 'mailForwardAddress' and x[2]][0] except IndexError: # mailForwardAddress was not changed, nevertheless we might need to change it new = self.mapping.mapValue('mailForwardAddress', self['mailForwardAddress']) or [] # FIXME: mapValue returns b'' instead of [b''] if self.hasChanged('mailPrimaryAddress') and self.oldattr.get('mailPrimaryAddress'): try: new.remove(self.oldattr['mailPrimaryAddress'][0]) except ValueError: pass if self['mailPrimaryAddress']: mail_primary_address = self.mapping.mapValue('mailPrimaryAddress', self['mailPrimaryAddress']) if self.get('mailForwardCopyToSelf') == '1' and self['mailForwardAddress']: new.append(mail_primary_address) elif mail_primary_address in new: new.remove(mail_primary_address) ml = [(key_, old_, new_) for (key_, old_, new_) in ml if key_ != u'mailForwardAddress'] if self.oldattr.get('mailForwardAddress', []) != new: ml.append(('mailForwardAddress', self.oldattr.get('mailForwardAddress'), new)) return ml def _modlist_univention_person(self, ml): # make sure that univentionPerson is set as objectClass when needed if any(self.hasChanged(ikey) and self[ikey] for ikey in ('umcProperty', 'birthday', 'serviceSpecificPassword')) and b'univentionPerson' not in self.oldattr.get('objectClass', []): ml.append(('objectClass', b'', b'univentionPerson')) # TODO: check if exists already return ml def _modlist_home_share(self, ml): if self.hasChanged('homeShare') or self.hasChanged('homeSharePath'): if self['homeShare']: share_mod = univention.admin.modules.get('shares/share') try: share = share_mod.object(None, self.lo, self.position, self['homeShare']) share.open() except Exception: # FIXME: specify correct exception raise univention.admin.uexceptions.noObject(_('DN given as share is not valid.')) if share['host'] and share['path']: if b'automount' not in self.oldattr.get('objectClass', []): ml.append(('objectClass', b'', b'automount')) am_host = share['host'] if not self['homeSharePath'] or not isinstance(self['homeSharePath'], six.string_types): raise univention.admin.uexceptions.missingInformation(_('%(homeSharePath)s must be given if %(homeShare)s is given.') % {'homeSharePath': _('Home share path'), 'homeShare': _('Home share')}) else: am_path = os.path.abspath(os.path.join(share['path'], self['homeSharePath'])) if not am_path.startswith(share['path']): raise univention.admin.uexceptions.valueError(_('%s: Invalid path') % _('Home share path'), property='homeShare') am_old = self.oldattr.get('automountInformation', [b''])[0] am_new = b'-rw %s:%s' % (am_host.encode('UTF-8'), am_path.encode('UTF-8')) # TODO: check if automountInformation is really UTF-8 ml.append(('automountInformation', am_old, am_new)) else: raise univention.admin.uexceptions.noObject(_('Given DN is no share.')) if not self['homeShare'] or not share['host'] or not share['path']: if b'automount' not in self.oldattr.get('objectClass', []): ml.append(('objectClass', b'', b'automount')) am_old = self.oldattr.get('automountInformation', [b''])[0] if am_old: ml.append(('automountInformation', am_old, b'')) return ml def _modlist_samba_sid(self, ml): if not self.exists() or self.hasChanged('sambaRID'): sid = self.__generate_user_sid(self['uidNumber']) sid = sid.encode('ASCII') ml.append(('sambaSID', self.oldattr.get('sambaSID', [b'']), [sid])) return ml def _modlist_primary_group(self, ml): if not self.exists() or self.hasChanged('primaryGroup'): # Posix ml.append(('gidNumber', self.oldattr.get('gidNumber', [b'']), [self.get_gid_for_primary_group().encode('ASCII')])) # Samba ml.append(('sambaPrimaryGroupSID', self.oldattr.get('sambaPrimaryGroupSID', [b'']), [self.get_sid_for_primary_group().encode('ASCII')])) return ml def _modlist_sambaAcctFlags(self, ml): if self.exists() and not self.hasChanged(['disabled', 'locked']): return ml old_flags = self.oldattr.get('sambaAcctFlags', [b''])[0] acctFlags = univention.admin.samba.acctFlags(old_flags.decode('ASCII')) if self['disabled'] == '1': # disable samba account acctFlags.set('D') else: # enable samba account acctFlags.unset('D') if self['locked'] == '1': # lock samba account acctFlags.set('L') else: acctFlags.unset('L') new_flags = acctFlags.decode().encode('ASCII') if old_flags != new_flags: ml.append(('sambaAcctFlags', old_flags, new_flags)) return ml def _modlist_service_specific_password(self, ml): new_password = self.info.get('serviceSpecificPassword', None) if new_password: service = new_password.get('service', None) password = new_password.get('password', None) if service != 'radius': raise univention.admin.uexceptions.valueError(_('Service does not support service specific passwords'), property='serviceSpecificPassword') if service: nt = passlib.hash.nthash.hash(password).upper().encode('ASCII') ml.append(('univentionRadiusPassword', self.oldattr.get('univentionRadiusPassword', [b'']), [nt])) return ml def _ldap_post_remove(self): self.alloc.append(('sid', self.oldattr['sambaSID'][0].decode('ASCII'))) self.alloc.append(('uid', self.oldattr['uid'][0].decode('UTF-8'))) self.alloc.append(('uidNumber', self.oldattr['uidNumber'][0].decode('ASCII'))) if self['mailPrimaryAddress']: self.alloc.append(('mailPrimaryAddress', self['mailPrimaryAddress'])) super(object, self)._ldap_post_remove() for group in self.oldinfo.get('groups', []): groupObject = univention.admin.objects.get(univention.admin.modules.get('groups/group'), self.co, self.lo, self.position, group) groupObject.fast_member_remove([self.dn], [x.decode('UTF-8') for x in self.oldattr.get('uid', [])], ignore_license=True) def _move(self, newdn, modify_childs=True, ignore_license=False): olddn = self.dn tmpdn = u'cn=%s-subtree,cn=temporary,cn=univention,%s' % (ldap.dn.escape_dn_chars(self['username']), self.lo.base) al = [('objectClass', [b'top', b'organizationalRole']), ('cn', [b'%s-subtree' % (self['username'].encode('UTF-8'),)])] subelements = self.lo.search(base=self.dn, scope='one', attr=['objectClass']) # FIXME: identify may fail, but users will raise decode-exception if subelements: try: self.lo.add(tmpdn, al) except ldap.LDAPError: # real errors will be caught later pass moved = dict(self.move_subelements(olddn, tmpdn, subelements, ignore_license)) subelements = [(moved[subdn], subattrs) for (subdn, subattrs) in subelements] try: dn = super(object, self)._move(newdn, modify_childs, ignore_license) except BaseException: # self couldn't be moved # move back subelements and reraise self.move_subelements(tmpdn, olddn, subelements, ignore_license) raise if subelements: try: moved = dict(self.move_subelements(tmpdn, newdn, subelements, ignore_license)) subelements = [(moved[subdn], subattrs) for (subdn, subattrs) in subelements] except BaseException: # subelements couldn't be moved to self # subelements were already moved back to temporary position # move back self, move back subelements to self and reraise super(object, self)._move(olddn, modify_childs, ignore_license) self.move_subelements(tmpdn, olddn, subelements, ignore_license) raise return dn @classmethod def _get_samba_password_history(cls, newpassword, smbpwhistory, smbpwhlen): """Get history of previously used passwords. # >>> object._get_samba_password_history('186CB09181E2C2ECAAC768C47C729904', 'A047EE4A9DB8BC8B4F3F8A03D72DEB80', 0) # ... # >>> object._get_samba_password_history('186CB09181E2C2ECAAC768C47C729904', '', 1) # ... # >>> object._get_samba_password_history('186CB09181E2C2ECAAC768C47C729904', 'A047EE4A9DB8BC8B4F3F8A03D72DEB80', 1) # ... # >>> object._get_samba_password_history('186CB09181E2C2ECAAC768C47C729904', 'A047EE4A9DB8BC8B4F3F8A03D72DEB80', 2) # ... """ # calculate the password hash & salt # in binary for calculating the md5: salt = os.urandom(16) # we have to have that in hex: hexsalt = codecs.encode(salt, 'hex').upper().decode('ASCII') # we need the ntpwd binary data to pwd = codecs.decode(newpassword, 'hex') # calculating hash. stored as a 32byte hex in sambaPasswordHistory, # syntax like that: [Salt][MD5(Salt+Hash)] # First 16bytes ^ ^ last 16bytes. pwdhash = hashlib.md5(salt + pwd).hexdigest().upper() smbpwhash = hexsalt + pwdhash # split the history pwlist = smbpwhistory.strip().split(' ') # append new hash pwlist.append(smbpwhash) # strip old hashes pwlist = pwlist[-smbpwhlen:] # build history smbpwhistory = ''.join(pwlist) return smbpwhistory def __allocate_rid(self, rid): searchResult = self.lo.search(filter='objectClass=sambaDomain', attr=['sambaSID']) domainsid = searchResult[0][1]['sambaSID'][0] sid = domainsid.decode('ASCII') + u'-' + rid try: return self.request_lock('sid', sid) except univention.admin.uexceptions.noLock: raise univention.admin.uexceptions.sidAlreadyUsed(rid) def __generate_user_sid(self, uidNum): if self['sambaRID']: return self.__allocate_rid(self['sambaRID']) elif self.s4connector_present: # In this case Samba 4 must create the SID, the s4 connector will sync the # new sambaSID back from Samba 4. return 'S-1-4-%s' % (uidNum,) rid = rids_for_well_known_security_identifiers.get(self['username'].lower()) if rid: return self.__allocate_rid(rid) while True: try: return self.request_lock('sid+user', uidNum) except univention.admin.uexceptions.noLock: uidNum = str(int(uidNum) + 1)
[docs] @classmethod def unmapped_lookup_filter(cls): filter_p = super(object, cls).unmapped_lookup_filter() filter_p.expressions.extend([ univention.admin.filter.conjunction(u'!', [univention.admin.filter.expression(u'uidNumber', u'0')]), univention.admin.filter.conjunction(u'!', [univention.admin.filter.expression(u'univentionObjectFlag', u'functional')]), ]) return filter_p
@classmethod def _ldap_attributes(cls): return [u'*', u'pwdAccountLockedTime']
[docs] @classmethod def rewrite_filter(cls, filter, mapping): if filter.variable == u'primaryGroup': filter.variable = u'gidNumber' elif filter.variable == u'groups': filter.variable = u'memberOf' elif filter.variable == u'disabled': # substring match for userPassword is not possible if filter.value == u'1': filter.transform_to_conjunction(univention.admin.filter.parse(u'(&(shadowExpire=1)(krb5KDCFlags:1.2.840.113556.1.4.803:=128)(|(sambaAcctFlags=[UD ])(sambaAcctFlags=[ULD ])))')) elif filter.value == u'0': filter.transform_to_conjunction(univention.admin.filter.parse(u'(&(!(shadowExpire=1))(!(krb5KDCFlags:1.2.840.113556.1.4.803:=128))(!(|(sambaAcctFlags=[UD ])(sambaAcctFlags=[ULD ]))))')) elif filter.value == u'none': filter.transform_to_conjunction(univention.admin.filter.parse(u'(&(!(shadowExpire=1))(!(krb5KDCFlags:1.2.840.113556.1.4.803:=128))(!(|(sambaAcctFlags=[UD ])(sambaAcctFlags=[ULD ]))))')) elif filter.value == u'all': filter.transform_to_conjunction(univention.admin.filter.parse(u'(&(shadowExpire=1)(krb5KDCFlags:1.2.840.113556.1.4.803:=128)(|(sambaAcctFlags=[UD ])(sambaAcctFlags=[ULD ])))')) elif filter.value == u'posix': filter.variable = u'shadowExpire' filter.value = u'1' elif filter.value == u'kerberos': filter.transform_to_conjunction(univention.admin.filter.parse(u'(&(krb5KDCFlags:1.2.840.113556.1.4.803:=128))')) elif filter.value == u'windows': filter.transform_to_conjunction(univention.admin.filter.parse(u'(|(sambaAcctFlags=[UD ])(sambaAcctFlags==[ULD ]))')) elif filter.value == u'windows_kerberos': filter.transform_to_conjunction(univention.admin.filter.parse(u'(&(krb5KDCFlags:1.2.840.113556.1.4.803:=128)(|(sambaAcctFlags=[UD ])(sambaAcctFlags==[ULD ])))')) elif filter.value == u'windows_posix': filter.transform_to_conjunction(univention.admin.filter.parse(u'(&(shadowExpire=1)(|(sambaAcctFlags=[UD ])(sambaAcctFlags==[ULD ])))')) elif filter.value == u'posix_kerberos': filter.transform_to_conjunction(univention.admin.filter.parse(u'(&(shadowExpire=1)(krb5KDCFlags=254))')) elif filter.value == u'*': filter.variable = u'uid' elif filter.variable == 'userexpiry': try: userexpiry = property_descriptions['userexpiry'].syntax.parse(filter.value) except univention.admin.uexceptions.valueError: # allow to search for userexpiry=* # TODO: should we allow to search for e.g. userexpiry=2021-* ? userexpiry_filter = filter_format(u'(|(shadowExpire=%s)(krb5ValidEnd=%s)(sambaKickoffTime=%s))', [filter.value or '*', filter.value or '*', filter.value or '*']) userexpiry_filter = userexpiry_filter.replace(filter_format('%s', ['*']), '*') else: userexpiry_filter = filter_format(u'(|(shadowExpire=%s)(krb5ValidEnd=%s)(sambaKickoffTime=%s))', [ _mapUserExpiryToShadowExpire(userexpiry), _mapUserExpiryToKrb5ValidEnd(userexpiry), _mapUserExpiryToSambaKickoffTime(userexpiry), ]) filter.transform_to_conjunction(univention.admin.filter.parse(userexpiry_filter)) elif filter.variable == u'locked': if filter.value == u'1': filter.transform_to_conjunction(univention.admin.filter.parse(u'(|(krb5KDCFlags:1.2.840.113556.1.4.803:=131072)(sambaAcctFlags=[UL ])(sambaAcctFlags=[ULD ]))')) elif filter.value == u'0': filter.transform_to_conjunction(univention.admin.filter.parse(u'(&(!(krb5KDCFlags:1.2.840.113556.1.4.803:=131072))(!(sambaAcctFlags=[UL ]))(!(sambaAcctFlags=[ULD ])))')) elif filter.value in [u'posix', u'windows', u'all', u'none']: if filter.value == 'all': filter.transform_to_conjunction(univention.admin.filter.parse(u'(|(sambaAcctFlags=[UL ])(sambaAcctFlags=[ULD ]))')) # filter.transform_to_conjunction(univention.admin.filter.parse(u'(|(sambaAcctFlags=[UL ])(sambaAcctFlags=[ULD ])(userPassword={crypt}!*))')) elif filter.value == u'windows': filter.transform_to_conjunction(univention.admin.filter.parse(u'(|(sambaAcctFlags=[UL ])(sambaAcctFlags=[ULD ]))')) # elif filter.value == u'posix': # filter.variable = u'userPassword' # filter.value = u'{crypt}!*' elif filter.value == u'none': # filter.transform_to_conjunction(univention.admin.filter.parse(u'(&(!(sambaAcctFlags=[UL ]))(!(sambaAcctFlags=[ULD ]))(!(userPassword={crypt}!*)))')) filter.transform_to_conjunction(univention.admin.filter.parse(u'(&(!(sambaAcctFlags=[UL ]))(!(sambaAcctFlags=[ULD ])))')) elif filter.value == u'*': filter.variable = u'uid' else: super(object, cls).rewrite_filter(filter, mapping)
[docs] @classmethod def identify(cls, dn, attr, canonical=False): if b'0' in attr.get('uidNumber', []) or b'$' in attr.get('uid', [b''])[0] or b'univentionHost' in attr.get('objectClass', []) or b'functional' in attr.get('univentionObjectFlag', []): return False required_ocs = {b'posixAccount', b'shadowAccount', b'sambaSamAccount', b'person', b'krb5KDCEntry', b'krb5Principal'} ocs = set(attr.get('objectClass', [])) return ocs & required_ocs == required_ocs
lookup = object.lookup lookup_filter = object.lookup_filter identify = object.identify