Source code for univention.appcenter.database

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention App Center
#  Database integration
#
# Copyright 2016-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
#

import os
import six

import MySQLdb as mysql
from ipaddress import IPv4Network, IPv4Address, AddressValueError

from univention.appcenter.utils import generate_password, call_process, call_process_as, container_mode
from univention.appcenter.packages import packages_are_installed, install_packages, update_packages, mark_packages_as_manually_installed, wait_for_dpkg_lock
from univention.appcenter.log import get_base_logger, LogCatcher
from univention.appcenter.ucr import ucr_get

database_logger = get_base_logger().getChild('database')


[docs]class DatabaseError(Exception):
[docs] def exception_value(self): return str(self)
[docs]class DatabaseCreationFailed(DatabaseError): def __init__(self, msg, details=None): self.msg = msg self.details = details def __str__(self): return self.msg
[docs] def exception_value(self): if self.details: return '%s: %s' % (self, self.details) else: return str(self)
[docs]class DatabaseConnectionFailed(DatabaseError): pass
[docs]class DatabaseInfoError(DatabaseError): pass
[docs]class DatabaseConnector(object): def __init__(self, app): self.app = app def _get_default_db_name(self): return self.app.id
[docs] def get_db_port(self): return None
[docs] def get_db_host(self): bip = ucr_get('docker/daemon/default/opts/bip', '172.17.42.1/16') try: IPv4Network(u'%s' % (bip,), False) except AddressValueError: raise DatabaseInfoError('Could not find DB host for %r' % bip) else: ip_address = IPv4Address(u'%s' % (bip.split('/', 1)[0],)) return str(ip_address)
[docs] def get_db_name(self): return self.app.database_name or self._get_default_db_name()
[docs] def get_db_user(self): return self.app.database_user or self._get_default_db_name()
[docs] def get_db_password(self): db_password_file = self.get_db_password_file() if not db_password_file: return None try: with open(db_password_file) as f: return f.read().rstrip('\n') except EnvironmentError: return None
[docs] def get_db_password_file(self): if self.app.database_password_file: return self.app.database_password_file
[docs] def get_autostart_variable(self): return None
def _get_software_packages(self): return []
[docs] def install(self): packages = self._get_software_packages() if packages: if packages_are_installed(packages, strict=False): mark_packages_as_manually_installed(packages) else: database_logger.info('Installing/upgrading %s' % ', '.join(packages)) if wait_for_dpkg_lock(): update_packages() if not install_packages(packages): raise DatabaseCreationFailed('Could not install software packages') else: raise DatabaseCreationFailed('Could not install software packages due to missing lock')
[docs] @classmethod def get_connector(cls, app): value = app.database if value: if app.docker and container_mode(): database_logger.warn('No database integration within container') return None if value.lower() == 'postgresql': database_logger.debug('%s uses PostgreSQL' % app) return PostgreSQL(app) elif value.lower() == 'mysql': database_logger.debug('%s uses MySQL' % app) return MySQL(app) else: raise DatabaseInfoError('%s wants %r as database. This is unsupported!' % (app, value)) return None
def _get_service_name(self): return self.__class__.__name__.lower()
[docs] def start(self, attempts=2): service_name = self._get_service_name() if service_name: if call_process(['service', service_name, 'start'], database_logger).returncode: if attempts > 1: # try again. sometimes, under heavy load, mysql seems to fail to # start although it is just slow database_logger.info('Starting %s failed. Retrying...' % service_name) return self.start(attempts=attempts - 1) catcher = LogCatcher(database_logger) call_process(['service', service_name, 'status'], catcher) details = '\n'.join(catcher.stdstream()) raise DatabaseCreationFailed('Could not start %s' % service_name, details=details)
def _write_password(self, password): db_password_file = self.get_db_password_file() try: with open(db_password_file, 'w') as f: os.chmod(f.name, 0o600) f.write(password) except EnvironmentError as exc: raise DatabaseCreationFailed(str(exc)) else: database_logger.info('Password for %s database in %s' % (self.app.id, db_password_file)) def _read_password(self): try: with open(self.get_db_password_file(), 'r') as f: return f.read().rstrip('\n') except (EnvironmentError, TypeError): return None
[docs] def db_exists(self): database_logger.info("DB Exists default implementation called...") return False
[docs] def db_user_exists(self): return False
[docs] def create_db_and_user(self, password): raise NotImplementedError()
[docs] def setup(self): self.install() self.start()
[docs] def create_database(self): self.setup() password = self._read_password() exists = False if password: database_logger.debug('Password already exists') if self.db_user_exists() and self.db_exists(): database_logger.debug('Database and User already exist') exists = True if not exists: database_logger.info('Creating database for %s' % self.app) password = password or generate_password() self.create_db_and_user(password) self._write_password(password) else: database_logger.info('%s already has its database' % self.app)
[docs]class PostgreSQL(DatabaseConnector): def _escape(self, value): return mysql.string_literal(value).decode('utf-8') def _get_software_packages(self): return ['univention-postgresql']
[docs] def get_db_port(self): return 5432
[docs] def get_db_password_file(self): if self.app.database_password_file: return self.app.database_password_file return '/etc/postgres-%s.secret' % self.app.id
[docs] def get_autostart_variable(self): return 'postgres8/autostart'
[docs] def execute(self, query): logger = LogCatcher() process = call_process_as('postgres', ['/usr/bin/psql', '-tc', query], logger=logger) if process.returncode: for level, msg in logger.logs: if level == 'OUT': database_logger.info(msg) elif level == 'ERR': database_logger.warn(msg) raise DatabaseError('Returncode %s for query' % process.returncode) return list(logger.stdout())
[docs] def db_exists(self): database_logger.info('Checking if database %s exists (postgresql implementation)' % self.get_db_name()) stdout = self.execute('SELECT COUNT(*) FROM pg_database WHERE datname = %s' % self._escape(self.get_db_name())) if stdout and stdout[0].strip() == '1': database_logger.info('Database %s already exists' % self.get_db_name()) return True else: database_logger.info('Database %s does not exist' % self.get_db_name()) return False
[docs] def db_user_exists(self): stdout = self.execute('SELECT COUNT(*) FROM pg_user WHERE usename = %s' % self._escape(self.get_db_user())) if stdout and stdout[0].strip() == '1': return True
[docs] def create_db_and_user(self, password): call_process_as('postgres', ['/usr/bin/createuser', '-DRS', '--login', self.get_db_user()], logger=database_logger) call_process_as('postgres', ['/usr/bin/createdb', '-O', self.get_db_user(), '-T', 'template0', '-E', 'UTF8', self.get_db_name()], logger=database_logger) self.execute('ALTER ROLE "%s" WITH ENCRYPTED PASSWORD %s' % (self.get_db_user(), self._escape(password)))
[docs]class MySQL(DatabaseConnector): def __init__(self, app): super(MySQL, self).__init__(app) self._connection = None self._cursor = None def _get_software_packages(self): return ['univention-mariadb']
[docs] def get_db_port(self): try: return int(ucr_get('mysql/config/mysqld/port')) except (TypeError, ValueError): return 3306
[docs] def get_db_password_file(self): if self.app.database_password_file: return self.app.database_password_file return '/etc/mysql-%s.secret' % self.app.id
[docs] def get_autostart_variable(self): return 'mysql/autostart'
[docs] def get_root_connection(self): if self._connection is None: with open('/etc/mysql.secret') as f: passwd = f.read().rstrip('\n') try: self._connection = mysql.connect(host='localhost', user='root', passwd=passwd) except mysql.OperationalError: raise DatabaseConnectionFailed('Could not connect to the MySQL server. Please verify that MySQL is running. The password for MySQL\'s root user should be in /etc/mysql.secret. You can test the connection via\n mysql --password="$(cat /etc/mysql.secret)"') return self._connection
[docs] def get_cursor(self): if self._cursor is None: self._cursor = self.get_root_connection().cursor() return self._cursor
[docs] def execute(self, query, *args): try: cursor = self.get_cursor() cursor.execute(query, args) except mysql.Error as exc: raise DatabaseError(str(exc)) else: return cursor
[docs] def db_exists(self): database_logger.info('Checking if database %s exists (mysql implementation)' % self.get_db_name()) cursor = self.execute("SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s)", self.get_db_name()) return cursor.fetchone()[0]
[docs] def db_user_exists(self): cursor = self.execute("SELECT EXISTS (SELECT DISTINCT user FROM mysql.user WHERE user = %s)", self.get_db_user()) return cursor.fetchone()[0]
[docs] def escape(self, value): print(repr(value)) print(repr(six.text_type(value))) return self.get_root_connection().escape(six.text_type(value))
[docs] def create_db_and_user(self, password): self.execute('CREATE DATABASE IF NOT EXISTS `%s`' % self.get_db_name()) self.execute("GRANT ALL ON `%s`.* TO %%s@'%%%%' IDENTIFIED BY %%s" % self.get_db_name(), self.get_db_user(), password)
def __del__(self): if self._connection: self._connection.close()