Source code for univention.radius.networkaccess

#
# Univention RADIUS
#
# SPDX-FileCopyrightText: 2012-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

import codecs
import logging
import os

from ldap import SERVER_DOWN
from ldap.filter import filter_format

import univention.config_registry
import univention.uldap

from .utils import decode_stationId, parse_username


SAMBA_ACCOUNT_FLAG_DISABLED = 'D'
SAMBA_ACCOUNT_FLAG_LOCKED = 'L'
DISALLOWED_SAMBA_ACCOUNT_FLAGS = frozenset((SAMBA_ACCOUNT_FLAG_DISABLED, SAMBA_ACCOUNT_FLAG_LOCKED))


[docs] def convert_network_access_attr(attributes: dict[str, list[bytes]]) -> bool: return b'1' in attributes.get('univentionNetworkAccess', [])
[docs] def convert_ucs_debuglevel(ucs_debuglevel: int) -> int: logging_debuglevel = [logging.ERROR, logging.WARNING, logging.INFO, logging.INFO, logging.DEBUG][max(0, min(4, ucs_debuglevel))] return logging_debuglevel
[docs] def get_ldapConnection(logger: logging.Logger | None = None) -> univention.uldap.access: secret_file = '/etc/freeradius.secret' if logger is None: logger = logging.getLogger('radius-ntlm') # check if file is readable try: with open(secret_file) as fd: fd.read() except OSError as exc: logger.critical('Unable to read %r: %s', secret_file, exc) raise try: # try ldap/server/name, then each of ldap/server/addition return univention.uldap.getMachineConnection(ldap_master=False, reconnect=False, secret_file=secret_file) except SERVER_DOWN: # then primary directory node return univention.uldap.getMachineConnection(secret_file=secret_file)
[docs] class NetworkAccessError(Exception): def __init__(self, msg: str) -> None: self.msg = msg
[docs] class UserNotAllowedError(NetworkAccessError): pass
[docs] class MacNotAllowedError(NetworkAccessError): pass
[docs] class NoHashError(NetworkAccessError): pass
[docs] class UserDeactivatedError(NetworkAccessError): pass
[docs] class NetworkAccess: def __init__(self, username: str, stationId: str, loglevel: int | None = None, logfile: str | None = None) -> None: self.username = parse_username(username) self.mac_address = decode_stationId(stationId) self.configRegistry = univention.config_registry.ConfigRegistry() self.configRegistry.load() self.use_ssp = self.configRegistry.is_true('radius/use-service-specific-password') self.whitelisting = self.configRegistry.is_true('radius/mac/whitelisting') self._setup_logger(loglevel, logfile) self.logger.debug('Given username: %r', username) self.logger.debug('Given stationId: %r', stationId) self._ldapConnection = None @property def ldapConnection(self): if self._ldapConnection is None: self._ldapConnection = get_ldapConnection(logger=self.logger) return self._ldapConnection def _setup_logger(self, loglevel: int | None, logfile: str | None) -> None: if loglevel is not None: ucs_debuglevel = loglevel else: try: ucs_debuglevel = int(self.configRegistry.get('freeradius/auth/helper/ntlm/debug', '2')) except ValueError: ucs_debuglevel = 2 debuglevel = convert_ucs_debuglevel(ucs_debuglevel) self.logger = logging.getLogger('radius-ntlm') self.logger.setLevel(debuglevel) if logfile is not None: log_handler: logging.Handler = logging.FileHandler(logfile) log_formatter = logging.Formatter(f'%(asctime)s - %(name)s - %(levelname)10s: [pid={os.getpid()}; user={self.username}; mac={self.mac_address}] %(message)s') else: log_handler = logging.StreamHandler() log_formatter = logging.Formatter(f'%(levelname)10s: [user={self.username}; mac={self.mac_address}] %(message)s') log_handler.setFormatter(log_formatter) self.logger.addHandler(log_handler) # self.logger.info("Loglevel set to: %s", ucs_debuglevel)
[docs] def build_access_dict(self, ldap_result: list[tuple[str, dict[str, list[bytes]]]]) -> dict[str, bool]: access_dict = { dn: convert_network_access_attr(attributes) for (dn, attributes) in ldap_result } return access_dict
[docs] def get_user_network_access(self, uid: str) -> dict[str, bool]: users = self.ldapConnection.search(filter=filter_format('(uid=%s)', (uid, )), attr=['univentionNetworkAccess']) if not users: users = self.ldapConnection.search(filter=filter_format('(mailPrimaryAddress=%s)', (uid, )), attr=['univentionNetworkAccess']) if not users: users = self.ldapConnection.search(filter=filter_format('(macAddress=%s)', (uid,)), attr=['univentionNetworkAccess']) return self.build_access_dict(users)
[docs] def get_station_network_access(self, mac_address: str) -> dict[str, bool]: stations = self.ldapConnection.search(filter=filter_format('(macAddress=%s)', (mac_address, )), attr=['univentionNetworkAccess']) return self.build_access_dict(stations)
[docs] def get_groups_network_access(self, dn: str) -> dict[str, bool]: groups = self.ldapConnection.search(filter=filter_format('(uniqueMember=%s)', (dn, )), attr=['univentionNetworkAccess']) return self.build_access_dict(groups)
[docs] def evaluate_ldap_network_access(self, access: dict[str, bool], level: str = '') -> bool: short_circuit = not self.logger.isEnabledFor(logging.DEBUG) policy = any(access.values()) if short_circuit and policy: return policy for dn, pol in access.items(): self.logger.debug("%s%s %r", level, 'ALLOW' if pol else 'DENY', dn) parents_access = self.get_groups_network_access(dn) if self.evaluate_ldap_network_access(parents_access, level=level + '-> '): policy = True if short_circuit: break return policy
[docs] def check_proxy_filter_policy(self) -> bool: """Dummy function for UCS@school""" self.logger.debug('UCS@school RADIUS support is not installed') return False
[docs] def check_network_access(self) -> bool: result = self.get_user_network_access(self.username) if not result: self.logger.info('Login attempt with unknown username') return False self.logger.debug('Checking LDAP settings for user') policy = self.evaluate_ldap_network_access(result) if policy: self.logger.info('Login attempt permitted by LDAP settings') else: self.logger.info('Login attempt denied by LDAP settings') return policy
[docs] def check_station_whitelist(self) -> bool: if not self.whitelisting: self.logger.debug('MAC filtering is disabled by radius/mac/whitelisting.') return True self.logger.debug('Checking LDAP settings for stationId') if not self.mac_address: self.logger.info('Login attempt without MAC address, but MAC filtering is enabled.') return False result = self.get_station_network_access(self.mac_address) if not result: self.logger.info('Login attempt with unknown MAC address') return False policy = self.evaluate_ldap_network_access(result) if policy: self.logger.info('Login attempt permitted by LDAP settings') else: self.logger.info('Login attempt denied by LDAP settings') return policy
[docs] def getNTPasswordHash(self) -> bytes: "stationId may be not supplied to the program" if not (self.check_proxy_filter_policy() or self.check_network_access()): raise UserNotAllowedError('User is not allowed to authenticate via RADIUS') if not self.check_station_whitelist(): raise MacNotAllowedError('stationId is denied, because it is not whitelisted') # user is authorized to authenticate via RADIUS, retrieve NT-password-hash from LDAP and return it self.logger.info('User is allowed to use RADIUS') pwd_attr = 'univentionRadiusPassword' if self.use_ssp else 'sambaNTPassword' if '@' in self.username: result = self.ldapConnection.search(filter=filter_format('(mailPrimaryAddress=%s)', (self.username, )), attr=[pwd_attr, 'sambaAcctFlags']) else: result = self.ldapConnection.search(filter=filter_format('(|(uid=%s)(macAddress=%s))', (self.username, self.username)), attr=[pwd_attr, 'sambaAcctFlags']) try: nt_password_hash: bytes = codecs.decode(result[0][1][pwd_attr][0], 'hex') except (IndexError, KeyError, TypeError): raise NoHashError('No valid NT-password-hash found. Check the "%s" attribute of the user.' % (pwd_attr,)) sambaAccountFlags = frozenset(result[0][1]['sambaAcctFlags'][0].decode('UTF-8')) if sambaAccountFlags & DISALLOWED_SAMBA_ACCOUNT_FLAGS: raise UserDeactivatedError('Account is deactivated') return nt_password_hash