#!/usr/bin/python3
#
# Univention Management Console
# module: software monitor
#
# SPDX-FileCopyrightText: 2011-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
import pgdb
import univention.config_registry
import univention.pkgdb as updb
from univention.management.console import Translation
from univention.management.console.log import MODULE
from univention.management.console.modules import Base, UMC_Error
from univention.management.console.modules.decorators import log, sanitize, simple_response
from univention.management.console.modules.sanitizers import ChoicesSanitizer
_ = Translation('univention-management-console-module-pkgdb').translate
RECORD_LIMIT = 100000 # never return more than this many records
CRITERIA = {
'systems': [
'all_properties', 'sysname', 'sysrole', 'sysversion', 'sysversion_greater', 'sysversion_lower',
],
'packages': [
'pkgname', 'currentstate', # head fields
'selectedstate', 'inststate', # state fields
],
}
CRITERIA_OPERATOR = {
'sysname': '~',
'pkgname': '~',
'sysversion': '=',
'vername': '=',
'sysrole': '=',
'selectedstate': '=',
'inststate': '=',
'currentstate': '=',
'sysversion_greater': '>',
'sysversion_lower': '<',
'all_properties': '~',
}
MAPPED_TABLES = {
'all_properties': ['sysname', 'sysrole', 'sysversion'],
'sysversion_greater': ['sysversion'],
'sysversion_lower': ['sysversion'],
}
MAPPED_PATTERNS_TO_KEYS = {
'incomplete': ['unpacked', 'halfinstalled', 'halfconfigured', 'triggers-awaited', 'triggers-pending'],
'notinstalled': ['notinstalled', 'uninstalled'],
}
# Search string proposals:
#
# - array: turns into a ComboBox
# - string: turns into a TextBox
#
# NOTE: the 'system_roles' and 'ucs_version' lists are fetched
# dynamically from the pkgdb object.
PROPOSALS = {
'selectedstate': [
'install', 'hold', 'deinstall', 'purge', 'unknown',
],
'inststate': [
'ok', 'reinst_req', 'hold', 'hold_reinst_req',
],
'currentstate': [
'installed', 'notinstalled', 'incomplete', 'configfiles',
# 'uninstalled', 'unpacked', 'halfconfigured', 'halfinstalled',
],
}
# Describes our query types:
#
# 'columns' the result set. Equally used as field list for the query as well
# as the column list for the results grid.
# 'db_fields' (optional) define this if the list of database fields is different from the
# list of columns to display (as it is the case for the 'packages' query)
# 'function' the function of the pkgdb object to use.
# 'args' (optional) if the corresponding function of the pkgdb object needs more args,
# you can specify them here. Args are passed as 'named args' in the Python sense.
#
QUERIES = {
# 'select sysname,sysversion,sysrole,to_char(scandate,\'YYYY-MM-DD HH24:MI:SS\'),ldaphostdn from systems where '+query+' order by sysname
'systems': {
'columns': ['sysname', 'sysversion', 'sysrole', 'inventory_date'],
'function': updb.sql_get_systems_by_query,
},
# 'select sysname,pkgname,vername,to_char(packages_on_systems.scandate,\'YYYY-MM-DD HH24:MI:SS\'),inststatus,selectedstate,inststate,currentstate from packages_on_systems join systems using(sysname) where '+query+' order by sysname,pkgname,vername
'packages': {
'columns': ['sysname', 'pkgname', 'vername', 'selectedstate', 'inststate', 'currentstate'],
'db_fields': ['sysname', 'pkgname', 'vername', 'inventory_date', 'inststatus', 'selectedstate', 'inststate', 'currentstate'],
'function': updb.sql_get_packages_in_systems_by_query,
# They allow querying for the UCS version, and then they don't display it? Who would use that at all?
# But nevertheless, if 'sysversion' is an allowed search key we have to switch this 'Join' flag on,
# or we get always empty result sets.
# (No, this join is not the performance bottleneck, believe me.)
'args': {
'join_systems': True,
'limit': RECORD_LIMIT,
# 'orderby' ..avoids doing a sort that only consumes time and memory, and afterwards
# the data is sorted again by the grid or its store
'orderby': '',
},
},
}
# These are the values that are stored into the database using
# 'coded' values. Key is the code as used in the database, value
# is the value being shown to the outside, and used as 'id' property
# in the ComboBox data arrays.
#
CODED_VALUES = {
'selectedstate': {'0': 'unknown', '1': 'install', '2': 'hold', '3': 'deinstall', '4': 'purge'},
'currentstate': {'0': 'notinstalled', '1': 'unpacked', '2': 'halfconfigured', '3': 'uninstalled', '4': 'halfinstalled', '5': 'configfiles', '6': 'installed', '7': 'triggers-awaited', '8': 'triggers-pending'},
'inststate': {'0': 'ok', '1': 'reinst_req', '2': 'hold', '3': 'hold_reinst_req'},
}
# We introduce a 'reverse index' of the CODED_VALUES here.
DECODED_VALUES = {f: {CODED_VALUES[f][key]: key for key in CODED_VALUES[f]} for f in CODED_VALUES}
# This helps translating labels. We don't make separate functions
# or such things as we don't have any name clashes.
LABELS = {
# ------------- search fields (keys) ---------
# 'incomplete_packages': _("Find packages installed incompletely"),
'inststate': _("Installation state"),
'inventory_date': _("Inventory date"),
# 'compare_with_version': _("Compared to version"),
'pkgname': _("Package name"),
# 'vername': _("Package version"),
'currentstate': _("Package state"),
'selectedstate': _("Selection state"),
'sysname': _("Hostname"),
'sysrole': _("System role"),
'sysversion': _("UCS Version"),
'sysversion_greater': _("UCS Version is greater than"),
'sysversion_lower': _("UCS Version is lower than"),
'all_properties': _("All properties"),
# ----------- server roles --------------------
'domaincontroller_master': _("Primary Directory Node"),
'domaincontroller_backup': _("Backup Directory Node"),
'domaincontroller_slave': _("Replica Directory Node"),
'memberserver': _("Managed Node"),
# ------------------ selection states --------------
'install': _("Install"),
'hold': _("Hold"),
'deinstall': _("Uninstall"),
'purge': _("Purge"),
'unknown': _("Not installed"),
# ----------------- installation states ------------
'ok': _("OK"),
'reinst_req': _("Reinstall required"),
# 'hold' already defined
'hold_reinst_req': _("Hold + Reinstall required"),
# -------------------- package states --------------
'notinstalled': _("Not installed"),
'unpacked': _("Unpacked"),
'halfconfigured': _("Half-configured"),
'uninstalled': _("Uninstalled"),
'halfinstalled': _("Half-installed"),
'configfiles': _("Config files only"),
'installed': _("Installed"),
'incomplete': _("Incomplete"),
'triggers-pending': _("Triggers pending"),
'triggers-awaited': _("Triggers awaited"),
}
PAGES = ('systems', 'packages')
def _server_not_running_msg():
return _('Maybe the PostgreSQL server is not running.\nIt can be started with the UMC module "System services".')
[docs]
class Instance(Base):
[docs]
def init(self):
self.ucr = univention.config_registry.ConfigRegistry()
self.ucr.load()
self.connect()
self._update_system_roles_and_versions()
[docs]
def connection(func):
def _connect(self, *args, **kwargs):
if self.dbConnection is None:
self.connect()
else:
self.test_connection()
return func(self, *args, **kwargs)
return _connect
[docs]
def connect(self):
# Create a connection to the pkgdb
try:
self.dbConnection = updb.open_database_connection(self.ucr, pkgdbu=True)
except pgdb.InternalError as ex:
MODULE.error('Could not establish connection to the PostgreSQL server: %s', ex)
raise UMC_Error(_('Could not establish connection to the database.\n\n%s') % (_server_not_running_msg(),))
else:
self.cursor = self.dbConnection.cursor()
[docs]
def test_connection(self):
# test if connection is still active
try:
self.cursor.execute('SELECT TRUE')
except pgdb.OperationalError as ex:
MODULE.error('Connection to the PostgreSQL server lost: %s', ex)
self.dbConnection = None
try:
self.connect()
except UMC_Error:
raise UMC_Error(_('Connection to the database lost.\n\n%s') % (_server_not_running_msg(),))
[docs]
@simple_response
def reinit(self):
"""Method invoked when opening the module in the frontend to cache and update some values"""
self._update_system_roles_and_versions()
def _update_system_roles_and_versions(self):
"""refetchs the variable lists (system roles and system versions)"""
PROPOSALS['sysrole'] = self._get_system_roles()
PROPOSALS['sysversion'] = self._get_system_versions()
PROPOSALS['sysversion_lower'] = PROPOSALS['sysversion']
PROPOSALS['sysversion_greater'] = PROPOSALS['sysversion']
@connection
def _get_system_roles(self):
return [role[0] for role in updb.sql_getall_systemroles(self.cursor)]
@connection
def _get_system_versions(self):
return [version[0] for version in updb.sql_getall_systemversions(self.cursor)]
[docs]
@sanitize(
page=ChoicesSanitizer(choices=PAGES, required=True),
key=ChoicesSanitizer(choices=CRITERIA_OPERATOR.keys()),
)
@connection
@simple_response
def query(self, page, key, pattern=''):
"""
Query to fill the grid. The structure of the corresponding grid
has already been fetched by the 'pkgdb/columns' command.
"""
desc = QUERIES[page]
operator = CRITERIA_OPERATOR[key]
function = desc['function']
kwargs = desc.get('args', {})
kwargs['query'] = _make_query(key, operator, pattern)
result = function(self.cursor, **kwargs)
names = desc.get('db_fields', desc['columns'])
return [_convert_to_grid(record, names) for record in result]
[docs]
@sanitize(page=ChoicesSanitizer(choices=PAGES, required=True))
@connection
@simple_response
@log
def keys(self, page):
"""returns the set of search criteria suitable for the given page."""
return _combobox_data(CRITERIA[page])
[docs]
@sanitize(page=ChoicesSanitizer(choices=PAGES, required=True))
@connection
@simple_response
@log
def proposals(self, page, key=''):
"""
returns proposals for the query pattern that can be
presented in the frontend. This can be a single pattern
(the corresponding field will turn into a text entry)
or an array (the field will turn into a ComboBox,
with optionally translated labels)
"""
if key in PROPOSALS:
return _combobox_data(PROPOSALS[key])
# fallback for everything not explicitly listed here.
return ''
[docs]
@sanitize(page=ChoicesSanitizer(choices=PAGES, required=True))
@connection
@simple_response
@log
def columns(self, page, key=''):
"""
returns the structure of the results grid for a given
page+key combination. Note that design properties (width etc)
are added at the JS page (KeyTranslator.js)
"""
return QUERIES[page]['columns']
def _combobox_data(data):
"""returns a (id, label) dict with translated values"""
return [{"id": identifier, "label": _id_to_label(identifier)} for identifier in data]
def _make_query(key, operator, pattern):
"""
consumes a tuple of 'key','operator','pattern' and converts it
into a valid Postgres WHERE clause. Features here:
- translates keyed values into their database representation
- tweaks this DOS-glob-style pattern notation into a correct
regular expression
"""
def __make_query(key, operator, pattern):
if not key:
return None
# Translate keyed values. That function returns the input
# value unchanged if there's no reason to translate anything.
pattern = _coded_value(key, pattern)
pattern = pgdb.escape_string(pattern)
key = key.replace('\\', '\\\\').replace('"', r'\"')
if '~' in operator:
# 1. dot is not a wildcard here but rather a literal dot
pattern = pattern.replace('.', r'\.')
# 2. a * indicates to not do a substring search
if '*' in pattern:
pattern = pattern.replace('*', '.*')
pattern = '^%s$' % (pattern)
# 3. empty pattern means search for everything
if pattern == '':
pattern = '.*'
return "\"%s\" %s '%s'" % (key, operator, pattern)
if pattern in MAPPED_PATTERNS_TO_KEYS:
patterns = MAPPED_PATTERNS_TO_KEYS[pattern]
return ' OR '.join(__make_query(key, operator, pattern) for pattern in patterns)
else:
keys = MAPPED_TABLES.get(key, [key])
return ' OR '.join(__make_query(key, operator, pattern) for key in keys)
def _decoded_value(field, key):
"""
accepts a field name and the database value of this field
and translates this into the codeword that represents this value.
"""
if field in CODED_VALUES:
return CODED_VALUES[field].get(str(key), key)
# unchanged if no match
return key
def _coded_value(field, value):
"""
this is the inverse of the above function: it accepts a field name
and a value and translates it back into the 'keyed' value to be
used when talking to the database.
"""
if field in DECODED_VALUES:
return DECODED_VALUES[field].get(str(value), value)
# unchanged if no match
return value
def _convert_to_grid(data, names):
"""
The queries here return arrays of values. But our grid
only accepts dicts where the values are prefixed by
the field names. This function converts one record.
"""
# find smaller length
length = min(len(data), len(names))
# This expression does the main work:
# (1) assigns the field name to a value
# (2) converts database representation into keyed values (_decoded_value)
# (3) translates keyed values for display (_id_to_label)
return {names[i]: _id_to_label(_decoded_value(names[i], data[i])) for i in range(length)}
def _id_to_label(identifier):
"""
translates any id into the corresponding label.
if no translation found -> returns id unchanged.
"""
return LABELS.get(identifier, identifier)