Source code for univention.management.console.session_dict
#!/usr/bin/python3
#
# Univention Management Console
# UMC server
#
# SPDX-FileCopyrightText: 2024-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
from collections.abc import MutableMapping
from sqlalchemy import exc
from univention.management.console.session_db import DBDisabledException, DBSession, get_session
from univention.management.console.sse import logout_notifiers
from .log import CORE
[docs]
class SessionDict(MutableMapping):
sessions = {}
def __delitem__(self, session_id) -> None:
self.delete(session_id)
def __setitem__(self, session_id, umc_session) -> None:
try:
with get_session() as db_session:
session = DBSession.get(db_session, session_id)
if session:
DBSession.update(db_session, session_id, umc_session)
else:
DBSession.create(db_session, session_id, umc_session)
except exc.DBAPIError as err:
CORE.error('Adding the session into the database failed\n%s', err)
except DBDisabledException:
pass
self.sessions[session_id] = umc_session
def __getitem__(self, session_id):
local_session = self.sessions[session_id]
try:
with get_session() as db_session:
session = DBSession.get(db_session, session_id)
if not session:
del self.sessions[session_id]
raise KeyError(session_id)
except exc.DBAPIError as err:
CORE.error('Getting the session from the database failed\n%s', err)
except DBDisabledException:
pass
return local_session
def __len__(self) -> int:
return len(self.sessions)
def __iter__(self):
return self.sessions.__iter__()
[docs]
def get_oidc_sessions(self, logout_token_claims):
try:
with get_session() as db_session:
sessions = DBSession.get_by_oidc(db_session, logout_token_claims)
return sessions
except exc.DBAPIError as err:
CORE.error('Getting OIDC sessions from the database failed\n%s', err)
except DBDisabledException:
pass
return self.__get_oidc_sessions_fallback(logout_token_claims)
def __get_oidc_sessions_fallback(self, logout_token_claims):
sessions = [user for user in self.sessions.values() if user and user.oidc and user.oidc.id_token and logout_token_claims['iss'] == user.oidc.claims['iss']]
for user in sessions:
if user.oidc.claims['sid'] == logout_token_claims.get('sid'):
yield user
return
for user in sessions:
if user.oidc.claims['sub'] == logout_token_claims.get('sub'):
yield user
[docs]
def delete(self, session_id, reload=True):
CORE.debug('session deletion in session dict')
logout_notifier = logout_notifiers.get(session_id)
if logout_notifier is not None:
CORE.debug('We have locally found a logout notifier')
if reload:
logout_notifier.set()
try:
with get_session() as db_session:
if logout_notifier is None:
CORE.debug('we have not locally found a logout notifier.')
DBSession.delete(db_session, session_id, logout_notifier is None)
except exc.DBAPIError as err:
CORE.debug('Deleting the session from the database failed\n%s', err)
except DBDisabledException:
pass
del self.sessions[session_id]