Source code for univention.admindiary.backend

#!/usr/bin/python3
# SPDX-FileCopyrightText: 2019-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

from __future__ import annotations

from contextlib import contextmanager
from functools import partial
from typing import TYPE_CHECKING, Any

import sqlalchemy
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, Sequence, String, Table, Text, and_, func, or_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import joinedload, relationship, scoped_session, sessionmaker
from sqlalchemy.pool import NullPool

from univention.admindiary import get_logger
from univention.config_registry import ConfigRegistry


if TYPE_CHECKING:
    from collections.abc import Iterator

    from univention.admindiary import DiaryEntry


get_logger = partial(get_logger, 'backend')


[docs] def get_query_limit() -> int: ucr = ConfigRegistry() ucr.load() limit = ucr.get('admin/diary/query/limit', '') default_limit = 1000 try: limit = int(limit) except ValueError: limit = default_limit else: if limit < 0: limit = default_limit return limit
[docs] def get_engine() -> sqlalchemy.Engine: ucr = ConfigRegistry() ucr.load() password = open('/etc/admin-diary.secret').read().strip() dbms = ucr.get('admin/diary/dbms') dbhost = ucr.get('admin/diary/dbhost') if not dbhost: admin_diary_backend = ucr.get('admin/diary/backend') or 'localhost' dbhost = admin_diary_backend.split()[0] if dbhost == ucr.get('hostname') or dbhost == '%(hostname)s.%(domainname)s' % ucr: dbhost = 'localhost' db_url = '%s://admindiary:%s@%s/admindiary' % (dbms, password, dbhost) if dbms == 'mysql': db_url = db_url + '?charset=utf8mb4' try: return sqlalchemy.create_engine(db_url, poolclass=NullPool) except sqlalchemy.exc.NoSuchModuleError: raise NoDBConnection()
[docs] def windowed_query(q, column, windowsize, single_entity=True): """ Break a Query into chunks on a given unique column (usually primary key), then fetch chunks using LIMIT only, adding a WHERE clause that will ensure we only fetch rows greater than the last one we fetched. This will work for basically any database backend. The potential downside is that the database needs to sort the full set of remaining rows for each chunk, which may be inefficient, even if the sort column is indexed. However, the approach is very simple and can likely work for most ordinary use cases for a primary key column on a database that does not support window functions. """ # TODO: single_entity is implemented in SqlAlchemy 1.3.11 and above. # After updating SqlAlchemy, single_entity should be removed from # function arguments, and resolved locally, as in line below: # single_entity = q.is_single_entity q = q.add_column(column).order_by(column) last_id = None while True: subq = q if last_id is not None: subq = subq.filter(column > last_id) chunk = subq.limit(windowsize).all() if not chunk: break last_id = chunk[-1][-1] for row in chunk: if single_entity: yield row[0] else: yield row[0:-1]
[docs] @contextmanager def get_session(auto_commit: bool = True) -> Iterator[sqlalchemy.Session]: session = None try: session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=get_engine())) yield session finally: if session is not None: if auto_commit: session.commit() session.remove() session.bind.dispose()
Base = declarative_base()
[docs] class Meta(Base): __tablename__ = 'meta' id = Column(Integer, Sequence('meta_id_seq'), primary_key=True) schema = Column(Integer, nullable=False)
entry_tags = Table( 'entry_tags', Base.metadata, Column('entry_id', ForeignKey('entries.id'), primary_key=True), Column('tag_id', ForeignKey('tags.id'), primary_key=True), )
[docs] class Event(Base): __tablename__ = 'events' id = Column(Integer, Sequence('event_id_seq'), primary_key=True) name = Column(String(190), nullable=False, unique=True, index=True)
[docs] class EventMessage(Base): __tablename__ = 'event_messages' event_id = Column(None, ForeignKey('events.id', ondelete='CASCADE'), primary_key=True) locale = Column(String(190), nullable=False, primary_key=True) message = Column(Text, nullable=False) locked = Column(Boolean)
[docs] class Entry(Base): __tablename__ = 'entries' id = Column(Integer, Sequence('entry_id_seq'), primary_key=True) username = Column(String(190), nullable=False, index=True) hostname = Column(String(190), nullable=False, index=True) message = Column(Text) timestamp = Column(DateTime(timezone=True), index=True) context_id = Column(String(190), index=True) event_id = Column(None, ForeignKey('events.id', ondelete='RESTRICT'), nullable=True) main_id = Column(None, ForeignKey('entries.id', ondelete='CASCADE'), nullable=True) event = relationship('Event') args = relationship('Arg', back_populates='entry') tags = relationship('Tag', secondary=entry_tags, back_populates='entries') comments = relationship('Entry', primaryjoin=context_id == context_id, foreign_keys=context_id, remote_side=context_id) # noqa: PLR0124
[docs] class Tag(Base): __tablename__ = 'tags' id = Column(Integer, Sequence('tag_id_seq'), primary_key=True) name = Column(String(190), nullable=False, unique=True, index=True) entries = relationship('Entry', secondary=entry_tags, back_populates='tags')
[docs] class Arg(Base): __tablename__ = 'args' id = Column(Integer, Sequence('arg_id_seq'), primary_key=True) entry_id = Column(None, ForeignKey('entries.id', ondelete='CASCADE'), index=True) key = Column(String(190), nullable=False, index=True) value = Column(String(190), nullable=False, index=True) entry = relationship('Entry')
[docs] class Client: def __init__(self, version: int, session: sqlalchemy.Session) -> None: self.version = version self._session = session self._translation_cache: dict[tuple[str, str], str] = {}
[docs] def translate(self, event_name: str, locale: str) -> str: key = (event_name, locale) if key not in self._translation_cache: event_message = self._session.query(EventMessage).filter(EventMessage.event_id == Event.id, EventMessage.locale == locale, Event.name == event_name).one_or_none() translation = event_message.message if event_message else None self._translation_cache[key] = translation else: translation = self._translation_cache[key] return translation
[docs] def options(self) -> dict: ret = {} ret['tags'] = sorted([tag.name for tag in self._session.query(Tag).all()]) ret['usernames'] = sorted([username[0] for username in self._session.query(Entry.username).distinct()]) ret['hostnames'] = sorted([hostname[0] for hostname in self._session.query(Entry.hostname).distinct()]) ret['events'] = sorted([event.name for event in self._session.query(Event).all()]) return ret
[docs] def add_tag(self, name: str) -> Tag: obj = self._session.query(Tag).filter(Tag.name == name).one_or_none() if obj is None: obj = Tag(name=name) self._session.add(obj) self._session.flush() return obj
[docs] def add_event(self, name: str) -> Event: obj = self._session.query(Event).filter(Event.name == name).one_or_none() if obj is None: obj = Event(name=name) self._session.add(obj) self._session.flush() return obj
[docs] def add_event_message(self, event_id: int, locale: str, message: str, force: bool) -> bool: event_message_query = self._session.query(EventMessage).filter(EventMessage.locale == locale, EventMessage.event_id == event_id) event_message = event_message_query.one_or_none() if event_message is None: event_message = EventMessage(event_id=event_id, locale=locale, message=message, locked=force) self._session.add(event_message) self._session.flush() return True else: if force: event_message_query.update({'locked': True, 'message': message}) self._session.flush() return True return False
[docs] def add(self, diary_entry: DiaryEntry) -> None: if diary_entry.event_name == 'COMMENT': entry_message = diary_entry.message.get('en') event_id = None else: get_logger().debug('Searching for Event %s' % diary_entry.event_name) entry_message = None event = self.add_event(diary_entry.event_name) event_id = event.id get_logger().debug('Found Event ID %s' % event.id) if diary_entry.message: for locale, message in diary_entry.message.items(): get_logger().debug('Trying to insert message for %s' % locale) if self.add_event_message(event.id, locale, message, False): get_logger().debug('Found no existing one. Inserted %r' % message) else: get_logger().debug('No further message given, though') entry = Entry(username=diary_entry.username, hostname=diary_entry.hostname, timestamp=diary_entry.timestamp, context_id=diary_entry.context_id, event_id=event_id, message=entry_message) self._session.add(entry) main_id = self._session.query(func.min(Entry.id)).filter(Entry.context_id == entry.context_id).scalar() if main_id: entry.main_id = main_id for tag in diary_entry.tags: tag = self.add_tag(tag) entry.tags.append(tag) for key, value in diary_entry.args.items(): entry.args.append(Arg(key=key, value=value)) get_logger().info('Successfully added %s (%s)' % (diary_entry.context_id, diary_entry.event_name))
[docs] def query(self, time_from=None, time_until=None, tag=None, event=None, username=None, hostname=None, message=None, locale='en'): limit = get_query_limit() with get_session(False) as session: return self.__query(session, limit, time_from, time_until, tag, event, username, hostname, message, locale)
def __query(self, session, limit, time_from, time_until, tag, event, username, hostname, message, locale): query = session.query(Entry).\ outerjoin(Event, Event.id == Entry.event_id).\ outerjoin(Arg, Arg.entry_id == Entry.id).\ options(joinedload(Entry.event)).\ options(joinedload(Entry.args)).\ options(joinedload(Entry.comments)).\ order_by(Entry.id) if time_from: query = query.filter(Entry.timestamp >= time_from) if time_until: query = query.filter(Entry.timestamp < time_until) if tag: query = query.filter(Entry.tags.any(Tag.name == tag)) if event: query = query.filter(Entry.event.has(name=event)) if username: query = query.filter(Entry.username == username) if hostname: query = query.filter(Entry.hostname == hostname) if message: # in case message is given, we search in entries, event_messages and args tables query = query.outerjoin(EventMessage, EventMessage.event_id == Entry.event_id) # form filters array filters = [ or_( Entry.message.ilike(f'%{pat}%'), and_( EventMessage.locale == locale, EventMessage.message.ilike(f'%{pat}%'), ), Entry.args.any(Arg.value == pat), ) for pat in message.split() ] # find all entries matching given message criterion query = query.filter(or_(*filters)) res = [] for entry in windowed_query(query, Entry.id, limit): if len(res) >= limit: break event = entry.event event_name = event.name if event else 'COMMENT' args = {arg.key: arg.value for arg in entry.args} comments = sum(1 for e in entry.comments if e.message and e.context_id == entry.context_id) res.append({ 'id': entry.id, 'date': entry.timestamp.strftime('%Y-%m-%d %H:%M:%S'), 'event_name': event_name, 'hostname': entry.hostname, 'username': entry.username, 'context_id': entry.context_id, 'message': entry.message, 'args': args, 'comments': comments > 0, }) return res[:limit]
[docs] def get(self, context_id: int) -> list[dict[str, Any]]: res = [] query = self._session.query(Entry).\ outerjoin(Event, Event.id == Entry.event_id).\ outerjoin(Arg, Arg.entry_id == Entry.id).\ options(joinedload(Entry.event)).\ options(joinedload(Entry.args)).\ filter(Entry.context_id == context_id).\ order_by(Entry.id) for entry in query.all(): args = {arg.key: arg.value for arg in entry.args} tags = [tag.name for tag in entry.tags] event = entry.event event_name = event.name if event else 'COMMENT' obj = { 'id': entry.id, 'username': entry.username, 'hostname': entry.hostname, 'message': entry.message, 'args': args, 'date': entry.timestamp.strftime('%Y-%m-%d %H:%M:%S'), 'tags': tags, 'context_id': entry.context_id, 'event_name': event_name, } res.append(obj) return res
[docs] @contextmanager def get_client(version: int) -> Iterator[Client]: if version != 1: raise UnsupportedVersion(version) with get_session() as session: client = Client(version=version, session=session) yield client
[docs] class UnsupportedVersion(Exception): def __str__(self): return 'Version %s of the Admin Diary Backend is not supported' % (self.args[0])
[docs] class NoDBConnection(Exception): def __str__(self): return "Database connection cannot be established!\nPlease refer to https://help.univention.com/t/admin-diary-how-to-seperate-frontend-and-backend/11331"