#!/usr/bin/python3
# SPDX-FileCopyrightText: 2012-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""
Univention System Setup
Python setup script base
"""
import locale
import logging
import os
import sys
import traceback
from collections.abc import Iterable, Iterator
from contextlib import contextmanager
from datetime import datetime
from types import TracebackType
import apt
from univention.config_registry import ConfigRegistry
from univention.config_registry.frontend import ucr_update
from univention.lib.i18n import Translation
from univention.lib.package_manager import PackageManager, _PackageManagerLoggerHandler
from univention.management.console.modules.setup.util import PATH_PROFILE, PATH_SETUP_SCRIPTS
[docs]
def setup_i18n() -> Translation:
locale.setlocale(locale.LC_ALL, "")
translation = Translation('univention-system-setup-scripts')
translation.set_language()
return translation.translate
_ = setup_i18n()
[docs]
class Profile(dict):
[docs]
def load(self, filename: str = PATH_PROFILE) -> None:
with open(filename) as profile:
for line in profile:
line = line.strip()
if not line:
continue
if line.startswith('#'):
continue
key, value = line.split('=', 1)
for delim in ("'", '"'):
if value.startswith(delim) and value.endswith(delim):
value = value[1:-1]
break
self[key] = value
self._filename = filename
[docs]
def hide(self, key: str) -> None:
filename = self._filename
with open(filename) as profile:
all_lines = profile.readlines()
with open(filename, 'w') as profile:
for line in all_lines:
if line.startswith('%s=' % key):
line = '#%s="********"\n' % key
profile.write(line)
[docs]
def is_true(self, key: str) -> bool:
value = self.get(key)
if value:
value = value.lower()
ucr = ConfigRegistry()
return ucr.is_true(value=value)
[docs]
def get_list(self, key: str, split_by=' ') -> list[str]:
"""
Retrieve the value of var_name from the profile file.
Return the string as a list split by split_by.
"""
value = self.get(key)
return value.split(split_by) if value else []
[docs]
class TransactionalUcr:
def __init__(self) -> None:
self.ucr = ConfigRegistry()
self.ucr.load()
self.changes: dict[str, str | None] = {}
[docs]
def set(self, key: str, value: str) -> None:
"""
Set the value of key of UCR.
Does not save immediately.
commit() is called at the end of inner_run(). If you need to commit
changes immediately, you can call commit() at any time.
"""
orig_val = self.ucr.get(key)
if orig_val == value:
# in case it was overwritten previously
self.changes.pop(key, None)
else:
self.changes[key] = value
[docs]
def commit(self) -> None:
"""
Saves UCR variables previously set by set_ucr_var(). Also commits
changes (if done any). Is called automatically *if inner_run() did not
raise an exception*. You can call it manually if you need to
do it (e.g. in down()).
"""
if self.changes:
ucr_update(self.ucr, self.changes)
# reset (in case it is called multiple) times in a script
self.changes.clear()
[docs]
def get(self, key: str, search_in_changes=True) -> str | None:
"""
Retrieve the value of key from ucr.
If search_in_changes, it first looks in (not yet committed) values.
"""
if search_in_changes:
try:
return self.changes[key]
except KeyError:
pass
return self.ucr.get(key)
def __enter__(self) -> "TransactionalUcr":
return self
def __exit__(self, exc_type: type[BaseException], exc_value: BaseException, traceback: TracebackType) -> None: # noqa: PYI036
if exc_type is None:
self.commit()
[docs]
class SetupScript:
"""
Baseclass for all Python-based Setup-Scripts.
Script lifecycle::
__init__() -> up()
run() -> (inner_run() -> commit()) -> down()
`up()`, (`inner_run()` -> `commit()`), and `down()` and encapsulated by
try-blocks, so the script should under no cirucumstances break.
You should define `name` and `script_name` class (or instance) variables
where `name` is localised and will show up at top of the progress and
`script_name` is for logging and internal infos found at
univention.management.console.modules.setup.util.ProgressParser.FRACTIONS.
You should define your own inner_run-method, and, if needed,
override (initially dummy) `up()` and `down()`.
You should execute a script like so::
script = MySetupScript()
script.run()
Or maybe even better like so, as it calls `sys.exit`::
if __name__ == '__main__':
script = MySetupScript()
main(script) # helper function defined in here
You may control the progress parser with these methods:
* self.header(msg) # automatically called by run()
* self.message(msg)
* self.error(msg)
* self.join_error(msg)
* self.steps(steps)
* self.step(step)
"""
name = ''
def __init__(self, *args, **kwargs) -> None:
"""
Initialise Script. Will call self.up() with same *args
and **kwargs as __init__() (which itself will leave them
untouched)
So don't override this method, instead write your own up().
The default up()-method does nothing.
self.up() is called in a try-except-block. If an exception
was raised by up(), it will be saved and raised as soon as
run() is called. You should make sure that this does not
happen.
"""
self.ucr = TransactionalUcr()
self._step = 1
# remove script path from name
self.script_name = os.path.abspath(sys.argv[0])
self.script_name = self.script_name.removeprefix(PATH_SETUP_SCRIPTS)
self.profile = self.parse_profile()
try:
self.up(*args, **kwargs)
except Exception as exc:
# save caught exception. raise later (in run())
self._broken: Exception | None = exc
else:
self._broken = None
[docs]
@staticmethod
def parse_profile() -> Profile:
profile = Profile()
profile.load()
return profile
[docs]
def message(self, msg: object) -> None:
"""Write a harmless __MSG__: for the parser"""
self.inform_progress_parser('msg', msg)
[docs]
def error(self, msg: object) -> None:
"""
Write a non-critical __ERR__: for the parser
The parser will save the error and inform the frontend
that something went wrong
"""
self.inform_progress_parser('err', msg)
[docs]
def join_error(self, msg: object) -> None:
"""
Write a critical __JOINERR__: for the parser.
The parser will save it and inform the frontend that something
went terribly wrong leaving the system in an unjoined state
"""
self.inform_progress_parser('joinerr', msg)
[docs]
def steps(self, steps: int) -> None:
"""
Total number of __STEPS__: to come throughout the whole
script. Progress within the script should be done with
step() which is relative to steps()
"""
self.inform_progress_parser('steps', steps)
[docs]
def step(self, step: int | None = None) -> None:
"""
Inform parser that the next __STEP__: in this script
was done. You can provide an exact number or None
in which case an internal counter will be incremented
"""
if step is not None:
self._step = step
self.inform_progress_parser('step', self._step)
self._step += 1
[docs]
def log(self, *msgs: object) -> None:
"""Log messages in a log file"""
for msg in msgs:
print(msg, end=' ')
print('')
[docs]
def run(self) -> bool:
"""
Run the SetupScript.
Don't override this method, instead define your own
:py:meth:`inner_run()`.
Call :py:meth:`.header()`
If `up()` failed raise its exception.
Run inner_run() in a try-except-block
Return False if an exception occurred
Otherwise return `True`/`False` depending on
return code of inner_run itself.
*In any case*, run `self.down()` in a try-except-block
afterwards. If this should fail, return `False`.
"""
if self.name:
self.header(self.name)
try:
if self._broken is not None:
raise self._broken
else:
success = self.inner_run()
# is called only if inner_run
# really returned and did not
# raise an exception
self.ucr.commit()
except Exception:
exc = traceback.format_exc()
self.error(exc)
success = False
self.log(exc)
finally:
try:
self.down()
except Exception:
success = False
return success is not False
[docs]
def inner_run(self) -> bool | None:
"""
Main function, called by run().
Override this method in your SetupScriptClass.
You may return True or False which will be propagated
to run() itself. If you don't return False, True will be
used implicitly.
"""
raise NotImplementedError('Define your own inner_run() method, please.')
[docs]
def up(self, *args, **kwargs) -> None:
"""
Override this method if needed.
It is called during __init__ with the very same parameters
as __init__ was called.
"""
[docs]
def down(self) -> None:
"""
Override this method if needed.
It is called at the end of run() even when an error in up()
or inner_run() occurred.
"""
class _PackageManagerLoggerHandlerWithoutProcess(_PackageManagerLoggerHandler):
def emit(self, record: logging.LogRecord) -> None:
if record.name == 'packagemanager.dpkg.process':
return
super().emit(record)
[docs]
class AptScript(SetupScript):
"""
More or less just a wrapper around
univention.lib.package_manager.PackageManager
with SetupScript capabilities.
"""
brutal_apt_options = True
[docs]
def up(self, *args, **kwargs) -> None:
self.package_manager = PackageManager(always_noninteractive=False)
handler = _PackageManagerLoggerHandlerWithoutProcess(self.message, self.step, self.error)
self.package_manager.logger.addHandler(handler)
self.roles_package_map = {
'domaincontroller_master': 'univention-server-master',
'domaincontroller_backup': 'univention-server-backup',
'domaincontroller_slave': 'univention-server-slave',
'memberserver': 'univention-server-member',
}
self.current_server_role = self.ucr.get('server/role')
self.wanted_server_role = self.profile.get('server/role')
[docs]
def set_always_install(self, *packages) -> None:
self.package_manager.always_install(packages)
[docs]
@contextmanager
def noninteractive(self) -> Iterator[None]:
if self.brutal_apt_options:
with self.package_manager.brutal_noninteractive():
yield
else:
with self.package_manager.noninteractive():
yield
[docs]
def update(self) -> bool:
with self.noninteractive():
return self.package_manager.update()
[docs]
def get_package(self, pkg_name: str) -> apt.package.Package | None:
return self.package_manager.get_package(pkg_name)
[docs]
def finish_task(self, *log_msgs: object) -> None:
"""
Task is finished. Increment counter and inform
the progress parser. Reopen the cache (maybe unneeded
but does not slow us down too much).
"""
# don't log msgs for now
self.package_manager.add_hundred_percent()
self.reopen_cache()
[docs]
def reopen_cache(self) -> None:
self.package_manager.reopen_cache()
[docs]
def mark_auto(self, auto: bool, *pkgs: str) -> None:
self.package_manager.mark_auto(auto, *pkgs)
[docs]
def commit(
self,
install: Iterable[str] = [],
remove: Iterable[str] = [],
msg_if_failed: str = '',
) -> bool:
with self.noninteractive():
return self.package_manager.commit(install, remove, msg_if_failed=msg_if_failed)
[docs]
def install(self, *pkg_names: str) -> bool:
with self.noninteractive():
return self.package_manager.install(*pkg_names)
[docs]
def uninstall(self, *pkg_names: str) -> bool:
with self.noninteractive():
return self.package_manager.uninstall(*pkg_names)
[docs]
def get_package_for_role(self, role_name: str) -> apt.package.Package | None:
"""
Searches for the meta-package that belongs
to the given role_name
"""
try:
# get "real" package for server/role
pkg_name = self.roles_package_map[role_name]
return self.package_manager.cache[pkg_name]
except KeyError:
self.error(_('Failed to get package for Role %s') % role_name)
return None
[docs]
def autoremove(self) -> bool:
with self.noninteractive():
return self.package_manager.autoremove()
[docs]
def down(self) -> None:
self.package_manager.unlock()
[docs]
def main(setup_script: SetupScript, exit: bool = True) -> int:
'''
Helper function to run the setup_script and evaluate its
return code as a "shell-compatible" one. You may sys.exit immediately
'''
success = setup_script.run()
ret_code = 1 - int(success)
if exit:
sys.exit(ret_code)
else:
return ret_code