#!/usr/bin/python3
#
# Univention Management Console
# Univention Directory Manager Module
#
# SPDX-FileCopyrightText: 2019-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
import argparse
import contextvars
import json
import logging
import os
import signal
import subprocess
import sys
import uuid
import atexit
import pycurl
import tornado.httpclient
import tornado.httpserver
import tornado.httputil
import tornado.ioloop
import tornado.process
import tornado.web
from setproctitle import getproctitle, setproctitle
from tornado.netutil import bind_sockets, bind_unix_socket
import univention.lib.i18n
import univention.logging
from univention.admin.rest.shared_memory import shared_memory
from univention.admin.rest.utils import RE_UUID, init_request_context_logging
from univention.config_registry import ucr
try:
from multiprocessing.util import _exit_function
except ImportError:
_exit_function = None
proctitle = getproctitle()
request_context = contextvars.ContextVar("request_context")
log = univention.logging.Structured(logging.getLogger('ADMIN'))
[docs]
class Gateway(tornado.web.RequestHandler):
"""
A server which acts as proxy to multiple processes in different languages
TODO: Implement authentication via PAM
TODO: Implement ACL handling (restriction on certain paths for certain users/groups)
TODO: Implement a SAML service provider
TODO: Implement management of modules
"""
child_id = None
PROCESSES = {}
SOCKETS = {}
[docs]
def prepare(self):
request_id = self.request.headers.setdefault('X-Request-Id', str(uuid.uuid4()))
self.request.x_request_id = RE_UUID.sub('', request_id)[:36]
request_context.set({"request_id": self.request.x_request_id})
[docs]
@tornado.gen.coroutine
def get(self):
try:
accepted_language, language_socket = self.select_language()
except univention.lib.i18n.I18N_Error:
accepted_language, language_socket = None, None
if language_socket is None: # pragma: no cover
raise tornado.web.HTTPError(406)
request = tornado.httpclient.HTTPRequest(
self.request.full_url(),
method=self.request.method,
body=self.request.body or None,
headers=self.request.headers,
allow_nonstandard_methods=True,
follow_redirects=False,
connect_timeout=20.0, # TODO: raise value?
request_timeout=int(ucr.get('directory/manager/rest/response-timeout', '310')) + 1,
prepare_curl_callback=lambda curl: curl.setopt(pycurl.UNIX_SOCKET_PATH, language_socket),
)
client = tornado.httpclient.AsyncHTTPClient()
try:
response = yield client.fetch(request, raise_error=True)
except tornado.curl_httpclient.CurlError as exc:
log.warning('Reaching service failed', error=exc)
# happens during starting the service and subprocesses when the UNIX sockets aren't available yet
self.set_status(503)
self.add_header('Retry-After', '3') # Tell clients, we are ready in 3 seconds
self.add_header('Content-Type', 'application/json')
self.write(json.dumps('The service could not be reached. Please retry in a few seconds or contact an Administrator to restart the service.'))
self.finish()
return
except tornado.httpclient.HTTPError as exc:
response = exc.response
self.set_status(response.code, response.reason)
self._headers = tornado.httputil.HTTPHeaders()
self.add_header('Content-Language', accepted_language)
for header, v in response.headers.get_all():
if header not in ('Content-Length', 'Transfer-Encoding', 'Content-Encoding', 'Connection', 'X-Http-Reason'):
self.add_header(header, v)
if response.body:
self.set_header('Content-Length', len(response.body))
self.write(response.body)
self.finish()
post = put = delete = patch = options = get
[docs]
def select_language(self):
languages = self.request.headers.get("Accept-Language", "en-US").split(",")
locales = []
defaults = {'en_US': 0.01, 'de_DE': 0.02}
for language in languages:
parts = language.strip().split(";")
if len(parts) > 1 and parts[1].strip().startswith("q="):
try:
quality = float(parts[1].strip()[2:])
except (ValueError, TypeError):
quality = 0.0
else:
quality = 1.0
defaults.pop(parts[0], None)
if quality > 0:
locales.append((parts[0], quality))
locales = [lang[0].replace('-', '_') for lang in sorted(locales + list(defaults.items()), key=lambda x: x[1], reverse=True)]
for locale in [*locales, "en_US", "de_DE"]:
locale = '%s_%s' % self.get_locale(locale)
if locale in self.SOCKETS:
return locale.replace('_', '-'), self.SOCKETS[locale]
return 'C', None
[docs]
@classmethod
def main(cls):
parser = argparse.ArgumentParser(prog=f'{sys.executable} -m univention.admin.rest.server')
parser.add_argument('-d', '--debug', type=int, default=ucr.get_int('directory/manager/rest/debug/level', 2))
parser.add_argument('-p', '--port', help='Bind to a TCP port (%(default)s)', type=int, default=ucr.get_int('directory/manager/rest/server/port'))
parser.add_argument('-i', '--interface', help='Bind to specified interface address (%(default)s)', default=ucr['directory/manager/rest/server/address'])
parser.add_argument('-s', '--unix-socket', help='Bind to specified UNIX socket')
parser.add_argument('-c', '--processes', type=int, default=ucr.get_int('directory/manager/rest/processes'), help='How many processes should be forked')
args = parser.parse_args()
setproctitle(proctitle + ' # gateway main')
univention.logging.basicConfig(filename='stdout', univention_debug_level=args.debug, use_structured_logging=ucr.is_true('directory/manager/rest/debug/structured-logging'))
univention.logging.extendLogger('tornado', univention_debug_category='NETWORK')
logger = logging.getLogger('tornado')
logger.set_ud_level(ucr.get_int('directory/manager/rest/tornado-debug/level', 3))
init_request_context_logging(request_context)
tornado.httpclient.AsyncHTTPClient.configure('tornado.curl_httpclient.CurlAsyncHTTPClient')
tornado.locale.load_gettext_translations('/usr/share/locale', 'univention-directory-manager-rest')
os.umask(0o077) # FIXME: should probably be changed, this is what UMC sets
# bind sockets
socks = []
if args.port:
socks.extend(bind_sockets(args.port, args.interface, reuse_port=True))
if args.unix_socket:
socks.append(bind_unix_socket(args.unix_socket))
# start sharing memory (before fork, before first usage, after import)
shared_memory.start()
# start sub processes for each required locale
try:
cls.start_processes(args.processes, args.port, args.debug)
except Exception:
cls.signal_handler_stop(signal.SIGTERM, None)
raise
cls.register_signal_handlers()
# start mutliprocessing
if args.processes != 1:
if _exit_function is not None:
atexit.unregister(_exit_function)
cls.socks = socks
try:
child_id = tornado.process.fork_processes(args.processes, 0)
except RuntimeError as exc:
log.info('Stopped process', error=exc)
cls.signal_handler_stop(signal.SIGTERM, None)
else:
cls.start_child(child_id)
else:
cls.start_server(socks)
[docs]
@classmethod
def start_child(cls, child_id):
setproctitle(proctitle + f' # gateway proxy {child_id}')
cls.child_id = child_id
log.info('Started child', child=cls.child_id)
shared_memory.children[cls.child_id] = os.getpid()
cls.start_server(cls.socks)
[docs]
@classmethod
def start_server(cls, socks):
app = tornado.web.Application(
[(r'.*', cls)],
serve_traceback=ucr.is_true('directory/manager/rest/show-tracebacks', True),
log_function=cls.log_function,
)
server = tornado.httpserver.HTTPServer(app)
server.add_sockets(socks)
try:
tornado.ioloop.IOLoop.current().start()
except Exception:
cls.signal_handler_stop(signal.SIGTERM, None)
raise
[docs]
@classmethod
def log_function(cls, handler):
if handler.get_status() < 400:
return # ignore successfull requests here, they are logged in the other process
log_method = logging.getLogger('tornado.access').info
elif handler.get_status() < 500:
log_method = logging.getLogger('tornado.access').warning
else:
log_method = logging.getLogger('tornado.access').error
request_time = 1000.0 * handler.request.request_time()
log_method(
"[GATEWAY] %d %s %.2fms",
handler.get_status(),
handler._request_summary(),
request_time,
)
[docs]
@classmethod
def get_locale(cls, language):
locale = univention.lib.i18n.Locale(language)
territory = locale.territory or {'de': 'DE', 'en': 'US'}.get(locale.language)
return locale.language, territory
[docs]
@classmethod
def get_socket_for_locale(cls, language):
language, territory = cls.get_locale(language)
return f'/var/run/univention-directory-manager-rest-{language}-{territory.lower()}.socket'
[docs]
@classmethod
def start_processes(cls, num_processes=1, start_port=9979, debug_level=2):
languages = [
language.split(':', 1)[0]
for language in ucr.get('locale', 'de_DE.UTF-8:UTF-8 en_US.UTF-8:UTF-8').split()
]
for language in languages:
cmd = [sys.executable, '-m', 'univention.admin.rest', '-l', language, '-c', str(num_processes), '-d', str(debug_level)]
language = language.split('.', 1)[0]
sock = cls.get_socket_for_locale(language)
cmd.extend(['-s', sock])
short_lang = language.split('_', 1)[0]
cls.SOCKETS[language] = cls.SOCKETS.get(short_lang, sock)
if short_lang in cls.SOCKETS:
continue
cls.SOCKETS[short_lang] = sock
cls.PROCESSES[language] = subprocess.Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
[docs]
@classmethod
def register_signal_handlers(cls):
signal.signal(signal.SIGTERM, cls.signal_handler_stop)
signal.signal(signal.SIGINT, cls.signal_handler_stop)
signal.signal(signal.SIGHUP, cls.signal_handler_reload)
[docs]
@classmethod
def signal_handler_reload(cls, sig, frame):
log.debug('Reloading service.')
if cls.child_id is None:
for process in cls.PROCESSES.values():
cls.safe_kill(process.pid, sig)
tornado_logger = logging.getLogger("tornado")
for handler in tornado_logger.handlers:
if isinstance(handler, logging.StreamHandler):
stream = handler.stream
handler.close()
handler.stream = stream
[docs]
@classmethod
def signal_handler_stop(cls, sig, frame):
if cls.child_id is None:
try:
children_pids = list(shared_memory.children.values())
except Exception: # multiprocessing failure
children_pids = []
log.info('stopping children', children=children_pids)
for pid in children_pids:
cls.safe_kill(pid, sig)
log.info('stopping subprocesses', subprocesses=list(cls.PROCESSES.keys()))
for process in cls.PROCESSES.values():
cls.safe_kill(process.pid, sig)
shared_memory.shutdown()
else:
log.info('shutting down')
io_loop = tornado.ioloop.IOLoop.current()
def shutdown():
io_loop.stop()
io_loop.add_callback_from_signal(shutdown)
[docs]
@classmethod
def safe_kill(cls, pid, signo):
try:
os.kill(pid, signo)
except OSError as exc:
log.error('Could not kill children', signo=signo, pid=pid, error=exc)
else:
os.waitpid(pid, os.WNOHANG)