Source code for univention.radius.networkaccess

# -*- coding: utf-8 -*-
#
# Univention RADIUS
#
# Copyright (C) 2012-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of the software contained in this package
# as well as the source package itself are made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this package provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use the software under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

import codecs
import logging
import os
from typing import Dict, List, Optional, Tuple  # noqa: F401

from ldap import SERVER_DOWN
from ldap.filter import filter_format

import univention.uldap
import univention.config_registry

from .utils import decode_stationId, parse_username

SAMBA_ACCOUNT_FLAG_DISABLED = 'D'
SAMBA_ACCOUNT_FLAG_LOCKED = 'L'
DISALLOWED_SAMBA_ACCOUNT_FLAGS = frozenset((SAMBA_ACCOUNT_FLAG_DISABLED, SAMBA_ACCOUNT_FLAG_LOCKED, ))


[docs]def convert_network_access_attr(attributes): # type: (Dict[str, List[bytes]]) -> bool return b'1' in attributes.get('univentionNetworkAccess', [])
[docs]def convert_ucs_debuglevel(ucs_debuglevel): # type: (int) -> int logging_debuglevel = [logging.ERROR, logging.WARN, logging.INFO, logging.INFO, logging.DEBUG][max(0, min(4, ucs_debuglevel))] return logging_debuglevel
[docs]def get_ldapConnection(): # type: () -> univention.uldap.access try: # try ldap/server/name, then each of ldap/server/addition return univention.uldap.getMachineConnection(ldap_master=False, reconnect=False) except SERVER_DOWN: # then primary directory node return univention.uldap.getMachineConnection()
[docs]class NetworkAccessError(Exception): def __init__(self, msg): # type: (str) -> None self.msg = msg
[docs]class UserNotAllowedError(NetworkAccessError): pass
[docs]class MacNotAllowedError(NetworkAccessError): pass
[docs]class NoHashError(NetworkAccessError): pass
[docs]class UserDeactivatedError(NetworkAccessError): pass
[docs]class NetworkAccess(object): def __init__(self, username, stationId, loglevel=None, logfile=None): # type: (str, str, Optional[int], Optional[str]) -> None self.username = parse_username(username) self.mac_address = decode_stationId(stationId) self.ldapConnection = get_ldapConnection() self.configRegistry = univention.config_registry.ConfigRegistry() self.configRegistry.load() self.use_ssp = self.configRegistry.is_true('radius/use-service-specific-password') self.whitelisting = self.configRegistry.is_true('radius/mac/whitelisting') self._setup_logger(loglevel, logfile) self.logger.debug('Given username: %r', username) self.logger.debug('Given stationId: %r', stationId) def _setup_logger(self, loglevel, logfile): # type: (Optional[int], Optional[str]) -> None if loglevel is not None: ucs_debuglevel = loglevel else: try: ucs_debuglevel = int(self.configRegistry.get('freeradius/auth/helper/ntlm/debug', '2')) except ValueError: ucs_debuglevel = 2 debuglevel = convert_ucs_debuglevel(ucs_debuglevel) self.logger = logging.getLogger('radius-ntlm') self.logger.setLevel(debuglevel) if logfile is not None: log_handler = logging.FileHandler(logfile) # type: logging.Handler log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)10s: [pid={}; user={}; mac={}] %(message)s'.format(os.getpid(), self.username, self.mac_address)) else: log_handler = logging.StreamHandler() log_formatter = logging.Formatter('%(levelname)10s: [user={}; mac={}] %(message)s'.format(self.username, self.mac_address)) log_handler.setFormatter(log_formatter) self.logger.addHandler(log_handler) # self.logger.info("Loglevel set to: %s", ucs_debuglevel)
[docs] def build_access_dict(self, ldap_result): # type: (List[Tuple[str, Dict[str, List[bytes]]]]) -> Dict[str, bool] access_dict = { dn: convert_network_access_attr(attributes) for (dn, attributes) in ldap_result } return access_dict
[docs] def get_user_network_access(self, uid): # type: (str) -> Dict[str, bool] users = self.ldapConnection.search(filter=filter_format('(uid=%s)', (uid, )), attr=['univentionNetworkAccess']) return self.build_access_dict(users)
[docs] def get_station_network_access(self, mac_address): # type: (str) -> Dict[str, bool] stations = self.ldapConnection.search(filter=filter_format('(macAddress=%s)', (mac_address, )), attr=['univentionNetworkAccess']) return self.build_access_dict(stations)
[docs] def get_groups_network_access(self, dn): # type: (str) -> Dict[str, bool] groups = self.ldapConnection.search(filter=filter_format('(uniqueMember=%s)', (dn, )), attr=['univentionNetworkAccess']) return self.build_access_dict(groups)
[docs] def evaluate_ldap_network_access(self, access, level=''): # type: (Dict[str, bool], str) -> bool short_circuit = not self.logger.isEnabledFor(logging.DEBUG) policy = any(access.values()) if short_circuit and policy: return policy for dn, pol in access.items(): self.logger.debug("%s%s %r", level, 'ALLOW' if pol else 'DENY', dn) parents_access = self.get_groups_network_access(dn) if self.evaluate_ldap_network_access(parents_access, level=level + '-> '): policy = True if short_circuit: break return policy
[docs] def check_proxy_filter_policy(self): # type: () -> bool '''Dummy function for UCS@school''' self.logger.debug('UCS@school RADIUS support is not installed') return False
[docs] def check_network_access(self): # type: () -> bool result = self.get_user_network_access(self.username) if not result: self.logger.info('Login attempt with unknown username') return False self.logger.debug('Checking LDAP settings for user') policy = self.evaluate_ldap_network_access(result) if policy: self.logger.info('Login attempt permitted by LDAP settings') else: self.logger.info('Login attempt denied by LDAP settings') return policy
[docs] def check_station_whitelist(self): # type: () -> bool if not self.whitelisting: self.logger.debug('MAC filtering is disabled by radius/mac/whitelisting.') return True self.logger.debug('Checking LDAP settings for stationId') if not self.mac_address: self.logger.info('Login attempt without MAC address, but MAC filtering is enabled.') return False result = self.get_station_network_access(self.mac_address) if not result: self.logger.info('Login attempt with unknown MAC address') return False policy = self.evaluate_ldap_network_access(result) if policy: self.logger.info('Login attempt permitted by LDAP settings') else: self.logger.info('Login attempt denied by LDAP settings') return policy
[docs] def getNTPasswordHash(self): # type: () -> bytes 'stationId may be not supplied to the program' if not (self.check_proxy_filter_policy() or self.check_network_access()): raise UserNotAllowedError('User is not allowed to authenticate via RADIUS') if not self.check_station_whitelist(): raise MacNotAllowedError('stationId is denied, because it is not whitelisted') # user is authorized to authenticate via RADIUS, retrieve NT-password-hash from LDAP and return it self.logger.info('User is allowed to use RADIUS') pwd_attr = 'univentionRadiusPassword' if self.use_ssp else 'sambaNTPassword' result = self.ldapConnection.search(filter=filter_format('(uid=%s)', (self.username, )), attr=[pwd_attr, 'sambaAcctFlags']) try: nt_password_hash = codecs.decode(result[0][1][pwd_attr][0], 'hex') except (IndexError, KeyError, TypeError): raise NoHashError('No valid NT-password-hash found. Check the "%s" attribute of the user.' % (pwd_attr,)) sambaAccountFlags = frozenset(result[0][1]['sambaAcctFlags'][0].decode('UTF-8')) if sambaAccountFlags & DISALLOWED_SAMBA_ACCOUNT_FLAGS: raise UserDeactivatedError('Account is deactivated') return nt_password_hash