#!/usr/bin/python3
# SPDX-FileCopyrightText: 2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""Interface to Guardian"""
import functools
import json
import pathlib
import re
from univention.dn import DN
[docs]
class LocalGuardianAuthorizationClient:
def __init__(self, base_path):
self.base_path = base_path
[docs]
def reload(self):
self._get_capabilities.cache_clear()
self.load_local_roles.cache_clear()
[docs]
@staticmethod
@functools.lru_cache(maxsize=1)
def load_local_roles(base_path):
capabilities = (pathlib.Path(base_path) / 'capabilities').glob('**/*.json')
permissions = (pathlib.Path(base_path) / 'permissions').glob('**/*.json')
roles = (pathlib.Path(base_path) / 'roles').glob('**/*.json')
def _cap(x, d):
return {
'name': d['name'],
'fullname': _rol(x, d),
'conditions': [(f'{c["app_name"]}:{c["namespace_name"]}:{c["name"]}', {item['name']: item['value'] for item in c['parameters']}) for c in d['conditions']],
'permissions': [f'{p["app_name"]}:{p["namespace_name"]}:{p["name"]}' for p in d['permissions']],
'relation': {'AND': all, 'OR': any}[d['relation']],
'role': f'{d["role"]["app_name"]}:{d["role"]["namespace_name"]}:{d["role"]["name"]}',
}
def _rol(x, d):
return f'{x.parent.parent.name}:{x.parent.name}:{d["name"]}'
return [
[_cap(p, json.loads(p.read_bytes())) for p in capabilities],
[_rol(p, json.loads(p.read_bytes())) for p in permissions],
[_rol(p, json.loads(p.read_bytes())) for p in roles],
]
@staticmethod
@functools.lru_cache(maxsize=20)
def _get_capabilities(base_path, actor_roles: tuple[str], namespaces):
all_capabilities = LocalGuardianAuthorizationClient.load_local_roles(base_path)[0]
return [
cap
for cap in all_capabilities
if cap['role'] in actor_roles and (not namespaces or any(cap['fullname'].startswith(ns + ':') for ns in namespaces))
]
[docs]
def check_permissions(self, actor, targets, contexts, namespaces, extra_request_data=None, targeted_permissions_to_check=None, general_permissions_to_check=None):
return self.get_and_check_permissions(actor, targets, contexts, namespaces, extra_request_data, targeted_permissions_to_check, general_permissions_to_check)
[docs]
def get_and_check_permissions(self, actor, targets, contexts, namespaces, extra_request_data=None, targeted_permissions_to_check=None, general_permissions_to_check=None):
general_permissions, target_permissions = self.get_permissions(actor, targets, contexts, namespaces, extra_request_data, include_general_permissions=bool(general_permissions_to_check))
actor_has_all_targeted_permissions = False
actor_has_all_general_permissions = False
permissions_check_results = []
if targeted_permissions_to_check:
actor_has_all_targeted_permissions = True
for i, target in enumerate(targets):
target_perms = target_permissions[i]
assert target_perms['target_id'] in (target['old_target']['id'], target['new_target']['id']), (target['old_target']['id'], target['new_target']['id'], target_perms['target_id'])
target_check_result = {
'target_id': target_perms['target_id'],
'actor_has_permissions': set(targeted_permissions_to_check).issubset(target_perms['permissions']),
}
if not target_check_result['actor_has_permissions']:
actor_has_all_targeted_permissions = False
permissions_check_results.append(target_check_result)
if general_permissions_to_check:
actor_has_all_general_permissions = set(general_permissions_to_check).issubset(general_permissions)
return {
'actor_id': actor['id'],
'permissions_check_results': permissions_check_results,
'actor_has_all_general_permissions': actor_has_all_general_permissions,
'actor_has_all_targeted_permissions': actor_has_all_targeted_permissions,
'general_permissions': general_permissions,
'target_permissions': target_permissions,
}
[docs]
def get_permissions(self, actor, targets, contexts, namespaces, extra_request_data=None, include_general_permissions=False):
caps = self._get_capabilities(self.base_path, self._extract_roles(actor['roles']), tuple(namespaces))
general_permissions = set()
if include_general_permissions:
general_permissions = self._get_permissions(actor, [{'old_target': None, 'new_target': None}], contexts, namespaces, extra_request_data, caps)[0]['permissions']
target_permissions = self._get_permissions(actor, targets, contexts, namespaces, extra_request_data, caps)
return general_permissions, target_permissions
def _get_permissions(self, actor, targets, contexts, namespaces, extra_request_data, caps):
EMPTY_TARGET = {'new_target': {'id': '', 'attributes': {}, 'roles': []}, 'old_target': {'id': '', 'attributes': {}, 'roles': []}}
target_permissions = []
for target in targets:
permissions = set()
if not target.get('new_target') and not target.get('old_target'):
target = EMPTY_TARGET
for cap in caps:
if not cap['conditions'] or cap['relation'](self._evaluate_condition(cond, actor, [r.split('&', 1) for r in actor['roles']], target, contexts, namespaces, extra_request_data) for cond in cap['conditions']):
permissions |= set(cap['permissions'])
target_permissions.append({'target_id': target['new_target']['id'] or target['old_target']['id'], 'permissions': permissions})
return target_permissions
def _extract_roles(self, roles):
return tuple(role.split('&', 1)[0] for role in roles)
def _evaluate_condition(self, condition, actor, roles, target, contexts, namespaces, extra_request_data):
cond, params = condition
func = {
'udm:conditions:target_position_from_context': self.udm_conditions_target_position_from_context,
'udm:conditions:target_position_in': self.udm_conditions_target_position_in,
'udm:conditions:target_object_type_equals': self.udm_conditions_target_object_type_equals,
'guardian:builtin:target_is_self': self.target_is_self,
}[cond]
return func(params, {'actor': actor, 'actor_role': roles, 'target': target, 'contexts': contexts, 'namespaces': namespaces, 'extra_args': extra_request_data})
[docs]
def udm_conditions_target_position_from_context(self, params, condition_data):
context_name = params['context']
positions = [
c[1].split(context_name + '=', 1)[-1]
for c in condition_data['actor_role']
if len(c) > 1 and c[1].startswith(context_name)
]
params = {
'position': positions,
'scope': params['scope'],
}
return self.udm_conditions_target_position_in(params, condition_data)
[docs]
def udm_conditions_target_position_in(self, params, condition_data):
"""Checks if the position matches the condition."""
result = []
for target in (condition_data['target']['new_target']['attributes'], condition_data['target']['old_target']['attributes']):
target_dn = target.get('dn')
if target_dn is None:
result.append(False)
continue
scope = params.get('scope', 'base')
pos = params['position']
try:
func = {
"subtree": _check_scope_subtree,
"base": _check_scope_base,
"one": _check_scope_one,
}[scope]
except KeyError:
pass
else:
if not func(target_dn, pos if isinstance(pos, list) else [pos]):
return False
result.append(True)
continue
raise NotImplementedError(f"Scope {scope} not implemented")
return any(result)
[docs]
def udm_conditions_target_object_type_equals(self, params, condition_data):
"""Checks the object type of the target object"""
oc = (condition_data['target']['new_target']['attributes'] or condition_data['target']['old_target']['attributes']).get('objectType')
return oc == params.get('objectType')
[docs]
def udm_conditions_target_property_values_compares(self, params, condition_data):
"""Checks a property matches any certain value in the target object properties"""
def check(operator, value, data):
if operator in ('==-i', '!=-i'):
data, value = data.lower(), value.lower()
operator = operator[:-2]
if operator == '==':
return value == data
if operator == '!=':
return value != data
if operator.startswith('regex'):
operator, flags = (operator[:-2], re.I) if operator.endswith('-i') else (operator, 0)
matched = re.match(value, data, flags) is not None
return matched if operator == 'regex-match' else not matched
if operator.startswith('dn'):
_, _, scope = operator.partition('-')
func = {
"": _check_scope_base,
"subtree": _check_scope_subtree,
"base": _check_scope_base,
"one": _check_scope_one,
}[scope]
return func(value, [data])
prop = params['property']
operator = params['operator']
values = params['values']
for target in (condition_data['target']['new_target']['attributes'], condition_data['target']['old_target']['attributes']):
if not target.get('properties', {}).get(prop):
continue
propval = target['properties'][prop] # FIXME: multivalue
if any(check(operator, values, propval) for value in values):
return True
return False
[docs]
def target_is_self(self, params, condition_data):
field = params.get('field')
if field:
target_attributes = (condition_data['target']['new_target']['attributes'] or condition_data['target']['old_target']['attributes'])
try:
return condition_data['actor']['attributes'][field] == target_attributes[field]
except KeyError:
return False
target_id = (condition_data['target']['new_target']['id'] or condition_data['target']['old_target']['id'])
try:
return condition_data['actor']['id'] and condition_data['actor']['id'] == target_id
except KeyError:
return False
def _check_scope_subtree(position: str, condition_positions: list[str]) -> bool:
"""
Checks if the position is in the subtree of the condition.
>>> _check_scope_subtree('cn=users,dc=base', ['cn=users,dc=base'])
True
>>> _check_scope_subtree('uid=fbest,cn=users,dc=base', ['cn=users,dc=base'])
True
>>> _check_scope_subtree('uid=fbest,cn=foo,cn=users,dc=base', ['cn=users,dc=base'])
True
>>> _check_scope_subtree('dc=base', ['cn=users,dc=base'])
False
>>> _check_scope_subtree('uid=fbest,cn=userz,dc=base', ['cn=users,dc=base'])
False
"""
position = DN(position)
condition_positions = [DN(condition_position) for condition_position in condition_positions]
return any(
position.endswith(condition_position)
for condition_position in condition_positions
)
def _check_scope_base(position: str, condition_positions: list[str]) -> bool:
"""
Checks if the position is in the base of the condition.
>>> _check_scope_base('cn=users,dc=base', ['dc=base'])
False
>>> _check_scope_base('cn=users,dc=base', ['cn=userz,dc=base'])
False
>>> _check_scope_base('cn=users,dc=base', ['cn = users,dc=base'])
True
"""
position = DN(position)
condition_positions = [DN(condition_position) for condition_position in condition_positions]
return position in condition_positions
def _check_scope_one(position: str, condition_positions: list[str]) -> bool:
"""
Checks if the position is in the scope onelevel of the condition.
>>> _check_scope_one('uid=foo,cn=users,dc=base', ['dc=base'])
False
>>> _check_scope_one('uid=foo,cn=users,dc=base', ['cn=userz,dc=base'])
False
>>> _check_scope_one('uid=foo,cn=users,dc=base', ['cn = users,dc=base'])
True
"""
position = DN(position)
condition_positions = [DN(condition_position) for condition_position in condition_positions]
return position.parent in condition_positions
[docs]
class GuardianAuthorizationClient:
[docs]
def check_permissions(self, actor, targets, contexts, namespaces, extra_request_data=None, targeted_permissions_to_check=None, general_permissions_to_check=None):
return {}
[docs]
def get_and_check_permissions(self, actor, targets, contexts, namespaces, extra_request_data=None, targeted_permissions_to_check=None, general_permissions_to_check=None):
permissions = self.get_permissions(actor, targets, contexts, namespaces, extra_request_data)
check = self.check_permissions(actor, targets, contexts, namespaces, extra_request_data=extra_request_data, targeted_permissions_to_check=targeted_permissions_to_check, general_permissions_to_check=general_permissions_to_check)
permissions.update(check)
return permissions
[docs]
def get_permissions(self, actor, targets, contexts, namespaces, extra_request_data=None, include_general_permissions=False):
return {}