Source code for univention.management.console.session

#!/usr/bin/python3
#
# Univention Management Console
#  session handling
#
# SPDX-FileCopyrightText: 2022-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

import asyncio
import errno
import functools
import weakref

import ldap
import tornado.gen
from ldap.filter import filter_format

import univention.admin.uexceptions as udm_errors
from univention.management.console.session_dict import SessionDict

from .acl import ACLs, LDAP_ACLs
from .auth import AuthHandler
from .category import Manager as CategoryManager
from .config import MODULE_DEBUG_LEVEL, ucr
from .error import ServiceUnavailable
from .ldap import get_machine_connection, reset_cache as reset_ldap_connection_cache
from .log import CORE
from .message import Request
from .module import Manager as ModuleManager


try:
    from time import monotonic
except ImportError:
    from monotonic import monotonic

moduleManager = ModuleManager()
categoryManager = CategoryManager()
_session_timeout = ucr.get_int('umc/http/session/timeout', 300)


[docs] class User: """Information about the authenticated user""" __slots__ = ('_locale', 'auth_type', 'authenticated', 'federated_account', 'ip', 'object_id', 'password', 'roles', 'session_end_time', 'user_dn', 'username') def __init__(self): self.ip = None self.authenticated = False self.username = None self.password = None self.auth_type = None self.user_dn = None self.object_id = None self.session_end_time = None self._locale = None # don't use! self.roles = None self.federated_account = False def __repr__(self): return '<User(authenticated=%r name=%r ip=%r dn=%r auth_type=%r session_end_time=%r password="keep dreaming :-P")>' % (self.authenticated, self.username, self.ip, self.user_dn, self.auth_type, self.session_end_time)
[docs] class Session: """A interface to session data""" __slots__ = ('_', '__weakref__', '_active_requests', '_timeout_id', 'acls', 'oidc', 'processes', 'saml', 'session_id', 'user') __auth = AuthHandler() sessions = SessionDict()
[docs] @classmethod def get_or_create(cls, session_id): session = cls.sessions.get(session_id) if not session: session = cls(session_id) return session
[docs] @classmethod def put(cls, session_id, session): session.session_id = session_id session.reset_timeout() cls.sessions[session_id] = session
[docs] @classmethod def expire(cls, session_id, reload=True): """Removes a session when the connection to the UMC server has died or the session is expired""" try: cls.sessions.delete(session_id, reload) CORE.info('Cleaning up session %r', session_id) except KeyError: CORE.info('Session %r not found', session_id)
def __init__(self, session_id): self.session_id = session_id self.user = User() self.saml = None self.oidc = None self.acls = IACLs(self) self.processes = Processes(self) self._timeout_id = None self._active_requests = set() self._ = None
[docs] def renew(self): CORE.info('Renewing session') self.acls = IACLs(self) self.processes = Processes(self)
[docs] async def authenticate(self, args): from .server import pool pam = self.__auth.get_handler(args['locale']) future = pool.submit(self.__auth.authenticate, pam, args) result = await asyncio.wrap_future(future) pam.end() if ( self.user.authenticated and self.user.username and result.credentials and result.credentials.get('username') and self.user.username.casefold() != result.credentials['username'].casefold() ): # re-authentication with a different username self.renew() authenticated = bool(result) if authenticated: self.set_credentials(**result.credentials) else: self.user = User() self.renew() return result
[docs] async def change_password(self, args): from .server import pool pam = self.__auth.get_handler(args['locale']) username = args['username'] password = args['password'] new_password = args['new_password'] future = pool.submit(pam.change_password, username, password, new_password) await asyncio.wrap_future(future) pam.end() self.set_credentials(username, new_password, None)
[docs] def set_credentials(self, username, password, auth_type, object_id=None, roles=None, federated_account=False): self.user.authenticated = True self.user.username = username self.user.object_id = object_id self.user.auth_type = auth_type self.user.federated_account = federated_account if roles is not None: self.user.roles = roles if self.user.auth_type is None: # important! there might be a password already. in case of SAML we must not set/overwrite the password. self.user.password = password self._search_user_dn() # TODO: send reload signal to all other processes? CORE.info('Reloading resources: UCR, modules, categories') ucr.load() moduleManager.load() categoryManager.load() self.acls._reload_acls_and_permitted_commands()
def _search_user_dn(self): lo = get_machine_connection(write=False)[0] if self.user.federated_account: username = self.user.object_id s_filter = filter_format('(&(univentionObjectIdentifier=%s)(objectClass=univentionFederatedAccount))', (self.user.object_id,)) else: username = self.user.username s_filter = filter_format('(&(uid=%s)(objectClass=person))', (username,)) if lo and username: # get the LDAP DN of the authorized user try: ldap_dn = lo.searchDn(s_filter) except (ldap.LDAPError, udm_errors.base): reset_ldap_connection_cache(lo) ldap_dn = None CORE.exception('Could not get uid for %r', self.user.username) if ldap_dn: self.user.user_dn = ldap_dn[0] CORE.info('The LDAP DN for user %s is %s', self.user.username, self.user.user_dn) if not self.user.user_dn and self.user.username not in ('root', '__systemsetup__', None): CORE.error('The LDAP DN for user %s could not be found (lo=%r)', self.user.username, lo)
[docs] def get_user_ldap_connection(self, **kwargs): base = Request('') base.auth_type = self.get_umc_auth_type() base.username = self.user.username base.user_dn = self.user.user_dn base.password = self.get_umc_password() return base.get_user_ldap_connection(**kwargs)
[docs] def is_saml_user(self): # self.saml indicates that it was originally a # SAML user. but it may have upgraded and got a # real password. the saml user object is still there, # though return self.user.password is None and self.saml
[docs] def is_oidc_user(self): # self.oidc indicates that it was originally a # oidc user. but it may have upgraded and got a # real password. the oidc user object is still there, # though return self.user.password is None and self.oidc
[docs] def get_umc_password(self): if self.is_oidc_user(): return self.oidc.access_token if self.is_saml_user(): return self.saml.message else: return self.user.password
[docs] def get_umc_auth_type(self): if self.is_oidc_user(): return "OIDC" elif self.is_saml_user(): return "SAML" else: return None
[docs] def logout(self, reload=True): CORE.info('User %r logged out', self.user.username) self.on_logout() self.expire(self.session_id, reload=reload)
def _session_timeout_timer(self): if self._active_requests: CORE.info('There are still open requests. Deferring session timeout.') self.user.session_end_time = monotonic() + 1 ioloop = tornado.ioloop.IOLoop.current() self._timeout_id = ioloop.call_later(1, self._session_timeout_timer) return CORE.info('session %r timed out', self.session_id) self.expire(self.session_id, reload=False) self.on_logout() return False
[docs] def reset_timeout(self): self.disconnect_timer() self.user.session_end_time = monotonic() + _session_timeout # this will trigger the update of the session end time in the database if self.sessions.get(self.session_id, None): self.sessions[self.session_id] = self ioloop = tornado.ioloop.IOLoop.current() when = int(self.session_end_time - monotonic()) CORE.debug('reset_timeout(): new session expiration in %s seconds', when) self._timeout_id = ioloop.call_later(when, self._session_timeout_timer)
[docs] def disconnect_timer(self): if self._timeout_id is not None: ioloop = tornado.ioloop.IOLoop.current() ioloop.remove_timeout(self._timeout_id)
[docs] def timed_out(self, now=None, session_end_time=None): now = now or monotonic() session_end_time = session_end_time or self.session_end_time return session_end_time < now
@property def session_end_time(self): if self.is_oidc_user() and self.oidc.session_end_time: return self.oidc.session_end_time if self.is_saml_user() and self.saml.session_end_time: return self.saml.session_end_time return self.user.session_end_time
[docs] def on_logout(self): self.disconnect_timer() if self.saml: self.saml.on_logout()
def __repr__(self): return '<Session id=%r %s processes=%r>' % (self.session_id, self.user, self.processes)
[docs] class IACLs: """Interface for UMC-ACL information""" @property def acls(self): if self.__acls is None: self._reload_acls_and_permitted_commands() return self.__acls def __init__(self, session): self.session = weakref.ref(session) self.__acls = None self.__permitted_commands = None def _reload_acls_and_permitted_commands(self): self.__acls = self._get_acls() if isinstance(self.acls, LDAP_ACLs): lo, _po = get_machine_connection(write=False) try: self.acls.reload(lo) except (ldap.LDAPError, udm_errors.ldapError): reset_ldap_connection_cache(lo) raise else: self.acls.reload() self.__permitted_commands = None self.get_permitted_commands(moduleManager) def _get_acls(self): sess = self.session() if not sess.user.authenticated: # We need to set empty ACL's for unauthenticated requests return ACLs() else: if sess.user.federated_account: # TODO: return UMC_ACLS_FROM GUARDIAN # return ROLES_ACLS(...) pass return LDAP_ACLs(sess.user.username, sess.user.user_dn, ucr['ldap/base'])
[docs] def is_command_allowed(self, command, options, flavor): if not isinstance(options, dict): options = {} return moduleManager.is_command_allowed(self.acls, command, None, options, flavor)
[docs] def get_permitted_commands(self, moduleManager): if self.__permitted_commands is None: # fixes performance leak? self.__permitted_commands = moduleManager.permitted_commands(ucr['hostname'], self.acls) return self.__permitted_commands
[docs] def is_module_singleton(self, module_name): return moduleManager.is_singleton(module_name)
[docs] def get_module_proxy_address(self, module_name): return moduleManager.proxy_address(module_name)
[docs] def get_module_providing(self, moduleManager, command): permitted_commands = self.get_permitted_commands(moduleManager) module_name = moduleManager.module_providing(permitted_commands, command) try: # check if the module exists in the module manager moduleManager[module_name] except KeyError: # the module has been removed from moduleManager (probably through a reload) CORE.warning('Module %r (command=%r) does not exists anymore', module_name, command) moduleManager.load() self._reload_acls_and_permitted_commands() module_name = None return module_name
[docs] def get_method_name(self, moduleManager, module_name, command): module = self.get_permitted_commands(moduleManager)[module_name] methods = (cmd.method for cmd in module.commands if cmd.name == command) for method in methods: return method
def __repr__(self): return '<ACLs from-ldap=%r>' % (isinstance(self.__acls, LDAP_ACLs),)
[docs] class Processes: """Interface for module processes""" singletons = {} def __init__(self, session): self.session = weakref.ref(session) self._acls = session.acls self.__processes = {} @property def acls(self): # workaround for circular reference memory leak sess = self.session() if sess is not None: if self._acls is not sess.acls: self._acls = sess.acls return self._acls
[docs] def processes(self, module_name): return self.singletons if self.acls.is_module_singleton(module_name) else self.__processes
[docs] def get_process(self, module_name, accepted_language, no_daemonize_module_processes=False): from .resources import ModuleProcess, ModuleProxy proxy_address = self.acls.get_module_proxy_address(module_name) if proxy_address: return ModuleProxy(proxy_address) processes = self.processes(module_name) if module_name not in processes: CORE.info('Starting new module process %s', module_name) try: mod_proc = ModuleProcess(module_name, debug=MODULE_DEBUG_LEVEL, locale=accepted_language, no_daemonize_module_processes=no_daemonize_module_processes) except OSError as exc: _ = self.session()._ message = _('Could not open the module. %s Please try again later.') % { errno.ENOMEM: _('There is not enough memory available on the server.'), errno.EMFILE: _('There are too many opened files on the server.'), errno.ENFILE: _('There are too many opened files on the server.'), errno.ENOSPC: _('There is not enough free space on the server.'), errno.ENOENT: _('The executable was not found.'), }.get(exc.errno, _('An unknown operating system error occurred (%s).') % (exc,)) raise ServiceUnavailable(message) processes[module_name] = mod_proc mod_proc.set_exit_callback(functools.partial(self.process_exited, module_name)) return processes[module_name]
[docs] def stop_process(self, module_name): proc = self.processes(module_name).pop(module_name, None) if proc: proc.stop()
[docs] def process_exited(self, module_name, exit_code): proc = self.processes(module_name).pop(module_name, None) if proc: proc._died(exit_code)
[docs] def has_active_module_processes(self): return self.__processes
def __repr__(self): return '<Processes for=%s>' % (', '.join(self.__processes),)