#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright 2019-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
from contextlib import contextmanager
from datetime import datetime # noqa: F401
from functools import partial
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Tuple # noqa: F401
import sqlalchemy
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Boolean, ForeignKey, Sequence, Text, DateTime, func, Table
from univention.config_registry import ConfigRegistry
from univention.admindiary import get_logger, DiaryEntry # noqa: F401
get_logger = partial(get_logger, 'backend')
[docs]def get_query_limit():
# type: () -> int
ucr = ConfigRegistry()
ucr.load()
limit = ucr.get('admin/diary/query/limit', '')
default_limit = 1000
try:
limit = int(limit)
except ValueError:
limit = default_limit
else:
if limit < 0:
limit = default_limit
return limit
[docs]def get_engine():
# type: () -> sqlalchemy.Engine
ucr = ConfigRegistry()
ucr.load()
password = open('/etc/admin-diary.secret').read().strip()
dbms = ucr.get('admin/diary/dbms')
dbhost = ucr.get('admin/diary/dbhost')
if not dbhost:
admin_diary_backend = ucr.get('admin/diary/backend') or 'localhost'
dbhost = admin_diary_backend.split()[0]
if dbhost == ucr.get('hostname') or dbhost == '%s.%s' % (ucr.get('hostname'), ucr.get('domainname')):
dbhost = 'localhost'
db_url = '%s://admindiary:%s@%s/admindiary' % (dbms, password, dbhost)
if dbms == 'mysql':
db_url = db_url + '?charset=utf8mb4'
return sqlalchemy.create_engine(db_url)
[docs]def make_session_class():
# type: () -> Callable[[], sqlalchemy.Session]
if make_session_class._session is None:
engine = get_engine()
make_session_class._session = sessionmaker(bind=engine)
return make_session_class._session
make_session_class._session = None
[docs]@contextmanager
def get_session():
# type: () -> Iterator[sqlalchemy.Session]
session = make_session_class()()
yield session
session.commit()
session.close()
Base = declarative_base()
entry_tags = Table(
'entry_tags', Base.metadata,
Column('entry_id', ForeignKey('entries.id'), primary_key=True),
Column('tag_id', ForeignKey('tags.id'), primary_key=True)
)
[docs]class Event(Base):
__tablename__ = 'events'
id = Column(Integer, Sequence('event_id_seq'), primary_key=True)
name = Column(String(190), nullable=False, unique=True, index=True)
[docs]class EventMessage(Base):
__tablename__ = 'event_messages'
event_id = Column(None, ForeignKey('events.id', ondelete='CASCADE'), primary_key=True)
locale = Column(String(190), nullable=False, primary_key=True)
message = Column(Text, nullable=False)
locked = Column(Boolean)
[docs]class Entry(Base):
__tablename__ = 'entries'
id = Column(Integer, Sequence('entry_id_seq'), primary_key=True)
username = Column(String(190), nullable=False, index=True)
hostname = Column(String(190), nullable=False, index=True)
message = Column(Text)
timestamp = Column(DateTime(timezone=True), index=True)
context_id = Column(String(190), index=True)
event_id = Column(None, ForeignKey('events.id', ondelete='RESTRICT'), nullable=True)
main_id = Column(None, ForeignKey('entries.id', ondelete='CASCADE'), nullable=True)
event = relationship('Event')
args = relationship('Arg', back_populates='entry')
tags = relationship('Tag', secondary=entry_tags, back_populates='entries')
[docs]class Tag(Base):
__tablename__ = 'tags'
id = Column(Integer, Sequence('tag_id_seq'), primary_key=True)
name = Column(String(190), nullable=False, unique=True, index=True)
entries = relationship('Entry', secondary=entry_tags, back_populates='tags')
[docs]class Arg(Base):
__tablename__ = 'args'
id = Column(Integer, Sequence('arg_id_seq'), primary_key=True)
entry_id = Column(None, ForeignKey('entries.id', ondelete='CASCADE'), index=True)
key = Column(String(190), nullable=False, index=True)
value = Column(String(190), nullable=False, index=True)
entry = relationship('Entry')
[docs]class Client(object):
def __init__(self, version, session):
# type: (int, sqlalchemy.Session) -> None
self.version = version
self._session = session
self._translation_cache = {} # type: Dict[Tuple[str, str], str]
[docs] def translate(self, event_name, locale):
# type: (str, str) -> str
key = (event_name, locale)
if key not in self._translation_cache:
event_message = self._session.query(EventMessage).filter(EventMessage.event_id == Event.id, EventMessage.locale == locale, Event.name == event_name).one_or_none()
if event_message:
translation = event_message.message
else:
translation = None
self._translation_cache[key] = translation
else:
translation = self._translation_cache[key]
return translation
[docs] def options(self):
# type: () -> dict
ret = {}
ret['tags'] = sorted([tag.name for tag in self._session.query(Tag).all()])
ret['usernames'] = sorted([username[0] for username in self._session.query(Entry.username).distinct()])
ret['hostnames'] = sorted([hostname[0] for hostname in self._session.query(Entry.hostname).distinct()])
ret['events'] = sorted([event.name for event in self._session.query(Event).all()])
return ret
[docs] def add_tag(self, name):
# type: (str) -> Tag
obj = self._session.query(Tag).filter(Tag.name == name).one_or_none()
if obj is None:
obj = Tag(name=name)
self._session.add(obj)
self._session.flush()
return obj
[docs] def add_event(self, name):
# type: (str) -> Event
obj = self._session.query(Event).filter(Event.name == name).one_or_none()
if obj is None:
obj = Event(name=name)
self._session.add(obj)
self._session.flush()
return obj
[docs] def add_event_message(self, event_id, locale, message, force):
# type: (int, str, str, bool) -> bool
event_message_query = self._session.query(EventMessage).filter(EventMessage.locale == locale, EventMessage.event_id == event_id)
event_message = event_message_query.one_or_none()
if event_message is None:
event_message = EventMessage(event_id=event_id, locale=locale, message=message, locked=force)
self._session.add(event_message)
self._session.flush()
return True
else:
if force:
event_message_query.update({'locked': True, 'message': message})
self._session.flush()
return True
return False
[docs] def add(self, diary_entry):
# type: (DiaryEntry) -> None
if diary_entry.event_name == 'COMMENT':
entry_message = diary_entry.message.get('en')
event_id = None
else:
get_logger().debug('Searching for Event %s' % diary_entry.event_name)
entry_message = None
event = self.add_event(diary_entry.event_name)
event_id = event.id
get_logger().debug('Found Event ID %s' % event.id)
if diary_entry.message:
for locale, message in diary_entry.message.items():
get_logger().debug('Trying to insert message for %s' % locale)
if self.add_event_message(event.id, locale, message, False):
get_logger().debug('Found no existing one. Inserted %r' % message)
else:
get_logger().debug('No further message given, though')
entry = Entry(username=diary_entry.username, hostname=diary_entry.hostname, timestamp=diary_entry.timestamp, context_id=diary_entry.context_id, event_id=event_id, message=entry_message)
self._session.add(entry)
main_id = self._session.query(func.min(Entry.id)).filter(Entry.context_id == entry.context_id).scalar()
if main_id:
entry.main_id = main_id
for tag in diary_entry.tags:
tag = self.add_tag(tag)
entry.tags.append(tag)
for key, value in diary_entry.args.items():
entry.args.append(Arg(key=key, value=value))
get_logger().info('Successfully added %s (%s)' % (diary_entry.context_id, diary_entry.event_name))
def _one_query(self, ids, result):
# type: (Optional[Set[int]], Iterable[Any]) -> Set[int]
if ids is not None and not ids:
return set()
new_ids = set()
for entry in result:
new_ids.add(entry.id)
if ids is None:
return new_ids
else:
return ids.intersection(new_ids)
[docs] def query(self, time_from=None, time_until=None, tag=None, event=None, username=None, hostname=None, message=None, locale='en'):
# type: (Optional[datetime], Optional[datetime], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], str) -> List[Dict[str, Any]]
ids = None
if time_from:
ids = self._one_query(ids, self._session.query(Entry).filter(Entry.timestamp >= time_from))
if time_until:
ids = self._one_query(ids, self._session.query(Entry).filter(Entry.timestamp < time_until))
if tag:
ids = self._one_query(ids, self._session.query(Entry).filter(Entry.tags.any(Tag.name == tag)))
if event:
ids = self._one_query(ids, self._session.query(Entry).filter(Entry.event.has(name=event)))
if username:
ids = self._one_query(ids, self._session.query(Entry).filter(Entry.username == username))
if hostname:
ids = self._one_query(ids, self._session.query(Entry).filter(Entry.hostname == hostname))
if message:
pattern_ids = set()
for pat in message.split():
pattern_ids.update(self._one_query(None, self._session.query(Entry).filter(Entry.message.ilike('%{}%'.format(pat)))))
pattern_ids.update(self._one_query(None, self._session.query(Entry).join(EventMessage, Entry.event_id == EventMessage.event_id).filter(Entry.event_id == EventMessage.event_id, EventMessage.locale == locale, EventMessage.message.ilike('%{}%'.format(pat)))))
pattern_ids.update(self._one_query(None, self._session.query(Entry).filter(Entry.args.any(Arg.value == pat))))
if ids is None:
ids = pattern_ids
else:
ids.intersection_update(pattern_ids)
if ids is None:
entries = self._session.query(Entry).filter(Entry.event_id != None) # noqa: E711
else:
entries = self._session.query(Entry).filter(Entry.id.in_(ids), Entry.event_id != None) # noqa: E711
limit = get_query_limit()
if limit:
entries = entries.limit(limit)
res = []
for entry in entries:
event = entry.event
if event:
event_name = event.name
else:
event_name = 'COMMENT'
args = dict((arg.key, arg.value) for arg in entry.args)
comments = self._session.query(Entry).filter(Entry.context_id == entry.context_id, Entry.message != None).count() # noqa: E711
res.append({
'id': entry.id,
'date': entry.timestamp.strftime('%Y-%m-%d %H:%M:%S'),
'event_name': event_name,
'hostname': entry.hostname,
'username': entry.username,
'context_id': entry.context_id,
'message': entry.message,
'args': args,
'comments': comments > 0,
})
return res
[docs] def get(self, context_id):
# type: (int) -> List[Dict[str, Any]]
res = []
for entry in self._session.query(Entry).filter(Entry.context_id == context_id).order_by('id'):
args = dict((arg.key, arg.value) for arg in entry.args)
tags = [tag.name for tag in entry.tags]
event = entry.event
if event:
event_name = event.name
else:
event_name = 'COMMENT'
obj = {
'id': entry.id,
'username': entry.username,
'hostname': entry.hostname,
'message': entry.message,
'args': args,
'date': entry.timestamp.strftime('%Y-%m-%d %H:%M:%S'),
'tags': tags,
'context_id': entry.context_id,
'event_name': event_name,
}
res.append(obj)
return res
[docs]@contextmanager
def get_client(version):
# type: (int) -> Iterator[Client]
if version != 1:
raise UnsupportedVersion(version)
with get_session() as session:
client = Client(version=version, session=session)
yield client
[docs]class UnsupportedVersion(Exception):
def __str__(self):
return 'Version %s of the Admin Diary Backend is not supported' % (self.args[0])