#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention UCS@school
#
# Copyright 2017-2025 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.
"""HTTP API Client"""
from __future__ import absolute_import, unicode_literals
import copy
import inspect
import logging
import os.path
from io import IOBase
from typing import Any, Callable, Dict, List # noqa: F401
import dateutil.parser
import magic
import requests
from six import string_types, with_metaclass
from six.moves.urllib_parse import parse_qs, quote as url_quote, urljoin, urlparse
from ucsschool.lib.models.utils import get_stream_handler
from univention.config_registry import ConfigRegistry
ucr = ConfigRegistry()
ucr.load()
MIME_TYPE = magic.open(magic.MAGIC_MIME_TYPE)
MIME_TYPE.load()
__resource_client_class_registry = [] # type: List[Client._ResourceClient]
__resource_representation_class_registry = (
{}
) # type: Dict[str, ResourceRepresentation._ResourceReprBase]
[docs]
def register_resource_client_class(cls):
if cls not in __resource_client_class_registry:
__resource_client_class_registry.append(cls)
[docs]
def get_resource_client_classes():
return __resource_client_class_registry
[docs]
def register_resource_representation_class(resource_name, cls):
__resource_representation_class_registry[resource_name] = cls
[docs]
def get_resource_representation_classes(resource_name):
return __resource_representation_class_registry[resource_name]
[docs]
def invalidate_resource_representation_classes_cache():
for cls in [
get_resource_representation_classes(class_name)
for class_name in __resource_representation_class_registry
]:
cls.invalidate_cache()
[docs]
class ApiError(Exception):
def __init__(self, msg, status_code=None):
super(ApiError, self).__init__(msg)
self.status_code = status_code
[docs]
class BadRequest(ApiError):
"""HTTP 400"""
pass
[docs]
class PermissionError(ApiError):
"""HTTP 401|403"""
pass
[docs]
class ObjectNotFound(ApiError):
"""HTTP 404"""
pass
[docs]
class ServerError(ApiError):
"""HTTP 5xx"""
pass
[docs]
class ConnectionError(ApiError):
"""Cannot establish / lost connection to server."""
pass
[docs]
class IllegalURLError(ApiError):
"""URLs returned from API root do not meet expectation."""
pass
class _ResourceClientMetaClass(type):
"""Meta class for resource client classes. Registers them."""
def __new__(cls, clsname, bases, attrs):
kls = super(_ResourceClientMetaClass, cls).__new__(cls, clsname, bases, attrs)
register_resource_client_class(kls)
return kls
class _ResourceRepresentationMetaClass(type):
"""Meta class for resource representation classes. Registers them."""
def __new__(cls, clsname, bases, attrs):
kls = super(_ResourceRepresentationMetaClass, cls).__new__(cls, clsname, bases, attrs)
register_resource_representation_class(kls.resource_name, kls)
return kls
[docs]
class ResourceRepresentationIterator(object):
"""Iterator for paginated query results."""
def __init__(self, resource_client, paginated_resource_list):
self._resource_client = resource_client
self._paginated_resource_list = paginated_resource_list
self.index = 0
def __iter__(self):
return self
def __next__(self):
try:
resource = self._paginated_resource_list["results"][self.index]
except IndexError:
if self._paginated_resource_list["next"] is None:
raise StopIteration()
parse_result = urlparse(self._paginated_resource_list["next"])
url = parse_result._replace(query=None).geturl()
params = parse_qs(parse_result.query)
self._paginated_resource_list = self._resource_client._resource_from_url(url, **params)
self.index = 0
resource = self._paginated_resource_list["results"][self.index]
self.index += 1
return ResourceRepresentation.get_repr(self._resource_client, resource)
next = __next__ # py2
[docs]
class ResourceRepresentation(object):
"""
Python representations of HTTP-API resources.
To add resources to the Python API create inner classes that
1. subclass _ResourceReprBase
2. use as meta class _ResourceRepresentationMetaClass
3. set a class attribute `resource_name` that matches the resource path
in the HTTP-API
The meta class will register the resource representation class and make it
available through Client objects.
client = Client(<username>, <password>, [log level])
client.school.list() # <-- from SchoolResource
client.userimportjob.create() # <-- from UserImportJobResource
"""
class _ResourceReprBase(object):
"""Base class of resource representation classes."""
resource_name = ""
resource_cache = {}
_attribute_repr = {} # type: Dict[str, Callable[[str], Any]]
def __init__(self, resource_client, resource):
self._resource_client = resource_client
self._resource = resource
self._set_attrs(self._resource)
@classmethod
def get(cls, resource_client, resource_url):
cached_resource = cls.resource_cache.get(resource_url, None)
if cached_resource is not None:
return cached_resource
resource = cls(resource_client, resource_client._resource_from_url(resource_url))
cls.resource_cache[resource_url] = resource
return resource
@classmethod
def invalidate_cache(cls):
cls.resource_cache = {}
def __repr__(self):
return "{}({})".format(self.__class__.__name__, getattr(self, self._resource_client.pk_name))
def _set_attrs(self, resource):
for k, v in resource.items():
if k == "url":
continue
if k not in dir(self):
try:
val = self._attribute_repr[k](v)
except KeyError:
val = v
setattr(self, k, val)
def update(self):
self._cache = {}
self._resource = self._resource_client._resource_from_url(self._resource["url"])
self._set_attrs(self._resource)
[docs]
class SchoolResource(with_metaclass(_ResourceRepresentationMetaClass, _ResourceReprBase)):
resource_name = "schools"
@property
def roles(self):
"""
Roles the connected user has in this school.
:return: RoleResource objects
:rtype: ResourceRepresentationIterator
"""
url = urljoin(self._resource["url"], "roles")
return self._resource_client.client.roles.list(resource_url=url)
@property
def user_imports(self):
"""
UserImportJobs that ran for this school.
:return: UserImportJobResource objects
:rtype: ResourceRepresentationIterator
"""
return self._resource_client.client.userimportjob.list(school=self.name)
[docs]
class RoleResource(with_metaclass(_ResourceRepresentationMetaClass, _ResourceReprBase)):
resource_name = "roles"
[docs]
class ResultResource(with_metaclass(_ResourceRepresentationMetaClass, _ResourceReprBase)):
resource_name = "result"
_attribute_repr = {"date_done": lambda x: dateutil.parser.parse(x)}
def __repr__(self):
return "{}(status={!r})".format(self.__class__.__name__, self.status)
[docs]
class UserImportJobResource(with_metaclass(_ResourceRepresentationMetaClass, _ResourceReprBase)):
"""
Representation of an import job resource.
`job = client.userimportjob.get(job_id)`
* job.status
* job.result
* job.log_file
* job.password_file
* job.school
* job.summary_file
"""
resource_name = "imports/users"
_attribute_repr = {"date_created": lambda x: dateutil.parser.parse(x)}
def __repr__(self):
return "{}({}, {}, {}, {}, {})".format(
self.__class__.__name__,
getattr(self, self._resource_client.pk_name),
self.school.name, # side effect: this will create a request (the first time) to get
# the school resource
self.user_role,
self.principal,
self.status,
)
@property
def log_file(self):
try:
return self._resource_client._resource_from_url(self._resource["log_file"]).get("text")
except ObjectNotFound:
return None
@property
def password_file(self):
try:
return self._resource_client._resource_from_url(self._resource["password_file"]).get(
"text"
)
except ObjectNotFound:
return None
@property
def school(self):
return ResourceRepresentation.SchoolResource.get(
self._resource_client, self._resource["school"]
)
@property
def summary_file(self):
try:
return self._resource_client._resource_from_url(self._resource["summary_file"]).get(
"text"
)
except ObjectNotFound:
return None
@property
def result(self):
if self._resource["result"]:
return ResourceRepresentation.ResultResource(
self._resource_client, self._resource["result"]
)
else:
return None
[docs]
@classmethod
def get_repr(cls, resource_client, resource):
return get_resource_representation_classes(resource_client.resource_name)(
resource_client, resource
)
[docs]
class Client(object):
"""
HTTP-API import client.
client = Client(username, password)
my_schools = client.school.list()
my_roles_at_school1 = client.school.get('school1').roles
job_id = client.userimportjob.create()
client.userimportjob.get(job_id)
"""
LOG_REQUEST = 5
LOG_RESPONSE = 4
def __init__(
self,
name,
password,
server=None,
version=1,
log_level=logging.INFO,
ssl_verify=True,
*args,
**kwargs
):
"""
UCS@school HTTP API client.
:param str name: username for connecting to HTTP-API
:param str password: password to use for connecting to HTTP-API
:param str server: FQDN of server running the HTTP-API
:param str version: HTTP-API version, omit to use latest version
:param int log_level: log level, use `logging.{INFO,DEBUG,..}` or
`Client.LOG_REQUEST` to log API requests, `Client.LOG_RESPONSE` to
log both requests and responses
"""
self.session = requests.Session()
self.username = name
self.password = password
self.server = server or "{}.{}".format(ucr["hostname"], ucr["domainname"])
self.version = version
self.ssl_verify = ssl_verify
self.base_url = "https://{}/api/v{}/".format(self.server, self.version)
self.logger = self._setup_logging(log_level)
self._resource_urls = None
self.logger.debug("Registering resources and methods:")
for kls in get_resource_client_classes():
cls_name = kls.__name__.lower().strip("_")
setattr(self, cls_name, kls(self))
self.logger.debug(
" %s: %s",
cls_name,
", ".join(
[
m[0]
for m in inspect.getmembers(kls, predicate=inspect.ismethod)
if not m[0].startswith("_")
]
),
)
[docs]
def close(self):
self.session.close()
[docs]
def invalidate_caches(self):
invalidate_resource_representation_classes_cache()
@property
def resource_urls(self):
if not self._resource_urls:
self._resource_urls = self.call_api("get", ".")
for resource, url in self._resource_urls.items():
if not url.lower().startswith(self.base_url.lower()):
raise IllegalURLError(
"URL {!r} for resource {!r} from API root does not start with {!r}.".format(
url, resource, self.base_url
)
)
return self._resource_urls
@classmethod
def _setup_logging(cls, log_level):
if not hasattr(logging, "LOG_REQUEST"):
logging.addLevelName(cls.LOG_REQUEST, "REQUEST")
if not hasattr(logging, "LOG_RESPONSE"):
logging.addLevelName(cls.LOG_RESPONSE, "RESPONSE")
logger = logging.getLogger(__name__)
logger.request = lambda msg, *args, **kwargs: logger.log(cls.LOG_REQUEST, msg, *args, **kwargs)
logger.response = lambda msg, *args, **kwargs: logger.log(cls.LOG_RESPONSE, msg, *args, **kwargs)
if not logger.handlers:
logger.addHandler(get_stream_handler(log_level))
if log_level > logger.level:
logger.setLevel(log_level)
return logger
[docs]
def call_api(self, method, url_end, data=None, files=None, params=None, **kwargs):
"""
Call HTTP-API.
:param str method: `get`, `post` etc
:param str url_end: URL path after base URL (https://<server>/api/<version>/<url_end>)
:param dict data: payload
:param dict files: {'<key>': (<filename>, <open file>, <mime type>)}
:param dict params: URL parameters
:param dict kwargs: additional arguments to pass to request
:return: server response
:rtype: dict
:raises: ApiError
"""
if not url_end.endswith("/"):
url_end += "/"
url = urljoin(self.base_url, url_end)
request_kwargs = dict(
url=url,
data=data,
files=files,
params=params,
auth=(self.username, self.password),
headers={"Accept": "application/json"},
**kwargs
)
# TODO: add language to request for translated displayNames. something like:
# request_kwargs['headers']['Accept-Language'] ='de_DE'
if not self.ssl_verify:
request_kwargs["verify"] = False
log_request_kwargs = copy.deepcopy(
dict(request_kwargs, files={k: v[0] for k, v in (request_kwargs["files"] or {}).items()})
)
log_request_kwargs["auth"] = (
log_request_kwargs["auth"][0],
"*" * len(log_request_kwargs["auth"][1]),
)
self.logger.request(
"%s(%s)", method, ", ".join("{}={!r}".format(k, v) for k, v in log_request_kwargs.items())
)
meth = getattr(self.session, method)
try:
response = meth(**request_kwargs)
except requests.ConnectionError as exc:
raise ConnectionError(str(exc))
self.logger.response(
"%s -> %s (%r) headers:%r content:%r",
response.url,
response.reason,
response.status_code,
response.headers,
response.content,
)
if not response.ok:
msg = "Received status_code={!r} with reason={!r} for requests.{}(**{}).".format(
response.status_code,
response.reason,
method,
", ".join("{}={!r}".format(k, v) for k, v in log_request_kwargs.items()),
)
if response.status_code == 400:
exc = BadRequest
elif response.status_code in (401, 403):
exc = PermissionError
elif response.status_code == 404:
exc = ObjectNotFound
elif 499 < response.status_code < 600:
exc = ServerError
else:
exc = ApiError
raise exc(msg, status_code=response.status_code)
return response.json()
class _ResourceClient(object):
resource_name = ""
pk_name = ""
def __init__(self, client):
self.client = client
self.resource_url = self.client.resource_urls[self.resource_name]
def _to_python(self, resource):
if resource is None:
return None
elif all(key in resource for key in ("count", "next", "previous", "results")):
return ResourceRepresentationIterator(self, resource)
return ResourceRepresentation.get_repr(self, resource)
def _resource_from_url(self, url, **params):
return self.client.call_api("get", url, params=params)
def _get_resource(self, pk, **params):
url = urljoin(self.resource_url, url_quote(str(pk)))
return self._resource_from_url(url, **params)
def _list_resource(self, **params):
resource_url = params.pop("resource_url", self.resource_url)
return self._resource_from_url(resource_url, **params)
def get(self, pk):
"""
Read Resource.
:param str pk: primary key (name, id, ..)
:return: Resource object
:rtype: _ResourceReprBase
"""
assert isinstance(pk, string_types) or isinstance(pk, int)
return self._to_python(self._get_resource(pk))
def latest(self, **params):
"""
Get newest Resource this user has access to.
All arguments will be passed as parameters to the request. Example:
latest(dryrun=True)
:param params: arguments to pass as parameters to the request
:return: Resource object
:rtype: ResourceRepresentation
"""
list_kwargs = {"ordering": "-{}".format(self.pk_name), "limit": "1"}
list_kwargs.update(params)
for ioj in self.list(**list_kwargs):
return ioj
raise ObjectNotFound("No {!r} resources exist.".format(self.resource_name))
def list(self, **params):
"""
List all Resource this user has access to.
All arguments will be passed as parameters to the request. Example:
list(status=['Aborted', 'Finished'], dryrun=False, ordering='id', limit=1)
:param params: arguments to pass as parameters to the request
:return: list of Resource objects
:rtype: ResourceRepresentationIterator
"""
return self._to_python(self._list_resource(**params))
class _School(with_metaclass(_ResourceClientMetaClass, _ResourceClient)):
resource_name = "schools"
pk_name = "name"
class _Roles(with_metaclass(_ResourceClientMetaClass, _ResourceClient)):
resource_name = "roles"
pk_name = "name"
class _UserImportJob(with_metaclass(_ResourceClientMetaClass, _ResourceClient)):
resource_name = "imports/users"
pk_name = "id"
def create(
self, filename, source_uid=None, school=None, user_role=None, dryrun=True, file_obj=None
):
"""
Create a UserImportJob.
:param str filename: path to a CSV file, or just a filename and read from 'file_obj'
:param str source_uid: optional unique ID of school management software database
:param str school: optional name of a School
:param str user_role: optional role of user, one of staff, student, teacher,
teacher_and_staff
:param bool dryrun: False to start a real import
:param file file_obj: optional file like object to read CSV data from, instead of opening
'filename'
:return: the created UserImportJob resource
:rtype: _ResourceReprBase
"""
assert isinstance(filename, string_types)
assert isinstance(source_uid, string_types) or source_uid is None
assert isinstance(school, string_types) or school is None
assert isinstance(user_role, string_types) or user_role is None
assert isinstance(dryrun, bool)
assert isinstance(file_obj, IOBase) or file_obj is None
data = {
"dryrun": dryrun,
}
if school:
try:
school_obj = self.client.school.get(school)
except ObjectNotFound:
raise ObjectNotFound("School {!r} is unknown.".format(school))
data["school"] = school_obj._resource["url"]
if source_uid:
data["source_uid"] = source_uid
if user_role:
data["user_role"] = user_role
filename = filename or "noname"
if not file_obj:
file_obj = open(filename, "rb")
file_data = file_obj.read(32)
mime_type = self._get_mime_type(file_data)
file_obj.seek(os.SEEK_SET)
files = {"input_file": (os.path.basename(filename), file_obj, mime_type)}
return self._to_python(
self.client.call_api("post", self.resource_url, data=data, files=files)
)
@staticmethod
def _get_mime_type(data):
return MIME_TYPE.buffer(data)