Source code for univention.admin.rest.server

#!/usr/bin/python3
#
# Univention Management Console
#  Univention Directory Manager Module
#
# SPDX-FileCopyrightText: 2019-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only


import argparse
import contextvars
import json
import logging
import os
import signal
import subprocess
import sys
import uuid

import atexit
import pycurl
import tornado.httpclient
import tornado.httpserver
import tornado.httputil
import tornado.ioloop
import tornado.process
import tornado.web
from setproctitle import getproctitle, setproctitle
from tornado.netutil import bind_sockets, bind_unix_socket

import univention.lib.i18n
import univention.logging
from univention.admin.rest.shared_memory import shared_memory
from univention.admin.rest.utils import RE_UUID, init_request_context_logging
from univention.config_registry import ucr


try:
    from multiprocessing.util import _exit_function
except ImportError:
    _exit_function = None

proctitle = getproctitle()
request_context = contextvars.ContextVar("request_context")
log = univention.logging.Structured(logging.getLogger('ADMIN'))


[docs] class Gateway(tornado.web.RequestHandler): """ A server which acts as proxy to multiple processes in different languages TODO: Implement authentication via PAM TODO: Implement ACL handling (restriction on certain paths for certain users/groups) TODO: Implement a SAML service provider TODO: Implement management of modules """ child_id = None PROCESSES = {} SOCKETS = {}
[docs] def set_default_headers(self): self.set_header('Server', 'Univention/1.0') # TODO:
[docs] def prepare(self): request_id = self.request.headers.setdefault('X-Request-Id', str(uuid.uuid4())) self.request.x_request_id = RE_UUID.sub('', request_id)[:36] request_context.set({"request_id": self.request.x_request_id})
[docs] @tornado.gen.coroutine def get(self): try: accepted_language, language_socket = self.select_language() except univention.lib.i18n.I18N_Error: accepted_language, language_socket = None, None if language_socket is None: # pragma: no cover raise tornado.web.HTTPError(406) request = tornado.httpclient.HTTPRequest( self.request.full_url(), method=self.request.method, body=self.request.body or None, headers=self.request.headers, allow_nonstandard_methods=True, follow_redirects=False, connect_timeout=20.0, # TODO: raise value? request_timeout=int(ucr.get('directory/manager/rest/response-timeout', '310')) + 1, prepare_curl_callback=lambda curl: curl.setopt(pycurl.UNIX_SOCKET_PATH, language_socket), ) client = tornado.httpclient.AsyncHTTPClient() try: response = yield client.fetch(request, raise_error=True) except tornado.curl_httpclient.CurlError as exc: log.warning('Reaching service failed', error=exc) # happens during starting the service and subprocesses when the UNIX sockets aren't available yet self.set_status(503) self.add_header('Retry-After', '3') # Tell clients, we are ready in 3 seconds self.add_header('Content-Type', 'application/json') self.write(json.dumps('The service could not be reached. Please retry in a few seconds or contact an Administrator to restart the service.')) self.finish() return except tornado.httpclient.HTTPError as exc: response = exc.response self.set_status(response.code, response.reason) self._headers = tornado.httputil.HTTPHeaders() self.add_header('Content-Language', accepted_language) for header, v in response.headers.get_all(): if header not in ('Content-Length', 'Transfer-Encoding', 'Content-Encoding', 'Connection', 'X-Http-Reason'): self.add_header(header, v) if response.body: self.set_header('Content-Length', len(response.body)) self.write(response.body) self.finish()
post = put = delete = patch = options = get
[docs] def select_language(self): languages = self.request.headers.get("Accept-Language", "en-US").split(",") locales = [] defaults = {'en_US': 0.01, 'de_DE': 0.02} for language in languages: parts = language.strip().split(";") if len(parts) > 1 and parts[1].strip().startswith("q="): try: quality = float(parts[1].strip()[2:]) except (ValueError, TypeError): quality = 0.0 else: quality = 1.0 defaults.pop(parts[0], None) if quality > 0: locales.append((parts[0], quality)) locales = [lang[0].replace('-', '_') for lang in sorted(locales + list(defaults.items()), key=lambda x: x[1], reverse=True)] for locale in [*locales, "en_US", "de_DE"]: locale = '%s_%s' % self.get_locale(locale) if locale in self.SOCKETS: return locale.replace('_', '-'), self.SOCKETS[locale] return 'C', None
[docs] @classmethod def main(cls): parser = argparse.ArgumentParser(prog=f'{sys.executable} -m univention.admin.rest.server') parser.add_argument('-d', '--debug', type=int, default=ucr.get_int('directory/manager/rest/debug/level', 2)) parser.add_argument('-p', '--port', help='Bind to a TCP port (%(default)s)', type=int, default=ucr.get_int('directory/manager/rest/server/port')) parser.add_argument('-i', '--interface', help='Bind to specified interface address (%(default)s)', default=ucr['directory/manager/rest/server/address']) parser.add_argument('-s', '--unix-socket', help='Bind to specified UNIX socket') parser.add_argument('-c', '--processes', type=int, default=ucr.get_int('directory/manager/rest/processes'), help='How many processes should be forked') args = parser.parse_args() setproctitle(proctitle + ' # gateway main') univention.logging.basicConfig(filename='stdout', univention_debug_level=args.debug, use_structured_logging=ucr.is_true('directory/manager/rest/debug/structured-logging')) univention.logging.extendLogger('tornado', univention_debug_category='NETWORK') logger = logging.getLogger('tornado') logger.set_ud_level(ucr.get_int('directory/manager/rest/tornado-debug/level', 3)) init_request_context_logging(request_context) tornado.httpclient.AsyncHTTPClient.configure('tornado.curl_httpclient.CurlAsyncHTTPClient') tornado.locale.load_gettext_translations('/usr/share/locale', 'univention-directory-manager-rest') os.umask(0o077) # FIXME: should probably be changed, this is what UMC sets # bind sockets socks = [] if args.port: socks.extend(bind_sockets(args.port, args.interface, reuse_port=True)) if args.unix_socket: socks.append(bind_unix_socket(args.unix_socket)) # start sharing memory (before fork, before first usage, after import) shared_memory.start() # start sub processes for each required locale try: cls.start_processes(args.processes, args.port, args.debug) except Exception: cls.signal_handler_stop(signal.SIGTERM, None) raise cls.register_signal_handlers() # start mutliprocessing if args.processes != 1: if _exit_function is not None: atexit.unregister(_exit_function) cls.socks = socks try: child_id = tornado.process.fork_processes(args.processes, 0) except RuntimeError as exc: log.info('Stopped process', error=exc) cls.signal_handler_stop(signal.SIGTERM, None) else: cls.start_child(child_id) else: cls.start_server(socks)
[docs] @classmethod def start_child(cls, child_id): setproctitle(proctitle + f' # gateway proxy {child_id}') cls.child_id = child_id log.info('Started child', child=cls.child_id) shared_memory.children[cls.child_id] = os.getpid() cls.start_server(cls.socks)
[docs] @classmethod def start_server(cls, socks): app = tornado.web.Application( [(r'.*', cls)], serve_traceback=ucr.is_true('directory/manager/rest/show-tracebacks', True), log_function=cls.log_function, ) server = tornado.httpserver.HTTPServer(app) server.add_sockets(socks) try: tornado.ioloop.IOLoop.current().start() except Exception: cls.signal_handler_stop(signal.SIGTERM, None) raise
[docs] @classmethod def log_function(cls, handler): if handler.get_status() < 400: return # ignore successfull requests here, they are logged in the other process log_method = logging.getLogger('tornado.access').info elif handler.get_status() < 500: log_method = logging.getLogger('tornado.access').warning else: log_method = logging.getLogger('tornado.access').error request_time = 1000.0 * handler.request.request_time() log_method( "[GATEWAY] %d %s %.2fms", handler.get_status(), handler._request_summary(), request_time, )
[docs] @classmethod def get_locale(cls, language): locale = univention.lib.i18n.Locale(language) territory = locale.territory or {'de': 'DE', 'en': 'US'}.get(locale.language) return locale.language, territory
[docs] @classmethod def get_socket_for_locale(cls, language): language, territory = cls.get_locale(language) return f'/var/run/univention-directory-manager-rest-{language}-{territory.lower()}.socket'
[docs] @classmethod def start_processes(cls, num_processes=1, start_port=9979, debug_level=2): languages = [ language.split(':', 1)[0] for language in ucr.get('locale', 'de_DE.UTF-8:UTF-8 en_US.UTF-8:UTF-8').split() ] for language in languages: cmd = [sys.executable, '-m', 'univention.admin.rest', '-l', language, '-c', str(num_processes), '-d', str(debug_level)] language = language.split('.', 1)[0] sock = cls.get_socket_for_locale(language) cmd.extend(['-s', sock]) short_lang = language.split('_', 1)[0] cls.SOCKETS[language] = cls.SOCKETS.get(short_lang, sock) if short_lang in cls.SOCKETS: continue cls.SOCKETS[short_lang] = sock cls.PROCESSES[language] = subprocess.Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
[docs] @classmethod def register_signal_handlers(cls): signal.signal(signal.SIGTERM, cls.signal_handler_stop) signal.signal(signal.SIGINT, cls.signal_handler_stop) signal.signal(signal.SIGHUP, cls.signal_handler_reload)
[docs] @classmethod def signal_handler_reload(cls, sig, frame): log.debug('Reloading service.') if cls.child_id is None: for process in cls.PROCESSES.values(): cls.safe_kill(process.pid, sig) tornado_logger = logging.getLogger("tornado") for handler in tornado_logger.handlers: if isinstance(handler, logging.StreamHandler): stream = handler.stream handler.close() handler.stream = stream
[docs] @classmethod def signal_handler_stop(cls, sig, frame): if cls.child_id is None: try: children_pids = list(shared_memory.children.values()) except Exception: # multiprocessing failure children_pids = [] log.info('stopping children', children=children_pids) for pid in children_pids: cls.safe_kill(pid, sig) log.info('stopping subprocesses', subprocesses=list(cls.PROCESSES.keys())) for process in cls.PROCESSES.values(): cls.safe_kill(process.pid, sig) shared_memory.shutdown() else: log.info('shutting down') io_loop = tornado.ioloop.IOLoop.current() def shutdown(): io_loop.stop() io_loop.add_callback_from_signal(shutdown)
[docs] @classmethod def safe_kill(cls, pid, signo): try: os.kill(pid, signo) except OSError as exc: log.error('Could not kill children', signo=signo, pid=pid, error=exc) else: os.waitpid(pid, os.WNOHANG)