#!/usr/bin/python3
#
# Univention Management Console
# module: password reset service
#
# SPDX-FileCopyrightText: 2015-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
import datetime
import email.charset
import os.path
import random
import smtplib
import string
from email.mime.nonmultipart import MIMENonMultipart
from email.utils import formatdate
from functools import wraps
from subprocess import PIPE, STDOUT, Popen
from typing import Any
import atexit
import pylibmc
from ldap.filter import filter_format
import univention.admin.modules
import univention.admin.uexceptions
import univention.admin.uexceptions as udm_errors
import univention.admin.uldap
from univention.admin.uldap import getMachineConnection
from univention.lib.i18n import Translation
from univention.lib.umc import Client, ConnectionError, HTTPError, Unauthorized # noqa: A004
from univention.management.console.config import ucr
from univention.management.console.ldap import (
get_admin_connection, get_machine_connection, get_user_connection, machine_connection,
)
from univention.management.console.log import MODULE
from univention.management.console.modules import Base, UMC_Error
from univention.management.console.modules.decorators import sanitize, simple_response
from univention.management.console.modules.sanitizers import StringSanitizer
from .sending import get_plugins as get_sending_plugins
from .tokendb import MultipleTokensInDB, TokenDB
_ = Translation('univention-self-service-passwordreset-umc').translate
MEMCACHED_SOCKET = ucr.get("umc/self-service/memcached/socket", "/var/lib/univention-self-service-passwordreset-umc/memcached.socket")
MEMCACHED_USERNAME = ucr.get("umc/self-service/memcached/username")
MEMCACHED_SECRET_FILE = ucr.get("umc/self-service/memcached/password-file")
MEMCACHED_MAX_KEY = 250
SELFSERVICE_MASTER = ucr.get("self-service/backend-server", ucr.get("ldap/master"))
IS_SELFSERVICE_MASTER = '%(hostname)s.%(domainname)s' % ucr == SELFSERVICE_MASTER
UDM_REST_SERVER = ucr.get('self-service/udm-rest-server', '%(hostname)s.%(domainname)s' % ucr)
DISALLOW_AUTHENTICATION = not ucr.is_true('umc/self-service/allow-authenticated-use')
DEREGISTRATION_TIMESTAMP_FORMATTING = '%Y%m%d%H%M%SZ'
if IS_SELFSERVICE_MASTER:
try:
from univention.admin.rest.client import UDM as UDMRest
from univention.management.console.modules.udm.udm_ldap import UDM_Error, UDM_Module
from univention.udm import UDM, NoObject
except ImportError as exc:
MODULE.error('Could not load udm module: %s', exc)
[docs]
def forward_to_master(func):
@wraps(func)
def _decorator(self, request, *args, **kwargs):
if not IS_SELFSERVICE_MASTER:
try:
language = str(self.locale).split('.')[0].replace('_', '-')
client = Client(SELFSERVICE_MASTER, language=language)
client.authenticate_with_machine_account()
response = client.umc_command(request.arguments[0], request.options)
except (Unauthorized, ConnectionError) as exc:
raise UMC_Error(_('The connection to the server could not be established. Please try again later. Error message was: %s') % (exc,), status=503)
except HTTPError as exc:
response = exc.response
self.finished(request.id, response.result, message=response.message, status=response.status)
return
return func(self, request, *args, **kwargs)
return _decorator
[docs]
def forward_to_master_if_authentication_disabled(func):
if DISALLOW_AUTHENTICATION:
return forward_to_master(func)
return func
[docs]
def prevent_denial_of_service(func):
def _pretty_time(sec):
if sec <= 60:
return _("one minute")
m, _s = divmod(sec, 60)
if m < 60:
return _("{} minutes").format(m + 1)
elif m == 60:
return _("one hour") # and one minute, but nvm
h, m = divmod(m, 60)
return _("{} hours").format(h + 1)
def _check_limits(memcache, limits):
limit_reached = False
_max_wait = datetime.datetime.utcnow()
for key, decay, limit in limits:
# Not really a "decay", as for that we'd have to store the date for
# each request. Then a moving window could be implemented. But
# my guess is that we won't need that, so this is simpler.
# Continue even if a limit was reached, so that all counters are
# incremented.
if limit == 0:
# limit deactivated by UCR
continue
try:
count = memcache.incr(key)
except pylibmc.NotFound:
count = 1
memcache.set_multi(
{
key: count,
f"{key}:exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=decay),
},
decay,
)
if count > limit:
limit_reached = True
_max_wait = max(_max_wait, memcache.get(f"{key}:exp"))
return limit_reached, _max_wait
@wraps(func)
def _decorated(self, *args, **kwargs):
request = self._current_request
client_host_str = None
# Determine the client host. Respect X-Forwarded-Host if configured and present.
x_forwarded_host = request.headers.get('X-Forwarded-Host')
if x_forwarded_host:
# Take the first Host in the list (closest client)
client_host_str = x_forwarded_host.split(',')[0].strip()
MODULE.debug("Rate limit check: Using X-Forwarded-Host: %s", client_host_str)
else:
MODULE.debug("Rate limit check: Cannot determine remote Host from request object.")
if client_host_str and client_host_str in self.trusted_hosts:
MODULE.debug("Rate limit bypassed for trusted host %s in trusted hosts %r", client_host_str, self.trusted_hosts)
return func(self, *args, **kwargs)
# check total request limits
total_limit_reached, total_max_wait = _check_limits(self.memcache, self.total_limits)
# check user request limits
try:
if "username" in kwargs:
username = kwargs["username"]
else:
username = args[0].options["username"]
except (IndexError, AttributeError, KeyError, TypeError):
# args[0] is not the expected 'request'
MODULE.exception("prevent_denial_of_service() could not find username argument. self: %r args: %r kwargs: %r", self, args, kwargs)
raise
# TODO: return func(self, *args, **kwargs) here?!
if len(username) > MEMCACHED_MAX_KEY - 9: # "_hour:exp"
raise ServiceForbidden()
username = self.email2username(username)
user_limits = [
(f"{username}_minute", 60, self.limit_user_minute),
(f"{username}_hour", 3600, self.limit_user_hour),
(f"{username}_day", 86400, self.limit_user_day),
]
user_limit_reached, user_max_wait = _check_limits(self.memcache, user_limits)
if total_limit_reached or user_limit_reached:
time_s = _pretty_time((max(total_max_wait, user_max_wait) - datetime.datetime.utcnow()).total_seconds())
raise ConnectionLimitReached(time_s)
return func(self, *args, **kwargs)
return _decorated
[docs]
class ConnectionLimitReached(UMC_Error):
status = 503
def __init__(self, seconds):
super().__init__(_("The allowed maximum number of connections to the server has been reached. Please retry in {}.").format(seconds))
[docs]
class ServiceForbidden(UMC_Error):
# protection against bruteforcing user names
status = 403
def __init__(self):
super().__init__(_("Either username or password is incorrect or you are not allowed to use this service."))
[docs]
class TokenNotFound(UMC_Error):
status = 400
def __init__(self):
super().__init__(
_("The token you supplied is either expired or invalid. Please request a new one."))
[docs]
class NoMethodsAvailable(UMC_Error):
status = 403
def __init__(self):
super().__init__(_('No contact information is stored for this user. Resetting the password is not possible.'))
[docs]
class TokenSendMessage(UMC_Error):
status = 200
def __init__(self):
super().__init__(_("A message containing a token has been sent to the user (if the user exists and is allowed to use this service)."))
[docs]
class Instance(Base):
[docs]
def init(self):
if not ucr.is_true("umc/self-service/enabled"):
raise UMC_Error(_('The password reset service is disabled via configuration registry.'), status=503)
self._usersmod = None
self.groupmod = None
self.trusted_hosts = [
h.strip()
for h in ucr.get('umc/self-service/rate-limit/trusted-hosts', '').split(',')
if h.strip()
]
MODULE.info("Added trusted host for rate limit bypass: %r", self.trusted_hosts)
self.token_validity_period = ucr.get_int("umc/self-service/passwordreset/token_validity_period", 3600)
limit_total_minute = ucr.get_int("umc/self-service/passwordreset/limit/total/minute", 0)
limit_total_hour = ucr.get_int("umc/self-service/passwordreset/limit/total/hour", 0)
limit_total_day = ucr.get_int("umc/self-service/passwordreset/limit/total/day", 0)
self.limit_user_minute = ucr.get_int("umc/self-service/passwordreset/limit/per_user/minute", 0)
self.limit_user_hour = ucr.get_int("umc/self-service/passwordreset/limit/per_user/hour", 0)
self.limit_user_day = ucr.get_int("umc/self-service/passwordreset/limit/per_user/day", 0)
self.total_limits = [
("t:c_minute", 60, limit_total_minute),
("t:c_hour", 3600, limit_total_hour),
("t:c_day", 86400, limit_total_day),
]
if IS_SELFSERVICE_MASTER:
self.db = TokenDB(MODULE)
atexit.register(self.db.close_db)
if not self.db.table_exists():
self.db.create_table()
password = os.getenv("SELF_SERVICE_MEMCACHED_SECRET")
if not password and MEMCACHED_SECRET_FILE:
try:
with open(MEMCACHED_SECRET_FILE) as pw_file:
password = pw_file.readline().strip()
except OSError:
raise UMC_Error('The memcached password is not properly configured.', status=503)
self.memcache = pylibmc.Client([MEMCACHED_SOCKET], binary=True, username=MEMCACHED_USERNAME, password=password)
self.send_plugins = get_sending_plugins(MODULE.process)
self.password_reset_plugins = {k: v for k, v in self.send_plugins.items() if v.message_application() == 'password_reset'}
@property
def usersmod(self):
if not self._usersmod:
univention.admin.modules.update()
self._usersmod = univention.admin.modules.get('users/user')
if not self._usersmod.initialized:
lo, po = get_machine_connection()
univention.admin.modules.init(lo, po, self._usersmod)
return self._usersmod
[docs]
@forward_to_master
@sanitize(
username=StringSanitizer(required=True, minimum=1),
password=StringSanitizer(required=True, minimum=1))
@simple_response
def get_service_specific_passwords(self, username, password):
"""
Get users (possible) service specific passwords.
:return: list of dicts with users ssp
"""
if ucr.is_false('umc/self-service/service-specific-passwords/backend/enabled'):
msg = _('Service specific passwords were disabled via the Univention Configuration Registry.')
MODULE.error("get_service_specific_passwords(): %s", msg)
raise UMC_Error(msg)
dn, username = self.auth(username, password)
ret = []
# we only have radius as a service specific password: setting
# the backend to true means that the admin wants to allow
# radius password. at some point we would need to know which
# services should actually be managed
ldap_connection, _ldap_position = getMachineConnection()
radius_passwords = ldap_connection.get(dn, attr=['univentionRadiusPassword']).get('univentionRadiusPassword', [])
ret.append({'type': 'radius', 'set': len(radius_passwords)})
return ret
[docs]
@forward_to_master
@sanitize(
username=StringSanitizer(required=True, minimum=1),
password=StringSanitizer(required=True, minimum=1),
password_type=StringSanitizer(required=True, minimum=1))
@simple_response
def set_service_specific_passwords(self, username, password, password_type):
"""
Set a new service specific password.
:return: The password in cleartext
"""
if ucr.is_false('umc/self-service/service-specific-passwords/backend/enabled'):
msg = _('Service specific passwords were disabled via the Univention Configuration Registry.')
MODULE.error("get_service_specific_passwords(): %s", msg)
raise UMC_Error(msg)
dn, username = self.auth(username, password)
MODULE.error("set_service_specific_passwords(): Setting %s password for %s", password_type, username)
if password_type == 'radius':
udm = UDMRest.http('https://%s/univention/udm/' % UDM_REST_SERVER, 'cn=admin', open('/etc/ldap.secret').read())
user_obj = udm.get('users/user').get(dn)
service_specific_password = user_obj.generate_service_specific_password('radius')
else:
msg = _('Service specific passwords were disabled for "%s".') % password_type
MODULE.error("get_service_specific_passwords(): %s", msg)
raise UMC_Error(msg)
return {'password': service_specific_password}
[docs]
@forward_to_master
@sanitize(
username=StringSanitizer(required=DISALLOW_AUTHENTICATION, minimum=1),
password=StringSanitizer(required=DISALLOW_AUTHENTICATION, minimum=1))
@simple_response
def get_user_attributes(self, username=None, password=None): # MUST be supported until UCS 4.4-7 is out of maintenance
dn, username = self.authenticate_user(username, password)
if self.is_blacklisted(username, 'profiledata'):
raise ServiceForbidden()
user = self.get_udm_user_by_dn(dn)
user.set_defaults = True
user.set_default_values()
properties = user.info.copy()
widget_descriptions = [
dict(wd, value=properties.get(wd['id'])) for wd in self._get_user_attributes_descriptions()
if user.has_property(wd['id'])
]
# TODO: make layout configurable via ucr ?
layout = [wd['id'] for wd in widget_descriptions]
return {
'widget_descriptions': widget_descriptions,
'layout': layout,
}
[docs]
@forward_to_master_if_authentication_disabled
@sanitize(
username=StringSanitizer(required=DISALLOW_AUTHENTICATION, minimum=1),
password=StringSanitizer(required=DISALLOW_AUTHENTICATION, minimum=1))
@simple_response
def get_user_attributes_values(self, attributes, username=None, password=None):
dn, username = self.authenticate_user(username, password)
if self.is_blacklisted(username, 'profiledata'):
raise ServiceForbidden()
user_attributes = [attr.strip() for attr in ucr.get('self-service/udm_attributes', '').split(',')]
user = self.get_udm_user_by_dn(dn)
user.set_defaults = True
user.set_default_values()
properties = user.info.copy()
return {
prop: properties.get(prop) for prop in attributes if user.has_property(prop) and prop in user_attributes
}
[docs]
@forward_to_master
@simple_response
def get_user_attributes_descriptions(self):
return self._get_user_attributes_descriptions()
def _get_user_attributes_descriptions(self):
user_attributes = [attr.strip() for attr in ucr.get('self-service/udm_attributes', '').split(',')]
read_only_attributes = [attr.strip() for attr in ucr.get('self-service/udm_attributes/read-only', '').split(',')]
widget_descriptions = []
label_overwrites = {
'jpegPhoto': _('Your picture'),
}
for propname in user_attributes:
if propname == 'password':
continue
prop = self.usersmod.property_descriptions.get(propname)
if not prop:
continue
widget_description = {
'id': propname,
'label': label_overwrites.get(propname, prop.short_description),
'description': prop.long_description,
'syntax': prop.syntax.name,
'size': prop.size or prop.syntax.size,
'required': bool(prop.required),
'editable': bool(prop.may_change),
'readonly': not bool(prop.editable) or propname in read_only_attributes,
'multivalue': bool(prop.multivalue),
}
widget_description.update(prop.syntax.get_widget_options(prop))
if 'udm' in widget_description['type']:
continue
if 'dynamicValues' in widget_description:
continue
widget_descriptions.append(widget_description)
return widget_descriptions
[docs]
@forward_to_master
@simple_response
def get_registration_attributes(self):
ucr.load()
property_ids = ['PasswordRecoveryEmail', 'password']
for id_ in [attr.strip() for attr in ucr.get('umc/self-service/account-registration/udm_attributes', '').split(',') if attr.strip()]:
if id_ not in property_ids:
property_ids.append(id_)
lo, po = get_machine_connection()
users_mod = UDM_Module('users/user', True, lo, po)
properties = {prop['id']: prop for prop in users_mod.properties(None)}
not_existing = set(property_ids) - set(properties.keys())
properties = {k: v for (k, v) in properties.items() if 'dynamicValues' not in v and 'udm' not in v['type']} # filter out not supported props
not_supported = set(property_ids) - set(properties.keys()) - not_existing
if 'PasswordRecoveryEmail' in properties:
properties['PasswordRecoveryEmail']['label'] = _('Email')
properties['PasswordRecoveryEmail']['description'] = ''
self._update_required_attr_of_props_for_registration(properties)
properties = [properties[id_] for id_ in property_ids if id_ in properties]
if not_existing:
MODULE.warning("get_registration_attributes(): the following attributes defined by umc/self-service/account-registration/udm_attributes do not exist on users/user: %s", ", ".join(not_existing))
if not_supported:
MODULE.warning("get_registration_attributes(): the following attributes defined by umc/self-service/account-registration/udm_attributes are not supported: %s", ", ".join(not_supported))
return {
'widget_descriptions': properties,
'layout': [prop['id'] for prop in properties],
}
def _update_required_attr_of_props_for_registration(self, properties):
for k in properties.keys():
if isinstance(properties[k], dict):
properties[k]['required'] = False
else:
properties[k].required = False
required_ids = set(['PasswordRecoveryEmail', 'password'] + [attr.strip() for attr in ucr.get('umc/self-service/account-registration/udm_attributes/required', '').split(',') if attr.strip()])
for id_ in required_ids:
if id_ in properties:
if isinstance(properties[id_], dict):
properties[id_]['required'] = True
else:
properties[id_].required = True
[docs]
@forward_to_master_if_authentication_disabled
@sanitize(
username=StringSanitizer(required=DISALLOW_AUTHENTICATION, minimum=1),
password=StringSanitizer(required=DISALLOW_AUTHENTICATION, minimum=1))
@simple_response
def validate_user_attributes(self, username, password, attributes):
_dn, username = self.authenticate_user(username, password)
if self.is_blacklisted(username, 'profiledata'):
raise ServiceForbidden()
return self._validate_user_attributes(attributes)
def _validate_user_attributes(self, attributes, map_properties_func=None):
res = {}
properties = self.usersmod.property_descriptions
if map_properties_func:
properties = properties.copy()
map_properties_func(properties)
for propname, value in attributes.items():
prop = properties.get(propname)
if not prop:
continue
isValid = True
message = ''
if prop.multivalue and isinstance(value, tuple | list):
isValid = []
message = []
for ival in value:
_isValid = True
_message = ''
try:
prop.syntax.parse(ival)
except (udm_errors.valueError, udm_errors.valueInvalidSyntax) as e:
_isValid = False
_message = str(e)
finally:
isValid.append(_isValid)
message.append(_message)
else:
try:
prop.syntax.parse(value)
except (udm_errors.valueError, udm_errors.valueInvalidSyntax) as e:
isValid = False
message = str(e)
if prop.required and not value:
isValid = False
message = _('This value is required')
res[propname] = {
'isValid': isValid,
'message': message,
}
return res
[docs]
@forward_to_master_if_authentication_disabled
@sanitize(
username=StringSanitizer(required=DISALLOW_AUTHENTICATION, minimum=1),
password=StringSanitizer(required=DISALLOW_AUTHENTICATION, minimum=1))
@simple_response(with_request=True)
def set_user_attributes(self, request, attributes, username=None, password=None):
dn, username = self.authenticate_user(username, password)
username = username or request.username
if password:
dn, username = self.auth(username, password)
lo, po = get_user_connection(binddn=dn, bindpw=password)
else:
lo = request.get_user_ldap_connection(write=True)
po = univention.admin.uldap.position(lo.base)
if self.is_blacklisted(username, 'profiledata'):
raise ServiceForbidden()
user_attributes = [attr.strip() for attr in ucr.get('self-service/udm_attributes', '').split(',')]
read_only_attributes = [attr.strip() for attr in ucr.get('self-service/udm_attributes/read-only', '').split(',')]
for attr in attributes:
if attr in read_only_attributes:
MODULE.error('set_user_attributes(): attribute %s is read-only', attr)
raise UMC_Error(_('The attribute %s is read-only.') % (attr,))
user = self.usersmod.object(None, lo, po, dn)
user.open()
for propname, value in attributes.items():
if propname in user_attributes and user.has_property(propname):
user[propname] = value
try:
user.modify()
except udm_errors.base as exc:
MODULE.exception('set_user_attributes(): modifying the user failed:')
raise UMC_Error(_('The attributes could not be saved: %s') % (UDM_Error(exc)))
return _("Successfully changed your profile data.")
def _get_password_complexity_message(self):
return ucr.get(
'umc/login/password-complexity-message/%s' % (self.locale.language,),
ucr.get('umc/login/password-complexity-message/en', ''),
)
[docs]
@forward_to_master
@simple_response
def create_self_registered_account(self, attributes):
MODULE.info("create_self_registered_account(): attributes: %s", attributes)
ucr.load()
if ucr.is_false('umc/self-service/account-registration/backend/enabled', True):
msg = _('The account registration was disabled via the Univention Configuration Registry.')
MODULE.error("create_self_registered_account(): %s", msg)
raise UMC_Error(msg)
# filter out attributes that are not valid to set
allowed_to_set = set(['PasswordRecoveryEmail', 'password'] + [attr.strip() for attr in ucr.get('umc/self-service/account-registration/udm_attributes', '').split(',') if attr.strip()])
attributes = {k: v for (k, v) in attributes.items() if k in allowed_to_set}
# validate attributes
res = self._validate_user_attributes(attributes, self._update_required_attr_of_props_for_registration)
# check username taken
if 'username' in attributes:
try:
UDM.machine().version(2).get('users/user').get_by_id(attributes['username'])
except NoObject:
pass
else:
res['username'] = {
'isValid': False,
'message': _('The username is already taken'),
}
invalid = {k: v for (k, v) in res.items() if not (all(v['isValid']) if isinstance(v['isValid'], list) else v['isValid'])}
if invalid:
return {
'success': False,
'failType': 'INVALID_ATTRIBUTES',
'data': invalid,
}
# check for missing required attributes from umc/self-service/account-registration/udm_attributes/required
required_attrs = [attr.strip() for attr in ucr.get('umc/self-service/account-registration/udm_attributes/required', '').split(',') if attr.strip()]
not_found = [attr for attr in required_attrs if attr not in attributes]
if not_found:
msg = _('The account could not be created:\nInformation provided is not sufficient. The following properties are missing:\n%s') % ('\n'.join(not_found),)
MODULE.error("create_self_registered_account(): %s", msg)
raise UMC_Error(msg)
univention.admin.modules.update()
lo, po = get_admin_connection()
# get usertemplate
template_dn = ucr.get('umc/self-service/account-registration/usertemplate', '')
usertemplate = None
if template_dn:
usertemplate_mod = univention.admin.modules.get('settings/usertemplate')
univention.admin.modules.init(lo, po, usertemplate_mod, None, True)
try:
usertemplate = usertemplate_mod.object(None, lo, None, template_dn)
except udm_errors.noObject:
msg = _('The user template "{template_dn}" set by the "umc/self-service/account-registration/usertemplate" UCR variable does not exist. A user account can not be created. Please contact your system administrator.').format(template_dn=template_dn)
MODULE.error("create_self_registered_account(): %s", msg)
raise UMC_Error(msg)
# init user module with template
usersmod = univention.admin.modules.get('users/user')
univention.admin.modules.init(lo, po, usersmod, usertemplate, True)
# get user container
udm = UDM.machine().version(2)
user_position = univention.admin.uldap.position(po.getBase())
container_dn = ucr.get('umc/self-service/account-registration/usercontainer', None)
if container_dn:
try:
container = udm.obj_by_dn(container_dn)
except NoObject:
msg = _('The container "{container_dn}" set by the "umc/self-service/account-registration/usercontainer" UCR variable does not exist. A user account can not be created. Please contact your system administrator.').format(container_dn=container_dn)
MODULE.error("create_self_registered_account(): %s", msg)
raise UMC_Error(msg)
else:
user_position.setDn(container.dn)
else:
for dn in usersmod.object.get_default_containers(lo):
try:
container = udm.obj_by_dn(dn)
except NoObject:
pass
else:
user_position.setDn(container.dn)
break
# create user
attributes['PasswordRecoveryEmailVerified'] = 'FALSE'
attributes['RegisteredThroughSelfService'] = 'TRUE'
new_user = usersmod.object(None, lo, user_position)
new_user.open()
for key, value in attributes.items():
if key in new_user and value:
new_user[key] = value
try:
new_user.create()
except univention.admin.uexceptions.base as exc:
password_complexity_message = self._get_password_complexity_message() if isinstance(exc, udm_errors.pwToShort | udm_errors.pwQuality) else ''
MODULE.error('create_self_registered_account(): could not create user: %s', exc)
return {
'success': False,
'failType': 'CREATION_FAILED',
'data': (_('The account could not be created:\n%s\n%s') % (UDM_Error(exc), password_complexity_message)).rstrip(),
}
finally:
# TODO: cleanup
# reinit user module without template.
# This has to be done since the modules are singletons?
univention.admin.modules.update()
self._usersmod = None
# univention.admin.modules.init(lo, po, usersmod, None, True)
try:
# in all SS cases we need more than the previously default fields
user_info = self._extract_user_properties(new_user)
self.send_message(
new_user['username'],
'verify_email',
new_user['PasswordRecoveryEmail'],
user_info,
)
except Exception:
MODULE.exception('could not send message')
verify_token_successfully_send = False
else:
verify_token_successfully_send = True
return {
'success': True,
'verifyTokenSuccessfullySend': verify_token_successfully_send,
'data': {
'username': new_user['username'],
'email': new_user['PasswordRecoveryEmail'],
},
}
def _extract_user_properties(self, user_obj):
message_fields = [
'username',
'title',
'initials',
'displayName',
'organisation',
'employeeNumber',
'firstname',
'lastname',
'mailPrimaryAddress',
]
info_out = {field: user_obj.info.get(field, '') for field in message_fields}
return info_out
[docs]
@forward_to_master
@prevent_denial_of_service
@sanitize(
username=StringSanitizer(required=True))
@simple_response
def send_verification_token(self, username):
MODULE.info("send_verification_token(): username: %s", username)
ucr.load()
if ucr.is_false('umc/self-service/account-verification/backend/enabled', True):
msg = _('The account verification was disabled via the Univention Configuration Registry.')
MODULE.error("send_verification_token(): %s", msg)
raise UMC_Error(msg)
invalid_information = {
'success': False,
'failType': 'INVALID_INFORMATION',
}
users_mod = UDM.machine().version(2).get('users/user')
try:
user = users_mod.get_by_id(username)
except NoObject:
return invalid_information
try:
email = user.props.PasswordRecoveryEmail
except AttributeError:
return invalid_information
else:
if not email:
return invalid_information
user_info = self._extract_user_properties(user._orig_udm_object)
self.send_message(username, 'verify_email', email, user_info)
return {
'success': True,
'data': {
'username': username,
},
}
[docs]
@forward_to_master
@prevent_denial_of_service
@sanitize(
username=StringSanitizer(required=True),
method=StringSanitizer(required=True))
@simple_response
def send_token(self, username, method):
if ucr.is_false('umc/self-service/passwordreset/backend/enabled'):
msg = _('The password reset was disabled via the Univention Configuration Registry.')
MODULE.error("send_token(): %s", msg)
raise UMC_Error(msg)
MODULE.info("send_token(): username: '%s' method: '%s'.", username, method)
try:
plugin = self.password_reset_plugins[method]
except KeyError:
MODULE.error("send_token() method '%s' not in %s.", method, self.password_reset_plugins.keys())
raise UMC_Error(_("Unknown recovery method '{}'.").format(method))
if self.is_blacklisted(username, 'passwordreset'):
raise TokenSendMessage()
# check if the user has the required attribute set
user = self.get_udm_user(username=username)
username = user["username"]
if len(user[plugin.udm_property]) > 0:
# found contact info
user_info = self._extract_user_properties(user)
self.send_message(username, method, user[plugin.udm_property], user_info)
raise TokenSendMessage()
[docs]
@forward_to_master
@prevent_denial_of_service
@sanitize(
username=StringSanitizer(required=True),
password=StringSanitizer(required=True),
)
@simple_response
def deregister_account(self, username, password):
MODULE.info("deregister_account(): username: %s password: *****", username)
ucr.load()
if ucr.is_false('umc/self-service/account-deregistration/enabled', True):
msg = _('The account deregistration was disabled via the Univention Configuration Registry.')
MODULE.error("deregister_account(): %s", msg)
raise UMC_Error(msg)
_dn, username = self.auth(username, password)
if self.is_blacklisted(username, 'account-deregistration'):
raise ServiceForbidden()
try:
return self._deregister_account(username)
except Exception:
raise UMC_Error(_('Account could not be deleted'), status=500)
def _deregister_account(self, username):
try:
user = UDM.admin().version(2).get('users/user').get_by_id(username)
user.props.DeregisteredThroughSelfService = 'TRUE'
user.props.DeregistrationTimestamp = datetime.datetime.strftime(datetime.datetime.utcnow(), DEREGISTRATION_TIMESTAMP_FORMATTING)
user.props.disabled = True
user.save()
try:
self._notify_about_account_deregistration(user.props.username, user.props.PasswordRecoveryEmail)
except Exception:
MODULE.exception("_deregister_account(): sending of email failed")
return
except Exception:
MODULE.exception("_deregister_account()")
raise
def _notify_about_account_deregistration(self, username, mail):
if not mail:
return
ucr.load()
path_ucr = ucr.get("umc/self-service/account-deregistration/email/text_file")
if path_ucr and os.path.exists(path_ucr):
path = path_ucr
else:
path = "/usr/share/univention-self-service/email_bodies/deregistration_notification_email_body.txt"
with open(path) as fp:
txt = fp.read()
txt = txt.format(username=username)
msg = MIMENonMultipart('text', 'plain', charset='utf-8')
msg["Subject"] = ucr.get("umc/self-service/account-deregistration/email/subject", "Account deletion")
msg["Date"] = formatdate(localtime=True)
msg["From"] = ucr.get("umc/self-service/account-deregistration/email/sender_address", "Password Reset Service <noreply@{}>".format(".".join([ucr["hostname"], ucr["domainname"]])))
msg["To"] = mail
cs = email.charset.Charset("utf-8")
cs.body_encoding = email.charset.QP
msg.set_payload(txt, charset=cs)
smtp = smtplib.SMTP(ucr.get("umc/self-service/account-deregistration/email/server", "localhost"))
smtp.sendmail(msg["From"], msg["To"], msg.as_string())
smtp.quit()
[docs]
@forward_to_master
@prevent_denial_of_service
@sanitize(
token=StringSanitizer(required=True),
username=StringSanitizer(required=True),
password=StringSanitizer(required=True)) # new_password(!)
@simple_response
def set_password(self, token, username, password):
MODULE.info("set_password(): username: '%s'.", username)
username = self.email2username(username)
token_from_db = self._check_token(username, token)
# token is correct and valid
MODULE.info("Receive valid token for '%s'.", username)
if self.is_blacklisted(username, 'passwordreset'):
# this should not happen
MODULE.error("Found token in DB for blacklisted user '%s'.", username)
self.db.delete_tokens(token=token, username=username)
raise ServiceForbidden() # TokenNotFound() ?
plugin = self._get_send_plugin(token_from_db['method'])
email_verified = plugin.password_reset_verified_recovery_email()
ret = self.udm_set_password(username, password, email_verified=email_verified)
self.db.delete_tokens(token=token, username=username)
if ret:
raise UMC_Error(_("Successfully changed your password."), status=200)
raise UMC_Error(_('Failed to change password.'), status=500)
def _check_token(self, username, token, token_application='password_reset'):
try:
token_from_db = self.db.get_one(token=token, username=username)
except MultipleTokensInDB as e:
# this should not happen, delete all tokens, raise Exception
# regardless of correctness of token
MODULE.error("set_password(): %s", e)
self.db.delete_tokens(token=token, username=username)
raise TokenNotFound()
if not token_from_db:
# no token in DB
MODULE.info("Token not found in DB for user '%s'.", username)
raise TokenNotFound()
if (datetime.datetime.utcnow() - token_from_db["timestamp"]).total_seconds() >= self.token_validity_period:
# token is correct but expired
MODULE.info("Receive correct but expired token for '%s'.", username)
self.db.delete_tokens(token=token, username=username)
raise TokenNotFound()
if self._get_send_plugin(token_from_db['method']).message_application() != token_application:
# token is correct but should not be used for this application
MODULE.info("Receive correct token for '%s' but it should be used for another application.", username)
self.db.delete_tokens(token=token, username=username)
raise TokenNotFound()
return token_from_db
[docs]
@forward_to_master
@simple_response
def get_reset_methods(self) -> list[dict[str, Any]]:
if ucr.is_false('umc/self-service/passwordreset/backend/enabled'):
msg = _('The password reset was disabled via the Univention Configuration Registry.')
MODULE.error("get_reset_methods(): %s", msg)
raise UMC_Error(msg)
reset_methods = [{
"id": p.send_method(),
"label": p.send_method_label(),
} for p in self.password_reset_plugins.values()]
if not reset_methods:
raise NoMethodsAvailable()
return reset_methods
[docs]
@staticmethod
def create_token(length):
# remove easily confusable characters
chars = ''.join(set(string.ascii_letters) | set(string.digits) - {"0", "O", "1", "I", "l"})
rand = random.SystemRandom()
return ''.join(rand.choice(chars) for _ in range(length))
[docs]
def send_message(self, username, method, address, user_properties):
plugin = self._get_send_plugin(method)
try:
token_from_db = self.db.get_one(username=username)
except MultipleTokensInDB as e:
# this should not happen, delete all tokens
MODULE.error("send_token(): %s", e)
self.db.delete_tokens(username=username)
token_from_db = None
token = self.create_token(plugin.token_length)
if token_from_db:
# replace with fresh token
MODULE.info("send_token(): Updating token for user '%s'...", username)
self.db.update_token(username, method, token)
else:
# store a new token
MODULE.info("send_token(): Adding new token for user '%s'...", username)
self.db.insert_token(username, method, token)
try:
self._call_send_msg_plugin(username, method, address, token, user_properties)
except Exception:
MODULE.error("send_token(): Error sending token with via '%s' to '%s'.", method, username)
self.db.delete_tokens(username=username)
raise
return True
def _get_send_plugin(self, method):
try:
plugin = self.send_plugins[method]
if not plugin.is_enabled:
raise KeyError
except KeyError:
raise UMC_Error("Unknown send message method", status=500)
return plugin
def _call_send_msg_plugin(self, username, method, address, token, user_properties):
MODULE.info("send_message(): username: %s method: %s address: %s", username, method, address)
plugin = self._get_send_plugin(method)
plugin.set_data({
"username": username,
"address": address,
"token": token,
"user_properties": user_properties,
})
MODULE.info("send_message(): Running plugin of class %s...", plugin.__class__.__name__)
try:
plugin.send()
except Exception as exc:
MODULE.exception('Unknown error')
raise UMC_Error(_("Error sending token: {}").format(exc), status=500)
return True
[docs]
@staticmethod
@machine_connection
def auth(username, password, ldap_connection=None, ldap_position=None):
filter_s = filter_format("(|(uid=%s)(mailPrimaryAddress=%s))", (username, username))
users = ldap_connection.search(filter=filter_s)
try:
binddn, userdict = users[0]
get_user_connection(binddn=binddn, bindpw=password)
except (udm_errors.authFail, IndexError):
raise ServiceForbidden()
return binddn, userdict["uid"][0].decode('utf-8')
[docs]
def authenticate_user(self, username=None, password=None):
"""Check if the user is already authenticated (via UMC/SAML login) or use the credentials provided via the form."""
if username and password: # credentials provided, use them
dn, username = self.auth(username, password)
return (dn, username)
elif self.user_dn and self.username and not DISALLOW_AUTHENTICATION: # logged in via SAML/UMC
return (self.user_dn, self.username)
else: # malformed request, cannot really happen
raise UMC_Error('Please provide username and password.')
def _notify_about_email_change(self, username, old_email, new_email):
if not old_email:
return
new_email = new_email or ''
ucr.load()
path_ucr = ucr.get("umc/self-service/email-change-notification/email/text_file")
if path_ucr and os.path.exists(path_ucr):
path = path_ucr
else:
path = "/usr/share/univention-self-service/email_bodies/email_change_notification_email_body.txt"
with open(path) as fp:
txt = fp.read()
txt = txt.format(username=username, old_email=old_email, new_email=new_email)
msg = MIMENonMultipart('text', 'plain', charset='utf-8')
msg["Subject"] = ucr.get("umc/self-service/email-change-notification/email/subject", "Account recovery email changed")
msg["Date"] = formatdate(localtime=True)
msg["From"] = ucr.get("umc/self-service/passwordreset/email/sender_address", "Password Reset Service <noreply@{}>".format(".".join([ucr["hostname"], ucr["domainname"]])))
msg["To"] = old_email
cs = email.charset.Charset("utf-8")
cs.body_encoding = email.charset.QP
msg.set_payload(txt, charset=cs)
smtp = smtplib.SMTP(ucr.get("umc/self-service/passwordreset/email/server", "localhost"))
smtp.sendmail(msg["From"], msg["To"], msg.as_string())
smtp.quit()
[docs]
def admember_set_password(self, username, password):
ldb_url = ucr.get('connector/ad/ldap/host')
ldb_url = 'ldaps://%s' % (ldb_url,) if ucr.is_true('connector/ad/ldap/ldaps') else 'ldap://%s' % (ldb_url,)
try:
reset_username = dict(ucr)['ad/reset/username']
with open(dict(ucr)['ad/reset/password']) as fd:
reset_password = fd.readline().strip()
except (OSError, KeyError):
raise UMC_Error(_('The configuration of the password reset service is not complete. The UCR variables "ad/reset/username" and "ad/reset/password" need to be set properly. Please inform an administrator.'), status=500)
process = Popen(['samba-tool', 'user', 'setpassword', '--username', reset_username, '--password', reset_password, '--filter', filter_format('samaccountname=%s', (username,)), '--newpassword', password, '-H', ldb_url], stdout=PIPE, stderr=STDOUT)
stdouterr = process.communicate()[0].decode('utf-8', 'replace')
if stdouterr:
MODULE.process('samba-tool user setpassword: %s', stdouterr)
if process.returncode:
MODULE.error("admember_set_password(): failed to set password. Return code: %s", process.returncode)
return False
return True
[docs]
def udm_set_password(self, username, password, email_verified):
user = self.get_udm_user(username=username, admin=True)
if ucr.is_true('ad/member') and 'synced' in user.get('objectFlag', []):
success = self.admember_set_password(username, password)
else:
user["password"] = password
user["pwdChangeNextLogin"] = 0
success = True
if email_verified:
user["PasswordRecoveryEmailVerified"] = 'TRUE'
try:
user.modify()
except (udm_errors.pwToShort, udm_errors.pwQuality) as exc:
password_complexity_message = ucr.get('umc/login/password-complexity-message/%s' % (self.locale.language,), ucr.get('umc/login/password-complexity-message/en', ''))
raise UMC_Error(("%s %s" % (exc, password_complexity_message)).rstrip())
except udm_errors.pwalreadyused as exc:
raise UMC_Error(exc.message)
except Exception:
MODULE.exception("udm_set_password(): failed to set password")
raise
else:
return success
# TODO: decoratorize
[docs]
@machine_connection
def is_blacklisted(self, username, feature, ldap_connection=None, ldap_position=None):
def listize(li):
return [x.strip().lower() for x in li.split(",") if x.strip()]
bl_users = listize(ucr.get(f"umc/self-service/{feature}/blacklist/users", ""))
bl_groups = listize(ucr.get(f"umc/self-service/{feature}/blacklist/groups", ""))
wh_users = listize(ucr.get(f"umc/self-service/{feature}/whitelist/users", ""))
wh_groups = listize(ucr.get(f"umc/self-service/{feature}/whitelist/groups", ""))
username = self.email2username(username)
# user blacklist
if username.lower() in bl_users:
MODULE.info("is_blacklisted(username: %s, feature: %s): match in blacklisted users", username, feature)
return True
# get groups
try:
filter_s = filter_format("(|(uid=%s)(mailPrimaryAddress=%s))", (username, username))
userdn = ldap_connection.searchDn(filter=filter_s)[0]
groups_dns = self.get_groups(userdn)
for group_dn in list(groups_dns):
groups_dns.extend(self.get_nested_groups(group_dn))
groups_dns = list(set(groups_dns))
gr_names = [x.lower() for x in self.dns_to_groupname(groups_dns)]
except IndexError:
# no user or no group found
return True
# group blacklist
if any(gr in bl_groups for gr in gr_names):
MODULE.info("is_blacklisted(username: %s, feature: %s): match in blacklisted groups", username, feature)
return True
# if not on blacklist, check whitelists
# user whitelist
if username.lower() in wh_users:
MODULE.info("is_blacklisted(username: %s, feature: %s): match in whitelisted users", username, feature)
return False
# group whitelist
if any(gr in wh_groups for gr in gr_names):
MODULE.info("is_blacklisted(username: %s, feature: %s): match in whitelisted groups", username, feature)
return False
# not on either black or white list -> not allowed if whitelist exists, else OK
MODULE.info("is_blacklisted(username: %s, feature: %s): neither black nor white listed", username, feature)
return bool(wh_users or wh_groups)
[docs]
def get_groups(self, userdn):
user = self.get_udm_user_by_dn(userdn=userdn)
groups = user["groups"]
prim_group = user["primaryGroup"]
if prim_group not in groups:
groups.append(prim_group)
return groups
[docs]
def get_nested_groups(self, groupdn):
group = self.get_udm_group(groupdn)
res = group["memberOf"] or []
for ng in list(res):
res.extend(self.get_nested_groups(ng))
return res
[docs]
def dns_to_groupname(self, dns):
names = []
for groupdn in dns:
group = self.get_udm_group(groupdn)
names.append(group["name"])
return names
[docs]
def get_udm_user_by_dn(self, userdn, admin=False):
if admin:
lo, po = get_admin_connection()
else:
lo, po = get_machine_connection()
user = self.usersmod.object(None, lo, po, userdn)
user.open()
return user
[docs]
def get_udm_user(self, username, admin=False):
filter_s = filter_format('(|(uid=%s)(mailPrimaryAddress=%s))', (username, username))
lo, _po = get_machine_connection()
dn = lo.searchDn(filter=filter_s)[0]
return self.get_udm_user_by_dn(dn, admin=admin)
[docs]
@machine_connection
def get_udm_group(self, groupdn, ldap_connection=None, ldap_position=None):
# reuse module for recursive lookups by get_nested_groups()
if not self.groupmod:
univention.admin.modules.update()
self.groupmod = univention.admin.modules.get("groups/group")
univention.admin.modules.init(ldap_connection, ldap_position, self.groupmod)
group = self.groupmod.object(None, ldap_connection, ldap_position, groupdn)
group.open()
return group
[docs]
@machine_connection # TODO: overwrite StringSanitizer and do it there
def email2username(self, email, ldap_connection=None, ldap_position=None):
if "@" not in email:
return email
# cache email->username in memcache
username = self.memcache.get(f"e2u:{email}")
if not username:
mailf = filter_format("(mailPrimaryAddress=%s)", (email,))
users = ldap_connection.search(filter=mailf)
try:
_, userdict = users[0]
except IndexError:
return email
username = userdict["uid"][0].decode('utf-8')
self.memcache.set(f"e2u:{email}", username, 300)
return username