Source code for univention.admin.authorization.config

#!/usr/bin/python3
# SPDX-FileCopyrightText: 2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""
A domain specific language (DSL) for UDM access rules
inspired by LDAP ACLs
realized with extended BNF grammar and a LALR (Look-Ahead Left <- Right) Parser.
"""

import copy
import hashlib
import io
import logging
import sys
from pathlib import Path

import lark
import yaml
from lark import Lark, Transformer

import univention.admin.modules
from univention.authorization.config import AuthorizationConfig
from univention.config_registry import ucr


log = logging.getLogger('ACL').getChild(__name__)


UDM_DSL_GRAMMAR = r"""
start: statement+

statement: condition | access_block

condition: "condition" QUOTED_STRING condition_line param_line?

condition_line: "condition=" QUOTED_STRING
param_line: "parameters" kvpair+

access_block: "access" by_line+ to_line*

BY_KEY: "role" | "description"
by_line: "by" by_kvpair+
by_kvpair: NAME "=" value -> by_kvpair

TO_KEY: "objecttype" | "if" | "position" | "name" | "description"
to_line: "to" to_kvlistpair+ grant_line*
to_kvlistpair: NAME "=" valuelist -> to_kvlistpair

GRANT_KEY: "actions" | "properties" | "permission" | "values"
grant_line: "grant" grant_kvlistpair+
grant_kvlistpair: NAME "=" valuelist -> grant_kvlistpair

kvpair: NAME "=" value
value: QUOTED_STRING | NAME

valuelist: QUOTED_STRING | list | NAME

list: "[" [QUOTED_STRING ("," QUOTED_STRING)*] "]"

