Source code for univention.admin.rest.client

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Management Console
#  Univention Directory Manager Module
#
# Copyright 2019-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
Sample Client for the UDM REST API.

>>> from univention.admin.rest.client import UDM
>>> uri = 'http://localhost/univention/udm/'
>>> udm = UDM.http(uri, 'Administrator', 'univention')
>>> module = udm.get('users/user')
>>> print('Found {}'.format(module))
>>> obj = next(module.search())
>>> if obj:
>>> 	obj = obj.open()
>>> print('Object {}'.format(obj))
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import sys
import time
import copy
import requests

import six
import uritemplate

if sys.version_info.major > 2:
	import http.client
	http.client._MAXHEADERS = 1000
else:
	import httplib
	httplib._MAXHEADERS = 1000


[docs]class HTTPError(Exception): def __init__(self, code, message, response): self.code = code self.response = response super(HTTPError, self).__init__(message)
[docs]class BadRequest(HTTPError): pass
[docs]class Unauthorized(HTTPError): pass
[docs]class Forbidden(HTTPError): pass
[docs]class NotFound(HTTPError): pass
[docs]class PreconditionFailed(HTTPError): pass
[docs]class UnprocessableEntity(HTTPError): pass
[docs]class ServerError(HTTPError): pass
[docs]class ServiceUnavailable(HTTPError): pass
[docs]class ConnectionError(Exception): pass
[docs]class UnexpectedResponse(ConnectionError): pass
class _NoRelation(Exception): pass
[docs]class Response(object): def __init__(self, response, data, uri): self.response = response self.data = data self.uri = uri
[docs]class Session(object): def __init__(self, credentials, language='en-US', reconnect=True, user_agent='univention.lib/1.0', enable_caching=False): self.language = language self.credentials = credentials self.reconnect = reconnect self.user_agent = user_agent self.enable_caching = enable_caching self.default_headers = { 'Accept': 'application/hal+json; q=1, application/json; q=0.9; text/html; q=0.2, */*; q=0.1', 'Accept-Language': self.language, 'User-Agent': self.user_agent, } self.session = self.create_session()
[docs] def create_session(self): sess = requests.session() sess.auth = (self.credentials.username, self.credentials.password) if not self.enable_caching: return sess try: from cachecontrol import CacheControl except ImportError: pass else: sess = CacheControl(sess) return sess
[docs] def get_method(self, method): sess = self.session return { 'GET': sess.get, 'POST': sess.post, 'PUT': sess.put, 'DELETE': sess.delete, 'PATCH': sess.patch, 'OPTIONS': sess.options, }.get(method.upper(), sess.get)
[docs] def request(self, method, uri, data=None, expect_json=False, **headers): return self.make_request(method, uri, data, expect_json=expect_json, **headers).data
[docs] def make_request(self, method, uri, data=None, expect_json=False, allow_redirects=True, **headers): if method in ('GET', 'HEAD'): params = data json = None else: params = None json = data def doit(): try: response = self.get_method(method)(uri, params=params, json=json, headers=dict(self.default_headers, **headers), allow_redirects=allow_redirects) except requests.exceptions.ConnectionError as exc: raise ConnectionError(exc) data = self.eval_response(response, expect_json=expect_json) return Response(response, data, uri) for i in range(5): try: return doit() except ServiceUnavailable as exc: # TODO: same for ConnectionError? python-request does it itself. if not self.reconnect: raise try: retry_after = min(5, int(exc.response.headers.get('Retry-After', 1))) except ValueError: retry_after = 1 time.sleep(retry_after) return doit()
[docs] def eval_response(self, response, expect_json=False): if response.status_code >= 399: msg = '{} {}: {}'.format(response.request.method, response.url, response.status_code) try: json = response.json() except ValueError: pass else: if isinstance(json, dict): if 'error' in json: server_message = json['error'].get('message') # traceback = json['error'].get('traceback') if server_message: msg += '\n{}'.format(server_message) errors = {400: BadRequest, 404: NotFound, 403: Forbidden, 401: Unauthorized, 412: PreconditionFailed, 422: UnprocessableEntity, 500: ServerError, 503: ServiceUnavailable} cls = HTTPError cls = errors.get(response.status_code, cls) raise cls(response.status_code, msg, response) if response.headers.get('Content-Type') in ('application/json', 'application/hal+json'): return response.json() elif expect_json: raise UnexpectedResponse(response.text) return response.text
[docs] def get_relations(self, entry, relation, name=None, template=None): links = entry.get('_links', {}) links = links.get(relation, [None]) links = links if links and isinstance(links, list) else [links] links = [link for link in links if isinstance(link, dict) and (not name or link.get('name') == name)] for link in sorted(links, key=lambda x: not x.get('templated', False) if template else x.get('templated', False)): if link.get('deprecation'): pass # TODO: log warning if link.get('templated'): link['href'] = uritemplate.expand(link['href'], template) yield link
[docs] def get_relation(self, entry, relation, name=None, template=None): try: return next(self.get_relations(entry, relation, name, template)) except StopIteration: raise _NoRelation(relation)
[docs] def resolve_relations(self, entry, relation, name=None, template=None): embedded = entry.get('_embedded', {}) if isinstance(embedded, dict) and relation in embedded: for x in embedded[relation]: yield x return for relation in self.get_relations(entry, relation, name, template): yield self.make_request('GET', relation['href']).data
[docs] def resolve_relation(self, entry, relation, name=None, template=None): return next(self.resolve_relations(entry, relation, name, template))
[docs]class Client(object): def __init__(self, client): self.client = client
[docs]class UDM(Client):
[docs] @classmethod def http(cls, uri, username, password): return cls(uri, username, password)
def __init__(self, uri, username, password, *args, **kwargs): self.uri = uri self.username = username self.password = password self._api_version = None self.entry = None super(UDM, self).__init__(Session(self, *args, **kwargs))
[docs] def load(self): # FIXME: use HTTP caching instead of memory caching if self.entry is None: self.reload()
[docs] def reload(self): self.entry = self.client.request('GET', self.uri, expect_json=True)
[docs] def get_ldap_base(self): self.load() return Object.from_data(self, self.client.resolve_relation(self.entry, 'udm:ldap-base')).dn
[docs] def modules(self, name=None): self.load() for module in self.client.resolve_relations(self.entry, 'udm:object-modules'): for module_info in self.client.get_relations(module, 'udm:object-types', name): yield Module(self, module_info['href'], module_info['name'], module_info['title'])
[docs] def version(self, api_version): self._api_version = api_version return self
[docs] def obj_by_dn(self, dn): self.load() return Object.from_data(self, self.client.resolve_relation(self.entry, 'udm:object/get-by-dn', template={'dn': dn}))
[docs] def obj_by_uuid(self, uuid): self.load() return Object.from_data(self, self.client.resolve_relation(self.entry, 'udm:object/get-by-uuid', template={'uuid': uuid}))
[docs] def get(self, name): for module in self.modules(name): return module
[docs] def get_object(self, object_type, dn): return self.get(object_type).get(dn)
def __repr__(self): return 'UDM(uri={}, username={}, password=****, version={})'.format(self.uri, self.username, self._api_version)
[docs]class Module(Client): def __init__(self, udm, uri, name, title, *args, **kwargs): super(Module, self).__init__(udm.client, *args, **kwargs) self.udm = udm self.uri = uri self.username = udm.username self.password = udm.password self.name = name self.title = title self.relations = {}
[docs] def load_relations(self): if self.relations: return self.relations = self.client.request('GET', self.uri)
def __repr__(self): return 'Module(uri={}, name={})'.format(self.uri, self.name)
[docs] def new(self, position=None, superordinate=None, template=None): self.load_relations() data = {'position': position, 'superordinate': superordinate, 'template': template} resp = self.client.resolve_relation(self.relations, 'create-form', template=data) return Object.from_data(self.udm, resp)
[docs] def get(self, dn): # TODO: use a link relation instead of a search for obj in self.search(position=dn, scope='base'): return obj.open() raise NotFound(404, 'Wrong object type!?', None) # FIXME: object exists but is of different module. should be fixed on the server.
[docs] def get_by_entry_uuid(self, uuid): # TODO: use a link relation instead of a search #return self.udm.get_by_uuid(uuid) for obj in self.search(filter={'entryUUID': uuid}, scope='base'): return obj.open() raise NotFound(404, 'Wrong object type!?', None) # FIXME: object exists but is of different module. should be fixed on the server.
[docs] def get_by_id(self, dn): # TODO: Needed? raise NotImplementedError()
[docs] def search(self, filter=None, position=None, scope='sub', hidden=False, superordinate=None, opened=False): data = {} if isinstance(filter, dict): for prop, val in filter.items(): data['query[%s]' % (prop,)] = val elif isinstance(filter, six.string_types): data['filter'] = filter if superordinate: data['superordinate'] = superordinate data['position'] = position data['scope'] = scope data['hidden'] = '1' if hidden else '0' if not opened: data['properties'] = 'dn' self.load_relations() entries = self.client.resolve_relation(self.relations, 'search', template=data) for obj in self.client.resolve_relations(entries, 'udm:object'): if opened: yield Object.from_data(self.udm, obj) # NOTE: this is missing last-modified, therefore no conditional request is done on modification! else: objself = self.client.get_relation(obj, 'self') uri = objself['href'] dn = objself['name'] yield ShallowObject(self.udm, dn, uri)
[docs] def get_layout(self): self.load_relations() return self.udm.client.resolve_relation(self.relations, 'udm:layout').get('layout')
[docs] def get_properties(self): self.load_relations() return self.udm.client.resolve_relation(self.relations, 'udm:properties').get('properties')
[docs] def get_property_choices(self, property): self.load_relations() relations = self.udm.client.resolve_relation(self.relations, 'udm:properties') return self.udm.client.resolve_relation(relations, 'udm:property-choices', name=property).get('choices')
[docs] def policy_result(self, policy_module, position, policy=None): self.load_relations() policy_result = self.udm.client.resolve_relation(self.relations, 'udm:policy-result', name=policy_module, template={'position': position, 'policy': policy}) policy_result.pop('_links', None) policy_result.pop('_embedded', None) return policy_result
[docs] def get_report_types(self): self.load_relations() return [x['name'] for x in self.udm.client.get_relations(self.relations, 'udm:report', template={'dn': ''}) if x.get('name')]
[docs] def create_report(self, report_type, object_dns): self.load_relations() return self.udm.client.resolve_relation(self.relations, 'udm:report', name=report_type, template={'dn': object_dns})
[docs]class ShallowObject(Client): def __init__(self, udm, dn, uri, *args, **kwargs): super(ShallowObject, self).__init__(udm.client, *args, **kwargs) self.dn = dn self.udm = udm self.uri = uri
[docs] def open(self): return Object.from_response(self.udm, self.client.make_request('GET', self.uri))
def __repr__(self): return 'ShallowObject(dn={})'.format(self.dn)
[docs]class References(object): def __init__(self, obj=None): self.obj = obj self.udm = self.obj.udm if self.obj is not None else None def __getitem__(self, item): return [ ShallowObject(self.obj.udm, x['name'], x['href']) for x in self.udm.client.get_relations(self.obj.hal, 'udm:object/property/reference/%s' % (item,)) ] def __getattribute__(self, key): try: return super(References, self).__getattribute__(key) except AttributeError: return self[key] def __get__(self, obj, cls=None): return type(self)(obj)
[docs]class Object(Client): objects = References() @property def module(self): # FIXME: use "type" relation link #object_type = self.udm.get_relation(self.hal, 'type')['href'] return self.udm.get(self.object_type) @property def object_type(self): return self.representation['objectType'] @property def dn(self): return self.representation.get('dn') @property def properties(self): return self.representation['properties'] @property def options(self): return self.representation.get('options', {}) @property def policies(self): return self.representation.get('policies', {}) @property def superordinate(self): return self.representation.get('superordinate') @superordinate.setter def superordinate(self, superordinate): self.representation['superordinate'] = superordinate @property def position(self): return self.representation.get('position') @position.setter def position(self, position): self.representation['position'] = position @property def uri(self): try: uri = self.client.get_relation(self.hal, 'self') except _NoRelation: uri = None if uri: return uri['href'] return self.representation.get('uri')
[docs] @classmethod def from_response(cls, udm, response): return cls.from_data(udm, response.data, response.response.headers)
[docs] @classmethod def from_data(cls, udm, entry, headers=None): headers = headers or {} return cls(udm, entry, etag=headers.get('Etag'), last_modified=headers.get('Last-Modified'))
def __init__(self, udm, representation, etag=None, last_modified=None, *args, **kwargs): super(Object, self).__init__(udm.client, *args, **kwargs) self.udm = udm self.representation = representation self.hal = {} self.hal['_links'] = representation.pop('_links', {}) self.hal['_embedded'] = representation.pop('_embedded', {}) self.etag = etag self.last_modified = last_modified def __repr__(self): return 'Object(module={}, dn={}, uri={})'.format(self.object_type, self.dn, self.uri)
[docs] def reload(self): uri = self.client.get_relation(self.hal, 'self') if uri: obj = ShallowObject(self.udm, self.dn, uri['href']).open() else: obj = self.module.get(self.dn) self._copy_from_obj(obj)
[docs] def save(self, reload=True): if self.dn: return self._modify(reload) else: return self._create(reload)
[docs] def delete(self, remove_referring=False): return self.client.request('DELETE', self.uri)
[docs] def move(self, position): self.position = position self.save()
def _modify(self, reload=True): headers = dict((key, value) for key, value in { 'If-Unmodified-Since': self.last_modified, 'If-Match': self.etag, }.items() if value) response = self.client.make_request('PUT', self.uri, data=self.representation, allow_redirects=False, **headers) response = self._follow_redirection(response, reload) # move() causes multiple redirections! return response def _create(self, reload=True): uri = self.client.get_relation(self.hal, 'create') response = self.client.make_request('POST', uri['href'], data=self.representation, allow_redirects=False) response = self._follow_redirection(response, reload) return response def _reload_from_response(self, response, reload): if 200 <= response.response.status_code <= 299 and 'Location' in response.response.headers: uri = response.response.headers['Location'] obj = ShallowObject(self.udm, None, uri) if reload: self._copy_from_obj(obj.open()) elif reload: self.reload() def _follow_redirection(self, response, reload=True): location = None # python-requests doesn't follow redirects for 201 if response.response.status_code == 201 and 'Location' in response.response.headers: location = response.response.headers['Location'] response = self.client.make_request('GET', location, allow_redirects=False) # prevent allow_redirects because it does not wait Retry-After time causing a break up after 30 fast redirections while 300 <= response.response.status_code <= 399 and 'Location' in response.response.headers: location = response.response.headers['Location'] if response.response.headers.get('Retry-After', '').isdigit(): time.sleep(min(30, max(0, int(response.response.headers['Retry-After'])))) response = self.client.make_request('GET', location, allow_redirects=False) if location and response.response.status_code == 200: # the response already contains a new representation self._copy_from_obj(Object.from_response(self.udm, response)) elif reload: self._reload_from_response(response, reload) return response def _copy_from_obj(self, obj): self.udm = obj.udm self.representation = copy.deepcopy(obj.representation) self.hal = copy.deepcopy(obj.hal) self.etag = obj.etag self.last_modified = obj.last_modified
[docs] def generate_service_specific_password(self, service): uri = self.client.get_relation(self.hal, 'udm:service-specific-password')['href'] response = self.client.make_request('POST', uri, data={"service": service}) return response.data.get('password', None)
[docs] def get_layout(self): return self.udm.client.resolve_relation(self.hal, 'udm:layout').get('layout')
[docs] def get_properties(self): return self.udm.client.resolve_relation(self.hal, 'udm:properties').get('properties')
[docs] def get_property_choices(self, property): hal = self.udm.client.resolve_relation(self.hal, 'udm:properties') return self.udm.client.resolve_relation(hal, 'udm:property-choices', name=property).get('choices')
[docs] def policy_result(self, policy_module, policy=None): policy_result = self.udm.client.resolve_relation(self.hal, 'udm:policy-result', name=policy_module, template={'policy': policy}) policy_result.pop('_links', None) policy_result.pop('_embedded', None) return policy_result