#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Management Console
# authentication mechanisms
#
# Copyright 2014-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
from __future__ import absolute_import
import traceback
import ldap
from ldap.filter import filter_format
import notifier
import notifier.signals as signals
import notifier.threads as threads
import univention.admin.uexceptions as udm_errors
from univention.lib.i18n import Locale
from univention.management.console.log import AUTH
from univention.management.console.config import ucr
from univention.management.console.ldap import get_machine_connection, reset_cache
from univention.management.console.pam import PamAuth, AuthenticationError, AuthenticationFailed, AuthenticationInformationMissing, PasswordExpired, AccountExpired, PasswordChangeFailed
try:
from typing import Any, Dict, Optional, Tuple, Union # noqa: F401
from univention.management.console.protocol.meesage import Request # noqa: F401
except ImportError:
pass
[docs]class AuthenticationResult(object):
def __init__(self, result, locale): # type: (Union[BaseException, Dict[str, str]], Optional[str]) -> None
from univention.management.console.protocol.definitions import SUCCESS, BAD_REQUEST_UNAUTH
self.credentials = None
self.status = SUCCESS
self.authenticated = not isinstance(result, BaseException)
if self.authenticated:
self.credentials = result
self.message = None
self.result = None # type: Optional[Dict[str, Any]]
self.password_expired = False
if isinstance(result, AuthenticationError):
self.status = BAD_REQUEST_UNAUTH
self.message = str(result)
self.result = {}
if isinstance(result, PasswordExpired):
self.result['password_expired'] = True
elif isinstance(result, AccountExpired):
self.result['account_expired'] = True
elif isinstance(result, AuthenticationInformationMissing):
self.result['missing_prompts'] = result.missing_prompts
elif isinstance(result, PasswordChangeFailed):
self.result['password_change_failed'] = True
if isinstance(result, (PasswordExpired, PasswordChangeFailed)):
_locale = Locale(locale)
self.message += (' %s' % (ucr.get('umc/login/password-complexity-message/%s' % (_locale.language,), ucr.get('umc/login/password-complexity-message/en', '')),)).rstrip()
elif isinstance(result, BaseException):
self.status = 500
self.message = str(result)
else:
self.result = {'username': result['username']}
def __bool__(self): # type: () -> bool
return self.authenticated
__nonzero__ = __bool__ # Python 2
[docs]class AuthHandler(signals.Provider):
def __init__(self): # type: () -> None
signals.Provider.__init__(self)
self.signal_new('authenticated')
[docs] def authenticate(self, msg): # type: (Request) -> None
# PAM MUST be initialized outside of a thread. Otherwise it segfaults e.g. with pam_saml.so.
# See http://pam-python.sourceforge.net/doc/html/#bugs
args = msg.body.copy()
locale = args.pop('locale', None)
args.pop('pam', None)
args.setdefault('new_password', None)
args.setdefault('username', '')
args.setdefault('password', '')
pam = PamAuth(locale)
thread = threads.Simple('pam', notifier.Callback(self.__authenticate_thread, pam, **args), notifier.Callback(self.__authentication_result, pam, msg, locale))
thread.run()
def __authenticate_thread(self, pam, username, password, new_password, auth_type=None, **custom_prompts): # type: (PamAuth, str, str, Optional[str], Optional[str], **Optional[str]) -> Tuple[str, str]
AUTH.info('Trying to authenticate user %r (auth_type: %r)' % (username, auth_type))
username = self.__canonicalize_username(username)
try:
pam.authenticate(username, password, **custom_prompts)
except AuthenticationFailed as auth_failed:
AUTH.error(str(auth_failed))
raise
except PasswordExpired as pass_expired:
AUTH.info(str(pass_expired))
if new_password is None:
raise
try:
pam.change_password(username, password, new_password)
except PasswordChangeFailed as change_failed:
AUTH.error(str(change_failed))
raise
else:
AUTH.info('Password change for %r was successful' % (username,))
return (username, new_password)
else:
AUTH.info('Authentication for %r was successful' % (username,))
return (username, password)
def __canonicalize_username(self, username): # type: (str) -> str
try:
lo, po = get_machine_connection(write=False)
result = None
if lo:
attr = 'mailPrimaryAddress' if '@' in username else 'uid'
result = lo.search(filter_format('(&(%s=%s)(objectClass=person))', (attr, username)), attr=['uid'], unique=True)
if result and result[0][1].get('uid'):
username = result[0][1]['uid'][0].decode('utf-8')
AUTH.info('Canonicalized username: %r' % (username,))
except (ldap.LDAPError, udm_errors.ldapError) as exc:
# /etc/machine.secret missing or LDAP server not reachable
AUTH.warn('Canonicalization of username was not possible: %s' % (exc,))
reset_cache()
except Exception:
AUTH.error('Canonicalization of username failed: %s' % (traceback.format_exc(),))
return username
def __authentication_result(self, thread, result, pam, request, locale): # type: (threads.Simple, Union[BaseException, Tuple[str, str], Dict[str, str]], PamAuth, Request, Optional[str]) -> None
pam.end()
if isinstance(result, BaseException) and not isinstance(result, (AuthenticationFailed, AuthenticationInformationMissing, PasswordExpired, PasswordChangeFailed, AccountExpired)):
msg = ''.join(thread.trace + traceback.format_exception_only(*thread.exc_info[:2]))
AUTH.error(msg)
if isinstance(result, tuple):
username, password = result
result = {'username': username, 'password': password, 'auth_type': request.body.get('auth_type')}
auth_result = AuthenticationResult(result, locale)
self.signal_emit('authenticated', auth_result, request)