#!/usr/bin/python3
#
# Univention Directory Manager
# REST API client
#
# SPDX-FileCopyrightText: 2019-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""
Sample Client for the UDM REST API.
>>> from univention.admin.rest.client import UDM
>>> uri = 'http://localhost/univention/udm/'
>>> udm = UDM.http(uri, 'Administrator', 'univention')
>>> module = udm.get('users/user')
>>> print('Found {}'.format(module))
>>> obj = next(module.search())
>>> if obj:
>>> obj = obj.open()
>>> print('Object {}'.format(obj))
"""
from __future__ import annotations
import copy
import http.client
import time
from typing import TYPE_CHECKING, Any, Self
import requests
import uritemplate
if TYPE_CHECKING:
from collections.abc import Callable, Iterator, Mapping
http.client._MAXHEADERS = 1000
[docs]
class HTTPError(Exception):
"""Generic HTTP Error."""
def __init__(self, code: int, message: str, response: requests.Response | None, error_details: dict | None = None) -> None:
self.code = code
self.response = response
self.error_details = error_details
super().__init__(message)
[docs]
class BadRequest(HTTPError):
"""A 400 Bad Request error."""
[docs]
class Unauthorized(HTTPError):
"""A 401 Unauthorized error."""
[docs]
class Forbidden(HTTPError):
"""A 403 Forbidden error."""
[docs]
class NotFound(HTTPError):
"""A 404 Not Found error."""
[docs]
class PreconditionFailed(HTTPError):
"""A 412 Precondition Failed error."""
[docs]
class UnprocessableEntity(HTTPError):
"""A 422 Unprocessable Entity error."""
[docs]
class ServerError(HTTPError):
"""A 500 Internal Server error."""
[docs]
class ServiceUnavailable(HTTPError):
"""A 503 Service Unavailable error."""
[docs]
class ConnectionError(Exception):
"""A HTTP Connection error."""
[docs]
class UnexpectedResponse(ConnectionError):
"""A unexpected response payload error (e.g. not JSON)."""
class _NoRelation(Exception):
pass
[docs]
class Response: # noqa: B903
"""Response wrapper."""
def __init__(self, response: requests.Response, data: Any, uri: str) -> None:
self.response = response
self.data = data
self.uri = uri
[docs]
class Session:
"""A session holding credentials and language settings for a client."""
def __init__(self, credentials: UDM, language: str = 'en-US', reconnect: bool = True, user_agent: str = 'univention.lib/1.0', enable_caching: bool = False) -> None:
self.language = language
self.credentials = credentials
self.reconnect = reconnect
self.user_agent = user_agent
self.enable_caching = enable_caching
self.default_headers = {
'Accept': 'application/hal+json; q=1, application/json; q=0.9; text/html; q=0.2, */*; q=0.1',
'Accept-Language': self.language,
'User-Agent': self.user_agent,
}
self.session = self.create_session()
[docs]
def create_session(self) -> requests.Session:
sess = requests.session()
if self.credentials.bearer_token:
sess.headers['Authorization'] = 'Bearer %s' % (self.credentials.bearer_token,)
else:
sess.auth = (self.credentials.username, self.credentials.password)
if not self.enable_caching:
return sess
try:
from cachecontrol import CacheControl
except ImportError:
pass
else:
sess = CacheControl(sess)
return sess
[docs]
def get_method(self, method: str) -> Callable[..., requests.Response]:
sess = self.session
return {
'GET': sess.get,
'POST': sess.post,
'PUT': sess.put,
'DELETE': sess.delete,
'PATCH': sess.patch,
'OPTIONS': sess.options,
}.get(method.upper(), sess.get)
[docs]
def request(self, method: str, uri: str, data: dict | None = None, expect_json: bool = False, **headers: str) -> Any:
return self.make_request(method, uri, data, expect_json=expect_json, **headers).data # type: ignore # <https://github.com/python/mypy/issues/10008>
[docs]
def make_request(self, method: str, uri: str, data: dict | None = None, expect_json: bool = False, allow_redirects: bool = True, custom_redirect_handling: bool = False, **headers: str) -> Response:
if method in ('GET', 'HEAD'):
params = data
json = None
else:
params = None
json = data
def doit() -> Response:
try:
response = self.get_method(method)(uri, params=params, json=json, headers=dict(self.default_headers, **headers), allow_redirects=allow_redirects)
except requests.exceptions.ConnectionError as exc:
raise ConnectionError(exc)
if custom_redirect_handling:
response = self._follow_redirection(response)
data = self.eval_response(response, expect_json=expect_json)
return Response(response, data, uri)
for _i in range(5):
try:
return doit()
except ServiceUnavailable as exc: # TODO: same for ConnectionError? python-request does it itself.
if not self.reconnect:
raise
try:
assert exc.response is not None
retry_after = min(5, int(exc.response.headers.get('Retry-After', 1)))
except ValueError:
retry_after = 1
time.sleep(retry_after)
return doit()
def _follow_redirection(self, response: Response) -> Response:
location = response.headers.get('Location')
# python-requests doesn't follow redirects for 202
if location and response.status_code in (201, 202):
response = self.make_request('GET', location, allow_redirects=False).response
# prevent allow_redirects because it does not wait Retry-After time causing a break up after 30 fast redirections
while 300 <= response.status_code <= 399 and 'Location' in response.headers:
location = response.headers['Location']
if response.headers.get('Retry-After', '').isdigit():
time.sleep(min(30, max(0, int(response.headers['Retry-After']))))
response = self.make_request(self._select_method(response), location, allow_redirects=False).response
return response
def _select_method(self, response: Response) -> str:
if response.status_code in (300, 301, 303) and response.request.method != 'HEAD':
return 'GET'
return response.request.method
[docs]
def eval_response(self, response: requests.Response, expect_json: bool = False) -> Any:
if response.status_code >= 399:
msg = f'{response.request.method} {response.url}: {response.status_code}'
error_details = None
try:
json = response.json()
except ValueError:
pass
else:
if isinstance(json, dict):
error_details = json.get('error', {})
try:
error_details['error'] = list(self.resolve_relations(json, 'udm:error'))
except _NoRelation:
pass
if error_details:
server_message = error_details.get('message')
# traceback = error_details.get('traceback')
if server_message:
msg += f'\n{server_message}'
errors = {400: BadRequest, 404: NotFound, 403: Forbidden, 401: Unauthorized, 412: PreconditionFailed, 422: UnprocessableEntity, 500: ServerError, 503: ServiceUnavailable}
cls = HTTPError
cls = errors.get(response.status_code, cls)
raise cls(response.status_code, msg, response, error_details=error_details)
if response.headers.get('Content-Type') in ('application/json', 'application/hal+json'):
return response.json()
elif expect_json:
raise UnexpectedResponse(response.text)
if response.status_code == 204:
return {}
return response.text
[docs]
def get_relations(self, entry: dict, relation: str, name: str | None = None, template: dict[str, Any] | None = None) -> Iterator[dict[str, str]]:
links = copy.deepcopy(entry.get('_links', {}))
links = links.get(relation, [None])
links = links if links and isinstance(links, list) else [links]
links = [link for link in links if isinstance(link, dict) and (not name or link.get('name') == name)]
for link in sorted(links, key=lambda x: not x.get('templated', False) if template else x.get('templated', False)):
if link.get('deprecation'):
pass # TODO: log warning
if link.get('templated'):
link['href'] = uritemplate.expand(link['href'], template)
yield link
[docs]
def get_relation(self, entry: dict, relation: str, name: str | None = None, template: dict[str, Any] | None = None) -> dict[str, str]:
rel = next(self.get_relations(entry, relation, name, template), None)
if rel is None:
raise _NoRelation(relation)
return rel
[docs]
def resolve_relations(self, entry: dict, relation: str, name: str | None = None, template: dict[str, Any] | None = None) -> Iterator[Any]:
embedded = entry.get('_embedded', {})
if isinstance(embedded, dict) and relation in embedded:
yield from embedded[relation]
return
for rel in self.get_relations(entry, relation, name, template):
yield self.make_request('GET', rel['href']).data
[docs]
def resolve_relation(self, entry: dict, relation: str, name: str | None = None, template: dict[str, Any] | None = None) -> Any:
rel = next(self.resolve_relations(entry, relation, name, template), None)
if rel is None:
raise _NoRelation(relation)
return rel
[docs]
class Client: # noqa: B903
"""Abstract client base class."""
def __init__(self, client: Session) -> None:
self.client = client
[docs]
class UDM(Client):
"""Univention Directory Manager client."""
[docs]
@classmethod
def http(cls, uri: str, username: str, password: str) -> Self:
return cls(uri, username, password)
[docs]
@classmethod
def bearer(cls, uri: str, bearer_token: str) -> Self:
return cls(uri, None, None, bearer_token=bearer_token)
def __init__(self, uri: str, username: str, password: str, *args: Any, **kwargs: Any) -> None:
self.uri = uri
self.username = username
self.password = password
self.bearer_token = kwargs.pop('bearer_token', None)
self._api_version: str | None = None
self.entry: Any = None
super().__init__(Session(self, *args, **kwargs))
[docs]
def load(self) -> None:
# FIXME: use HTTP caching instead of memory caching
if self.entry is None:
self.reload()
[docs]
def reload(self) -> None:
self.entry = self.client.request('GET', self.uri, expect_json=True)
[docs]
def get_ldap_base(self) -> str | None:
self.load()
return Object.from_data(self, self.client.resolve_relation(self.entry, 'udm:ldap-base')).dn
[docs]
def modules(self, name: str | None = None) -> Iterator[Module]:
self.load()
for module in self.client.resolve_relations(self.entry, 'udm:object-modules'):
for module_info in self.client.get_relations(module, 'udm:object-types', name):
yield Module(self, module_info['href'], module_info['name'], module_info['title'])
[docs]
def version(self, api_version: str) -> Self:
self._api_version = api_version
return self
[docs]
def obj_by_dn(self, dn: str) -> Object:
self.load()
return Object.from_data(self, self.client.resolve_relation(self.entry, 'udm:object/get-by-dn', template={'dn': dn}))
[docs]
def obj_by_uuid(self, uuid: str) -> Object:
self.load()
return Object.from_data(self, self.client.resolve_relation(self.entry, 'udm:object/get-by-uuid', template={'uuid': uuid}))
[docs]
def get(self, name: str) -> Module | None:
for module in self.modules(name):
return module
return None
[docs]
def get_object(self, object_type: str, dn: str) -> Object | None:
mod = self.get(object_type)
assert mod
obj = mod.get(dn)
return obj
def __repr__(self) -> str:
return f'UDM(uri={self.uri!r}, username={self.username!r}, password=***)'
[docs]
class Module(Client):
"""A UDM module representation."""
def __init__(self, udm: UDM, uri: str, name: str, title: str, *args: Any, **kwargs: Any) -> None:
super().__init__(udm.client, *args, **kwargs)
self.udm = udm
self.uri = uri
self.username = udm.username
self.password = udm.password
self.name = name
self.title = title
self.relations: dict = {}
[docs]
def load_relations(self) -> None:
if self.relations:
return
self.relations = self.client.request('GET', self.uri)
def __repr__(self) -> str:
return f'Module(uri={self.uri!r}, name={self.name!r})'
[docs]
def new(self, position: str | None = None, superordinate: str | None = None, template: dict[str, Any] | None = None) -> Object:
self.load_relations()
data = {'position': position, 'superordinate': superordinate, 'template': template}
resp = self.client.resolve_relation(self.relations, 'create-form', template=data)
return Object.from_data(self.udm, resp)
[docs]
def get(self, dn: str, properties: list[str] | None = None) -> Object | None:
# TODO: use a link relation instead of a search
for obj in self._search_closed(position=dn, scope='base', properties=properties):
return obj.open()
raise NotFound(404, 'Wrong object type!?', None) # FIXME: object exists but is of different module. should be fixed on the server.
[docs]
def get_by_entry_uuid(self, uuid: str, properties: list[str] | None = None) -> Object | None:
# TODO: use a link relation instead of a search
# return self.udm.get_by_uuid(uuid)
for obj in self._search_closed(filter={'entryUUID': uuid}, scope='base', properties=properties):
return obj.open()
raise NotFound(404, 'Wrong object type!?', None) # FIXME: object exists but is of different module. should be fixed on the server.
[docs]
def get_by_id(self, dn: str, properties: list[str] | None = None) -> Object | None:
# TODO: Needed?
raise NotImplementedError()
[docs]
def search(self, filter: dict[str, str] | str | bytes | None = None, position: str | None = None, scope: str | None = 'sub', hidden: bool = False, superordinate: str | None = None, opened: bool = False, properties: list[str] | None = None) -> Iterator[Any]:
if opened:
return self._search_opened(filter, position, scope, hidden, superordinate, properties)
else:
return self._search_closed(filter, position, scope, hidden, superordinate, properties)
def _search_opened(self, filter: dict[str, str] | str | bytes | None = None, position: str | None = None, scope: str | None = 'sub', hidden: bool = False, superordinate: str | None = None, properties: list[str] | None = None) -> Iterator[Object]:
for obj in self._search(filter, position, scope, hidden, superordinate, True, properties):
yield Object.from_data(self.udm, obj) # NOTE: this is missing last-modified, therefore no conditional request is done on modification!
def _search_closed(self, filter: dict[str, str] | str | bytes | None = None, position: str | None = None, scope: str | None = 'sub', hidden: bool = False, superordinate: str | None = None, properties: list[str] | None = None) -> Iterator[ShallowObject]:
for obj in self._search(filter, position, scope, hidden, superordinate, False, properties):
objself = self.client.get_relation(obj, 'self')
uri = objself['href']
dn = objself['name']
yield ShallowObject(self.udm, dn, uri)
def _search(self, filter: dict[str, str] | str | bytes | None = None, position: str | None = None, scope: str | None = 'sub', hidden: bool = False, superordinate: str | None = None, opened: bool = False, properties: list[str] | None = None) -> Iterator[Any]:
data = {
'position': position,
'scope': scope,
'hidden': '1' if hidden else '0',
}
if isinstance(filter, dict):
for prop, val in filter.items():
data.setdefault('query', {})[f'query[{prop}]'] = val
elif isinstance(filter, str):
data['filter'] = filter
if superordinate:
data['superordinate'] = superordinate
if not opened:
data['opened'] = '0'
data['properties'] = ['dn']
if properties:
data['properties'] = properties
self.load_relations()
entries = self.client.resolve_relation(self.relations, 'search', template=data)
yield from self.client.resolve_relations(entries, 'udm:object')
[docs]
def get_layout(self) -> Any | None:
self.load_relations()
return self.udm.client.resolve_relation(self.relations, 'udm:layout').get('layout')
[docs]
def get_properties(self) -> Any | None:
self.load_relations()
return self.udm.client.resolve_relation(self.relations, 'udm:properties').get('properties')
[docs]
def get_property_choices(self, property: str) -> Any | None:
self.load_relations()
relations = self.udm.client.resolve_relation(self.relations, 'udm:properties')
return self.udm.client.resolve_relation(relations, 'udm:property-choices', name=property).get('choices')
[docs]
def policy_result(self, policy_module: str, position: str, policy: str | None = None) -> dict:
self.load_relations()
policy_result = self.udm.client.resolve_relation(self.relations, 'udm:policy-result', name=policy_module, template={'position': position, 'policy': policy})
policy_result.pop('_links', None)
policy_result.pop('_embedded', None)
return policy_result
[docs]
def get_report_types(self) -> list[str]:
self.load_relations()
return [x['name'] for x in self.udm.client.get_relations(self.relations, 'udm:report', template={'dn': ''}) if x.get('name')]
[docs]
def create_report(self, report_type: str, object_dns: list[str]) -> Any:
self.load_relations()
return self.udm.client.resolve_relation(self.relations, 'udm:report', name=report_type, template={'dn': object_dns})
[docs]
class ShallowObject(Client):
"""A reference to an UDM object, which is not recevied from server yet."""
def __init__(self, udm: UDM, dn: str | None, uri: str, *args: Any, **kwargs: Any) -> None:
super().__init__(udm.client, *args, **kwargs)
self.dn = dn
self.udm = udm
self.uri = uri
[docs]
def open(self) -> Object:
return Object.from_response(self.udm, self.client.make_request('GET', self.uri))
def __repr__(self) -> str:
return f'ShallowObject(dn={self.dn!r})'
[docs]
class References:
# """Descriptor that provides access to related UDM objects."""
def __init__(self, obj: Object | None = None) -> None:
self.obj = obj
self.udm = self.obj.udm if self.obj is not None else None
def __getitem__(self, item: str) -> list[ShallowObject]:
assert self.obj
assert self.udm
return [
ShallowObject(self.obj.udm, x['name'], x['href'])
for x in self.udm.client.get_relations(self.obj.hal, f'udm:object/property/reference/{item}')
]
def __getattribute__(self, key):
try:
return super().__getattribute__(key)
except AttributeError:
return self[key]
def __get__(self, obj: Any, cls: type | None = None) -> References:
"""Return a new References bound to the given object."""
return type(self)(obj)
[docs]
class Object(Client):
"""A UDM object with related references."""
objects = References()
"""Descriptor that provides access to related UDM objects."""
@property
def module(self):
# FIXME: use "type" relation link
# object_type = self.udm.get_relation(self.hal, 'type')['href']
return self.udm.get(self.object_type)
@property
def object_type(self) -> str:
return self.representation['objectType']
@property
def dn(self) -> str | None:
return self.representation.get('dn')
@property
def properties(self):
return self.representation['properties']
@property
def options(self) -> dict:
return self.representation.get('options', {})
@property
def policies(self) -> dict:
return self.representation.get('policies', {})
@property
def superordinate(self) -> str | None:
return self.representation.get('superordinate')
@superordinate.setter
def superordinate(self, superordinate: str) -> None:
self.representation['superordinate'] = superordinate
@property
def position(self) -> str | None:
return self.representation.get('position')
@position.setter
def position(self, position: str) -> None:
self.representation['position'] = position
@property
def uri(self) -> str | None:
try:
uri = self.client.get_relation(self.hal, 'self')
except _NoRelation:
uri = None
if uri:
return uri['href']
return self.representation.get('uri')
[docs]
@classmethod
def from_response(cls, udm: UDM, response: Response) -> Object:
return cls.from_data(udm, response.data, response.response.headers)
[docs]
@classmethod
def from_data(cls, udm: UDM, entry: dict, headers: Mapping[str, str] | None = None) -> Object:
headers = headers or {}
return cls(udm, entry, etag=headers.get('Etag'), last_modified=headers.get('Last-Modified'))
def __init__(self, udm: UDM, representation: dict, etag: str | None = None, last_modified: str | None = None, *args: Any, **kwargs: Any) -> None:
super().__init__(udm.client, *args, **kwargs)
self.udm = udm
self.representation = representation
self.hal = {
'_links': representation.pop('_links', {}),
'_embedded': representation.pop('_embedded', {}),
}
self.etag = etag
self.last_modified = last_modified
def __repr__(self) -> str:
return f'Object(module={self.object_type!r}, dn={self.dn!r}, uri={self.uri!r})'
[docs]
def reload(self) -> None:
uri = self.client.get_relation(self.hal, 'self')
if uri:
obj = ShallowObject(self.udm, self.dn, uri['href']).open()
else:
obj = self.module.get(self.dn)
self._copy_from_obj(obj)
[docs]
def save(self, reload: bool = True) -> Response:
if self.dn:
return self._modify(reload)
else:
return self._create(reload)
[docs]
def json_patch(self, patch: dict, reload: bool = True) -> Response:
if self.dn:
return self._patch(patch, reload=reload)
else:
uri = self.client.get_relation(self.hal, 'create')
return self._request('POST', uri['href'], patch, {'Content-Type': 'application/json-patch+json'})
[docs]
def delete(self, remove_referring: bool = False) -> bytes:
assert self.uri
headers = {key: value for key, value in {
'If-Unmodified-Since': self.last_modified,
'If-Match': self.etag,
}.items() if value}
return self.client.request('DELETE', self.uri, **headers) # type: ignore # <https://github.com/python/mypy/issues/10008>
[docs]
def move(self, position: str, reload: bool = True) -> None:
self.position = position
self.save(reload=reload)
def _modify(self, reload: bool = True) -> Response:
assert self.uri
headers = {key: value for key, value in {
'If-Unmodified-Since': self.last_modified,
'If-Match': self.etag,
}.items() if value}
return self._request('PUT', self.uri, self.representation, headers, reload=reload)
def _patch(self, data: dict, reload: bool = True) -> Response:
assert self.uri
headers = {key: value for key, value in {
'If-Unmodified-Since': self.last_modified,
'If-Match': self.etag,
'Content-Type': 'application/json-patch+json',
}.items() if value}
return self._request('PATCH', self.uri, data, headers, reload=reload)
def _create(self, reload: bool = True) -> Response:
uri = self.client.get_relation(self.hal, 'create')
return self._request('POST', uri['href'], self.representation, {}, reload=reload)
def _request(self, method: str, uri: str, data: dict, headers: dict, reload: bool = True) -> Response:
response = self.client.make_request(method, uri, data=data, allow_redirects=False, custom_redirect_handling=True, **headers) # type: ignore # <https://github.com/python/mypy/issues/10008>
self._reload_from_response(response, reload)
return response
def _reload_from_response(self, response: Response, reload: bool) -> None:
if reload and 200 <= response.response.status_code <= 299 and 'Location' in response.response.headers:
uri = response.response.headers['Location']
obj = ShallowObject(self.udm, None, uri)
self._copy_from_obj(obj.open())
return
if response.response.status_code == 200:
# the response already contains a new representation
self._copy_from_obj(Object.from_response(self.udm, response))
return
if reload:
self.reload()
def _copy_from_obj(self, obj: Object) -> None:
self.udm = obj.udm
self.representation = copy.deepcopy(obj.representation)
self.hal = copy.deepcopy(obj.hal)
self.etag = obj.etag
self.last_modified = obj.last_modified
[docs]
def generate_service_specific_password(self, service: str) -> Any | None:
uri = self.client.get_relation(self.hal, 'udm:service-specific-password')['href']
response = self.client.make_request('POST', uri, data={"service": service})
return response.data.get('password', None)
[docs]
def get_layout(self) -> Any | None:
return self.udm.client.resolve_relation(self.hal, 'udm:layout').get('layout')
[docs]
def get_properties(self) -> Any | None:
return self.udm.client.resolve_relation(self.hal, 'udm:properties').get('properties')
[docs]
def get_property_choices(self, property: str) -> Any | None:
hal = self.udm.client.resolve_relation(self.hal, 'udm:properties')
return self.udm.client.resolve_relation(hal, 'udm:property-choices', name=property).get('choices')
[docs]
def policy_result(self, policy_module: str, policy: str | None = None) -> dict:
policy_result = self.udm.client.resolve_relation(self.hal, 'udm:policy-result', name=policy_module, template={'policy': policy})
policy_result.pop('_links', None)
policy_result.pop('_embedded', None)
return policy_result
[docs]
class PatchDocument:
"""application/json-patch+json representation"""
def __init__(self):
self.patch = []
[docs]
def add(self, path_segments, value):
self.patch.append({
'op': 'add',
'path': self.expand_path(path_segments),
'value': value,
})
[docs]
def replace(self, path_segments, value):
self.patch.append({
'op': 'replace',
'path': self.expand_path(path_segments),
'value': value,
})
[docs]
def remove(self, path_segments, value):
self.patch.append({
'op': 'remove',
'path': self.expand_path(path_segments),
'value': value, # TODO: not official
})
[docs]
def move(self, path_segments, from_segments):
self.patch.append({
'op': 'move',
'path': self.expand_path(path_segments),
'from': self.expand_path(from_segments),
})
[docs]
def copy(self, path_segments, from_segments):
self.patch.append({
'op': 'copy',
'path': self.expand_path(path_segments),
'from': self.expand_path(from_segments),
})
[docs]
def test(self, path_segments, value):
self.patch.append({
'op': 'test',
'path': self.expand_path(path_segments),
'value': value,
})
[docs]
def expand_path(self, path_segments):
return '/'.join(path.replace('~', '~0').replace('/', '~1') for path in ['', *path_segments])