#!/usr/bin/python3
#
# Univention Directory Manager
# UDM REST API
#
# SPDX-FileCopyrightText: 2017-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
import base64
import binascii
import contextvars
import copy
import functools
import io
import json
import logging
import operator
import os
import re
import socket
import traceback
import uuid
import xml.etree.ElementTree as ET # noqa: S405
import zlib
from http.client import responses
from urllib.parse import parse_qs, quote, unquote, urlencode, urljoin, urlparse, urlunparse
import ldap
import tornado.gen
import tornado.httpclient
import tornado.web
from concurrent.futures import ThreadPoolExecutor
from ldap.controls import SimplePagedResultsControl
from ldap.controls.readentry import PostReadControl
from ldap.controls.sss import SSSRequestControl
from ldap.dn import explode_rdn
from ldap.filter import filter_format
from tornado.concurrent import run_on_executor
from tornado.web import Finish, HTTPError, RequestHandler
import univention
import univention.admin.authorization as udm_auth
import univention.admin.modules as udm_modules
import univention.admin.objects as udm_objects
import univention.admin.types as udm_types
import univention.admin.uexceptions as udm_errors
import univention.directory.reports as udr
from univention.admin.rest.hal import HAL
from univention.admin.rest.html_ui import HTML
from univention.admin.rest.http_conditional import ConditionalResource, last_modified
from univention.admin.rest.ldap_connection import (
get_admin_ldap_write_connection, get_machine_ldap_read_connection, get_user_ldap_read_connection,
get_user_ldap_write_connection, reset_cache,
)
from univention.admin.rest.openapi import OpenAPIBase, RelationsBase, _OpenAPIBase
from univention.admin.rest.sanitizer import (
Base64EncodingSanitizer, Body, BooleanSanitizer, BoolSanitizer, ChoicesSanitizer, DictSanitizer, DNSanitizer,
EmailSanitizer, IntegerSanitizer, JSONPayload, LDAPFilterSanitizer, LDAPSearchSanitizer, ListSanitizer,
MultiValidationError, ObjectPropertySanitizer, PatchRepresentation, PropertiesSanitizer, Query, Sanitizer,
SanitizerBase, SearchSanitizer, StringSanitizer, ValidationError, sanitize,
)
from univention.admin.rest.shared_memory import JsonEncoder, shared_memory
from univention.admin.rest.utils import (
RE_UUID, NotFound, _get_post_read_entry_uuid, _map_normalized_dn, decode_properties, parse_content_type, quote_dn,
superordinate_names, unquote_dn,
)
from univention.config_registry import handler_set
from univention.lib.i18n import Translation
from univention.management.console.config import ucr
from univention.management.console.error import LDAP_ConnectionFailed, LDAP_ServerDown, UMC_Error, UnprocessableEntity
from univention.management.console.modules.udm.tools import (
LicenseError, LicenseImport as LicenseImporter, check_license, dump_license,
)
from univention.management.console.modules.udm.udm_ldap import (
NoIpLeft, ObjectDoesNotExist, SuperordinateDoesNotExist, UDM_Error, UDM_Module, container_modules, get_module,
ldap_dn2path, list_objects,
)
from univention.password import generate_password, password_config
# FIXME: it seems request.path contains the un-urlencoded path, could be security issue!
# TODO: 0f77c317e03844e8a16c484dde69abbcd2d2c7e3 is not integrated
# TODO: loading the policies probably unnecessarily slows down things
_ = Translation('univention-directory-manager-rest').translate
MAX_WORKERS = ucr.get('directory/manager/rest/max-worker-threads', 35)
request_context = contextvars.ContextVar("request_context")
log = univention.logging.Structured(logging.getLogger('MODULE'))
[docs]
class ResourceBase(SanitizerBase, HAL, HTML):
"""Abstract base class for all HTTP resources"""
pool = ThreadPoolExecutor(max_workers=MAX_WORKERS)
[docs]
@tornado.gen.coroutine
def pool_submit(self, cb, *args, **kwargs):
future = self.pool.submit(self.pool_wrapper, cb, *args, **kwargs)
return (yield future)
[docs]
def pool_wrapper(self, func, *a, **kw):
self._set_request_context()
return func(*a, **kw)
def _set_request_context(self):
request_context.set({
"request_id": self.request.x_request_id,
"requester_dn": self.request.user_dn,
"requester_ip": self.request.client_ip,
"requester_hostname": self.request.client_host,
})
requires_authentication = True
@property
def debug_mode_enabled(self):
return ucr.is_true('directory/manager/rest/debug-mode-enabled')
[docs]
def force_authorization(self, auth_type=None, error_information=None):
self.add_header('WWW-Authenticate', 'Basic realm="Univention Directory Manager"')
if auth_type == 'Bearer':
if error_information:
self.set_header('WWW-Authenticate', 'Bearer realm="Univention Directory Manager" scope="openid" error="invalid_token" error_description="%s"' % (error_information.encode('unicode_escape').decode('ISO8859-1').replace('"', '').replace('\\', '\\\\'),))
else:
self.add_header('WWW-Authenticate', 'Bearer realm="Univention Directory Manager"')
self.set_status(401)
self.finish() # TODO: add response body
[docs]
def prepare(self):
self.request.x_request_id = RE_UUID.sub('', self.request.headers.get('X-Request-Id', str(uuid.uuid4())))[:36]
self.set_header('X-Request-Id', self.request.x_request_id)
self.request.content_negotiation_lang = 'html'
self.request.path_decoded = unquote(self.request.path)
self.request.decoded_query_arguments = self.request.query_arguments.copy()
self.request.client_ip = self.request.headers.get('X-Forwarded-For', self.request.remote_ip).rsplit(',', 1).pop().strip()
try:
self.request.client_host = socket.gethostbyaddr(self.request.client_ip)[0]
except OSError:
self.request.client_host = ''
self.request.user_dn = None
self._set_request_context()
authorization = self.request.headers.get('Authorization')
if not authorization and self.requires_authentication:
return self.force_authorization(None)
try:
if authorization:
self.parse_authorization(authorization)
finally:
self.request.content_negotiation_lang = self.check_acceptable()
self.decode_request_arguments()
[docs]
def parse_authorization(self, authorization):
if authorization in shared_memory.authenticated: # cache for the userdn, which eliminates a search / request
auth_type, username, userdn, password = shared_memory.authenticated[authorization]
already_authenticated = True
else:
already_authenticated = False
username = userdn = password = None
auth_type = None
if authorization.lower().startswith('basic '):
try:
username, password = base64.b64decode(authorization.split(' ', 1)[1].encode('ISO8859-1')).decode('ISO8859-1').split(':', 1)
except (ValueError, IndexError, binascii.Error):
pass
if not username or not password:
raise HTTPError(400, 'The basic auth credentials are malformed.')
elif authorization.lower().startswith('bearer '):
auth_type = 'Bearer'
username, password = None, authorization.split(' ', 1)[1]
else:
return self.force_authorization()
if not auth_type:
userdn = self._auth_get_userdn(username)
if not userdn:
return self.force_authorization(auth_type)
try:
self.ldap_connection, self.ldap_position = get_user_ldap_read_connection(auth_type, userdn, password)
if already_authenticated and not self.ldap_connection.whoami(): # the ldap connection is not bound anymore
reset_cache(self.ldap_connection)
self.ldap_connection, self.ldap_position = get_user_ldap_read_connection(auth_type, userdn, password)
self.ldap_connection = self._authz_ldap_connection(userdn, self.ldap_connection)
if auth_type == 'Bearer':
userdn = self.ldap_connection.whoami()
username = '+'.join(explode_rdn(userdn, True))
except (ldap.LDAPError, udm_errors.base) as exc:
log.debug('Authentication failed.', error=exc)
return self.force_authorization(auth_type) # TODO: parse ldap.OTHER / etc for SASL error details?
except Exception:
log.exception('Unknown error during authentication:')
return self.force_authorization(auth_type)
else:
self.request.user_dn = userdn
self.request.username = username
self._set_request_context()
if not already_authenticated:
self._auth_check_allowed_groups()
shared_memory.authenticated[authorization] = (auth_type, username, userdn, password)
@property
def ldap_write_connection(self):
auth_type, _username, userdn, password = shared_memory.authenticated[self.request.headers.get('Authorization')]
conn = get_user_ldap_write_connection(auth_type, userdn, password)[0]
conn = self._authz_ldap_connection(userdn, conn)
return conn
def _authz_ldap_connection(self, userdn, conn):
non_authz_users = {value for key, value in ucr.items() if key.startswith('directory/manager/rest/delegative-administration/excluded-users/')} | {'cn=admin,%(ldap/base)s' % ucr}
if userdn in non_authz_users:
return conn
return udm_auth.Authorization.inject_ldap_connection(conn)
def _auth_check_allowed_groups(self):
if self.request.username in ('cn=admin',):
return
allowed_groups = [value for key, value in ucr.items() if key.startswith('directory/manager/rest/authorized-groups/')]
memberof = self.ldap_connection.authz_connection.getAttr(self.request.user_dn, 'memberOf')
if not set(_map_normalized_dn(memberof)) & set(_map_normalized_dn(allowed_groups)):
raise HTTPError(403, 'Not in allowed groups.')
def _auth_get_userdn(self, username):
if username in ('cn=admin',):
return 'cn=admin,%(ldap/base)s' % ucr
lo, _po = get_machine_ldap_read_connection()
dns = lo.searchDn(filter_format('(&(objectClass=person)(uid=%s))', [username]), unique=True)
return dns[0] if dns else None
[docs]
def get_module(self, object_type, ldap_connection=None):
module = UDM_Module(object_type, ldap_connection=ldap_connection or self.ldap_connection, ldap_position=self.ldap_position)
if not module or not module.module:
raise NotFound(object_type)
return module
[docs]
def get_module_object(self, object_type, dn, ldap_connection=None):
module = self.get_module(object_type, ldap_connection=ldap_connection)
try:
obj = module.get(dn)
except UDM_Error as exc:
if not isinstance(exc.exc, udm_errors.noObject):
raise
obj = None
if not obj:
raise NotFound(object_type, dn)
return module, obj
[docs]
def get_object_by_dn(self, dn, ldap_connection=None):
object_type = get_module(None, dn, self.ldap_connection).module
return self.get_object(object_type, dn, ldap_connection=ldap_connection)
[docs]
def get_object(self, object_type, dn, ldap_connection=None):
return self.get_module_object(object_type, dn, ldap_connection=ldap_connection)[1]
[docs]
def check_acceptable(self):
accept = self.request.headers.get('Accept', 'text/html').split(',')
langs = []
for language in accept:
score = 1.0
parts = language.strip().split(";")
for part in (x for x in parts[1:] if x.strip().startswith("q=")):
try:
score = float(part.strip()[2:])
break
except (ValueError, TypeError):
raise
score = 0.0
langs.append((parts[0].strip(), score))
langs.sort(key=lambda pair: pair[1], reverse=True)
lang = None
for name, q in langs:
if q <= 0:
continue
if name in ('text/html', 'text/xml', 'application/xml', 'text/*', '*/*'):
lang = 'html'
break
elif name in ('application/hal+json', 'application/*'):
lang = 'hal_json'
break
elif name in ('application/json',):
lang = 'json'
break
if not lang:
raise HTTPError(406, 'The requested Content-Type does not exists. Specify a valid Accept header.')
if lang == 'html' and not ucr.is_true('directory/manager/rest/html-view-enabled'):
raise HTTPError(406, 'The unsupported HTML view of the UDM REST API is disabled. Please use the JSON interface via the "Accept: application/json" HTTP header or enable it via the UCR variable "directory/manager/rest/html-view-enabled". To get a developer overview the OpenAPI schema interface can be reached at /univention/udm/schema/.')
return lang
[docs]
def decode_request_arguments(self):
content_type = parse_content_type(self.request.headers.get('Content-Type', ''))
if self.request.method in ('HEAD', 'GET', 'OPTIONS', 'DELETE'):
if self.request.body:
raise HTTPError(400, 'Safe HTTP method should not contain request body/Content-Type header.')
return
if content_type in ('application/json', 'application/json-patch+json'):
try:
self.request.body_arguments = json.loads(self.request.body)
except ValueError as exc:
raise HTTPError(400, _('Invalid JSON document: %r') % (exc,))
elif content_type in ('application/x-www-form-urlencoded', 'multipart/form-data'):
self.decode_form_arguments()
[docs]
def get_body_argument(self, name, *args):
if parse_content_type(self.request.headers.get('Content-Type', '')) in ('application/json',):
return self.request.body_arguments.get(name)
return super().get_body_argument(name, *args)
[docs]
def get_body_arguments(self, name, *args):
if parse_content_type(self.request.headers.get('Content-Type', '')) in ('application/json',):
return self.request.body_arguments.get(name)
return super().get_body_arguments(name, *args)
[docs]
def content_negotiation(self, response):
self.add_header('Vary', ', '.join(self.vary()))
lang = self.request.content_negotiation_lang
formatter = getattr(self, f'{self.request.method.lower()}_{lang}', getattr(self, f'get_{lang}'))
codec = getattr(self, f'content_negotiation_{lang}')
self.finish(codec(formatter(response), response))
[docs]
def content_negotiation_json(self, response, data):
self.set_header('Content-Type', 'application/json')
try:
return json.dumps(response, cls=JsonEncoder)
except TypeError:
log.error('Cannot JSON serialize response.', response=repr(response))
raise
[docs]
def get_json(self, response):
self.add_link(response, 'curies', self.abspath('relation/') + '{rel}', name='udm', templated=True)
response.get('_embedded', {}).pop('udm:form', None) # no public API, just to render html
response.get('_embedded', {}).pop('udm:button', None) # no public API, just to render html
response.get('_embedded', {}).pop('udm:layout', None) # save traffic, just to render html
response.get('_embedded', {}).pop('udm:properties', None) # save traffic, just to render html
return response
[docs]
def urljoin(self, *args, **query):
base = urlparse(self.request.full_url())
query_string = ''
if query:
qs = parse_qs(base.query)
qs.update({key: val if isinstance(val, list | tuple) else [val] for key, val in query.items()})
query_string = f'?{urlencode(qs, True)}'
scheme = base.scheme
for _scheme in self.request.headers.get_list('X-Forwarded-Proto'):
if _scheme == 'https':
scheme = 'https'
break
if _scheme == 'http':
scheme = 'http'
return urljoin(urljoin(urlunparse((scheme, base.netloc, 'univention/' if self.request.headers.get('X-Forwarded-Host') else '/', '', '', '')), quote(self.request.path_decoded.lstrip('/'))), '/'.join(args)) + query_string
[docs]
def abspath(self, *args):
return urljoin(self.urljoin('/univention/udm/' if self.request.headers.get('X-Forwarded-Host') else '/udm/'), '/'.join(args))
[docs]
def log_exception(self, typ, value, tb):
if isinstance(value, UMC_Error):
return
super().log_exception(typ, value, tb)
[docs]
def write_error(self, status_code, exc_info=None, **kwargs):
method = {'head': 'get'}.get(self.request.method.lower(), self.request.method.lower())
setattr(self, f'{method}_template', 'error.html')
setattr(self, f'{method}_html', self.get_error_html)
self.set_header('X-Request-Id', self.request.x_request_id)
self.set_header('HX-Reselect', 'div.error')
self.set_header('HX-Reswap', 'innerHTML')
if not exc_info: # or isinstance(exc_info[1], HTTPError):
return super().write_error(status_code, exc_info=exc_info, **kwargs)
etype, exc, etraceback = exc_info
if isinstance(exc, udm_errors.ldapError) and isinstance(getattr(exc, 'original_exception', None), ldap.SERVER_DOWN | ldap.CONNECT_ERROR | ldap.INVALID_CREDENTIALS):
exc = exc.original_exception
if isinstance(exc, ldap.SERVER_DOWN):
exc = LDAP_ServerDown()
if isinstance(exc, ldap.CONNECT_ERROR):
exc = LDAP_ConnectionFailed(exc)
message = str(exc)
title = ''
error = {}
response = {
'error': {},
}
if isinstance(exc, UDM_Error):
status_code = 400
if isinstance(exc, UMC_Error):
status_code = exc.status
title = exc.msg
if status_code == 503:
self.add_header('Retry-After', '15')
if title == message:
title = responses.get(status_code)
if isinstance(exc.result, dict):
error = exc.result
if isinstance(exc, UnprocessableEntity) and error.get('body', {}).get('properties'):
error = error['body']['properties']
elif isinstance(exc, UnprocessableEntity) and error.get('query', {}):
error = error['query']
if isinstance(exc, UnprocessableEntity):
error_summary = ''
def _append_error(key, message, location, formatter):
error_summary = ''
if isinstance(message, dict):
for k, v in message.items():
error_summary += _append_error(k, v, [*list(location), k], formatter)
else:
error_summary += formatter % (key, message)
self.add_resource(response, 'udm:error', {
'location': location,
'message': message,
'type': 'value_error',
})
return error_summary
for key, value in exc.result.items():
formatter = _('Request argument "%s" %s\n')
location = key
if key == 'query_string':
formatter = _('Query string "%s": %s\n')
location = 'query'
elif key == 'body_arguments':
formatter = _('Body data "%s": %s\n')
location = 'body'
error_summary += _append_error(key, value, (location,), formatter)
message = f'{message}:\n{error_summary}'
if status_code >= 500:
_traceback = None
if not isinstance(exc, UDM_Error | UMC_Error):
_traceback = ''.join(traceback.format_exception(etype, exc, etraceback))
response['error']['traceback'] = _traceback if self.application.settings.get("serve_traceback", True) else None
# backwards compatibility :'-(
response['error'].update({
'code': status_code,
'title': title,
'message': message,
'error': error, # deprecated, use embedded udm:error instead
})
self.add_link(response, 'self', self.urljoin(''), title=_('HTTP-Error %d: %s') % (status_code, title))
self.set_status(status_code)
self.add_caching(public=False, no_store=True, no_cache=True, must_revalidate=True)
self.content_negotiation(response)
[docs]
def add_caching(self, expires=None, public=False, must_revalidate=False, no_cache=False, no_store=False, no_transform=False, max_age=None, shared_max_age=None, proxy_revalidate=False):
control = [
'public' if public else 'private',
'must-revalidate' if must_revalidate else '',
'no-cache' if no_cache else '',
'no-store' if no_store else '',
'no-transform' if no_transform else '',
'max-age=%d' % (max_age,) if max_age else '',
's-maxage=%d' % (shared_max_age,) if shared_max_age else '',
'proxy-revalidate' if proxy_revalidate else '',
]
cache_control = ', '.join(x for x in control if x)
if cache_control:
self.set_header('Cache-Control', cache_control)
if expires:
self.set_header('Expires', expires)
[docs]
def vary(self):
return ['Accept', 'Accept-Language', 'Accept-Encoding', 'Authorization']
[docs]
def get_parent_object_type(self, module):
flavor = module.flavor
if '/' not in flavor:
return module
return UDM_Module(flavor, ldap_connection=self.ldap_connection, ldap_position=self.ldap_position)
[docs]
class Resource(ResourceBase, RequestHandler):
"""Base class for all HTTP resources"""
[docs]
def options(self, *args, **kwargs):
"""Display API descriptions."""
result = self._options(*args, **kwargs)
result.update(self.get_openapi_schema(args and args[0]))
self.add_caching(public=False, must_revalidate=True)
self.content_negotiation(result)
def _options(self, *args, **kwargs):
return {}
[docs]
def get_openapi_schema(self, object_type=None):
return {}
[docs]
def options_json(self, response):
response = super().get_json(response)
response.pop('_links', None)
response.pop('_embedded', None)
return response
[docs]
def handle_udm_errors(self, action, *args, **kwargs):
try:
exists_msg = None
error = None
try:
try:
return action(*args, **kwargs)
except UDM_Error as udm_exc:
raise udm_exc.exc
except udm_errors.objectExists as exc:
exists_msg = f'dn: {exc.args[0]}'
error = exc
except udm_errors.uidAlreadyUsed as exc:
exists_msg = '(uid)'
error = exc
except udm_errors.groupNameAlreadyUsed as exc:
exists_msg = '(group)'
error = exc
except udm_errors.dhcpServerAlreadyUsed as exc:
exists_msg = '(dhcpserver)'
error = exc
except udm_errors.macAlreadyUsed as exc:
exists_msg = '(mac)'
error = exc
except udm_errors.noLock as exc:
exists_msg = '(nolock)'
error = exc
if exists_msg and error:
self.raise_sanitization_error('dn', _('Object exists: %s: %s') % (exists_msg, UDM_Error(error)))
except (udm_errors.pwQuality, udm_errors.pwToShort, udm_errors.pwalreadyused) as exc:
self.raise_sanitization_error(('properties', 'password'), str(exc))
except udm_errors.invalidOptions as exc:
self.raise_sanitization_error('options', str(exc))
except udm_errors.insufficientInformation as exc:
if exc.missing_properties:
self.raise_sanitization_errors([('properties', property_name), _('The property "%(name)s" is required.') % {'name': property_name}] for property_name in exc.missing_properties)
self.raise_sanitization_error('dn', str(exc))
except (udm_errors.invalidOperation, udm_errors.invalidChild) as exc:
self.raise_sanitization_error('dn', str(exc)) # TODO: invalidOperation and invalidChild should be 403 Forbidden
except udm_errors.alreadyUsedInSubtree as exc:
self.raise_sanitization_error('position', str(exc))
except udm_errors.invalidDhcpEntry as exc:
self.raise_sanitization_error(('properties', 'dhcpEntryZone'), str(exc))
except udm_errors.circularGroupDependency as exc:
self.raise_sanitization_error(('properties', 'memberOf'), str(exc)) # or "nestedGroup"
except (udm_errors.valueError) as exc: # valueInvalidSyntax, valueRequired, etc.
self.raise_sanitization_error(('properties', getattr(exc, 'property', 'properties')), str(exc))
except udm_errors.prohibitedUsername as exc:
self.raise_sanitization_error(('properties', 'username'), str(exc))
except udm_errors.uidNumberAlreadyUsedAsGidNumber as exc:
self.raise_sanitization_error(('properties', 'uidNumber'), str(exc))
except udm_errors.gidNumberAlreadyUsedAsUidNumber as exc:
self.raise_sanitization_error(('properties', 'gidNumber'), str(exc))
except udm_errors.mailAddressUsed as exc:
self.raise_sanitization_error(('properties', 'mailPrimaryAddress'), str(exc))
except (udm_errors.adGroupTypeChangeLocalToAny, udm_errors.adGroupTypeChangeDomainLocalToUniversal, udm_errors.adGroupTypeChangeToLocal, udm_errors.adGroupTypeChangeUniversalToGlobal, udm_errors.adGroupTypeChangeGlobalToUniversal, udm_errors.adGroupTypeChangeDomainLocalToGlobal, udm_errors.adGroupTypeChangeGlobalToDomainLocal) as exc:
self.raise_sanitization_error(('properties', 'adGroupType'), str(exc))
except udm_errors.restoreFailed as exc:
self.raise_sanitization_error('dn', str(exc))
except udm_errors.permissionDenied as exc:
raise HTTPError(403, str(exc))
except udm_errors.base as exc:
UDM_Error(exc).reraise()
[docs]
class Nothing(Resource):
"""404 error page"""
[docs]
def prepare(self, *args, **kwargs):
super().prepare(*args, **kwargs)
raise NotFound()
[docs]
class Favicon(ResourceBase, tornado.web.StaticFileHandler):
"""Get a 16*16px PNG representation of a UDM module"""
[docs]
@classmethod
def get_absolute_path(cls, root, object_type=''):
value = object_type.replace('/', '-')
if value == 'favicon':
return root
if not value.replace('-', '').replace('_', '').isalpha():
raise NotFound(object_type)
return os.path.join(root, f'udm-{value}.png')
[docs]
class Relations(RelationsBase, Resource):
"""List all used Link relations"""
[docs]
class OpenAPI(OpenAPIBase, Resource):
"""JSON-Endpoint to get the openapi.json schema definition"""
[docs]
class Modules(Resource):
mapping = {
'users': 'users/user',
'contacts': 'users/contact',
'computers': 'computers/computer',
'groups': 'groups/group',
'networks': 'networks/network',
'dhcp': 'dhcp/dhcp',
'dns': 'dns/dns',
'shares': 'shares/share',
'printers': 'shares/print',
'mail': 'mail/mail',
'nagios': 'nagios/nagios',
'policies': 'policies/policy',
'self': 'users/self',
'portal': 'portals/all',
'saml': 'saml/serviceprovider',
'appcenter': 'appcenter/app',
'kerberos': 'kerberos/kdcentry',
'settings': 'settings/settings',
'navigation': 'object',
'container': 'container',
}
[docs]
def get(self):
result = {}
self.add_link(result, 'self', self.urljoin(''), title=_('All modules'))
for main_type, name in sorted(self.mapping.items(), key=lambda x: "\x00" if x[0] == 'navigation' else x[0]):
title = _('All %s types') % (name,)
if '/' in name:
title = UDM_Module(name, ldap_connection=self.ldap_connection, ldap_position=self.ldap_position).object_name_plural
self.add_link(result, 'udm:object-modules', self.urljoin(quote(main_type)) + '/', name='all' if main_type == 'navigation' else main_type, title=title)
for name in sorted(udm_modules.modules):
_module = UDM_Module(name, ldap_connection=self.ldap_connection, ldap_position=self.ldap_position)
self.add_link(result, 'udm:object-types', self.urljoin(quote(_module.name)) + '/', name=_module.name, title=_module.title, dont_set_http_header=True)
self.add_link(result, 'udm:object/get-by-dn', self.urljoin('object') + '/{dn}', templated=True)
self.add_link(result, 'udm:object/get-by-uuid', self.urljoin('object') + '/{uuid}', templated=True)
self.add_link(result, 'udm:license', self.urljoin('license') + '/', name='license', title=_('UCS license'))
self.add_link(result, 'udm:ldap-base', self.urljoin('ldap/base') + '/', title=_('LDAP base'))
self.add_link(result, 'udm:relations', self.urljoin('relation') + '/', name='relation', title=_('All link relations'))
self.add_caching(public=True)
self.content_negotiation(result)
[docs]
def bread_crumbs_navigation(self):
return ['self']
[docs]
class Directory(Resource):
"""LDAP directory tree"""
get_template = 'directory.html'
[docs]
@sanitize
async def get(
self,
container: str | None = Query(DNSanitizer(required=False, allow_none=True)),
object_type: str | None = Query(StringSanitizer(required=False, allow_none=True)),
):
result = {}
self.add_link(result, 'self', self.urljoin(''), title=_('LDAP directory'))
self.add_link(result, 'udm:tree', self.urljoin('tree'))
self.add_link(result, 'icon', self.abspath('container/dc/favicon.ico'))
search_layout_base = [{
'label': _('Search'), 'description': '', 'layout': [['type', '']],
}]
self.add_layout(result, search_layout_base, 'search')
form = self.add_form(result, self.urljoin(''), 'GET', rel='search', id='search', name='search', layout='search')
self.add_form_element(form, 'type', '', label=_('Type'), **{'data-dynamic': self.urljoin('children-types')})
self.add_form_element(form, '', _('Search'), type='submit', **{'data-size': 'OneThird'})
# TODO: move into another class?
# TODO: if a type is selected use that search
if container:
for module, obj in list_objects(container, object_type=object_type, ldap_connection=self.ldap_connection, ldap_position=self.ldap_position):
if obj is None:
continue
if object_type != 'containers' and module.childs:
continue
if object_type == 'containers' and not module.childs:
continue
# entry = {
# 'dn': obj.dn,
# 'objectType': module.name,
# 'labelObjectType': module.subtitle,
# 'name': udm_objects.description(obj),
# 'path': ldap_dn2path(obj.dn, include_rdn=False),
# #'$flags$': [x.decode('UTF-8') for x in obj.oldattr.get('univentionObjectFlag', [])],
# #'$childs$': module.childs,
# #'$operations$': module.operations,
# }
entry = Object.get_representation(module, obj, [], self.ldap_connection)
entry['uri'] = self.abspath(obj.module, quote_dn(obj.dn))
self.add_link(entry, 'self', entry['uri'], name=entry['dn'], title=entry['id'], dont_set_http_header=True)
self.add_resource(result, 'udm:object', entry)
self.add_caching(public=True, must_revalidate=True)
self.content_negotiation(result)
[docs]
def get_html(self, response):
r = response.copy()
root = super().get_html(r)
grid = ET.Element('div', **{'class': 'grid'})
ET.SubElement(grid, 'div', **{'class': 'grid-header'})
root.append(grid)
table = ET.SubElement(grid, 'table')
table_header = ET.SubElement(table, 'tr')
th1 = ET.SubElement(table_header, 'th', **{'class': 'check'}) # TODO: all-checkbox
ET.SubElement(table_header, 'th').text = _('Name') # TODO: sort-direction switch
ET.SubElement(table_header, 'th').text = _('Type')
ET.SubElement(table_header, 'th').text = _('Path')
ET.SubElement(th1, 'input', type='checkbox', name='dn')
for thing in self.get_resources(response, 'udm:object'):
row = ET.SubElement(table, 'tr')
x = thing.copy()
td1 = ET.SubElement(row, 'td', **{'class': 'check'})
td2 = ET.SubElement(row, 'td')
td4 = ET.SubElement(row, 'td')
td3 = ET.SubElement(row, 'td')
ET.SubElement(td1, 'input', type='checkbox', name='dn', value=x['dn'])
ET.SubElement(td2, 'img', src=self.urljoin('..', thing['objectType'], 'favicon.ico'), **{'class': 'grid-icon'})
a = ET.SubElement(td2, "a", href=x.pop('uri'), rel="udm:object item")
a.text = x['id']
td3.text = ldap_dn2path(thing['position'])
td4.text = UDM_Module(thing['objectType'], ldap_connection=self.ldap_connection, ldap_position=self.ldap_position).subtitle
if self.debug_mode_enabled:
pre = ET.SubElement(row, "script", dn=x['dn'], type='application/json')
pre.text = json.dumps(x, indent=4)
return root
[docs]
class ObjectTypes(Resource):
"""Get the object types of a specific flavor"""
[docs]
@sanitize
async def get(
self,
module_type,
superordinate: str | None = Query(DNSanitizer(required=False, allow_none=True)),
):
object_type = Modules.mapping.get(module_type)
if not object_type:
raise NotFound(object_type)
title = _('All object types')
module = None
if '/' in object_type:
# FIXME: what was/is the superordinate for?
module = UDM_Module(object_type, ldap_connection=self.ldap_connection, ldap_position=self.ldap_position)
if superordinate:
module = get_module(object_type, superordinate, self.ldap_connection) or module # FIXME: the object_type param is wrong?!
title = module.object_name_plural
result = {}
self.add_link(result, 'up', self.urljoin('../'), title=_('All modules'))
self.add_link(result, 'self', self.urljoin(''), name=module_type, title=title)
if module_type == 'navigation':
self.add_link(result, 'udm:tree', self.abspath('container/dc/tree'))
elif module and module.has_tree:
self.add_link(result, 'udm:tree', self.urljoin('../', object_type, 'tree'))
if module and (module.help_link or module.help_text):
self.add_link(result, 'help', module.help_link or '', title=module.help_text or module.help_link)
if module_type == 'navigation':
modules = udm_modules.modules.keys()
elif module_type == 'container':
modules = container_modules()
else:
modules = [x['id'] for x in module.child_modules]
for name in sorted(modules):
_module = UDM_Module(name, ldap_connection=self.ldap_connection, ldap_position=self.ldap_position)
self.add_link(result, 'udm:object-types', self.urljoin('../%s' % quote(_module.name)) + '/', name=_module.name, title=_module.title)
continue
# TODO: get rid of entries. all of it can be put into the link!?
result.setdefault('entries', []).append({
'id': _module.name,
'label': _module.title,
'object_name': _module.object_name,
'object_name_plural': _module.object_name_plural,
# 'help_link': _module.help_link,
# 'help_text': _module.help_text,
'columns': _module.columns, # FIXME: move to Objects?
# 'has_tree': _module.has_tree,
})
self.add_caching(public=True, must_revalidate=True)
self.content_negotiation(result)
[docs]
class SubObjectTypes(Resource):
"""A list of possible sub-object-types which can be created underneath of the specified container or superordinate."""
get_template = 'children_types.html'
[docs]
def template_vars(self):
data = super().template_vars()
data['selected'] = self.get_query_argument('selected', '')
data['required'] = self.get_query_argument('required', '')
data['name'] = self.get_query_argument('name', '')
return data
[docs]
def get(self, object_type=None, position=None):
"""
Returns the list of object types matching the given flavor or container.
requests.options = {}
'superordinate' -- if available only types for the given superordinate are returned (not for the navigation)
'container' -- if available only types suitable for the given container are returned (only for the navigation)
"""
if not position:
# no container is specified, return all existing object types
return self.module_definition(udm_modules.modules.keys())
position = unquote_dn(position)
# create a list of modules that can be created
# ... all container types except container/dc
allowed_modules = {m for m in udm_modules.containers if udm_modules.name(m) != 'container/dc'}
# the container may be a superordinate or have one as its parent
# (or grandparent, ....)
superordinate = udm_modules.find_superordinate(position, None, self.ldap_connection)
if superordinate:
# there is a superordinate... add its subtypes to the list of allowed modules
allowed_modules.update(udm_modules.subordinates(superordinate))
else:
# add all types that do not have a superordinate
allowed_modules.update(mod for mod in udm_modules.modules.values() if not udm_modules.superordinates(mod))
# make sure that the object type can be created
allowed_modules = [mod for mod in allowed_modules if udm_modules.supports(mod, 'add')]
return self.module_definition(allowed_modules)
[docs]
def module_definition(self, modules):
mods = [UDM_Module(name, ldap_connection=self.ldap_connection, ldap_position=self.ldap_position) for name in modules]
result = {'entries': [{'id': '', 'label': _('All types')}, {'id': 'containers', 'label': _('All containers')}, *sorted([{'id': mod.name, 'label': mod.title} for mod in mods], key=lambda x: x['label'].lower())]}
# TODO: just add embedded resource?
# 'object_name': mod.object_name,
# 'object_name_plural': mod.object_name_plural,
# 'help_link': mod.help_link,
# 'help_text': mod.help_text,
# 'columns': mod.columns,
# 'has_tree': mod.has_tree,
# self.add_link(result, 'udm:object-types', self.abspath(mod.name) + '/', name=mod.name, title=mod.title)
self.add_link(result, 'self', self.urljoin(''), title=_('Types'))
self.add_caching(public=True, must_revalidate=True)
self.content_negotiation(result)
[docs]
class LdapBase(Resource):
"""Redirect to the ldap base object"""
[docs]
def get(self):
result = {}
url = self.abspath('container/dc', quote_dn(ucr['ldap/base']))
self.add_link(result, 'self', url, title=_('LDAP base'))
self.set_header('Location', url)
self.set_status(301)
self.add_caching(public=True, must_revalidate=True)
self.content_negotiation(result)
[docs]
class LdapAttributeUnmap(Resource):
"""
Transform Base64 encoded LDAP attributes to a UDM object that fits.
Object does not have to exist.
"""
[docs]
@sanitize
async def post(
self,
representation: dict = JSONPayload(
dn=DNSanitizer(enforce_correct_ldap_base=False, required=True),
attributes=DictSanitizer({}, required=True, default_sanitizer=ListSanitizer(Base64EncodingSanitizer())),
),
):
dn = representation['dn']
attributes = representation['attributes']
modules = udm_modules.identify(dn, attributes)
obj = None
error = ''
for module in modules:
try:
obj = module.object(None, self.ldap_connection, None, dn, None, attributes)
break
except udm_errors.wrongObjectType as exc:
# e.g. univentionObjectType and objectClass does not match
error = str(exc)
log.warning('Could not initialize object', dn=dn, error=error, type=module.module)
if not obj:
self.raise_sanitization_error('dn', _('The object could not be identified: %s') % (error or _('No module found.'),))
objmodule = UDM_Module(obj.module, ldap_connection=self.ldap_connection, ldap_position=self.ldap_position)
result = Object.get_representation(objmodule, obj, ['*'], self.ldap_connection, opened=True)
self.content_negotiation(result)
[docs]
class ObjectLink(Resource):
"""If the object-type is not known but only the DN, this resource redirects to the correct object."""
[docs]
def get(self, dn):
dn = unquote_dn(dn)
attrs = self.ldap_connection.authz_connection.get(dn) # FIXME: information disclosure!
modules = udm_modules.objectType(None, self.ldap_connection, dn, attrs) or []
if not modules:
raise NotFound(None, dn)
for module in modules:
module = UDM_Module(module, ldap_connection=self.ldap_connection, ldap_position=self.ldap_position)
if module.module:
break
else:
raise NotFound(None, dn)
result = {}
url = self.abspath(module.name, quote_dn(dn))
self.add_link(result, 'self', url, title=_('Get UDM object by DN or UUID'))
self.set_header('Location', url)
self.set_status(301)
self.add_caching(public=True, must_revalidate=True)
self.content_negotiation(result)
[docs]
class ObjectByUiid(ObjectLink):
"""Redirect to the object specified by its LDAP Entry-UUID"""
[docs]
def get(self, uuid):
try:
dn = self.ldap_connection.authz_connection.searchDn(filter_format('entryUUID=%s', [uuid]))[0] # FIXME: information disclosure!
except IndexError:
raise NotFound()
return super().get(dn)
[docs]
class ContainerQueryBase(Resource):
"""Base class for queries to container structures, needed to build a tree representation"""
get_template = 'tree.html'
[docs]
def template_vars(self):
data = super().template_vars()
data['level'] = int(self.get_query_argument('level', 0))
return data
async def _container_query(self, object_type, container, modules, scope):
"""Get a list of containers or child objects of the specified container."""
if not container:
container = ucr['ldap/base']
defaults = {}
# if object_type != 'directory':
# defaults['$operations$'] = ['search'] # disallow edit
if object_type in ('dns/dns', 'dhcp/dhcp'):
defaults.update({
'label': UDM_Module(object_type, ldap_connection=self.ldap_connection, ldap_position=self.ldap_position).title,
'icon': 'udm-%s' % (object_type.replace('/', '-'),),
})
self.add_link({}, 'next', self.urljoin('?container=%s&level=1' % (quote(container))))
return [dict({
'id': container,
'html_id': self.sanitize_html_id(container),
'label': ldap_dn2path(container),
'icon': 'udm-container-dc',
'path': ldap_dn2path(container),
'objectType': 'container/dc',
# '$operations$': UDM_Module('container/dc', ldap_connection=self.ldap_connection, ldap_position=self.ldap_position).operations,
# '$flags$': [],
'has-childs': True,
# '$isSuperordinate$': False,
'root': True,
}, **defaults)]
result = []
for xmodule in modules:
xmodule = UDM_Module(xmodule, ldap_connection=self.ldap_connection, ldap_position=self.ldap_position)
superordinate = univention.admin.objects.get_superordinate(xmodule.module, None, self.ldap_connection, container) # TODO: should also better be in a thread
try:
ucr['directory/manager/web/sizelimit'] = ucr.get('ldap/sizelimit', '400000')
items = await self.pool_submit(xmodule.search, container, scope=scope, superordinate=superordinate)
for item in items:
module = UDM_Module(item.module, ldap_connection=self.ldap_connection, ldap_position=self.ldap_position)
result.append({
'id': item.dn,
'html_id': self.sanitize_html_id(item.dn),
'label': module.obj_description(item),
'icon': 'udm-%s' % (module.name.replace('/', '-')),
'path': ldap_dn2path(item.dn),
'objectType': module.name,
# '$operations$': module.operations,
# '$flags$': [x.decode('UTF-8') for x in item.oldattr.get('univentionObjectFlag', [])],
'has-childs': module.childs,
'root': False,
# '$isSuperordinate$': udm_modules.isSuperordinate(module.module),
})
except UDM_Error as exc:
raise HTTPError(400, None, str(exc))
return sorted(result, key=lambda x: x['label'].lower())
[docs]
class Tree(ContainerQueryBase):
"""Get a LDAP directory tree representation of a UDM module supporting tree-views (e.g. directory/DNS/DHCP) (GET udm/(dns/dns|dhcp/dhcp|directory)/tree/)"""
[docs]
@sanitize
async def get(
self,
object_type,
container: str = Query(DNSanitizer(default=None)),
level: str = Query(IntegerSanitizer(default=0)),
):
# TODO: add appropriate 404 errors
ldap_base = ucr['ldap/base']
modules = container_modules()
scope = 'one'
if not container:
# get the tree root == the ldap base
scope = 'base'
elif object_type not in ('navigation', 'directory') and container and ldap_base.lower() == container.lower():
# this is the tree root of DNS / DHCP, show all zones / services
scope = 'sub'
modules = [object_type]
result = {}
containers = await self._container_query(object_type, container, modules, scope)
for _container in containers:
self.add_link(_container, 'item', href=self.urljoin('./tree?container=%s&level=%s' % (quote(_container['id']), quote(str(level + 1)))), title=_container['label']) # should be "self" and with no header added
# self.add_link(result, 'item', href=self.urljoin('./tree?container=%s' % (quote(_container['id']),)), title=_container['label'], path=_container['path'])
self.add_resource(result, 'udm:tree', _container)
self.add_link(result, 'self', self.urljoin(''), title='Tree')
if container:
parent_dn = self.ldap_connection.parentDn(container)
if parent_dn:
self.add_link(result, 'up', self.urljoin('tree?container=%s' % (quote(parent_dn),)), title='Parent container')
if object_type not in ('navigation', 'directory'):
module = self.get_module(object_type)
self.add_link(result, 'icon', self.urljoin('../../favicon.ico'), type='image/x-icon')
self.add_link(result, 'udm:object-modules', self.urljoin('../../'), title=_('All modules'))
self.add_link(result, 'udm:object-module', self.urljoin('../'), title=self.get_parent_object_type(module).object_name_plural)
self.add_link(result, 'type', self.urljoin('.'), title=module.object_name)
else:
self.add_link(result, 'icon', self.urljoin('../favicon.ico'), type='image/x-icon')
self.add_link(result, 'udm:object-modules', self.urljoin('../'), title=_('All modules'))
self.add_caching(public=False, must_revalidate=True)
self.content_negotiation(result)
[docs]
class MoveDestinations(ContainerQueryBase):
"""Get the locations where a specified UDM module can be moved to"""
[docs]
@sanitize
async def get(
self,
object_type,
container: str = Query(DNSanitizer(default=None)),
):
# TODO: add appropriate 404 errors
scope = 'one'
modules = container_modules()
if not container:
scope = 'base'
result = {}
containers = await self._container_query(object_type or 'navigation', container, modules, scope)
self.add_caching(public=False, must_revalidate=True)
for _container in containers:
self.add_link(_container, 'item', href=self.urljoin('?container=%s' % (quote(_container['id']),)), title=_container['label'])
self.add_resource(result, 'udm:tree', _container)
self.add_link(result, 'self', self.urljoin(''), title='Move destinations')
if container:
parent_dn = self.ldap_connection.parentDn(container)
if parent_dn:
self.add_link(result, 'up', self.urljoin('?container=%s' % (quote(parent_dn),)), title='Parent container')
if object_type and object_type != 'directory':
module = self.get_module(object_type)
self.add_link(result, 'icon', self.urljoin('favicon.ico'), type='image/x-icon')
self.add_link(result, 'udm:object-modules', self.urljoin('../../'), title=_('All modules'))
self.add_link(result, 'udm:object-module', self.urljoin('../'), title=self.get_parent_object_type(module).object_name_plural)
self.add_link(result, 'type', self.urljoin('./'), title=module.object_name)
self.content_negotiation(result)
[docs]
class Properties(Resource):
"""Get the properties of a UDM module or UDM object (GET udm/users/user/properties)"""
[docs]
@sanitize
async def get(
self,
object_type,
dn=None,
searchable: bool = Query(BoolSanitizer(required=False)),
):
result = {}
if dn:
dn = unquote_dn(dn)
module = self.get_module(object_type)
module.load(force_reload=True) # reload for instant extended attributes
self.add_link(result, 'self', self.urljoin(''), title=_('Properties for %s') % (module.object_name,))
if dn:
self.add_link(result, 'icon', self.urljoin('../favicon.ico'), type='image/x-icon')
self.add_link(result, 'udm:object-modules', self.urljoin('../../../'), title=_('All modules'))
self.add_link(result, 'udm:object-module', self.urljoin('../../'), title=self.get_parent_object_type(module).object_name_plural)
self.add_link(result, 'type', self.urljoin('../'), title=module.object_name)
self.add_link(result, 'up', self.urljoin('..', quote_dn(dn)), title=dn)
else:
self.add_link(result, 'icon', self.urljoin('./favicon.ico'), type='image/x-icon')
self.add_link(result, 'udm:object-modules', self.urljoin('../../'), title=_('All modules'))
self.add_link(result, 'udm:object-module', self.urljoin('../'), title=self.get_parent_object_type(module).object_name_plural)
# self.add_link(result, 'type', self.urljoin('.'), title=module.object_name)
self.add_link(result, 'up', self.urljoin('.'), title=module.object_name)
properties = self.get_properties(module, dn)
if searchable:
properties = {name: prop for name, prop in properties.items() if prop.get('searchable', False)}
result['properties'] = properties
for propname, prop in properties.items():
if prop.get('dynamicValues') or prop.get('staticValues') or prop.get('type') == 'umc/modules/udm/MultiObjectSelect':
self.add_link(result, 'udm:property-choices', self.urljoin('properties', propname, 'choices'), name=propname, title=_('Get choices for property %s') % (propname,))
self.add_caching(public=True, must_revalidate=True)
self.content_negotiation(result)
[docs]
@classmethod
def get_properties(cls, module, dn=None):
properties = module.get_properties(dn)
for policy in module.policies:
properties.append({
'id': 'policies[%s]' % (policy['objectType'],),
'label': policy['label'],
'description': policy['description'],
})
for prop in properties[:]:
if prop['id'] == '$options$':
for option in prop['widgets']:
option['id'] = 'options[%s]' % (option['id'],)
properties.append(option)
for prop in properties:
prop.setdefault('label', '')
prop.setdefault('description', '')
prop.setdefault('readonly', False)
prop.setdefault('readonly_when_synced', False)
prop.setdefault('disabled', False)
prop.setdefault('required', False)
prop.setdefault('syntax', '')
prop.setdefault('identifies', False)
prop.setdefault('searchable', False)
prop.setdefault('multivalue', False)
prop.setdefault('show_in_lists', True)
return {prop['id']: prop for prop in properties if not prop['id'].startswith('$')}
[docs]
class Layout(Resource):
"""Get the layout representation of a UDM module"""
[docs]
def get(self, object_type, dn=None):
result = {}
if dn:
dn = unquote_dn(dn)
module = self.get_module(object_type)
module.load(force_reload=True) # reload for instant extended attributes
self.add_link(result, 'self', self.urljoin(''), title=_('Layout for %s') % (module.object_name,))
if dn:
self.add_link(result, 'icon', self.urljoin('../favicon.ico'), type='image/x-icon')
self.add_link(result, 'udm:object-modules', self.urljoin('../../../'), title=_('All modules'))
self.add_link(result, 'udm:object-module', self.urljoin('../../'), title=self.get_parent_object_type(module).object_name_plural)
self.add_link(result, 'type', self.urljoin('../'), title=module.object_name)
self.add_link(result, 'up', self.urljoin('..', quote_dn(dn)), title=dn)
else:
self.add_link(result, 'icon', self.urljoin('./favicon.ico'), type='image/x-icon')
self.add_link(result, 'udm:object-modules', self.urljoin('../../'), title=_('All modules'))
self.add_link(result, 'udm:object-module', self.urljoin('../'), title=self.get_parent_object_type(module).object_name_plural)
# self.add_link(result, 'type', self.urljoin('.'), title=module.object_name)
self.add_link(result, 'up', self.urljoin('.'), title=module.object_name)
result['layout'] = self.get_layout(module)
self.add_caching(public=True, must_revalidate=True)
self.content_negotiation(result)
[docs]
@classmethod
def get_layout(cls, module, dn=None):
layout = module.get_layout(dn)
# TODO: insert module.help_text into first layout element
layout.insert(0, {'layout': [], 'advanced': False, 'description': _('Meta information'), 'label': _('Meta information'), 'is_app_tab': False})
apps = cls.get_apps_layout(layout)
if apps:
apps['layout'].append('options')
else:
layout.append({'layout': ['options'], 'advanced': False, 'description': _('Here you can activate the user for one of the installed apps. The user can then log on to the app and use it.'), 'label': _('Apps & Options'), 'is_app_tab': False})
advanced = {'layout': [], 'advanced': False, 'description': _('Advanced settings'), 'label': _('Advanced settings')}
for x in layout[:]:
if x['advanced']:
advanced['layout'].append(x)
layout.remove(x)
if advanced['layout']:
layout.append(advanced)
layout.append({
'layout': ['policies[%s]' % (policy['objectType'],) for policy in module.policies],
'advanced': False,
'description': _('Properties inherited from policies'),
'label': _('Policies'),
'is_app_tab': False,
'help': _('List of all object properties that are inherited by policies. The values cannot be edited directly. By clicking on "Create new policy", a new tab with a new policy will be opened. If an attribute is already set, the corresponding policy can be edited in a new tab by clicking on the "edit" link.'),
})
return layout
[docs]
@classmethod
def get_apps_layout(cls, layout):
for x in layout:
if x.get('label') == 'Apps':
return x
[docs]
@classmethod
def get_reference_layout(cls, layout):
for x in layout:
if x.get('label', '').lower().startswith('referen'):
return x
[docs]
class ReportingBase:
"""Base class for UDM object report operations"""
[docs]
def initialize(self):
self.reports_cfg = udr.Config()
[docs]
class Report(ReportingBase, Resource):
"""Create a object report e.g. in CSV of PDF format (GET udm/users/user/report/$report_type?dn=...&dn=...)"""
# i18n: translation for univention-directory-reports
_('PDF Document')
_('CSV Report')
[docs]
async def get(self, object_type, report_type):
dns = self.get_query_arguments('dn')
await self.create_report(object_type, report_type, dns)
[docs]
@sanitize
async def post(
self,
object_type,
report_type,
dn: list[str] = Query(ListSanitizer(DNSanitizer())),
):
await self.create_report(object_type, report_type, dn)
[docs]
async def create_report(self, object_type, report_type, dns):
ldap_connection = self.ldap_connection
if not await self.pool_submit(ldap_connection.authz.is_report_create_allowed, ldap_connection, object_type, report_type, raise_exception=False):
raise HTTPError(403, None, report_type)
try:
assert report_type in self.reports_cfg.get_report_names(object_type)
except (KeyError, AssertionError):
raise NotFound(report_type)
report = udr.Report(ldap_connection)
try:
report_file = await self.pool_submit(report.create, object_type, report_type, dns or [])
except udr.ReportError as exc:
raise HTTPError(400, None, str(exc))
with open(report_file, 'rb') as fd: # noqa: ASYNC221, ASYNC230
self.set_header('Content-Type', 'text/csv' if report_file.endswith('.csv') else 'application/pdf')
self.set_header('Content-Disposition', 'attachment; filename="%s"' % (os.path.basename(report_file).replace('\\', '\\\\').replace('"', '\\"')))
self.finish(fd.read())
os.remove(report_file)
[docs]
class NextFreeIpAddress(Resource):
"""Get the next free IP of the specified network (GET udm/networks/network/$DN/next-free-ip-address)"""
[docs]
def get(self, object_type, dn): # TODO: threaded?! (might have caused something in the past in system setup?!)
"""
Returns the next IP configuration based on the given network object
'increaseCounter' -- if given and set to True, network object counter for IP addresses is increased
"""
dn = unquote_dn(dn)
obj = self.get_object(object_type, dn)
try:
obj.refreshNextIp()
except udm_errors.nextFreeIp:
raise NoIpLeft(dn)
result = {
'ip': obj['nextIp'],
'dnsEntryZoneForward': obj['dnsEntryZoneForward'],
'dhcpEntryZone': obj['dhcpEntryZone'],
'dnsEntryZoneReverse': obj['dnsEntryZoneReverse'],
}
self.add_caching(public=False, must_revalidate=True)
self.content_negotiation(result)
if self.get_query_argument('increaseCounter', False):
# increase the next free IP address
obj.stepIp()
obj.modify()
[docs]
class Objects(ConditionalResource, FormBase, ReportingBase, _OpenAPIBase, Resource):
"""Search for objects"""
get_template = 'search_form.html'
[docs]
@sanitize
async def get(
self,
object_type: str,
position: str = Query(DNSanitizer(required=False, default=None, allow_none=True), description="Position which is used as search base."),
ldap_filter: str = Query(
LDAPFilterSanitizer(required=False, default="", allow_none=True),
alias='filter',
description="A LDAP filter which may contain `UDM` property names instead of `LDAP` attribute names.",
examples={
"any-object": {
"value": "(objectClass=*)",
},
"admin-user": {
"value": "(|(username=Administrator)(username=Admin*))",
},
},
),
query: dict = Query(
DictSanitizer({}, default_sanitizer=LDAPSearchSanitizer(required=False, default='*', add_asterisks=False, use_asterisks=True), key_sanitizer=ObjectPropertySanitizer()),
description="The values to search for (propertyname and search filter value). Alternatively with `filter` a raw LDAP filter can be given.",
style="deepObject",
examples={
'nothing': {
'value': None,
},
'property': {
'value': {'': '*'},
},
},
),
property: str = Query(
ObjectPropertySanitizer(required=False, default=None),
description="The property to search for if not specified via `query`",
),
scope: str = Query(ChoicesSanitizer(choices=['sub', 'one', 'base', 'base+one'], default='sub'), description="The LDAP search scope (sub, base, one)."),
hidden: bool = Query(BoolSanitizer(default=True), description="Include hidden/system objects in the response.", example=True),
opened: bool = Query(BoolSanitizer(default=True), description="Wheather to open the object in case certain properties are requested.", example=True),
properties: list[str] = Query(
ListSanitizer(StringSanitizer(), required=False, default=['*'], allow_none=True, min_elements=0),
style="form",
explode=True,
description="The properties which should be returned, if not given all properties are returned.",
examples={
'no restrictions': {'value': None},
'only small subset': {'value': ['username', 'firstname', 'lastname']},
},
),
superordinate: str | None = Query(
DNSanitizer(required=False, default=None, allow_none=True),
description="The superordinate DN of the objects to find. `position` is sufficient.",
# example=f"cn=superordinate,{ldap_base}"
),
dir: str = Query(
ChoicesSanitizer(choices=['ASC', 'DESC'], default='ASC'),
deprecated=True,
description="**Broken/Experimental**: The Sort direction (ASC or DESC).",
),
by: str = Query(
StringSanitizer(required=False),
deprecated=True,
description="**Broken/Experimental**: Sort the search result by the specified property.",
# example="username",
),
page: int = Query(
IntegerSanitizer(required=False, default=1, minimum=1),
deprecated=True,
description="**Broken/Experimental**: The search page, starting at one.",
example=1,
),
limit: int = Query(
IntegerSanitizer(required=False, default=None, allow_none=True, minimum=0),
deprecated=True,
description="**Broken/Experimental**: How many results should be shown per page.",
examples={
"no limit": {"value": "", "summary": "get all entries"},
"limit to 50": {"value": 50, "summary": "limit to 50 entries"},
},
),
):
"""Search for {module.object_name_plural} objects"""
module = self.get_module(object_type)
result = self._options(object_type)
search = bool(self.request.query)
properties = properties[:]
direction = dir
property_ = property
reverse = direction == 'DESC'
items_per_page = limit
if not ldap_filter:
filters = filter(None, [(module._object_property_filter(attribute or property_ or None, value, hidden)) for attribute, value in query.items()])
if filters:
ldap_filter = str(univention.admin.filter.conjunction('&', [univention.admin.filter.parse(fil) for fil in filters]))
# TODO: replace the superordinate concept with container
superordinate = self.superordinate_dn_to_object(module, superordinate)
if superordinate:
position = position or superordinate.dn
objects = []
if search: # TODO: check if searching is allowed
try:
objects, last_page = await self.search(module, position, ldap_filter, superordinate, scope, hidden, items_per_page, page, by, reverse, opened)
except ObjectDoesNotExist as exc:
self.raise_sanitization_error('position', str(exc), type='query')
except SuperordinateDoesNotExist as exc:
self.raise_sanitization_error('superordinate', str(exc), type='query')
if opened and properties == ['dn']: # backwards compatibility with older clients
opened = False
props = [_prop for _prop in properties if _prop not in ('dn')] if not opened else properties
if not opened:
properties = ['dn']
for obj in objects or []:
if obj is None:
continue
objmodule = UDM_Module(obj.module, ldap_connection=self.ldap_connection, ldap_position=self.ldap_position)
entry = Object.get_representation(objmodule, obj, properties, self.ldap_connection, opened=opened)
uri = entry['uri'] = self.abspath(obj.module, quote_dn(obj.dn))
if not opened and props and props != ['*']:
uri = uri + f"?{urlencode({'properties': props}, True)}"
self.add_link(entry, 'self', uri, name=entry['dn'], title=entry['id'], dont_set_http_header=True)
self.add_resource(result, 'udm:object', entry)
if items_per_page:
self.add_link(result, 'first', self.urljoin('', page='1'), title=_('First page'))
if page > 1:
self.add_link(result, 'prev', self.urljoin('', page=str(page - 1)), title=_('Previous page'))
if not last_page:
self.add_link(result, 'next', self.urljoin('', page=str(page + 1)), title=_('Next page'))
else:
self.add_link(result, 'last', self.urljoin('', page=str(last_page)), title=_('Last page'))
if search:
for i, report_type in enumerate(sorted(self.reports_cfg.get_report_names(object_type)), 1):
self.add_link(result, 'udm:report', self.urljoin('report', quote(report_type)) + '{?dn}', name=report_type, title=_('Create %s report') % _(report_type), method='POST', templated=True)
self.add_button(result, self.urljoin('report', quote(report_type)), 'POST', rel='udm:report', name=report_type, id='report%d' % (i,), label=_('Create %s report') % _(report_type), **{'hx-boost': 'false', 'hx-vals': 'js:{dn: getAllSelectedDNs()}', 'hx-disable': 'true'})
self.add_button(result, self.urljoin('multi-edit'), 'POST', name='multi-edit', id='multi-edit', rel='edit-form', label=_('Modify (multi edit)'), title=_('Modify %s (multi edit)') % (module.object_name_plural,))
self.add_button(result, self.urljoin('remove'), 'POST', name='remove', id='remove', rel='udm:object/remove', label=_('Delete'), title=_('Delete %s') % (module.object_name_plural,))
self.add_button(result, self.urljoin('move'), 'POST', name='move', id='move', rel='udm:object/move', label=_('Move to'), title=_('Move %s') % (module.object_name_plural,))
else:
for i, report_type in enumerate(sorted(self.reports_cfg.get_report_names(object_type)), 1):
self.add_link(result, 'udm:report', self.urljoin('report', quote(report_type)) + '{?dn}', name=report_type, title=_('Create %s report') % _(report_type), method='POST', templated=True)
search_layout = []
search_layout_base = [
{'label': _('Search for %s') % (module.object_name_plural,), 'description': f'{module.object_name_plural} ({module.description})', 'layout': [
['property', 'query*', ''],
{'layout': search_layout, 'description': '', 'label': _('Advanced options'), 'opened': False},
]},
]
self.add_layout(result, search_layout_base, 'search')
form = self.add_form(result, self.urljoin(''), 'GET', rel='search', id='search', name='search', layout='search')
self.add_form_element(form, 'position', position or '', label=_('Search in'), **{'data-dynamic': self.urljoin('default-containers')})
search_layout.append(['position', 'hidden'])
if superordinate_names(module):
self.add_form_element(form, 'superordinate', superordinate.dn if superordinate else '', label=_('Superordinate'))
search_layout.append(['superordinate'])
searchable_properties = [{'value': '', 'label': _('Defaults')}] + [{'value': prop['id'], 'label': prop['label']} for prop in module.properties(None) if prop.get('searchable')]
self.add_form_element(form, 'property', property_ or '', element='select', options=searchable_properties, label=_('Property'), **{'data-size': 'One'})
self.add_form_element(form, 'query*', query.get('', '*'), label=_('Search for'), placeholder=_('Search value (e.g. *)'), **{'data-size': 'TwoThird'})
self.add_form_element(form, 'scope', scope, element='select', options=[{'value': 'sub'}, {'value': 'one'}, {'value': 'base'}, {'value': 'base+one'}], label=_('Search scope'))
self.add_form_element(form, 'hidden', '1', type='checkbox', checked=bool(hidden), label=_('Include hidden objects'))
# self.add_form_element(form, 'fields', list(fields))
if module.supports_pagination:
self.add_form_element(form, 'limit', str(items_per_page or '0'), type='number', label=_('Limit'), **{'data-size': 'OneThird'})
self.add_form_element(form, 'page', str(page or '1'), type='number', label=_('Selected page'), **{'data-size': 'OneThird'})
self.add_form_element(form, 'by', by or '', element='select', options=searchable_properties, label=_('Sort by'), **{'data-size': 'OneThird'})
self.add_form_element(form, 'dir', direction if direction in ('ASC', 'DESC') else 'ASC', element='select', options=[{'value': 'ASC', 'label': _('Ascending')}, {'value': 'DESC', 'label': _('Descending')}], label=_('Direction'), **{'data-size': 'OneThird'})
search_layout.append(['page', 'limit', 'by', 'dir'])
self.add_form_element(form, '', _('Search'), type='submit', **{'data-size': 'OneThird'})
if search:
result['results'] = len(self.get_resources(result, 'udm:object'))
else:
self.add_link(result, 'udm:layout', self.urljoin('layout'), title=_('Module layout'))
self.add_link(result, 'udm:properties', self.urljoin('properties'), title=_('Module properties'))
for policy_module in module.policies:
policy_module = policy_module['objectType']
self.add_link(result, 'udm:policy-result', self.urljoin(f'{policy_module}/{{?policy,position}}'), name=policy_module, title=_('Evaluate referenced %s policies') % (policy_module,), templated=True)
self.add_caching(public=False, no_cache=True, no_store=True, max_age=1, must_revalidate=True)
self.content_negotiation(result)
[docs]
async def search(self, module, container, ldap_filter, superordinate, scope, hidden, items_per_page, page, by, reverse, opened):
ctrls = {}
serverctrls = []
hashed = (self.request.user_dn, module.name, container or None, ldap_filter or None, superordinate or None, scope or None, hidden or None, items_per_page or None, by or None, reverse or None)
session = shared_memory.search_sessions.get(hashed, {})
last_cookie = session.get('last_cookie', '')
current_page = session.get('page', 0)
page_ctrl = SimplePagedResultsControl(True, size=items_per_page, cookie=last_cookie) # TODO: replace with VirtualListViewRequest
if module.supports_pagination:
if items_per_page:
serverctrls.append(page_ctrl)
if by in ('uid', 'uidNumber', 'cn'):
rule = ':caseIgnoreOrderingMatch' if by not in ('uidNumber',) else ''
serverctrls.append(SSSRequestControl(ordering_rules=['%s%s%s' % ('-' if reverse else '', by, rule)]))
objects = []
# TODO: we have to store the results of the previous pages (or make them cacheable)
# FIXME: we have to store the session across all processes
ucr['directory/manager/web/sizelimit'] = ucr.get('ldap/sizelimit', '400000')
last_page = page
for _i in range(current_page, page or 1):
# TODO: if we want to improve performance one day for `opened == False` pass `simple=True`
objects = await self.pool_submit(module.search, container, superordinate=superordinate, filter=ldap_filter, scope=scope, hidden=hidden, serverctrls=serverctrls, response=ctrls)
for control in ctrls.get('ctrls', []):
if control.controlType == SimplePagedResultsControl.controlType:
page_ctrl.cookie = control.cookie
if not page_ctrl.cookie:
shared_memory.search_sessions.pop(hashed, None)
break
else:
shared_memory.search_sessions[hashed] = {'last_cookie': page_ctrl.cookie, 'page': page}
last_page = 0
# TODO: move into module.search(opened=True) and then into object.lookup()! because that does error handling.
if opened and objects:
for obj in objects:
obj.open()
return (objects, last_page)
[docs]
def get_html(self, response):
if self.request.method not in ('GET', 'HEAD'):
return super().get_html(response)
r = response.copy()
r.pop('entries', None)
num_of_entries = r.pop('results', 0)
root = super().get_html(r)
grid = ET.Element('div', **{'class': 'grid'})
childs = [child for child in root if child.attrib.get('id') != 'search']
root = next(child for child in root if child.attrib.get('id') == 'search')
grid_header = ET.SubElement(grid, 'div', **{'class': 'grid-header'})
grid_header.extend(childs)
root.append(grid)
has_four_rows = self.request.decoded_query_arguments.get('property')
if not isinstance(has_four_rows, str):
has_four_rows = None
table = ET.SubElement(grid, 'table')
table_header = ET.SubElement(table, 'tr')
th1 = ET.SubElement(table_header, 'th', **{'class': 'check'}) # TODO: all-checkbox
ET.SubElement(table_header, 'th').text = 'Name' # TODO: sort-direction switch
if has_four_rows:
th4 = ET.SubElement(table_header, 'th')
th4.text = has_four_rows
ET.SubElement(table_header, 'th').text = 'Path'
ET.SubElement(grid_header, 'span').text = f'{{num}} users of {num_of_entries} selected.'
# There is a bug in chrome, so we cannot have form='report1 report2'. so, only 1 report is possible :-/
ET.SubElement(th1, 'input', type='checkbox', name='dn', form=' '.join([report['id'] for report in self.get_resources(response, 'udm:form') if report['rel'] == 'udm:report'][-1:]))
for thing in self.get_resources(response, 'udm:object'):
row = ET.SubElement(table, 'tr')
x = thing.copy()
td1 = ET.SubElement(row, 'td', **{'class': 'check'})
td2 = ET.SubElement(row, 'td')
if has_four_rows:
td4 = ET.SubElement(row, 'td')
th4.text = UDM_Module(thing['objectType'], ldap_connection=self.ldap_connection, ldap_position=self.ldap_position).get_property(has_four_rows).short_description
td3 = ET.SubElement(row, 'td')
# There is a bug in chrome, so we cannot have form='report1 report2'. so, only 1 report is possible :-/
ET.SubElement(td1, 'input', type='checkbox', name='dn', value=x['dn']) # form=' '.join([report['id'] for report in self.get_resources(response, 'udm:form') if report['rel'] == 'udm:report'][-1:]))
ET.SubElement(td2, 'img', src=self.urljoin('favicon.ico'), **{'class': 'grid-icon'})
a = ET.SubElement(td2, "a", href=x.pop('uri') + '/edit', rel="udm:object item")
a.text = x['id']
td3.text = ldap_dn2path(thing['position'] or '')
if has_four_rows:
td4.text = thing['properties'].get(has_four_rows) # TODO: syntax.to_string()
if self.debug_mode_enabled:
pre = ET.SubElement(row, "script", dn=x['dn'], type='application/json')
pre.text = json.dumps(x, indent=4)
return root
[docs]
@sanitize
async def post(
self,
object_type,
representation: dict = JSONPayload(
position=DNSanitizer(required=False, allow_none=True),
superordinate=DNSanitizer(required=False, allow_none=True),
options=DictSanitizer({}, default_sanitizer=BooleanSanitizer(), required=False),
policies=DictSanitizer({}, default_sanitizer=ListSanitizer(DNSanitizer()), required=False),
properties=DictSanitizer({}, required=True),
),
patch_document: list = PatchRepresentation(),
):
"""Create a new {module.object_name} object"""
obj = Object(self.application, self.request)
obj.ldap_connection, obj.ldap_position = self.ldap_connection, self.ldap_position
serverctrls = [PostReadControl(True, ['entryUUID', 'modifyTimestamp', 'entryCSN'])]
response = {}
result = {}
new_obj = await obj.create(object_type, None, representation or patch_document, result, serverctrls=serverctrls, response=response)
self.set_header('Location', self.urljoin(quote_dn(new_obj.dn)))
self.set_entity_tags(new_obj, check_conditionals=False)
self.set_status(201)
self.add_caching(public=False, no_cache=True, must_revalidate=True)
uuid = _get_post_read_entry_uuid(response)
result.update({
'dn': new_obj.dn,
'uuid': uuid,
})
self.content_negotiation(result)
def _options(self, object_type):
result = {}
module = self.get_module(object_type)
parent = self.get_parent_object_type(module)
methods = ['GET', 'OPTIONS']
self.add_link(result, 'udm:object-modules', self.urljoin('../../'), title=_('All modules'))
self.add_link(result, 'up', self.urljoin('../'), title=parent.object_name_plural)
self.add_link(result, 'self', self.urljoin(''), name=module.name, title=module.object_name_plural)
self.add_link(result, 'describedby', self.urljoin(''), title=_('%s module') % (module.name,), method='OPTIONS')
if 'search' in module.operations:
searchfields = ['position', 'query*', 'filter', 'scope', 'hidden', 'properties*', 'opened']
if superordinate_names(module):
searchfields.append('superordinate')
if module.supports_pagination:
searchfields.extend(['limit', 'page', 'by', 'dir'])
self.add_link(result, 'search', self.urljoin('') + '{?%s}' % ','.join(searchfields), templated=True, title=_('Search for %s') % (module.object_name_plural,))
if 'add' in module.operations:
methods.append('POST')
self.add_link(result, 'create-form', self.urljoin('add'), title=_('Create a %s') % (module.object_name,))
self.add_link(result, 'create-form', self.urljoin('add') + '{?position,superordinate%s}' % (',template' if module.template else ''), templated=True, title=_('Create a %s') % (module.object_name,))
if module.help_link or module.help_text:
self.add_link(result, 'help', module.help_link or '', title=module.help_text or module.help_link)
self.add_link(result, 'icon', self.urljoin('favicon.ico'), type='image/x-icon')
if module.has_tree:
self.add_link(result, 'udm:tree', self.urljoin('tree'), title=_('Object type tree'))
# self.add_link(result, '', self.urljoin(''))
self.set_header('Allow', ', '.join(methods))
return result
[docs]
def options_html(self, response):
# root = self.get_html(response)
root = ET.Element('script', type='application/json')
root.text = json.dumps(response, indent=4)
return root
[docs]
class ObjectsMove(Resource):
"""Move multiple objects at the same time"""
[docs]
@sanitize
async def post(
self,
object_type,
position: str = Body(DNSanitizer(required=True)),
dn: list[str] = Body(ListSanitizer(DNSanitizer(required=True), min_elements=1)),
):
# FIXME: this can only move objects of the same object_type but should move everything
dns = dn # TODO: validate: moveable, etc.
status_id = str(uuid.uuid4())
status = shared_memory.dict()
status.update({
'id': status_id,
'finished': False,
'errors': False,
'moved': shared_memory.list(),
})
try:
shared_memory.queue[self.request.user_dn]
except KeyError:
shared_memory.queue[self.request.user_dn] = shared_memory.dict()
shared_memory.queue[self.request.user_dn][status_id] = status
self.set_status(202)
self.set_header('Location', self.abspath('progress', status['id']))
self.finish()
try:
for i, dn in enumerate(dns, 1):
module = get_module(object_type, dn, self.ldap_write_connection)
dn = await self.pool_submit(self.handle_udm_errors, module.move, dn, position)
status['moved'].append(dn)
status['description'] = _('Moved %d of %d objects. Last object was: %s.') % (i, len(dns), dn)
status['max'] = len(dns)
status['value'] = i
except Exception:
status['errors'] = True
status['traceback'] = traceback.format_exc() # FIXME: error handling
raise
else:
status['uri'] = self.urljoin(quote_dn(dn))
finally:
status['finished'] = True
[docs]
class Object(ConditionalResource, FormBase, _OpenAPIBase, Resource):
"""Get, modify, create, remove, rename or move an UDM object"""
[docs]
@sanitize
async def get(
self,
object_type: str,
dn: str,
properties: list[str] = Query(
ListSanitizer(StringSanitizer(), required=False, default=['*'], allow_none=True, min_elements=0),
style="form",
explode=True,
description="The properties which should be returned, if not given all properties are returned.",
examples={
'no restrictions': {'value': None},
'only small subset': {'value': ['username', 'firstname', 'lastname']},
},
),
copy: bool = Query(BoolSanitizer(default=False), description="**Experimental**: Might be used in the future for copying.", example=False), # TODO: move into own resource: ./copy
):
"""
Get a representation of the {module.object_name} object with all its properties, policies, options, metadata and references.
Includes also instructions how to modify, remove or move the object.
"""
dn = unquote_dn(dn)
properties = properties[:]
if object_type == 'users/self' and not self.ldap_connection.compare_dn(dn, self.request.user_dn):
raise HTTPError(403)
try:
module, obj = await self.pool_submit(self.get_module_object, object_type, dn)
except NotFound:
# FIXME: return HTTP 410 Gone for removed objects
# if self.ldap_connection.authz_connection.searchDn(filter_format('(&(reqDN=%s)(reqType=d))', [dn]), base='cn=translog'):
# raise Gone(object_type, dn)
raise
if object_type not in ('users/self', 'users/passwd') and not univention.admin.modules.recognize(object_type, obj.dn, obj.oldattr):
raise NotFound(object_type, dn)
self.set_entity_tags(obj)
props = {}
props.update(self._options(object_type, obj.dn))
props['uri'] = self.abspath(obj.module, quote_dn(obj.dn))
props.update(self.get_representation(module, obj, properties, self.ldap_connection, copy))
for reference in module.get_references(obj):
# TODO: add a reference for the "position" object?!
if reference['module'] != 'udm':
continue # can not happen currently
for dn in set(_map_normalized_dn(filter(None, [reference['id']]))):
rel = {'__policies': 'udm:object/policy/reference'}.get(reference['property'], 'udm:object/property/reference/%s' % (reference['property'],))
self.add_link(props, rel, self.abspath(reference['objectType'], quote_dn(dn)), name=dn, title=reference['label'], dont_set_http_header=True)
if module.name == 'networks/network':
self.add_link(props, 'udm:next-free-ip', self.urljoin(quote_dn(obj.dn), 'next-free-ip-address'), title=_('Next free IP address'))
if obj.has_property('jpegPhoto'):
self.add_link(props, 'udm:user-photo', self.urljoin(quote_dn(obj.dn), 'properties/jpegPhoto.jpg'), type='image/jpeg', title=_('User photo'))
if module.name == 'recyclebin/removedobject':
self.add_link(props, 'udm:restore', self.urljoin(quote_dn(obj.dn), 'restore'), title=_('Restore object'))
if module.name == 'users/user':
self.add_link(props, 'udm:service-specific-password', self.urljoin(quote_dn(obj.dn), 'service-specific-password'), title=_('Generate a new service specific password'))
self.add_link(props, 'udm:layout', self.urljoin(quote_dn(obj.dn), 'layout'), title=_('Module layout'))
self.add_link(props, 'udm:properties', self.urljoin(quote_dn(obj.dn), 'properties'), title=_('Module properties'))
for policy_module in props.get('policies', {}).keys():
self.add_link(props, 'udm:policy-result', self.urljoin(quote_dn(obj.dn), f'{policy_module}/{{?policy}}'), name=policy_module, title=_('Evaluate referenced %s policies') % (policy_module,), templated=True)
self.add_caching(public=False, must_revalidate=True)
self.content_negotiation(props)
def _options(self, object_type, dn):
dn = unquote_dn(dn)
module = self.get_module(object_type)
props = {}
parent_module = self.get_parent_object_type(module)
self.add_link(props, 'udm:object-modules', self.urljoin('../../'), title=_('All modules'))
self.add_link(props, 'udm:object-module', self.urljoin('../'), name=parent_module.name, title=parent_module.object_name_plural)
# self.add_link(props, 'udm:object-types', self.urljoin('../'))
self.add_link(props, 'type', self.urljoin('x/../'), name=module.name, title=module.object_name)
self.add_link(props, 'up', self.urljoin('x/../'), name=module.name, title=module.object_name)
self.add_link(props, 'self', self.urljoin(''), title=dn)
self.add_link(props, 'describedby', self.urljoin(''), title=_('%s module') % (module.name,), method='OPTIONS')
self.add_link(props, 'icon', self.urljoin('favicon.ico'), type='image/x-icon')
self.add_link(props, 'udm:object/remove', self.urljoin(''), method='DELETE')
self.add_link(props, 'udm:object/edit', self.urljoin(''), method='PUT')
# self.add_link(props, '', self.urljoin('report/PDF Document?dn=%s' % (quote(obj.dn),))) # rel=alternate media=print?
# for mod in module.child_modules:
# mod = self.get_module(mod['id'])
# if mod and set(superordinate_names(mod)) & {module.name, }:
# self.add_link(props, 'udm:children-types', self.urljoin('../../%s/?superordinate=%s' % (quote(mod.name), quote(obj.dn))), name=mod.name, title=mod.object_name_plural)
methods = ['GET', 'OPTIONS']
if module.childs:
self.add_link(props, 'udm:children-types', self.urljoin(quote_dn(dn), 'children-types'), name=module.name, title=_('Sub object types of %s') % (module.object_name,))
can_modify = set(module.operations) & {'edit', 'move', 'subtree_move'}
can_remove = 'remove' in module.operations
if can_modify or can_remove:
if can_modify:
methods.extend(['PUT', 'PATCH'])
if can_remove:
methods.append('DELETE')
self.add_button(props, action='', method='DELETE', title=_('Remove this %s') % (module.object_name,), **{'hx-confirm': _("Are you sure you want to delete this %s?") % (module.object_name,)})
self.add_link(props, 'edit-form', self.urljoin(quote_dn(dn), 'edit'), title=_('Modify, move or remove this %s') % (module.object_name,))
self.set_header('Allow', ', '.join(methods))
if 'PATCH' in methods:
self.set_header('Accept-Patch', 'application/json-patch+json, application/json')
return props
[docs]
@classmethod
def get_representation(cls, module, obj, properties, ldap_connection, copy=False, add=False, opened=True):
def _remove_uncopyable_properties(obj):
if not copy:
return
for name, p in obj.descriptions.items():
if not p.copyable:
obj.info.pop(name, None)
# TODO: check if we really want to set the default values
_remove_uncopyable_properties(obj)
obj.set_defaults = True
obj.set_default_values()
_remove_uncopyable_properties(obj)
values = {}
if properties:
if opened and not obj._open and ('*' in properties or any(prop not in obj.info for prop in properties)):
# TODO: i think we need error handling here, because between receiving the object and opening it, it or referenced objects might be removed.
# best would be if lookup() would support opening because that already does error handling.
obj.open()
for key in obj.descriptions:
if key in properties and obj.has_property(key) and obj.descriptions[key].lazy_loading_fn:
obj.descriptions[key].lazy_load(obj)
# be very cautious here! we need to strip out only the properties where read-access is denied.
# the later could will set the default values for all unset (due to read restrictions or unset-ness) properties
visible_properties = set(obj.descriptions)
if not add and obj.dn and obj.authz.enabled:
all_set_properties = set(obj.info)
obj.authz.filter_object_properties(obj)
visible_properties -= (all_set_properties - set(obj.info))
if '*' not in properties:
values = {key: value for (key, value) in obj.info.items() if (key in properties) and obj.descriptions[key].show_in_lists}
else:
values = {key: obj[key] for key in obj.descriptions if key in visible_properties and (add or obj.has_property(key)) and obj.descriptions[key].show_in_lists}
for passwd in module.password_properties:
if passwd in values:
values[passwd] = None
values = dict(decode_properties(module, obj, values))
if add:
# we need to remove dynamic default values as they reference other currently not set variables
# (e.g. shares/share sets sambaName='' or users/user sets unixhome=/home/)
for name, p in obj.descriptions.items():
regex = re.compile(r'<(?P<key>[^>]+)>(?P<ext>\[[\d:]+\])?') # from univention.admin.pattern_replace()
if name not in obj.info or name not in values:
continue
if isinstance(p.base_default, str) and regex.search(p.base_default):
values[name] = None
props = {}
props['dn'] = obj.dn
props['objectType'] = module.name
props['id'] = module.obj_description(obj)
if not props['id']:
props['id'] = '+'.join(explode_rdn(obj.dn, True))
# props['path'] = ldap_dn2path(obj.dn, include_rdn=False)
props['position'] = ldap_connection.parentDn(obj.dn) if obj.dn else obj.position.getDn()
props['properties'] = values
props['options'] = {opt['id']: opt['value'] for opt in module.get_options(udm_object=obj)}
props['policies'] = {}
if opened and ('*' in properties or add):
for policy in module.policies:
props['policies'].setdefault(policy['objectType'], [])
for policy in obj.policies:
pol_mod = get_module(None, policy, ldap_connection)
if pol_mod and pol_mod.name:
props['policies'].setdefault(pol_mod.name, []).append(policy)
if superordinate_names(module):
props['superordinate'] = obj.superordinate and obj.superordinate.dn
if obj.entry_uuid:
props['uuid'] = obj.entry_uuid
# TODO: objectFlag is available for every module. remove the extended attribute and always map it.
# alternative: add some other meta information to this object, e.g. is_hidden_object: True, is_synced_from_active_directory: True, ...
if opened and ('*' in properties or 'objectFlag' in properties):
props['properties'].setdefault('objectFlag', [x.decode('utf-8', 'replace') for x in obj.oldattr.get('univentionObjectFlag', [])])
if copy or add:
props.pop('dn', None)
props.pop('id', None)
if not opened:
props.pop('policies', None)
props.pop('options', None)
return props
[docs]
@sanitize
async def put(
self,
object_type,
dn,
representation: dict = JSONPayload(
position=DNSanitizer(required=True),
superordinate=DNSanitizer(required=False, allow_none=True),
options=DictSanitizer({}, default_sanitizer=BooleanSanitizer()),
policies=DictSanitizer({}, default_sanitizer=ListSanitizer(DNSanitizer())),
properties=DictSanitizer({}, required=True),
),
patch_document: list = PatchRepresentation(),
):
"""Modify or move an {module.object_name} object"""
dn = unquote_dn(dn)
try:
module, obj = await self.pool_submit(self.get_module_object, object_type, dn, ldap_connection=self.ldap_write_connection)
except NotFound:
module, obj = None, None
serverctrls = [PostReadControl(True, ['entryUUID', 'modifyTimestamp', 'entryCSN'])]
response = {}
if not obj:
module = self.get_module(object_type)
result = {}
obj = await self.create(object_type, dn, representation or patch_document, result, serverctrls=serverctrls, response=response)
self.set_header('Location', self.urljoin(quote_dn(obj.dn)))
self.set_status(201)
self.add_caching(public=False, must_revalidate=True)
uuid = _get_post_read_entry_uuid(response)
result.update({
'dn': obj.dn,
'uuid': uuid,
})
self.content_negotiation(result)
return
self.set_entity_tags(obj)
position = representation.get('position')
if position and not self.ldap_write_connection.compare_dn(self.ldap_write_connection.parentDn(dn), position):
await self.move(module, dn, position)
return
else:
result = {}
obj = await self.modify(module, obj, representation or patch_document, result, serverctrls=serverctrls, response=response)
self.set_header('Location', self.urljoin(quote_dn(obj.dn)))
self.set_entity_tags(obj, check_conditionals=False)
self.add_caching(public=False, must_revalidate=True, no_cache=True, no_store=True)
if result:
self.content_negotiation(result)
else:
self.set_status(204)
raise Finish()
[docs]
@sanitize
async def patch(
self,
object_type,
dn,
representation: dict = JSONPayload(
position=DNSanitizer(required=False, default=''),
superordinate=DNSanitizer(required=False, allow_none=True),
options=DictSanitizer({}, default_sanitizer=BooleanSanitizer(), required=False),
policies=DictSanitizer({}, default_sanitizer=ListSanitizer(DNSanitizer()), required=False),
properties=DictSanitizer({}),
),
patch_document: list = PatchRepresentation(),
):
"""Modify an {module.object_name} object (moving is currently not possible)"""
dn = unquote_dn(dn)
module, obj = await self.pool_submit(self.get_module_object, object_type, dn, self.ldap_write_connection)
self.set_entity_tags(obj)
if not patch_document:
entry = Object.get_representation(module, obj, ['*'], self.ldap_write_connection, False)
if representation['options'] is None:
representation['options'] = entry['options']
if representation['policies'] is None:
representation['policies'] = entry['policies']
if representation['properties'] is None:
representation['properties'] = {}
if representation['position'] is None:
representation['position'] = entry['position']
if representation['superordinate'] is None:
representation['superordinate'] = entry.get('superordinate')
else:
representation = patch_document
serverctrls = [PostReadControl(True, ['entryUUID', 'modifyTimestamp', 'entryCSN'])]
response = {}
result = {}
obj = await self.modify(module, obj, representation, result, serverctrls=serverctrls, response=response)
self.set_entity_tags(obj, check_conditionals=False)
self.add_caching(public=False, must_revalidate=True, no_cache=True, no_store=True)
self.set_header('Location', self.urljoin(quote_dn(obj.dn)))
if result:
self.content_negotiation(result)
else:
self.set_status(204)
raise Finish()
[docs]
async def create(self, object_type, dn=None, representation=None, result=None, **kwargs):
module = self.get_module(object_type, ldap_connection=self.ldap_write_connection)
if isinstance(representation, list):
def get_patch_replacements(field):
for path, pname, op, value in representation:
if op != 'replace':
continue
if path == field:
return value
container = get_patch_replacements('position')
superordinate = get_patch_replacements('superordinate')
else:
container = representation['position']
superordinate = representation['superordinate']
if dn:
container = self.ldap_write_connection.parentDn(dn)
# TODO: validate that properties are equal to rdn
ldap_position = univention.admin.uldap.position(module.ldap_base)
if container:
ldap_position.setDn(container)
elif superordinate:
ldap_position.setDn(superordinate)
else:
ldap_position.setDn(module.get_default_container())
superordinate = self.superordinate_dn_to_object(module, superordinate)
obj = module.module.object(None, self.ldap_write_connection, ldap_position, superordinate=superordinate)
obj.open()
self.set_properties(module, obj, representation, result)
if dn and not self.ldap_write_connection.compare_dn(dn, obj._ldap_dn()):
self.raise_sanitization_error('dn', _('Trying to create an object with wrong RDN.'))
dn = await self.pool_submit(self.handle_udm_errors, obj.create, **kwargs)
return obj
[docs]
async def modify(self, module, obj, representation, result, **kwargs):
assert obj._open
self.set_properties(module, obj, representation, result)
await self.pool_submit(self.handle_udm_errors, obj.modify, **kwargs)
return obj
[docs]
def set_properties(self, module, obj, representation, result):
if isinstance(representation, list):
self.set_patch_properties(module, obj, representation, result)
return
options = representation['options'] or {} # TODO: AppAttributes.data_for_module(self.name).items() ?
options_enable = {opt for opt, enabled in options.items() if enabled}
options_disable = {opt for opt, enabled in options.items() if enabled is False} # ignore None!
obj.options = list(set(obj.options) - options_disable | options_enable)
if representation['policies']:
obj.policies = functools.reduce(operator.add, representation['policies'].values())
try:
properties = PropertiesSanitizer(_copy_value=False).sanitize(representation['properties'], module=module, obj=obj)
except MultiValidationError as exc:
multi_error = exc
properties = representation['properties']
for prop_name in multi_error.validation_errors:
properties.pop(prop_name)
else:
multi_error = MultiValidationError()
# FIXME: for the automatic IP address assignment, we need to make sure that
# the network is set before the IP address (see Bug #24077, comment 6)
# The following code is a workaround to make sure that this is the
# case, however, this should be fixed correctly.
# This workaround has been documented as Bug #25163.
def _tmp_cmp(i):
if i[0] == 'mac': # must be set before network, dhcpEntryZone
return ("\x00", i[1])
if i[0] == 'network': # must be set before ip, dhcpEntryZone, dnsEntryZoneForward, dnsEntryZoneReverse
return ("\x01", i[1])
if i[0] in ('ip', 'mac'): # must be set before dnsEntryZoneReverse, dnsEntryZoneForward
return ("\x02", i[1])
return i
password_properties = module.password_properties
for property_name, value in sorted(properties.items(), key=_tmp_cmp):
self.set_property(obj, property_name, value, result, multi_error, password_properties)
self.raise_sanitization_multi_error(multi_error)
[docs]
def set_property(self, obj, property_name, value, result, multi_error, password_properties):
if property_name in password_properties:
log.debug('Setting password property', property=property_name)
else:
log.debug('Setting property', property=property_name, value=value)
try:
try:
obj[property_name] = value
except KeyError:
if property_name != 'objectFlag':
raise
except udm_errors.ipOverridesNetwork as exc:
self.add_resource(result, 'udm:warning', {'message': '%s' % exc.message})
return
except udm_errors.valueMayNotChange:
if obj[property_name] == value: # UDM does not check equality before raising the exception
return
raise udm_errors.valueMayNotChange() # the original exception is ugly!
except udm_errors.valueRequired:
if value is None:
# examples where this happens:
# "password" of users/user: because password is required but on modify() None is send, which must not alter the current password
# "unixhome" of users/user: is required, set to None in the request, the default value is set afterwards in create(). Bug #50053
if property_name in password_properties:
log.debug('Ignore unsetting password property', property=property_name)
else:
current_value = obj.info.pop(property_name, None)
log.debug('Unsetting property', property=property_name, value=current_value)
return
raise
except (udm_errors.valueInvalidSyntax, udm_errors.valueError, udm_errors.valueMayNotChange, udm_errors.valueRequired, udm_errors.noProperty) as exc:
exc.message = ''
try:
self.raise_sanitization_error(property_name, _('The property %(name)s has an invalid value: %(details)s') % {'name': property_name, 'details': str(exc)})
except ValidationError as exc:
multi_error.add_error(exc, property_name)
[docs]
def set_patch_properties(self, module, obj, patch_document, result):
multi_error = MultiValidationError()
for path, property_name, op, value in patch_document:
if path == 'properties':
self.set_patch_property(module, obj, op, property_name, value, result, multi_error)
elif path == 'superordinate':
if op in ('add', 'replace') or (op == 'remove' and not value):
obj.superordinate = self.superordinate_dn_to_object(module, value)
elif op in ('remove',):
if self.ldap_connection.compare_dn(obj.superordinate.dn, value):
obj.superordinate = None
elif path == 'options':
if op == 'replace':
obj.options = []
if op in ('add', 'replace') and value:
obj.options.append(value)
if op == 'remove':
if value and value in obj.options: # TODO: be strict when value not in obj.options?
obj.options.remove(value)
elif not value:
obj.options = []
elif path == 'policies':
if op == 'replace':
obj.policies = []
if op in ('add', 'replace') and value:
obj.policy_reference(value)
if op == 'remove':
if value:
obj.policy_dereference(value)
elif not value:
obj.policies = []
if multi_error.has_errors(): # TODO: use raise_sanitization_multi_error
class FalseSanitizer(Sanitizer):
def sanitize(self):
raise multi_error
self.sanitize_arguments(FalseSanitizer(), _result_func=lambda x: {'body': x}, _fieldname='patch')
[docs]
def set_patch_property(self, module, obj, op, property_name, value, result, multi_error):
try:
prop = module.module.property_descriptions[property_name]
except KeyError:
if property_name == 'objectFlag':
return
self.add_resource(result, 'udm:warning', {'message': 'No attribute with name %r in this module, value not set.' % (property_name,)})
return
value = prop.syntax.parse_command_line(value)
current_values = obj[property_name] if prop.multivalue else [obj[property_name]]
current_values = list(current_values or [])
if current_values == ['']:
current_values = []
if op == 'replace':
current_values = []
if op in ('add', 'replace'):
if value in current_values:
self.add_resource(result, 'udm:warning', {'message': 'cannot append %s to %s, value exists' % (value, property_name)})
return
if prop.multivalue:
value = [*current_values, value]
elif op == 'add':
self.add_resource(result, 'udm:warning', {'message': 'appending to a single value property (%s) is not supported.' % (property_name,)})
return
elif op == 'remove' and value is None:
pass
elif op == 'remove':
try:
normalized_val = prop.syntax.parse(value)
except (univention.admin.uexceptions.valueInvalidSyntax, univention.admin.uexceptions.valueError):
normalized_val = None
if value in current_values:
current_values.remove(value)
elif normalized_val is not None and normalized_val in current_values:
current_values.remove(normalized_val)
else:
self.add_resource(result, 'udm:warning', {'message': 'cannot remove %s from %s, value does not exist' % (value, property_name)})
return
value = current_values if prop.multivalue else None
password_properties = module.password_properties
self.set_property(obj, property_name, value, result, multi_error, password_properties)
[docs]
async def move(self, module, dn, position):
if module.childs:
await self._move_with_childs(module, dn, position)
return
dn = await self.pool_submit(self.handle_udm_errors, module.move, dn, position)
uri = self.urljoin(quote_dn(dn))
self.add_header('Location', uri)
result = {
'id': '',
'errors': False,
'finished': True,
'uri': uri,
}
self.add_link(result, 'self', uri)
self.set_status(201)
self.add_caching(public=False, no_store=True, no_cache=True, must_revalidate=True)
self.content_negotiation(result)
async def _move_with_childs(self, module, dn, position):
"""Move objects which might have child objects like containers/cn"""
status_id = str(uuid.uuid4())
status = shared_memory.dict()
status.update({
'id': status_id,
'finished': False,
'errors': False,
})
# shared_memory.queue[self.request.user_dn] = queue.copy()
try:
shared_memory.queue[self.request.user_dn]
except KeyError:
shared_memory.queue[self.request.user_dn] = shared_memory.dict()
shared_memory.queue[self.request.user_dn][status_id] = status
self.set_status(202)
self.set_header('Location', self.abspath('progress', status['id']))
self.add_caching(public=False, must_revalidate=True)
self.content_negotiation(dict(status))
try:
dn = await self.pool_submit(self.handle_udm_errors, module.move, dn, position)
except Exception:
status['errors'] = True
status['traceback'] = traceback.format_exc() # FIXME: error handling
raise
else:
status['uri'] = self.urljoin(quote_dn(dn))
finally:
status['finished'] = True
[docs]
@sanitize
async def delete(
self,
object_type,
dn,
cleanup: bool = Query(BoolSanitizer(default=True), description="Whether to perform a cleanup (e.g. of temporary objects, locks, etc).", example=True),
recursive: bool = Query(BoolSanitizer(default=True), description="Whether to remove referring objects (e.g. DNS or DHCP references).", example=True),
):
"""Remove a {module.object_name_plural} object"""
dn = unquote_dn(dn)
_module, obj = await self.pool_submit(self.get_module_object, object_type, dn, self.ldap_write_connection)
assert obj._open
self.set_entity_tags(obj, remove_after_check=True)
def remove():
log.info('Removing LDAP object', dn=dn)
obj.remove(remove_childs=recursive)
if cleanup: # and udm_objects.wantsCleanup(obj)
udm_objects.performCleanup(obj)
await self.pool_submit(self.handle_udm_errors, remove)
self.add_caching(public=False, must_revalidate=True)
self.set_status(204)
raise Finish()
[docs]
class ObjectRestore(Resource):
"""Restore an object from the recyclebin"""
[docs]
async def post(self, object_type, dn):
"""Restore a {module.object_name_plural} object from the recyclebin"""
dn = unquote_dn(dn)
module = get_module(object_type, dn, self.ldap_write_connection)
if module is None:
raise NotFound(object_type, dn)
obj = await self.pool_submit(module.get, dn)
if not obj:
raise NotFound(object_type, dn)
original_object_type = obj['originalObjectType']
def restore():
log.info('Restoring LDAP object', dn=dn, original_type=original_object_type)
return obj.restore()
restored_dn = await self.pool_submit(self.handle_udm_errors, restore)
# Build URI with the correct object type (not recyclebin/removedobject)
# ../../../ goes up from /restore, /$dn, /$object_type
uri = self.urljoin('../../../', original_object_type, quote_dn(restored_dn))
self.add_header('Location', uri)
result = {
'dn': restored_dn,
}
self.add_link(result, 'self', uri)
self.set_status(201)
self.add_caching(public=False, no_store=True, no_cache=True, must_revalidate=True)
self.content_negotiation(result)
[docs]
class UserPhoto(ConditionalResource, Resource):
"""Get a (cacheable) user profile picture in JPEG format"""
[docs]
async def get(self, object_type, dn):
dn = unquote_dn(dn)
module = get_module(object_type, dn, self.ldap_connection)
if module is None:
raise NotFound(object_type, dn)
obj = await self.pool_submit(module.get, dn)
if not obj:
raise NotFound(object_type, dn)
if not obj.has_property('jpegPhoto'):
raise NotFound(object_type, dn)
data = base64.b64decode(obj.info.get('jpegPhoto', '').encode('ASCII'))
modified = self.modified_from_timestamp(self.ldap_connection.authz_connection.getAttr(obj.dn, 'modifyTimestamp')[0].decode('utf-8'))
if modified:
self.add_header('Last-Modified', last_modified(modified))
self.set_header('Content-Type', 'image/jpeg')
self.add_caching(public=False, max_age=2592000, must_revalidate=True)
self.finish(data)
[docs]
async def post(self, object_type, dn):
dn = unquote_dn(dn)
module = get_module(object_type, dn, self.ldap_write_connection)
if module is None:
raise NotFound(object_type, dn)
obj = await self.pool_submit(module.get, dn)
if not obj:
raise NotFound(object_type, dn)
if not obj.has_property('jpegPhoto'):
raise NotFound(object_type, dn)
photo = self.request.files['jpegPhoto'][0]['body']
if len(photo) > 262144:
raise HTTPError(413, 'too large: maximum: 262144 bytes')
obj['jpegPhoto'] = base64.b64encode(photo).decode('ASCII')
await self.pool_submit(obj.modify)
self.set_status(204)
raise Finish()
[docs]
class ObjectAdd(FormBase, _OpenAPIBase, Resource):
"""GET a form containing information about all properties, methods, URLs to create a specific object"""
[docs]
@sanitize
async def get(
self,
object_type,
position: str = Query(DNSanitizer(required=False, allow_none=True), description="Position which is used as search base."),
superordinate: str = Query(DNSanitizer(required=False, allow_none=True), description="The superordinate DN of the object to create. `position` is sufficient."), # example=f"cn=superordinate,{ldap_base}"
template: str = Query(DNSanitizer(required=False, allow_none=True), description="**Experimental**: A |UDM| template object.", deprecated=True),
):
"""Get a template for creating an {module.object_name} object (contains all properties and their default values)"""
module = self.get_module(object_type) # ldap_connection=self.ldap_write_connection ?
if 'add' not in module.operations:
raise NotFound(object_type)
module.load(force_reload=True) # reload for instant extended attributes
result = {}
self.add_link(result, 'self', self.urljoin(''), title=_('Add %s') % (module.object_name,))
if not module.template:
template = None
try:
result.update(self.get_create_form(module, template=template, position=position, superordinate=superordinate))
except udm_errors.insufficientInformation as exc:
self.raise_sanitization_error('superordinate', str(exc))
if module.template:
template = UDM_Module(module.template, ldap_connection=self.ldap_connection, ldap_position=self.ldap_position) # ldap_connection=self.ldap_write_connection ?
templates = template.search(ucr.get('ldap/base'))
if templates:
form = self.add_form(result, action='', method='GET', id='template', layout='template') # FIXME: preserve query string
template_layout = [{'label': _('Template'), 'description': 'A template defines rules for default object properties.', 'layout': ['template', '']}]
self.add_layout(result, template_layout, 'template')
self.add_form_element(form, 'template', '', element='select', options=[{'value': _obj.dn, 'label': template.obj_description(_obj)} for _obj in templates])
self.add_form_element(form, '', _('Fill template values'), type='submit')
self.add_caching(public=True, must_revalidate=True)
self.content_negotiation(result)
[docs]
def get_html(self, response):
root = super().get_html(response)
if self.request.method in ('GET', 'HEAD'):
for form in root:
if form.get('id') == 'add':
for elem in form.findall('.//section'):
if elem.find('./h1'):
self.add_link(response, 'udm:tab-switch', href=f'#{elem.get("id")}', title=elem.find('./h1').text)
return root
[docs]
class ObjectCopy(ObjectAdd):
"""Copy an object"""
[docs]
@sanitize
async def get(
self,
object_type,
dn: str = Query(DNSanitizer(required=True)),
):
module = self.get_module(object_type) # ldap_connection=self.ldap_write_connection ?
if 'copy' not in module.operations:
raise NotFound(object_type)
result = {}
self.add_link(result, 'self', self.urljoin(''), title=_('Copy %s') % (module.object_name,))
result.update(self.get_create_form(module, dn=dn, copy=True))
self.add_caching(public=True, must_revalidate=True)
self.content_negotiation(result)
[docs]
class ObjectEdit(FormBase, Resource):
"""GET a form containing ways to modify, remove, move a specific object"""
[docs]
async def get(self, object_type, dn):
dn = unquote_dn(dn)
module = get_module(object_type, dn, self.ldap_connection)
if module is None:
raise NotFound(object_type, dn)
if not set(module.operations) & {'remove', 'move', 'subtree_move', 'edit', 'restore'}:
# modification of this object type is not possible
raise NotFound(object_type, dn)
result = {}
module.load(force_reload=True) # reload for instant extended attributes
obj = await self.pool_submit(module.get, dn)
if not obj:
raise NotFound(object_type, dn)
if object_type not in ('users/self', 'users/passwd') and not univention.admin.modules.recognize(object_type, obj.dn, obj.oldattr):
raise NotFound(object_type, dn)
self.add_link(result, 'icon', self.urljoin('../favicon.ico'), type='image/x-icon')
self.add_link(result, 'udm:object-modules', self.urljoin('../../../'), title=_('All modules'))
self.add_link(result, 'udm:object-module', self.urljoin('../../'), title=self.get_parent_object_type(module).object_name_plural)
self.add_link(result, 'type', self.urljoin('../'), title=module.object_name)
self.add_link(result, 'up', self.urljoin('..', quote_dn(obj.dn)), title=obj.dn)
self.add_link(result, 'self', self.urljoin(''), title=_('Modify'))
if 'restore' in module.operations:
form = self.add_form(result, id='restore', name='restore', action=self.urljoin('.').rstrip('/'), method='POST', layout='restore', **{'hx-confirm': _('Please confirm the restoration of the selected %s') % (module.object_name,)})
restore_layout = [{'label': _('Restore'), 'description': _("Restore the object from the recyclebin"), 'layout': ['']}]
self.add_layout(result, restore_layout, 'restore')
self.add_form_element(form, '', _('Restore'), type='submit')
if 'remove' in module.operations:
# TODO: add list of referring objects
form = self.add_form(result, id='remove', name='remove', action=self.urljoin('.').rstrip('/'), method='DELETE', layout='remove', **{'hx-confirm': _('Please confirm the removal of the selected %s') % (module.object_name,)})
remove_layout = [{'label': _('Remove'), 'description': _("Remove the object"), 'layout': [['cleanup', 'recursive'], '']}]
self.add_layout(result, remove_layout, 'remove')
self.add_form_element(form, 'cleanup', '1', type='checkbox', checked=True, label=_('Perform a cleanup'), title=_('e.g. temporary objects, locks, etc.'))
self.add_form_element(form, 'recursive', '1', type='checkbox', checked=True, label=_('Remove referring objects'), title=_('e.g. DNS or DHCP references of a computer'))
self.add_form_element(form, '', _('Remove'), type='submit')
if set(module.operations) & {'move', 'subtree_move'}:
form = self.add_form(result, id='move', name='move', action=self.urljoin('.').rstrip('/'), method='PUT', layout='move', **{'hx-confirm': _('Please confirm the move of the selected %s') % (module.object_name,)})
move_layout = [{'label': _('Move'), 'description': _("Move the object"), 'layout': ['position', '']}]
self.add_layout(result, move_layout, 'move')
self.add_form_element(form, 'position', self.ldap_connection.parentDn(obj.dn)) # TODO: replace with <select> or udm:tree
self.add_form_element(form, '', _('Move'), type='submit')
if 'edit' in module.operations:
representation = Object.get_representation(module, obj, ['*'], self.ldap_connection)
result.update(representation)
for policy in module.policies:
ptype = policy['objectType']
form = self.add_form(result, action=self.urljoin(ptype) + '/', method='GET', name=ptype, rel='udm:policy-result')
pol = (representation['policies'].get(ptype, ['']) or [''])[0]
self.add_form_element(form, 'policy', pol, label=policy['label'], title=policy['description'], placeholder=_('Policy DN'))
self.add_form_element(form, '', _('Policy result'), type='submit')
assert obj._open
layout = Layout.get_layout(module, dn if object_type != 'users/self' else None)
layout[0]['layout'].extend([
['', 'dn', 'position'],
['jpegPhoto-preview'],
# [{'$form-ref': ['remove', 'move']}]
])
# layout.insert(1, {'layout': [{'$form-ref': ['remove', 'move']}], 'advanced': False, 'description': _('information'), 'label': _('information'), 'is_app_tab': False})
properties = Properties.get_properties(module, dn)
self.add_resource(result, 'udm:properties', {'properties': properties})
self.add_link(result, 'udm:properties', href=self.urljoin('properties'), title=_('Properties for %s') % (module.object_name,))
is_ad_synced_object = ucr.is_true('ad/member') and 'synced' in obj.oldattr.get('univentionObjectFlag', [])
is_ad_synced_object = 'synced' in obj.oldattr.get('univentionObjectFlag', [])
if is_ad_synced_object:
layout[0]['layout'].insert(0, '$active_directory_warning$')
properties['$active_directory_warning$'] = {'id': '$active_directory_warning$', 'label': _('The %s "%s" is part of the Active Directory domain.') % (module.object_name, obj[module.identifies])}
self.add_form_element(form, '$active_directory_warning$', '1', type='checkbox', checked=True, label=_('The %s "%s" is part of the Active Directory domain.') % (module.object_name, obj[module.identifies]))
for prop in properties.values():
if prop.get('readonly_when_synced'):
prop['disabled'] = True
enctype = 'application/x-www-form-urlencoded'
if obj.has_property('jpegPhoto'):
enctype = 'multipart/form-data'
form = self.add_form(result, action=self.urljoin('properties/jpegPhoto.jpg'), method='POST', enctype='multipart/form-data')
self.add_form_element(form, 'jpegPhoto', '', type='file', accept='image/jpg image/jpeg image/png')
self.add_form_element(form, '', _('Upload user photo'), type='submit')
self.add_link(result, 'udm:user-photo', self.urljoin('properties/jpegPhoto.jpg'), type='image/jpeg', title=_('User photo'))
form = self.add_form(result, id='edit-form', name='edit-form', action=self.urljoin('.').rstrip('/'), method='PUT', enctype=enctype, layout='edit-form', **{'hx-ext': 'json-enc'})
self.add_layout(result, layout, 'edit-form')
self.add_form_element(form, 'dn', obj.dn, readonly='readonly', disabled='disabled', label=_('LDAP Distinguished Name (DN)'))
self.add_form_element(form, 'position', self.ldap_connection.parentDn(obj.dn), type='hidden', required='required')
values = {}
for key, prop in properties.items():
if obj.module == 'users/user' and key == 'password':
prop['required'] = False
if prop['readonly_when_synced'] and is_ad_synced_object:
prop['disabled'] = True
try:
values[key] = obj[key] # don't use .get() !
except KeyError:
continue
values = dict(decode_properties(module, obj, values))
self.add_property_form_elements(module, form, properties, values)
if 'jpegPhoto' in properties:
properties['jpegPhoto-preview'] = {'id': 'jpegPhoto-preview', 'label': ' '}
self.add_form_element(form, 'jpegPhoto-preview', '', type='image', label=' ', src=self.urljoin('properties/jpegPhoto.jpg'), alt=_('No photo set'))
for policy in module.policies:
ptype = policy['objectType']
pol = (representation['policies'].get(ptype, ['']) or [''])[0]
self.add_form_element(form, 'policies[%s]' % (ptype), pol, label=policy['label'], placeholder=_('Policy DN'))
references = Layout.get_reference_layout(layout)
if references:
for reference in module.get_policy_references(obj.dn):
if reference['module'] != 'udm':
continue
self.add_form_element(form, 'references', '', href=self.abspath(reference['objectType'], quote_dn(reference['id'])), label=reference['label'], element='a')
references['layout'].append('references')
self.add_form_element(form, '', _('Modify %s') % (module.object_name,), type='submit', readonly='readonly')
self.add_caching(public=False, must_revalidate=True)
self.content_negotiation(result)
[docs]
def get_html(self, response):
root = super().get_html(response)
if self.request.method in ('GET', 'HEAD'):
for form in root:
if form.get('id') == 'edit-form':
for elem in form.findall('.//section'):
self.add_link(response, 'udm:tab-switch', href=f'#{elem.get("id")}', title=elem.find('./fieldset/legend').text, dont_set_http_header=True)
return root
[docs]
class ObjectMultiEdit(ObjectEdit):
"""Modify multiple objects at the same time"""
[docs]
class DefaultContainers(Resource):
"""
Get list of default containers.
GET udm/users/user/default-containers
"""
get_template = 'default_containers.html'
[docs]
def template_vars(self):
data = super().template_vars()
data['selected'] = self.get_query_argument('selected', '')
data['required'] = self.get_query_argument('required', '')
data['name'] = self.get_query_argument('name', '')
return data
# @sanitize
[docs]
async def get(
self,
object_type,
# include_empty: bool = Query(BoolSanitizer(required=False)),
):
result = {}
module = self.get_module(object_type)
self.add_link(result, 'self', self.urljoin(''), title=_('Default containers for %s') % (module.object_name,))
self.add_link(result, 'icon', self.urljoin('./favicon.ico'), type='image/x-icon')
self.add_link(result, 'udm:object-modules', self.urljoin('../../'), title=_('All modules'))
self.add_link(result, 'udm:object-module', self.urljoin('../'), title=self.get_parent_object_type(module).object_name_plural)
# self.add_link(result, 'type', self.urljoin('.'), title=module.object_name)
self.add_link(result, 'up', self.urljoin('.'), title=module.object_name)
result['default-containers'] = [('', _('All containers')), *sorted(((x, ldap_dn2path(x)) for x in module.get_default_containers()), key=lambda x: x[1].lower())]
self.add_caching(public=True, must_revalidate=True)
self.content_negotiation(result)
[docs]
class PropertyChoices(Resource):
"""
Get possible values/choices for the specified property.
GET udm/users/user/$DN/properties/$name/choices
"""
get_template = 'property_choices.html'
[docs]
def template_vars(self):
data = super().template_vars()
data['selected'] = self.get_query_argument('selected', '')
data['required'] = self.get_query_argument('required', '')
data['name'] = self.get_query_argument('name', '')
return data
[docs]
@sanitize
async def get(
self,
object_type,
dn,
property_,
dn_: str = Query(DNSanitizer(required=False), alias='dn'),
property: str = Query(ObjectPropertySanitizer(required=False)),
value: str = Query(SearchSanitizer(required=False)),
hidden: bool = Query(BooleanSanitizer(required=False, default=True)),
dependencies: str = Query(DictSanitizer({}, required=False)),
):
dn = unquote_dn(dn)
module = self.get_module(object_type)
try:
prop = module.module.property_descriptions[property_]
except KeyError:
raise NotFound(object_type, dn)
type_ = udm_types.TypeHint.detect(prop, property_)
options = {key: val for key, val in {
'dn': dn_,
'property': property,
'value': value,
'hidden': hidden,
'dependencies': dependencies,
}.items() if val is not None}
choices = await self.pool_submit(type_.get_choices, self.ldap_connection, options)
result = {'choices': choices}
self.add_link(result, 'self', self.urljoin(''), title=_('Property choices for %s property %s') % (module.object_name, prop.short_description))
if dn:
self.add_link(result, 'icon', self.urljoin('../../../favicon.ico'), type='image/x-icon')
self.add_link(result, 'udm:object-modules', self.urljoin('../../../../../'), title=_('All modules'))
self.add_link(result, 'udm:object-module', self.urljoin('../../../../'), title=self.get_parent_object_type(module).object_name_plural)
self.add_link(result, 'type', self.urljoin('../../../'), title=module.object_name)
self.add_link(result, 'up', self.urljoin('../../..', quote_dn(dn)), title=dn)
else:
self.add_link(result, 'icon', self.urljoin('./../../favicon.ico'), type='image/x-icon')
self.add_link(result, 'udm:object-modules', self.urljoin('../../../../'), title=_('All modules'))
self.add_link(result, 'udm:object-module', self.urljoin('../../../'), title=self.get_parent_object_type(module).object_name_plural)
# self.add_link(result, 'type', self.urljoin('.'), title=module.object_name)
self.add_link(result, 'up', self.urljoin('../../'), title=module.object_name)
self.add_caching(public=False, must_revalidate=True)
self.content_negotiation(result)
[docs]
class PolicyResultBase(Resource):
"""Get the possible policies of the policy-type for user objects located at the container"""
@run_on_executor(executor='pool')
def _get(self, object_type, policy_type, dn, is_container=False):
"""
Returns a virtual policy object containing the values that
the given object or container inherits
"""
policy_dn = self.request.decoded_query_arguments['policy']
if is_container:
# editing a new (i.e. non existing) object -> use the parent container
obj = self.get_object_by_dn(dn)
else:
# editing an exiting UDM object -> use the object itself
obj = self.get_object(object_type, dn)
if policy_dn:
policy_obj = self.get_object(policy_type, policy_dn)
else:
policy_obj = self.get_module(policy_type).get(None)
policy_obj.clone(obj)
# There are 2x2x2 (=8) cases that may occur (c.f., Bug #31916):
# (1)
# [edit] editing existing UDM object
# -> the existing UDM object itself is loaded
# [new] virtually edit non-existing UDM object (when a new object is being created)
# -> the parent container UDM object is loaded
# (2)
# [w/pol] UDM object has assigend policies in LDAP directory
# [w/o_pol] UDM object has no policies assigend in LDAP directory
# (3)
# [inherit] user request to (virtually) change the policy to 'inherited'
# [set_pol] user request to (virtually) assign a particular policy
faked_policy_reference = None
if not is_container and not policy_dn:
# case: [edit; w/pol; inherit]
# -> current policy is (virtually) overwritten with 'None'
faked_policy_reference = [None]
elif is_container and policy_dn:
# cases:
# * [new; w/pol; inherit]
# * [new; w/pol; set_pol]
# -> old + temporary policy are both (virtually) set at the parent container
faked_policy_reference = [*obj.policies, policy_dn]
else:
# cases:
# * [new; w/o_pol; inherit]
# * [new; w/o_pol; set_pol]
# * [edit; w/pol; set_pol]
# * [edit; w/o_pol; inherit]
# * [edit; w/o_pol; set_pol]
faked_policy_reference = [policy_dn]
policy_obj.policy_result(faked_policy_reference)
infos = copy.copy(policy_obj.polinfo_more)
for key, _value in infos.items():
if key in policy_obj.polinfo:
if isinstance(infos[key], tuple | list): # noqa: PLR1733
continue
_value['value'] = policy_obj.polinfo[key]
if policy_dn:
self.add_link(infos, 'udm:policy-edit', self.abspath(policy_obj.module, policy_dn), title=_('Click to edit the inherited properties of the policy'))
return infos
[docs]
class PolicyResult(PolicyResultBase):
"""
Get the possible policies of the policy-type for user objects located at the container.
GET udm/users/user/$userdn/policies/$policy_type/?policy=$dn (for a existing object)
"""
[docs]
@sanitize
async def get(
self,
object_type,
dn,
policy_type,
policy: str = Query(DNSanitizer(required=False, default=None)),
):
dn = unquote_dn(dn)
infos = await self._get(object_type, policy_type, dn, is_container=False)
self.add_caching(public=False, no_cache=True, must_revalidate=True, no_store=True)
self.content_negotiation(infos)
[docs]
class PolicyResultContainer(PolicyResultBase):
"""
Get the possible policies of the policy-type for user objects located at the container.
GET udm/users/user/policies/$policy_type/?policy=$dn&position=$dn (for a container, where a object should be created in)
"""
[docs]
@sanitize
async def get(
self,
object_type,
policy_type,
policy: str = Query(DNSanitizer(required=False, default=None)),
position: str = Query(DNSanitizer(required=True)),
):
infos = await self._get(object_type, policy_type, position, is_container=True)
self.add_caching(public=False, no_cache=True, must_revalidate=True, no_store=True)
self.content_negotiation(infos)
[docs]
class Operations(Resource):
"""GET /udm/progress/$progress-id (get the progress of a started operation like move, report, maybe add/put?, ...)"""
[docs]
def check_acceptable(self):
if self.request.headers.get('Accept') == '*/*':
# python-udm-rest-api-client <= 1.2.2 doesn't provide "Accept: application/json" header
return 'json'
return super().check_acceptable()
[docs]
def get(self, progress):
progressbars = shared_memory.queue.get(self.request.user_dn, {})
if progress not in progressbars:
raise NotFound()
result = dict(progressbars[progress])
if result.get('uri'):
self.set_status(303)
self.add_header('Location', result['uri'])
self.add_link(result, 'self', result['uri'])
shared_memory.queue.get(self.request.user_dn, {}).pop(progress, {})
else:
self.set_status(301)
self.add_header('Location', self.urljoin(''))
self.add_header('Retry-After', '1')
self.add_caching(public=False, no_store=True, no_cache=True, must_revalidate=True)
self.content_negotiation(result)
[docs]
def get_html(self, response):
root = super().get_html(response)
if isinstance(response, dict) and 'value' in response and 'max' in response:
h1 = ET.Element('h1')
h1.text = response.get('description', '')
root.append(h1)
root.append(ET.Element('progress', value=str(response['value']), max=str(response['max'])))
return root
[docs]
class LicenseRequest(Resource):
"""Request a new license from the Univention license server"""
[docs]
@sanitize
async def get(
self,
email: str = Query(EmailSanitizer(required=True)),
):
data = {
'email': email,
'licence': dump_license(),
}
if not data['licence']:
raise HTTPError(500, _('Cannot parse License from LDAP'))
# TODO: we should also send a link (self.request.full_url()) to the license server, so that the email can link to a url which automatically inserts the license:
# self.request.urljoin('import', license=quote(base64.b64encode(zlib.compress(b''.join(_[17:] for _ in open('license.ldif', 'rb').readlines() if _.startswith(b'univentionLicense')), 6)[2:-4])))
data = urlencode(data)
url = 'https://license.univention.de/keyid/conversion/submit'
http_client = tornado.httpclient.AsyncHTTPClient()
try:
await http_client.fetch(url, method='POST', body=data, user_agent='UMC/AppCenter', headers={'Content-Type': 'application/x-www-form-urlencoded'})
except tornado.httpclient.HTTPError as exc:
error = str(exc)
if exc.response.code >= 500:
error = _('This seems to be a problem with the license server. Please try again later.')
match = re.search(b'<span id="details">(?P<details>.*?)</span>', exc.response.body, flags=re.DOTALL)
if match:
error = match.group(1).decode('UTF-8', 'replace').replace('\n', '')
# FIXME: use original error handling
raise HTTPError(400, _('Could not request a license from Univention: %s') % (error,))
# creating a new ucr variable to prevent duplicated registration (Bug #35711)
handler_set(['ucs/web/license/requested=true'])
self.add_caching(public=False, no_store=True, no_cache=True, must_revalidate=True)
self.content_negotiation({'message': _('A new license has been requested and sent to your email address.')})
[docs]
class LicenseCheck(Resource):
"""Check if the license has exceeded or is still valid"""
[docs]
def get(self):
message = _('The license is valid.')
try:
check_license(self.ldap_connection)
except LicenseError as exc:
message = str(exc)
self.add_caching(public=False, max_age=120, must_revalidate=True)
self.content_negotiation({'message': message})
[docs]
class License(Resource):
"""Get information about the license"""
[docs]
def get(self):
license_data = {}
self.add_link(license_data, 'udm:license-check', self.urljoin('check'), title=_('Check license status'))
self.add_link(license_data, 'udm:license-request', self.urljoin('request'))
self.add_link(license_data, 'udm:license-import', self.urljoin(''))
form = self.add_form(license_data, self.urljoin('request'), 'GET', rel='udm:license-request')
self.add_form_element(form, 'email', '', type='email', label=_('E-Mail address'))
self.add_form_element(form, '', _('Request new license'), type='submit')
form = self.add_form(license_data, self.urljoin('import'), 'POST', rel='udm:license-import', enctype='multipart/form-data')
self.add_form_element(form, 'license', '', type='file', label=_('License file (ldif format)'))
self.add_form_element(form, '', _('Import license'), type='submit')
try:
import univention.admin.license as udm_license
except ImportError:
license_data['licenseVersion'] = 'gpl'
else:
license_data['licenseVersion'] = udm_license._license.version
if udm_license._license.version == '1':
for item in ('licenses', 'real'):
license_data[item] = {}
for lic_type in ('CLIENT', 'ACCOUNT', 'DESKTOP', 'GROUPWARE'):
count = getattr(udm_license._license, item)[udm_license._license.version][getattr(udm_license.License, lic_type)]
if isinstance(count, str):
try:
count = int(count)
except ValueError:
count = None
license_data[item][lic_type.lower()] = count
if 'UGS' in udm_license._license.types:
udm_license._license.types = filter(lambda x: x != 'UGS', udm_license._license.types)
elif udm_license._license.version == '2':
for item in ('licenses', 'real'):
license_data[item] = {}
for lic_type in ('SERVERS', 'USERS', 'MANAGEDCLIENTS', 'CORPORATECLIENTS'):
count = getattr(udm_license._license, item)[udm_license._license.version][getattr(udm_license.License, lic_type)]
if isinstance(count, str):
try:
count = int(count)
except ValueError:
count = None
license_data[item][lic_type.lower()] = count
license_data['keyID'] = udm_license._license.licenseKeyID
license_data['support'] = udm_license._license.licenseSupport
license_data['premiumSupport'] = udm_license._license.licensePremiumSupport
license_data['licenseTypes'] = udm_license._license.types
license_data['oemProductTypes'] = udm_license._license.oemProductTypes
license_data['endDate'] = udm_license._license.endDate
license_data['baseDN'] = udm_license._license.licenseBase
free_license = ''
if license_data['baseDN'] == 'Free for personal use edition':
free_license = 'ffpu'
if license_data['baseDN'] == 'UCS Core Edition':
free_license = 'core'
if free_license:
license_data['baseDN'] = ucr.get('ldap/base', '')
license_data['freeLicense'] = free_license
license_data['sysAccountsFound'] = udm_license._license.sysAccountsFound
self.add_caching(public=False, max_age=120, must_revalidate=True)
self.content_negotiation(license_data)
[docs]
class LicenseImport(Resource):
"""Import a new license"""
[docs]
@sanitize
async def get(
self,
license: str = Query(StringSanitizer(required=True)),
):
text = '''dn: cn=admin,cn=license,cn=univention,%(ldap/base)s
cn: admin
objectClass: top
objectClass: univentionLicense
objectClass: univentionObject
univentionObjectType: settings/license
''' % ucr
for line in zlib.decompress(base64.b64decode(license.encode('ASCII')), -15).decode('UTF-8').splitlines():
text += f'univentionLicense{line.strip()}\n'
self.import_license(io.BytesIO(text.encode('UTF-8')))
[docs]
def post(self):
return self.import_license(io.BytesIO(self.request.files['license'][0]['body']))
[docs]
def import_license(self, fd):
try:
# check license and write it to LDAP
importer = LicenseImporter(fd)
importer.check(ucr.get('ldap/base', ''))
importer.write(self.ldap_write_connection.authz_connection)
except ldap.LDAPError as exc:
# LDAPError e.g. LDIF contained non existing attributes
raise HTTPError(400, _('Importing the license failed: LDAP error: %s.') % exc.args[0].get('info'))
except (ValueError, AttributeError) as exc:
# AttributeError: missing univentionLicenseBaseDN
# ValueError raised by ldif.LDIFParser when e.g. dn is duplicated
raise HTTPError(400, _('Importing the license failed: %s.') % (exc,))
except LicenseError as exc:
raise HTTPError(400, str(exc))
self.content_negotiation({'message': _('The license was imported successfully.')})
[docs]
class ServiceSpecificPassword(Resource):
"""Let a new autogenerated service specific password be created"""
[docs]
@sanitize
async def post(
self,
object_type,
dn,
service: str = Body(StringSanitizer(required=True)),
):
module = get_module(object_type, dn, self.ldap_write_connection)
if module is None:
raise NotFound(object_type, dn)
cfg = password_config(service)
new_password = generate_password(**cfg)
obj = await self.pool_submit(module.get, dn)
obj['serviceSpecificPassword'] = {'service': service, 'password': new_password}
await self.pool_submit(self.handle_udm_errors, obj.modify)
# (handled) udm_errors.valueError raised if Service is not supported
result = {'service': service, 'password': new_password}
self.content_negotiation(result)
[docs]
class Application(tornado.web.Application):
"""The main tornado application"""
def __init__(self, **settings):
# module_type = '([a-z]+)'
module_type = '(%s)' % '|'.join(re.escape(mod) for mod in Modules.mapping)
object_type = '([A-Za-z0-9_-]+/[A-Za-z0-9_-]+)'
policies_object_type = '(policies/[A-Za-z0-9_-]+)'
dn = '((?:[^/]+%s.+%s)?(?:%s|%s))' % (self.multi_regex('='), self.multi_regex(','), self.multi_regex(ucr['ldap/base']), self.multi_regex('cn=internal'))
if ucr.is_true('directory/manager/rest/delegative-administration/enabled'):
udm_auth.Authorization.enable(lambda: get_admin_ldap_write_connection()[0])
# FIXME: with that dn regex, it is not possible to have urls like (/udm/$dn/foo/$dn/) because ldap-base at the end matches the last dn
# Note: the ldap base is part of the url to support "/" as part of the DN. otherwise we can use: '([^/]+(?:=|%3d|%3D)[^/]+)'
# Note: we cannot use .replace('/', '%2F') for the dn part as url-normalization could replace this and apache doesn't pass URLs with %2F to the ProxyPass without http://httpd.apache.org/docs/current/mod/core.html#allowencodedslashes
property_ = '([^/]+)'
super().__init__([
("/(?:udm/)?(favicon).ico", Favicon, {"path": "/var/www/favicon.ico"}),
("/udm/(?:index.html)?", Modules),
("/udm/openapi.json", OpenAPI),
("/udm/relation/(.*)", Relations),
("/udm/license/", License),
("/udm/license/import", LicenseImport),
("/udm/license/check", LicenseCheck),
("/udm/license/request", LicenseRequest),
("/udm/ldap/base/", LdapBase),
("/udm/directory/unmap-ldap-attributes", LdapAttributeUnmap),
(f"/udm/object/{dn}", ObjectLink),
("/udm/object/([a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12})", ObjectByUiid),
("/udm/directory/", Directory),
(f"/udm/{module_type}/", ObjectTypes),
("/udm/(directory)/tree", Tree),
(f"/udm/(?:{object_type}|directory)/move-destinations", MoveDestinations),
("/udm/directory/children-types", SubObjectTypes),
(f"/udm/{object_type}/", Objects),
(f"/udm/{object_type}/openapi.json", OpenAPI),
(f"/udm/{object_type}/add", ObjectAdd),
(f"/udm/{object_type}/copy", ObjectCopy),
(f"/udm/{object_type}/move", ObjectsMove),
(f"/udm/{object_type}/multi-edit", ObjectMultiEdit),
(f"/udm/{object_type}/tree", Tree),
(f"/udm/{object_type}/properties", Properties),
(f"/udm/{object_type}/default-containers", DefaultContainers),
(f"/udm/{object_type}/()properties/{property_}/choices", PropertyChoices),
(f"/udm/{object_type}/layout", Layout),
(f"/udm/{object_type}/favicon.ico", Favicon, {"path": "/usr/share/univention-management-console-frontend/js/dijit/themes/umc/icons/16x16/"}),
(f"/udm/{object_type}/{dn}", Object),
(f"/udm/{object_type}/{dn}/restore", ObjectRestore),
(f"/udm/{object_type}/{dn}/edit", ObjectEdit),
(f"/udm/{object_type}/{dn}/children-types", SubObjectTypes),
(f"/udm/{object_type}/report/([^/]+)", Report),
(f"/udm/{object_type}/{dn}/{policies_object_type}/", PolicyResult),
(f"/udm/{object_type}/{policies_object_type}/", PolicyResultContainer),
(f"/udm/{object_type}/{dn}/layout", Layout),
(f"/udm/{object_type}/{dn}/properties", Properties),
(f"/udm/{object_type}/{dn}/properties/{property_}/choices", PropertyChoices),
(f"/udm/{object_type}/{dn}/properties/jpegPhoto.jpg", UserPhoto),
(f"/udm/(networks/network)/{dn}/next-free-ip-address", NextFreeIpAddress),
(f"/udm/(users/user)/{dn}/service-specific-password", ServiceSpecificPassword),
("/udm/progress/([a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12})", Operations),
(r"/udm/((?:css|js|img|schema|swaggerui)/.*)", tornado.web.StaticFileHandler, {"path": "/var/www/univention/udm", "default_filename": "index.html"}),
# TODO: decorator for dn argument, which makes sure no invalid dn syntax is used
], default_handler_class=Nothing, **settings)
[docs]
def multi_regex(self, chars):
# Bug in tornado: requests go against the raw url; https://github.com/tornadoweb/tornado/issues/2548, therefore we must match =, %3d, %3D
return ''.join(f'(?:{re.escape(c)}|{re.escape(quote(c).lower())}|{re.escape(quote(c).upper())})' if c in '=,' else re.escape(c) for c in chars)
[docs]
def reload(self):
ucr.load()
udm_auth.Authorization.clear_caches()