Source code for univention.pkgdb

#!/usr/bin/python3
# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""
Univention Package Database
Python module for the package database
"""

from __future__ import annotations

import argparse
import csv
import os
import os.path
import sys
import time

import apt_pkg
import dns.exception
import dns.resolver
import pgdb

import univention.config_registry


assert pgdb.paramstyle == 'pyformat'

# TODO: use FQDN or DN as system identifier instead of hostname?
# TODO: add <limit> parameter to sql_get* functions
# TODO: add <order> parameter to sql_get* functions to change/disable sort
# TODO: pkgdbu should not be able to create roles, instead do it as postgres from listener script as root


[docs] def parse_options(): """ parse options and return <options> with <options.debug> <options.action> <options.system> <options.db_server> set """ parser = argparse.ArgumentParser(add_help=False, description='Scan all packages in the local system and send this data to the database pkgdb.') actions = parser.add_argument_group('Actions', 'Select an action, default is --scan') actions.add_argument( "--scan", help='Scan this systems packages and sent them to the package database', action='append_const', dest='action', const='scan', default=[]) actions.add_argument( "--remove-system", metavar='SYSTEM', help='Removes SYSTEM from the package database', dest='removesystem') actions.add_argument( "--test-superuser", help='Test for ability to add or delete database users', action='append_const', dest='action', const='test-superuser') actions.add_argument( '--dump-all', help='Dump entire content of the database', action='append_const', dest='action', const='dump-all') actions.add_argument( '--dump-systems', help='Dump systems-table of the database', action='append_const', dest='action', const='dump-systems') actions.add_argument( '--dump-packages', help='Dump packages-table (query) of the database', action='append_const', dest='action', const='dump-packages') actions.add_argument( '--dump-systems-packages', help='Dump systems-packages-table of the database', action='append_const', dest='action', const='dump-systems-packages') actions.add_argument( '--fill-testdb', help="Scan all packages of the local system and add them to the database using system name 'testsystemX', using 0001 to 1500 for X. For testing purposes only.", action='append_const', dest='action', const='fill-testdb') actions.add_argument( '--version', help='Print version information and exit', action='append_const', dest='action', const='version') actions.add_argument( "--add-system", metavar='SYSTEM', help='Add a SYSTEM as db-user-account. Normally this will be used by univention-listener', dest='addsystem') actions.add_argument( "--del-system", metavar='SYSTEM', help='Delete a SYSTEM as db-user-account. Normally this will be used by univention-listener', dest='delsystem') parser.add_argument("-?", "-h", "--help", action='help', help='show this help message and exit') parser.add_argument('--debug', help='Print more output', action='count', default=0) parser.add_argument('--db-server', metavar='SERVER', help='The database server') options = parser.parse_args() if options.addsystem is not None: options.action.append('add-system') options.system = options.addsystem del options.addsystem if options.delsystem is not None: options.action.append('del-system') options.system = options.delsystem del options.delsystem if options.removesystem is not None: options.action.append('remove-system') options.system = options.removesystem del options.removesystem if len(options.action) > 1: parser.error('Only one action at a time supported!') if not options.action: options.action.append('scan') options.action = options.action[0] return options
[docs] def log(message): """Log-Funktion""" try: with open("/var/log/univention/pkgdb.log", "a") as logfile: # TODO: persistent handle? logfile.write(time.strftime('%G-%m-%d %H:%M:%S') + ' ' + message + '\n') except OSError: # no log, no real problem pass
[docs] def build_sysversion(config_registry): sysversion = '%s-%s' % (config_registry['version/version'], config_registry['version/patchlevel']) if config_registry.get('version/security-patchlevel'): sysversion = "%s-%s" % (sysversion, config_registry['version/security-patchlevel']) if config_registry.get('version/erratalevel'): sysversion = "%s errata%s" % (sysversion, config_registry['version/erratalevel']) return sysversion
[docs] def sql_check_privileges(cursor): """DB-Privs testen (leerer Zugriff)""" log('check privileges ') try: cursor.execute('SELECT COUNT(*) FROM systems WHERE 1=0') except pgdb.Error: log('not OK') return 0 log('OK') return 1
[docs] def get_dbservername(domainname: str) -> str | None: """Datenbankserver ermitteln""" name = f"_pkgdb._tcp.{domainname}." log(f'get dbservername for {name!r}') resolver = dns.resolver.Resolver() try: answer = resolver.query(name, "SRV") for item in answer.response.answer[0].items: return item.target.to_text() except dns.exception.DNSException: log(f'Cannot find {name!r}') print(f'Cannot find {name}') return None
[docs] def sql_test_superuser(cursor): "Prüfe auf Superuser" log('test for pkgdbu') if not sql_check_privileges(cursor): log('pkgdbu not OK') return 1 log('pkgdbu OK') return 0
[docs] def sql_grant_system(connection, cursor, sysname): """Datenbankbenutzer hinzufügen""" log('add (grant) user ' + sysname + ' to database') # manual "quoted identifier" (no pgdb support) sqlcmd = 'CREATE USER "%s" IN GROUP pkgdbg' % (sysname.replace('"', '""'), ) print('SQL: %s\n' % (sqlcmd, )) try: cursor.execute(sqlcmd) connection.commit() log('create user OK') except pgdb.Error: connection.rollback() log('not OK. Try to alter ' + sysname) # manual "quoted identifier" (no pgdb support) sqlcmd = 'ALTER GROUP pkgdbg ADD USER "%s"' % (sysname.replace('"', '""'), ) print('SQL: %s\n' % (sqlcmd, )) try: cursor.execute(sqlcmd) connection.commit() log('alter user OK') except pgdb.Error: connection.rollback() log('not OK. ignore it') return 0
[docs] def sql_revoke_system(connection, cursor, sysname): """Datenbankbenutzer entfernen""" log('del (revoke) user ' + sysname + ' from database') # manual "quoted identifier" (no pgdb support) sql_command = 'DROP USER IF EXISTS "%s"' % (sysname.replace('"', '""'), ) cursor.execute(sql_command) connection.commit() return 0
[docs] def sql_put_sys_in_systems(cursor, sysname, sysversion, sysrole, ldaphostdn, architecture): """insert a system name into the system-table (or update its data)""" parameters = { 'sysname': sysname, 'sysversion': sysversion, 'sysrole': sysrole, 'ldaphostdn': ldaphostdn, 'architecture': architecture, } cursor.execute('SELECT true FROM systems WHERE sysname = %(sysname)s', parameters) if cursor.rowcount == 0: sql_command = ''' INSERT INTO systems (sysname, sysversion, sysrole, ldaphostdn, architecture, scandate) VALUES(%(sysname)s, %(sysversion)s, %(sysrole)s, %(ldaphostdn)s, %(architecture)s, CURRENT_TIMESTAMP) ''' else: sql_command = ''' UPDATE systems SET sysversion = %(sysversion)s, sysrole = %(sysrole)s, ldaphostdn = %(ldaphostdn)s, architecture = %(architecture)s, scandate = CURRENT_TIMESTAMP WHERE sysname = %(sysname)s ''' try: cursor.execute(sql_command, parameters) except pgdb.Error as error: log('DB-Error in sql_put_sys_on_systems: %r %r %r' % (error, sql_command, parameters)) raise
[docs] def sql_put_sys_in_systems_no_architecture(cursor, sysname, sysversion, sysrole, ldaphostdn): """insert a system name into the old system-table (or update its data)""" parameters = { 'sysname': sysname, 'sysversion': sysversion, 'sysrole': sysrole, 'ldaphostdn': ldaphostdn, } cursor.execute('SELECT true FROM systems WHERE sysname = %(sysname)s', parameters) if cursor.rowcount == 0: sql_command = ''' INSERT INTO systems (sysname, sysversion, sysrole, ldaphostdn, scandate) VALUES(%(sysname)s, %(sysversion)s, %(sysrole)s, %(ldaphostdn)s, CURRENT_TIMESTAMP) ''' else: sql_command = ''' UPDATE systems SET sysversion = %(sysversion)s, sysrole = %(sysrole)s, ldaphostdn = %(ldaphostdn)s, scandate = CURRENT_TIMESTAMP WHERE sysname = %(sysname)s ''' try: cursor.execute(sql_command, parameters) except pgdb.Error as error: log('DB-Error in sql_put_sys_on_systems: %r %r %r' % (error, sql_command, parameters)) raise
[docs] def sql_select(cursor, sqlcmd): """SQL Selects""" log('SQL: ' + sqlcmd) # TODO: why? try: cursor.execute(sqlcmd) result = cursor.fetchall() return result except pgdb.Error: log('Cannot read from the database:' + sqlcmd) return []
[docs] def sql_getall_systems(cursor): sqlcmd = "SELECT sysname, sysversion, sysrole, to_char(scandate,'YYYY-MM-DD HH24:MI:SS'), ldaphostdn FROM systems ORDER BY sysname" return sql_select(cursor, sqlcmd)
[docs] def sql_getall_systemroles(cursor): query = "SELECT DISTINCT sysrole FROM systems ORDER BY sysrole" return sql_select(cursor, query)
[docs] def sql_getall_systemversions(cursor): sqlcmd = "SELECT DISTINCT sysversion FROM systems ORDER BY sysversion" return sql_select(cursor, sqlcmd)
[docs] def sql_getall_packages_in_systems(cursor): sqlcmd = "SELECT sysname, pkgname, vername, to_char(scandate,'YYYY-MM-DD HH24:MI:SS'), inststatus, selectedstate, inststate, currentstate FROM packages_on_systems ORDER BY sysname, pkgname, vername" return sql_select(cursor, sqlcmd)
[docs] def sql_get_systems_by_query(cursor, query): if not query: return [] sqlcmd = "SELECT sysname, sysversion, sysrole, to_char(scandate,'YYYY-MM-DD HH24:MI:SS'), ldaphostdn FROM systems WHERE " + query + " ORDER BY sysname" # FIXME: # noqa: S608 return sql_select(cursor, sqlcmd)
[docs] def sql_get_packages_in_systems_by_query(cursor, query, join_systems, limit=None, orderby='sysname, pkgname, vername'): if not query: return [] if join_systems: sqlcmd = "SELECT sysname, pkgname, vername, to_char(packages_on_systems.scandate, 'YYYY-MM-DD HH24:MI:SS'), inststatus, selectedstate, inststate, currentstate FROM packages_on_systems JOIN systems USING(sysname) WHERE " + query # FIXME: # noqa: S608 else: sqlcmd = "SELECT sysname, pkgname, vername, to_char(packages_on_systems.scandate, 'YYYY-MM-DD HH24:MI:SS'), inststatus, selectedstate, inststate, currentstate FROM packages_on_systems WHERE " + query # FIXME: # noqa: S608, if orderby: sqlcmd += " ORDER BY %s" % (orderby) if limit is not None: sqlcmd += " LIMIT %d" % (limit) return sql_select(cursor, sqlcmd)
[docs] def dump_systems(cursor): """writes CSV with all systems and their system-specific information to stdout""" cursor.execute("SET datestyle = 'ISO'") query = ''' SELECT sysname, sysversion, sysrole, scandate, ldaphostdn FROM systems ORDER BY sysname ''' cursor.execute(query) writer = csv.writer(sys.stdout, delimiter=' ') writer.writerow(('hostname', 'UCS version', 'server role', 'last scan', 'LDAP host DN')) for row in cursor: writer.writerow(row) return 0
[docs] def dump_packages(cursor): # TODO: What use is this functionality? query = "SELECT DISTINCT ON (pkgname, vername) pkgname, vername, inststatus FROM packages_on_systems ORDER BY pkgname, vername, inststatus" cursor.execute(query) writer = csv.writer(sys.stdout, delimiter=' ') writer.writerow(('package', 'version', 'installed')) for row in cursor: writer.writerow(row) return 0
[docs] def dump_systems_packages(cursor): cursor.execute("SET datestyle = 'ISO'") query = ''' SELECT sysname, pkgname, vername, scandate, inststatus, selectedstate, inststate, currentstate FROM packages_on_systems ORDER BY sysname, pkgname, vername ''' cursor.execute(query) writer = csv.writer(sys.stdout, delimiter=' ') writer.writerow(('system', 'package', 'version', 'last scan', 'installed', 'selected state', 'installation state', 'current state')) for row in cursor: writer.writerow(row) return 0
[docs] def action_remove_system(connection, cursor, sysname): """removes system <sysname> from the database""" connection.rollback() delete_packages = ''' DELETE FROM packages_on_systems WHERE sysname = %(sysname)s ''' delete_system = ''' DELETE FROM systems WHERE sysname = %(sysname)s ''' cursor.execute(delete_packages, {'sysname': sysname}) cursor.execute(delete_system, {'sysname': sysname}) connection.commit()
[docs] def scan_and_store_packages(cursor, sysname, fake_null=False, architecture=None): """ updates the system <sysname> with the current package state if <fake_null> is True put '' instead of None in the vername field """ delete_packages = ''' DELETE FROM packages_on_systems WHERE sysname = %(sysname)s ''' insert_statement = ''' INSERT INTO packages_on_systems (scandate, sysname, currentstate, inststate, inststatus, pkgname, selectedstate, vername) VALUES ''' insert_value = '''( CURRENT_TIMESTAMP, %(sysname)s, %(currentstate)s, %(inststate)s, %(inststatus)s, %(pkgname)s, %(selectedstate)s, %(vername)s) ''' if scan_and_store_packages.cache is None: apt_pkg.init() scan_and_store_packages.cache = apt_pkg.Cache() cursor.execute(delete_packages, {'sysname': sysname}) insert_values = [] for package in scan_and_store_packages.cache.packages: if not package.has_versions: continue if architecture is not None and architecture != package.architecture: continue parameters = { 'sysname': sysname, 'currentstate': package.current_state, 'inststate': package.inst_state, 'inststatus': 'n', 'pkgname': package.name, 'selectedstate': package.selected_state, 'vername': None, } if fake_null: parameters['vername'] = '' if package.current_ver: parameters['inststatus'] = 'i' parameters['vername'] = package.current_ver.ver_str insert_values.append(cursor._quoteparams(insert_value, parameters)) if insert_values: insert_statement += ','.join(insert_values) cursor.execute(insert_statement)
scan_and_store_packages.cache = None
[docs] def action_fill_testdb(connection, cursor, config_registry): """Fülle Testdatenbank""" connection.rollback() sysversion = build_sysversion(config_registry) sysrole = config_registry['server/role'] ldaphostdn = config_registry['ldap/hostdn'] apt_pkg.init() architecture = apt_pkg.config.find("APT::Architecture") log('start fill of testdb ') for sysname in ['testsystem%04d' % (i, ) for i in range(1, 1500)]: try: sql_put_sys_in_systems(cursor, sysname, sysversion, sysrole, ldaphostdn, architecture) fake_null = False except pgdb.DatabaseError: # assume we are connected to a univention-pkgdb < 6.0.7-1 (old schema) connection.rollback() # retry for old schema sql_put_sys_in_systems_no_architecture(cursor, sysname, sysversion, sysrole, ldaphostdn) fake_null = True # old schema has NOT NULL, thus we have to use '' instead of None scan_and_store_packages(cursor, sysname, fake_null, architecture) connection.commit() log('end of fill testdb') return 0
[docs] def action_scan(connection, cursor, config_registry): """put systems <sysname> in the database and updates it with the current package state""" connection.rollback() sysname = config_registry['hostname'] sysversion = build_sysversion(config_registry) sysrole = config_registry['server/role'] ldaphostdn = config_registry['ldap/hostdn'] apt_pkg.init() architecture = apt_pkg.config.find("APT::Architecture") log('Starting scan of system %r' % (sysname, )) try: sql_put_sys_in_systems(cursor, sysname, sysversion, sysrole, ldaphostdn, architecture) fake_null = False except pgdb.DatabaseError: # assume we are connected to a univention-pkgdb < 6.0.7-1 (old schema) connection.rollback() # retry for old schema sql_put_sys_in_systems_no_architecture(cursor, sysname, sysversion, sysrole, ldaphostdn) fake_null = True # old schema has NOT NULL, thus we have to use '' instead of None scan_and_store_packages(cursor, sysname, fake_null, architecture) connection.commit() log('end of scan for system %r' % (sysname, )) return 0
PRIVILEGED_OPERATIONS = frozenset(('add-system', 'del-system', 'fill-testdb', 'test-superuser'))
[docs] def open_database_connection(config_registry, pkgdbu=False, db_server=None): connection_info = { # see <http://www.postgresql.org/docs/8.4/static/libpq-connect.html> 'dbname': 'pkgdb', } if config_registry.is_true('pkgdb/requiressl'): connection_info['sslmode'] = 'require' SECRET = '/etc/postgresql/pkgdb.secret' if pkgdbu and os.path.isfile(SECRET) and os.access(SECRET, os.R_OK): # 'host' not specified -> localhost over Unix-domain socket (connection type "local") connection_info['user'] = 'pkgdbu' password_file = SECRET else: if db_server is None: db_server = get_dbservername(config_registry['domainname']) if db_server is None: return None connection_info['host'] = db_server connection_info['user'] = config_registry.get('pkgdb/user', '%s$' % (config_registry['hostname'], )) password_file = config_registry.get('pkgdb/pwdfile', '/etc/machine.secret') with open(password_file) as fd: connection_info['password'] = fd.read().rstrip('\n') connectstring = ' '.join([ "%s='%s'" % (key, value.replace('\\', '\\\\').replace("'", "\\'")) for (key, value) in connection_info.items() ]) connection = pgdb.connect(database=connectstring) return connection
[docs] def main(): """main function for univention-pkgdb-scan""" options = parse_options() if options.action == 'version': print('%s %s' % (os.path.basename(sys.argv[0]), '@%@package_version@%@')) return 0 config_registry = univention.config_registry.ConfigRegistry() config_registry.load() # Datenbankzugriffsmethode ermitteln if options.action in PRIVILEGED_OPERATIONS: connection = open_database_connection(config_registry, pkgdbu=True) else: connection = open_database_connection(config_registry, pkgdbu=False) if connection is None: print('No DB-Server-Name found.') return 1 cursor = connection.cursor() if options.action == 'test-superuser': return sql_test_superuser(cursor) elif options.action == 'dump-systems': return dump_systems(cursor) elif options.action == 'dump-packages': return dump_packages(cursor) elif options.action == 'dump-systems-packages': return dump_systems_packages(cursor) elif options.action == 'dump-all': return dump_systems(cursor) or \ dump_packages(cursor) or \ dump_systems_packages(cursor) elif not sql_check_privileges(cursor): print('PKGDB: no privileges to access the database') print('You must first add this system with --add-system on the db-server (or join the system)') print('This should be done automatically by the cronjob univention-pkgdb-check') return 1 elif options.action == 'add-system': # Systembenutzer zur Datenbank hinzufügen return sql_grant_system(connection, cursor, options.system) elif options.action == 'del-system': # Systembenutzer aus Datenbank entfernen return sql_revoke_system(connection, cursor, options.system) elif options.action == 'fill-testdb': return action_fill_testdb(connection, cursor, config_registry) elif options.action == 'remove-system': return action_remove_system(connection, cursor, options.system) elif not config_registry.is_true('pkgdb/scan'): log('univention-config-registry pkgdb/scan is not true') print('The Univention Configuration Registry variable pkgdb/scan is not true.') return 0 elif options.action == 'scan': return action_scan(connection, cursor, config_registry)