Source code for univention.appcenter.actions.credentials
#!/usr/bin/python3
#
# Univention App Center
# univention-app module for actions needing credentials
#
# SPDX-FileCopyrightText: 2015-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
#
import time
from argparse import SUPPRESS
from contextlib import contextmanager
from copy import deepcopy
from getpass import getpass, getuser
from tempfile import NamedTemporaryFile
import ldap
from univention.appcenter.actions import UniventionAppAction
from univention.appcenter.exceptions import (
ConnectionFailed, ConnectionFailedConnectError, ConnectionFailedInvalidAdminCredentials,
ConnectionFailedInvalidMachineCredentials, ConnectionFailedInvalidUserCredentials, ConnectionFailedSecretFile,
ConnectionFailedServerDown, CredentialsNoPasswordError, CredentialsNoUsernameError,
)
from univention.appcenter.ucr import ucr_get
from univention.appcenter.udm import get_admin_connection, get_connection, get_machine_connection, search_objects
[docs]
class CredentialsAction(UniventionAppAction):
def __init__(self):
super().__init__()
self._username = None
self._userdn = None
self._password = None
[docs]
def setup_parser(self, parser):
parser.add_argument('--noninteractive', action='store_true', help='Do not prompt for anything, just agree or skip')
parser.add_argument('--username', default='Administrator', help='The username used for registering the app. Default: %(default)s')
parser.add_argument('--pwdfile', help='Filename containing the password for registering the app. See --username')
parser.add_argument('--password', help=SUPPRESS)
[docs]
def check_user_credentials(self, args):
try:
_lo, _pos = self._get_ldap_connection(args, allow_machine_connection=False, allow_admin_connection=False)
except ConnectionFailed:
return False
else:
return True
def _get_username(self, args):
if self._username is not None:
return self._username
if args.username:
return args.username
if not args.noninteractive:
try:
username = input('Username [Administrator]: ') or 'Administrator'
except (EOFError, KeyboardInterrupt):
raise CredentialsNoUsernameError()
self._username = username
return username
def _get_password(self, args, ask=True):
username = self._get_username(args)
if not username:
return None
if self._password is not None:
return self._password
if args.password:
return args.password
if args.pwdfile:
password = open(args.pwdfile).read().rstrip('\n')
return password
if ask and not args.noninteractive:
self._password = self._get_password_for(username)
return self._password
def _get_password_for(self, username):
try:
return getpass('Password for %s: ' % username)
except (EOFError, KeyboardInterrupt):
raise CredentialsNoPasswordError()
@contextmanager
def _get_password_file(self, args=None, password=None):
if password is None:
password = self._get_password(args)
if not password:
yield None
else:
with NamedTemporaryFile('w') as password_file:
password_file.write(password)
password_file.flush()
yield password_file.name
def _get_userdn(self, args):
username = self._get_username(args)
if not username:
return None
lo, pos = self._get_ldap_connection(args=None, allow_machine_connection=True)
users = search_objects('users/user', lo, pos, uid=username)
if users:
return users[0].dn
else:
self.fatal('Cannot find user %s' % username)
def _get_machine_connection(self):
try:
return get_machine_connection()
except OSError:
raise ConnectionFailedSecretFile()
except ldap.INVALID_CREDENTIALS:
raise ConnectionFailedInvalidMachineCredentials()
except ldap.CONNECT_ERROR as exc:
raise ConnectionFailedConnectError(exc)
except ldap.SERVER_DOWN:
raise ConnectionFailedServerDown()
def _get_admin_connection(self):
try:
return get_admin_connection()
except OSError:
raise ConnectionFailedSecretFile()
except ldap.INVALID_CREDENTIALS:
raise ConnectionFailedInvalidAdminCredentials()
except ldap.CONNECT_ERROR as exc:
raise ConnectionFailedConnectError(exc)
except ldap.SERVER_DOWN:
raise ConnectionFailedServerDown()
def _get_ldap_connection(self, args, allow_machine_connection=False, allow_admin_connection=True):
if allow_admin_connection and ucr_get('server/role') == 'domaincontroller_master' and getuser() == 'root':
try:
return self._get_admin_connection()
except ConnectionFailed:
if allow_machine_connection or args is not None:
# try to get another connection
pass
else:
raise
if allow_machine_connection:
try:
return self._get_machine_connection()
except ConnectionFailed:
if args is not None:
# try to get another connection
pass
else:
raise
attempts = 0
if args is not None:
args = deepcopy(args)
while attempts < 3:
attempts += 1
userdn = self._get_userdn(args)
password = self._get_password(args)
try:
if not userdn or not password:
raise ldap.INVALID_CREDENTIALS()
return get_connection(userdn, password)
except ldap.CONNECT_ERROR as exc:
raise ConnectionFailedConnectError(exc)
except ldap.SERVER_DOWN:
raise ConnectionFailedServerDown()
except ldap.INVALID_CREDENTIALS:
time.sleep(0.1)
self.warn('Invalid credentials')
args.username = None
self._username = None
args.pwdfile = None
self._password = None
raise ConnectionFailedInvalidUserCredentials()
raise ConnectionFailed()