#
# Univention RADIUS
#
# SPDX-FileCopyrightText: 2012-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
import codecs
import logging
import os
from ldap import SERVER_DOWN
from ldap.filter import filter_format
import univention.config_registry
import univention.uldap
from .utils import decode_stationId, parse_username
SAMBA_ACCOUNT_FLAG_DISABLED = 'D'
SAMBA_ACCOUNT_FLAG_LOCKED = 'L'
DISALLOWED_SAMBA_ACCOUNT_FLAGS = frozenset((SAMBA_ACCOUNT_FLAG_DISABLED, SAMBA_ACCOUNT_FLAG_LOCKED))
[docs]
def convert_network_access_attr(attributes: dict[str, list[bytes]]) -> bool:
return b'1' in attributes.get('univentionNetworkAccess', [])
[docs]
def convert_ucs_debuglevel(ucs_debuglevel: int) -> int:
logging_debuglevel = [logging.ERROR, logging.WARNING, logging.INFO, logging.INFO, logging.DEBUG][max(0, min(4, ucs_debuglevel))]
return logging_debuglevel
[docs]
def get_ldapConnection(logger: logging.Logger | None = None) -> univention.uldap.access:
secret_file = '/etc/freeradius.secret'
if logger is None:
logger = logging.getLogger('radius-ntlm')
# check if file is readable
try:
with open(secret_file) as fd:
fd.read()
except OSError as exc:
logger.critical('Unable to read %r: %s', secret_file, exc)
raise
try:
# try ldap/server/name, then each of ldap/server/addition
return univention.uldap.getMachineConnection(ldap_master=False, reconnect=False, secret_file=secret_file)
except SERVER_DOWN:
# then primary directory node
return univention.uldap.getMachineConnection(secret_file=secret_file)
[docs]
class NetworkAccessError(Exception):
def __init__(self, msg: str) -> None:
self.msg = msg
[docs]
class UserNotAllowedError(NetworkAccessError):
pass
[docs]
class MacNotAllowedError(NetworkAccessError):
pass
[docs]
class NoHashError(NetworkAccessError):
pass
[docs]
class UserDeactivatedError(NetworkAccessError):
pass
[docs]
class NetworkAccess:
def __init__(self, username: str, stationId: str, loglevel: int | None = None, logfile: str | None = None) -> None:
self.username = parse_username(username)
self.mac_address = decode_stationId(stationId)
self.configRegistry = univention.config_registry.ConfigRegistry()
self.configRegistry.load()
self.use_ssp = self.configRegistry.is_true('radius/use-service-specific-password')
self.whitelisting = self.configRegistry.is_true('radius/mac/whitelisting')
self._setup_logger(loglevel, logfile)
self.logger.debug('Given username: %r', username)
self.logger.debug('Given stationId: %r', stationId)
self._ldapConnection = None
@property
def ldapConnection(self):
if self._ldapConnection is None:
self._ldapConnection = get_ldapConnection(logger=self.logger)
return self._ldapConnection
def _setup_logger(self, loglevel: int | None, logfile: str | None) -> None:
if loglevel is not None:
ucs_debuglevel = loglevel
else:
try:
ucs_debuglevel = int(self.configRegistry.get('freeradius/auth/helper/ntlm/debug', '2'))
except ValueError:
ucs_debuglevel = 2
debuglevel = convert_ucs_debuglevel(ucs_debuglevel)
self.logger = logging.getLogger('radius-ntlm')
self.logger.setLevel(debuglevel)
if logfile is not None:
log_handler: logging.Handler = logging.FileHandler(logfile)
log_formatter = logging.Formatter(f'%(asctime)s - %(name)s - %(levelname)10s: [pid={os.getpid()}; user={self.username}; mac={self.mac_address}] %(message)s')
else:
log_handler = logging.StreamHandler()
log_formatter = logging.Formatter(f'%(levelname)10s: [user={self.username}; mac={self.mac_address}] %(message)s')
log_handler.setFormatter(log_formatter)
self.logger.addHandler(log_handler)
# self.logger.info("Loglevel set to: %s", ucs_debuglevel)
[docs]
def build_access_dict(self, ldap_result: list[tuple[str, dict[str, list[bytes]]]]) -> dict[str, bool]:
access_dict = {
dn: convert_network_access_attr(attributes)
for (dn, attributes) in ldap_result
}
return access_dict
[docs]
def get_user_network_access(self, uid: str) -> dict[str, bool]:
users = self.ldapConnection.search(filter=filter_format('(uid=%s)', (uid, )), attr=['univentionNetworkAccess'])
if not users:
users = self.ldapConnection.search(filter=filter_format('(mailPrimaryAddress=%s)', (uid, )), attr=['univentionNetworkAccess'])
if not users:
users = self.ldapConnection.search(filter=filter_format('(macAddress=%s)', (uid,)), attr=['univentionNetworkAccess'])
return self.build_access_dict(users)
[docs]
def get_station_network_access(self, mac_address: str) -> dict[str, bool]:
stations = self.ldapConnection.search(filter=filter_format('(macAddress=%s)', (mac_address, )), attr=['univentionNetworkAccess'])
return self.build_access_dict(stations)
[docs]
def get_groups_network_access(self, dn: str) -> dict[str, bool]:
groups = self.ldapConnection.search(filter=filter_format('(uniqueMember=%s)', (dn, )), attr=['univentionNetworkAccess'])
return self.build_access_dict(groups)
[docs]
def evaluate_ldap_network_access(self, access: dict[str, bool], level: str = '') -> bool:
short_circuit = not self.logger.isEnabledFor(logging.DEBUG)
policy = any(access.values())
if short_circuit and policy:
return policy
for dn, pol in access.items():
self.logger.debug("%s%s %r", level, 'ALLOW' if pol else 'DENY', dn)
parents_access = self.get_groups_network_access(dn)
if self.evaluate_ldap_network_access(parents_access, level=level + '-> '):
policy = True
if short_circuit:
break
return policy
[docs]
def check_proxy_filter_policy(self) -> bool:
"""Dummy function for UCS@school"""
self.logger.debug('UCS@school RADIUS support is not installed')
return False
[docs]
def check_network_access(self) -> bool:
result = self.get_user_network_access(self.username)
if not result:
self.logger.info('Login attempt with unknown username')
return False
self.logger.debug('Checking LDAP settings for user')
policy = self.evaluate_ldap_network_access(result)
if policy:
self.logger.info('Login attempt permitted by LDAP settings')
else:
self.logger.info('Login attempt denied by LDAP settings')
return policy
[docs]
def check_station_whitelist(self) -> bool:
if not self.whitelisting:
self.logger.debug('MAC filtering is disabled by radius/mac/whitelisting.')
return True
self.logger.debug('Checking LDAP settings for stationId')
if not self.mac_address:
self.logger.info('Login attempt without MAC address, but MAC filtering is enabled.')
return False
result = self.get_station_network_access(self.mac_address)
if not result:
self.logger.info('Login attempt with unknown MAC address')
return False
policy = self.evaluate_ldap_network_access(result)
if policy:
self.logger.info('Login attempt permitted by LDAP settings')
else:
self.logger.info('Login attempt denied by LDAP settings')
return policy
[docs]
def getNTPasswordHash(self) -> bytes:
"stationId may be not supplied to the program"
if not (self.check_proxy_filter_policy() or self.check_network_access()):
raise UserNotAllowedError('User is not allowed to authenticate via RADIUS')
if not self.check_station_whitelist():
raise MacNotAllowedError('stationId is denied, because it is not whitelisted')
# user is authorized to authenticate via RADIUS, retrieve NT-password-hash from LDAP and return it
self.logger.info('User is allowed to use RADIUS')
pwd_attr = 'univentionRadiusPassword' if self.use_ssp else 'sambaNTPassword'
if '@' in self.username:
result = self.ldapConnection.search(filter=filter_format('(mailPrimaryAddress=%s)', (self.username, )), attr=[pwd_attr, 'sambaAcctFlags'])
else:
result = self.ldapConnection.search(filter=filter_format('(|(uid=%s)(macAddress=%s))', (self.username, self.username)), attr=[pwd_attr, 'sambaAcctFlags'])
try:
nt_password_hash: bytes = codecs.decode(result[0][1][pwd_attr][0], 'hex')
except (IndexError, KeyError, TypeError):
raise NoHashError('No valid NT-password-hash found. Check the "%s" attribute of the user.' % (pwd_attr,))
sambaAccountFlags = frozenset(result[0][1]['sambaAcctFlags'][0].decode('UTF-8'))
if sambaAccountFlags & DISALLOWED_SAMBA_ACCOUNT_FLAGS:
raise UserDeactivatedError('Account is deactivated')
return nt_password_hash