NAME: /[a-zA-Z_][\w\/\-.,\/]*/
%import common.ESCAPED_STRING -> QUOTED_STRING
%import common.WS
%ignore WS
%ignore /#.*/  // Kommentare
"""

# FIXME: can't reference keys due to NAME
"""
by_kvpair: BY_KEY "=" value -> by_kvpair
to_kvlistpair: TO_KEY "=" valuelist -> to_kvlistpair
grant_kvlistpair: GRANT_KEY "=" valuelist -> grant_kvlistpair
"""

_SCOPES = {
    '': 'base',
    'one': 'one',
    'onelevel': 'one',
    'sub': 'subtree',
    'subtree': 'subtree',
    'base': 'base',
    'children': 'children',
}

_VALUE_OPERATORS = {
    '': '==',
    'equals': '==',
    'not-equals': '!=',
    'equals,i': '==-i',
    'not-equals,i': '!=-i',
    'regex': 'regex-match',
    'not-regex': 'regex-nomatch',
    'regex,i': 'regex-match-i',
    'not-regex,i': 'regex-nomatch-i',
    'dn': 'dn',
    'dn,base': 'dn',
    'dn,subtree': 'dn-subtree',
    'dn,one': 'dn-one',
    'dn,children': 'dn-children',
}

BUNDLE_NAMESPACE = 'udm:bundles'
ACTIONS = ('search', 'read', 'create', 'modify', 'rename', 'remove', 'move', 'report-create')
PERMISSIONS = ('search', 'read', 'write', 'readonly', 'writeonly', 'none')
SORT_PRIO = {
    'actions': {v: k for k, v in [*list(enumerate(ACTIONS)), [len(ACTIONS), '*']]},
    'permissions': {v: k for k, v in [*list(enumerate(PERMISSIONS)), [len(PERMISSIONS), '*']]},
}


[docs] class DSLSyntaxError(SyntaxError): pass
class _DSLTransformer(Transformer): """Transformer for the UDM DSL""" def __init__(self, filename, *args, strict=False, **kwargs): self.__filename = filename self.__strict = strict self.__unique_names = set() super().__init__(*args, **kwargs) def start(self, items): data = {'conditions': [], 'rules': []} for all_items in items: for item in all_items: if item['type'] == 'condition': data['conditions'].append(item) elif item['type'] == 'access': data['rules'].append(item) else: raise DSLSyntaxError('unknown type', (self.__filename, 0, 0, item['type'])) item.pop('type') return data def statement(self, items): return items def condition(self, items): name = items[0] cond = items[1] parameters = items[2] if len(items) > 2 else {} return { 'type': 'condition', 'name': name, 'condition': cond, 'parameters': parameters, } def condition_line(self, items): return items[0] def param_line(self, items): return dict(items) def access_block(self, items): by_blocks = [] to_blocks = [] meta = {} for item in items: if item.get('type') == 'by': by_blocks.append(item['by']) meta = item['meta'] elif item['type'] == 'to': to_blocks.append(item['to']) else: raise DSLSyntaxError('unknown type', (self.__filename, 0, 0, item['type'])) return { 'type': 'access', 'by': by_blocks, 'to': to_blocks, **meta, } def by_line(self, items): meta = dict(items) by = {'role': meta.pop('role')} self._assert_names('by', meta, {'description'}) self._assert_names('by', by, {'role'}) if by['role'].count(':') != 2: raise DSLSyntaxError('role: must contain two ":"', (self.__filename, 0, 0, by['role'])) return { 'type': 'by', 'by': by, 'meta': meta, } def to_line(self, items): current_to = {} current_to['grant'] = [] for item in items: if isinstance(item, tuple): current_to[item[0]] = item[1] elif isinstance(item, dict): # grant_line if current_to['grant'] is None: raise ValueError("'to' without preceding 'grant'") current_to['grant'].append(item) self._assert_names('to', current_to, {'grant', 'objecttype', 'if', 'position', 'name', 'description'}) object_type = current_to.get('objecttype') if not object_type: raise DSLSyntaxError('objecttype required', (self.__filename, 0, 0, repr(items))) if '/' not in object_type and object_type != '*': raise DSLSyntaxError('invalid objecttype', (self.__filename, 0, 0, object_type)) if name := current_to.get('name'): if name in self.__unique_names: raise DSLSyntaxError('duplicated name', (self.__filename, 0, 0, current_to['name'])) self.__unique_names.add(name) mod = univention.admin.modules.get(object_type) if object_type != '*' and not mod: if self.__strict: raise DSLSyntaxError(f'Object type {object_type} unknown!', (self.__filename, 0, 0, object_type)) print(f'Warning: No object type {object_type!r} exists.', file=sys.stderr) for grant in current_to.get('grant', []): for prop in grant.get('properties', []): if prop != '*' and (not mod or not mod.property_descriptions.get(prop)): if self.__strict: raise DSLSyntaxError(f'Property {object_type}:{prop} unknown!', (self.__filename, 0, 0, f'{object_type}:{prop}')) print(f'Warning: No property {prop!r} for {object_type!r} exists. Assuming it is an extended attribute.', file=sys.stderr) return { 'type': 'to', 'to': current_to, } def grant_line(self, items): grant = dict(items) self._assert_names('grant', grant, {'actions', 'properties', 'permission', 'values'}) if ('permission' not in grant and 'actions' not in grant) or set(grant) & {'actions', 'permission'} == {'actions', 'permission'}: raise DSLSyntaxError('invalid "grant": requires only one of actions or permission', (self.__filename, 0, 0, '')) if 'permission' in grant: self._assert_names('permission', set(grant['permission'].split(',')), {*PERMISSIONS, '*'}) return grant def by_kvpair(self, items): return self.kvpair(items) def to_kvlistpair(self, items): return self.kvlistpair(items) def grant_kvlistpair(self, items): return self.kvlistpair(items) def kvpair(self, items): key, value = items return (str(key), value) def kvlistpair(self, items): key, (value,) = items key, _, operator = str(key).partition('.') if key in ('values', 'position'): value = (operator, value) if key == 'position': self._assert_names('position.scope', {operator}, set(_SCOPES)) elif key == 'values': self._assert_names('values.operator', {operator}, set(_VALUE_OPERATORS)) if key in {'actions', 'properties'} and isinstance(value, str): value = [v.strip() for v in value.split(',')] if key == 'actions': self._assert_names('actions', set(value), {*ACTIONS, '*'}) return (key, value) def value(self, items): return items[0] def valuelist(self, items): return items def list(self, items): return items def QUOTED_STRING(self, s): return s[1:-1] # remove quotes def NAME(self, s): return str(s) def __default__(self, data, children, meta): # noqa: PLW3201 if not data.startswith('__'): log.error('UNHANDLED RULE: %s', data) return super().__default__(data, children, meta) def _assert_names(self, name, obj, names): if set(obj) - names: invalid = ','.join(set(obj) - names) raise DSLSyntaxError(f'unknown {name!r}: {invalid!r}', (self.__filename, 0, 0, invalid)) @staticmethod def compose(parsed): result = io.StringIO() to_items = {'grant', 'objecttype', 'if', 'position'} def _v(k, v): if isinstance(v, list): sv = sorted(v, key=lambda x: SORT_PRIO.get(k, {}).get(x, x)) return f'{k}="{",".join(sv)}"' if isinstance(v, str): return f'{k}="{v}"' if v[0]: return f'{k}.{v[0]}="{v[1]}"' return f'{k}="{v[1]}"' def _kv(items, restricted=None): return ' '.join( _v(k, v) for k, v in items.items() if v is not None and (not restricted or k in restricted) ) for cond in parsed['conditions']: params = ' parameters %s' % _kv(cond['parameters']) if cond.get('parameters') else '' print(f'condition "{cond["name"]}"\n condition="{cond["condition"]}"\n{params}\n', file=result) for rule in parsed['rules']: by = rule.pop('by') to = rule.pop('to') roles = ' by %s' % _kv(by[0]) if len(by) == 1 else '\n ' + '\n '.join('by %s' % _kv(r) for r in sorted(by, key=lambda x: x['role'])) params = '\n %s' % _kv(rule) if rule else '' print(file=result) print(f'access{roles}{params}', file=result) for to_clause in to: print(file=result) grants = to_clause.pop('grant') print(' to %s' % _kv(to_clause, to_items), file=result) if set(to_clause) - to_items: print(' %s\n' % _kv(to_clause, set(to_clause) - to_items), file=result) for grant in grants: print(' grant %s' % _kv(grant), file=result) return result.getvalue().strip()
[docs] class UDMAuthorizationConfig: """UDM specific DSL""" def __init__(self, filename, *, strict=False): self.filename = Path(filename) self.parser = Lark(UDM_DSL_GRAMMAR, parser='lalr', transformer=_DSLTransformer(str(self.filename), strict=strict))
[docs] def parse(self): univention.admin.modules.update() try: self.parsed = self.parser.parse(self.filename.read_text()) except lark.exceptions.LarkError as exc: raise DSLSyntaxError(str(exc)) from exc
[docs] def compose(self): return _DSLTransformer.compose(copy.deepcopy(self.parsed))
[docs] def to_yaml(self): univention.admin.modules.update() all_modules = list(univention.admin.modules.modules) conf = AuthorizationConfig(self.filename.with_suffix('.yaml')) for cond in self.parsed['conditions']: conf.conditions[cond['name']] = {cond['condition']: cond['parameters']} for rule in self.parsed['rules']: by = rule.get('by') to = rule.get('to') bundle_name = '--'.join(sorted(role['role'].rsplit(':', 1)[-1] for role in by)) bundle_name = self._unique(conf.capability_bundles.setdefault(BUNDLE_NAMESPACE, {}), bundle_name) cap_bundle_string = f'{BUNDLE_NAMESPACE}:{bundle_name}' bundle = conf.capability_bundles.setdefault(BUNDLE_NAMESPACE, {}).setdefault(bundle_name, []) for role in by: role_namespace, _, role_name = role['role'].rpartition(':') # create one role capability mapping for each role and assign one capability bundle, where all capabilities are added role_cap_map = conf.role_capability_mapping.setdefault(role_namespace, {}).setdefault(role_name, { 'permissions': [], 'capabilities': [], 'capability-bundles': [], }) role_cap_map['displayname'] = rule.get('description', '') if cap_bundle_string not in role_cap_map['capability-bundles']: role_cap_map['capability-bundles'].append(cap_bundle_string) wildcard_object_type = False expanded_to_clauses = [] for to_clause in to: object_type = to_clause['objecttype'] if object_type == '*': wildcard_object_type = True for oc in all_modules: new_to_clase = copy.deepcopy(to_clause) new_to_clase['objecttype'] = oc expanded_to_clauses.append(new_to_clase) else: expanded_to_clauses.append(to_clause) for to_clause in expanded_to_clauses: object_type = to_clause['objecttype'] grants = to_clause.get('grant', []) # create a capability and assign it to the capability budle capability_namespace = f'udm:{object_type}' capability_name = self._unique(conf.capabilities.get(capability_namespace, {}), bundle_name) capability_string = f'{capability_namespace}:{capability_name}' bundle.append(capability_string) cap = conf.capabilities.setdefault(capability_namespace, {}).setdefault(capability_name, { 'displayname': to_clause.get('displayname', ''), 'grants-permissions': [], 'conditions': {'AND': []}, }) conditions = cap['conditions']['AND'] # create a permission set for each capibility and assign it to the capability psetname = to_clause.get('name') if psetname and wildcard_object_type: psetname = f'{psetname}-{object_type.replace("/", "-")}' psetname = psetname or f'{object_type.replace("/", "-")}-{capability_name}-all' psetname = self._unique(conf.permission_sets, psetname) # assert psetname not in conf.permission_sets, psetname permissions = set() cap['grants-permissions'].append(psetname) for prop in grants: # grant given actions if 'properties' not in prop: actions = set(prop.get('actions', [])) if '*' in actions: actions.update(set(ACTIONS)) actions.remove('*') elif 'read' in actions: actions.add('search') permissions.update({ f'udm:{object_type}:{action}' for action in sorted(actions) }) continue # grant given properties perms = set(prop['permission'].split(',')) if isinstance(prop['permission'], str) else set(prop['permission']) # perms = {prop['permission']} if '*' in perms: perms.update(set(PERMISSIONS)) perms.remove('*') if 'read' in perms: perms.add('search') if 'write' in perms: perms.add('read') perms.add('write') perms.add('search') permissions.update({ f'udm:{object_type}:{perm}-property-{propname}' for propname in prop['properties'] for perm in perms }) for propname in sorted(prop['properties']): operator, values = prop.get('values', [None, None]) if not values: continue if len(grants) != 1: raise RuntimeError('Security warning: Value based checks must create exactly only one capability (to block)!') if len(prop['properties']) != 1: raise RuntimeError('Security warning: Value based checks must check only one property!') if prop['permission'] == 'write' or 'write' in perms: raise RuntimeError('Security warning: Value based checks most likely should not add write permissions, design it the opposite way!') operator = _VALUE_OPERATORS.get(operator, '==') val_condition = self._unique(conf.conditions, f'{object_type.replace("/", "-")}-{propname}-values-{operator}', values='||'.join(sorted(values))) conditions.append(val_condition) conf.conditions[val_condition] = { 'udm:conditions:target_property_value_compares': { 'property': propname, 'operator': operator, 'values': values, }, } conf.permission_sets.setdefault(psetname, []).extend(sorted(permissions)) # restrict capability to conditions if object_type != '*': ot_condition = f'object-type-is-{object_type.replace("/", "-")}' cap['conditions'].setdefault('AND', []).append(ot_condition) conf.conditions[ot_condition] = { 'udm:conditions:target_object_type_equals': { 'objectType': object_type, }, } scope, position = to_clause.get('position', [None, None]) scope = _SCOPES.get(scope, 'base') if position and position.startswith('context='): _, _, context = position.partition('context=') if context == 'udm:contexts:position': pos_condition = self._unique(conf.conditions, 'position-from-context', scope=scope, context=context) conditions.append(pos_condition) conf.conditions[pos_condition] = { 'udm:conditions:target_position_from_context': { 'context': context, 'scope': scope, }, } elif position: position = position.format(**ucr) pos_condition = self._unique(conf.conditions, 'position', scope=scope, position=position) conditions.append(pos_condition) conf.conditions[pos_condition] = { 'udm:conditions:target_position_in': { 'position': position, 'scope': scope, }, } if not conditions: cap['conditions'].pop('AND') if to_clause.get('if'): conditions.append(to_clause['if']) return yaml.dump(conf.compose())
def _unique(self, parent, string, **unique): if unique: hash_ = hashlib.sha1('-'.join(sorted(unique.values())).encode()).hexdigest()[:8] else: if string not in parent: return string hash_ = str(len(parent)) return f'{string}-{hash_}'
if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('--config') parser.add_argument('--compose', action='store_true') parser.add_argument('--convert', action='store_true') parser.add_argument('--unstrict', action='store_true') args = parser.parse_args() conf = UDMAuthorizationConfig(args.config, strict=not args.unstrict) conf.parse() if args.compose: print(conf.compose()) if args.convert: print(conf.to_yaml())