#!/usr/bin/python3
#
# Univention App Center
# Database integration
#
# SPDX-FileCopyrightText: 2016-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
#
import os
from ipaddress import AddressValueError, IPv4Address, IPv4Network
import MySQLdb as mysql
from univention.appcenter.log import LogCatcher, get_base_logger
from univention.appcenter.packages import (
install_packages, mark_packages_as_manually_installed, packages_are_installed, update_packages, wait_for_dpkg_lock,
)
from univention.appcenter.ucr import ucr_get
from univention.appcenter.utils import call_process, call_process_as, container_mode, generate_password
database_logger = get_base_logger().getChild('database')
[docs]
class DatabaseError(Exception):
[docs]
def exception_value(self):
return str(self)
[docs]
class DatabaseCreationFailed(DatabaseError):
def __init__(self, msg, details=None):
self.msg = msg
self.details = details
def __str__(self):
return self.msg
[docs]
def exception_value(self):
if self.details:
return '%s: %s' % (self, self.details)
else:
return str(self)
[docs]
class DatabaseConnectionFailed(DatabaseError):
pass
[docs]
class DatabaseInfoError(DatabaseError):
pass
[docs]
class DatabaseConnector:
def __init__(self, app):
self.app = app
def _get_default_db_name(self):
return self.app.id
[docs]
def get_db_port(self):
return None
[docs]
def get_db_host(self):
bip = ucr_get('docker/daemon/default/opts/bip', '172.17.42.1/16')
try:
IPv4Network('%s' % (bip,), False)
except AddressValueError:
raise DatabaseInfoError('Could not find DB host for %r' % bip)
else:
ip_address = IPv4Address('%s' % (bip.split('/', 1)[0],))
return str(ip_address)
[docs]
def get_db_name(self):
return self.app.database_name or self._get_default_db_name()
[docs]
def get_db_user(self):
return self.app.database_user or self._get_default_db_name()
[docs]
def get_db_password(self):
db_password_file = self.get_db_password_file()
if not db_password_file:
return None
try:
with open(db_password_file) as f:
return f.read().rstrip('\n')
except OSError:
return None
[docs]
def get_db_password_file(self):
if self.app.database_password_file:
return self.app.database_password_file
[docs]
def get_autostart_variable(self):
return None
def _get_software_packages(self):
return []
[docs]
def install(self):
packages = self._get_software_packages()
if packages:
if packages_are_installed(packages, strict=False):
mark_packages_as_manually_installed(packages)
else:
database_logger.info('Installing/upgrading %s', ', '.join(packages))
if wait_for_dpkg_lock():
update_packages()
if not install_packages(packages):
raise DatabaseCreationFailed('Could not install software packages')
else:
raise DatabaseCreationFailed('Could not install software packages due to missing lock')
[docs]
@classmethod
def get_connector(cls, app):
value = app.database
if value:
if app.docker and container_mode():
database_logger.warning('No database integration within container')
return None
if value.lower() == 'postgresql':
database_logger.debug('%s uses PostgreSQL', app)
return PostgreSQL(app)
elif value.lower() == 'mysql':
database_logger.debug('%s uses MySQL', app)
return MySQL(app)
else:
raise DatabaseInfoError('%s wants %r as database. This is unsupported!' % (app, value))
return None
def _get_service_name(self):
return self.__class__.__name__.lower()
[docs]
def start(self, attempts=2):
service_name = self._get_service_name()
if service_name and call_process(['service', service_name, 'start'], database_logger).returncode:
if attempts > 1:
# try again. sometimes, under heavy load, mysql seems to fail to
# start although it is just slow
database_logger.info('Starting %s failed. Retrying...', service_name)
return self.start(attempts=attempts - 1)
catcher = LogCatcher(database_logger)
call_process(['service', service_name, 'status'], catcher)
details = '\n'.join(catcher.stdstream())
raise DatabaseCreationFailed('Could not start %s' % service_name, details=details)
def _write_password(self, password):
db_password_file = self.get_db_password_file()
try:
with open(db_password_file, 'w') as f:
os.chmod(f.name, 0o600)
f.write(password)
except OSError as exc:
raise DatabaseCreationFailed(str(exc))
else:
database_logger.info('Password for %s database in %s', self.app.id, db_password_file)
def _read_password(self):
try:
with open(self.get_db_password_file()) as f:
return f.read().rstrip('\n')
except (OSError, TypeError):
return None
[docs]
def db_exists(self):
database_logger.info("DB Exists default implementation called...")
return False
[docs]
def db_user_exists(self):
return False
[docs]
def create_db_and_user(self, password):
raise NotImplementedError()
[docs]
def setup(self):
self.install()
self.start()
[docs]
def create_database(self):
self.setup()
password = self._read_password()
exists = False
if password:
database_logger.debug('Password already exists')
if self.db_user_exists() and self.db_exists():
database_logger.debug('Database and User already exist')
exists = True
if not exists:
database_logger.info('Creating database for %s', self.app)
password = password or generate_password()
self.create_db_and_user(password)
self._write_password(password)
else:
database_logger.info('%s already has its database', self.app)
[docs]
class PostgreSQL(DatabaseConnector):
def _escape(self, value):
return mysql.string_literal(value).decode('utf-8')
def _get_software_packages(self):
return ['univention-postgresql']
[docs]
def get_db_port(self):
return 5432
[docs]
def get_db_password_file(self):
if self.app.database_password_file:
return self.app.database_password_file
return '/etc/postgres-%s.secret' % self.app.id
[docs]
def get_autostart_variable(self):
return 'postgres8/autostart'
[docs]
def execute(self, query):
logger = LogCatcher()
process = call_process_as('postgres', ['/usr/bin/psql', '-tc', query], logger=logger)
if process.returncode:
for level, msg in logger.logs:
if level == 'OUT':
database_logger.info(msg)
elif level == 'ERR':
database_logger.warning(msg)
raise DatabaseError('Returncode %s for query' % process.returncode)
return list(logger.stdout())
[docs]
def db_exists(self):
database_logger.info('Checking if database %s exists (postgresql implementation)', self.get_db_name())
stdout = self.execute('SELECT COUNT(*) FROM pg_database WHERE datname = %s' % self._escape(self.get_db_name())) # noqa: S608
if stdout and stdout[0].strip() == '1':
database_logger.info('Database %s already exists', self.get_db_name())
return True
else:
database_logger.info('Database %s does not exist', self.get_db_name())
return False
[docs]
def db_user_exists(self):
stdout = self.execute('SELECT COUNT(*) FROM pg_user WHERE usename = %s' % self._escape(self.get_db_user())) # noqa: S608
if stdout and stdout[0].strip() == '1':
return True
[docs]
def create_db_and_user(self, password):
call_process_as('postgres', ['/usr/bin/createuser', '-DRS', '--login', self.get_db_user()], logger=database_logger)
call_process_as('postgres', ['/usr/bin/createdb', '-O', self.get_db_user(), '-T', 'template0', '-E', 'UTF8', self.get_db_name()], logger=database_logger)
self.execute('ALTER ROLE "%s" WITH ENCRYPTED PASSWORD %s' % (self.get_db_user(), self._escape(password)))
[docs]
class MySQL(DatabaseConnector):
def __init__(self, app):
super().__init__(app)
self._connection = None
self._cursor = None
def _get_software_packages(self):
return ['univention-mariadb']
[docs]
def get_db_port(self):
try:
return int(ucr_get('mysql/config/mysqld/port'))
except (TypeError, ValueError):
return 3306
[docs]
def get_db_password_file(self):
if self.app.database_password_file:
return self.app.database_password_file
return '/etc/mysql-%s.secret' % self.app.id
[docs]
def get_autostart_variable(self):
return 'mysql/autostart'
[docs]
def get_root_connection(self):
if self._connection is None:
with open('/etc/mysql.secret') as f:
passwd = f.read().rstrip('\n')
try:
self._connection = mysql.connect(host='localhost', user='root', passwd=passwd)
except mysql.OperationalError:
raise DatabaseConnectionFailed('Could not connect to the MySQL server. Please verify that MySQL is running. The password for MySQL\'s root user should be in /etc/mysql.secret. You can test the connection via\n mysql --password="$(cat /etc/mysql.secret)"')
return self._connection
[docs]
def get_cursor(self):
if self._cursor is None:
self._cursor = self.get_root_connection().cursor()
return self._cursor
[docs]
def execute(self, query, *args):
try:
cursor = self.get_cursor()
cursor.execute(query, args)
except mysql.Error as exc:
raise DatabaseError(str(exc))
else:
return cursor
[docs]
def db_exists(self):
database_logger.info('Checking if database %s exists (mysql implementation)', self.get_db_name())
cursor = self.execute("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s)", self.get_db_name())
return cursor.fetchone()[0]
[docs]
def db_user_exists(self):
cursor = self.execute("SELECT EXISTS (SELECT DISTINCT user FROM mysql.user WHERE user = %s)", self.get_db_user())
return cursor.fetchone()[0]
[docs]
def escape(self, value):
print(repr(value))
print(repr(str(value)))
return self.get_root_connection().escape(str(value))
[docs]
def create_db_and_user(self, password):
self.execute('CREATE DATABASE IF NOT EXISTS `%s`' % self.get_db_name())
self.execute("GRANT ALL ON `%s`.* TO %%s@'%%%%' IDENTIFIED BY %%s" % self.get_db_name(), self.get_db_user(), password)
def __del__(self):
if self._connection:
self._connection.close()