#!/usr/bin/python3
#
# Univention App Center
# univention-app modules
#
# SPDX-FileCopyrightText: 2015-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
#
from __future__ import annotations
import http.client
import logging
import os.path
import ssl
import sys
import urllib.error
from argparse import Action, ArgumentParser, Namespace
from functools import wraps
from glob import glob
from typing import TYPE_CHECKING, Any
from univention.appcenter.app_cache import Apps
from univention.appcenter.exceptions import Abort, NetworkError
from univention.appcenter.log import get_base_logger
from univention.appcenter.utils import call_process, send_information, underscore, verbose_http_error
if TYPE_CHECKING:
from collections.abc import Iterator, Mapping, Sequence
from univention.appcenter.app import App
_ACTIONS = {}
JOINSCRIPT_DIR = '/usr/lib/univention-install'
[docs]
def possible_network_error(func):
@wraps(func)
def _func(*args, **kwargs):
try:
return func(*args, **kwargs)
except (urllib.error.HTTPError, urllib.error.URLError, ssl.CertificateError, http.client.BadStatusLine) as exc:
raise NetworkError(verbose_http_error(exc))
return _func
[docs]
class StoreAppAction(Action):
cache_class = Apps
def __call__(self, parser, namespace, value, option_string=None):
apps = []
if self.nargs is None:
value = [value]
for val in value:
if self.cache_class:
app = self.cache_class.find_by_string(val)
else:
app = Apps.split_app_string(val)
if app is None:
parser.error('Unable to find app %s. Maybe "%s update" to get the latest list of applications?' % (val, sys.argv[0]))
apps.append(app)
if self.nargs is None:
apps = apps[0]
setattr(namespace, self.dest, apps)
[docs]
class UniventionAppAction(metaclass=UniventionAppActionMeta):
parent_logger = get_base_logger().getChild('actions')
def __init__(self) -> None:
self._progress_percentage = 0
[docs]
@classmethod
def get_action_name(cls) -> str:
return underscore(cls.__name__).replace('_', '-')
@classmethod
def _log(cls, logger, level, msg, *args, **kwargs):
logger = cls.logger.getChild(logger) if logger is not None else cls.logger
logger.log(level, msg, *args, **kwargs)
[docs]
@classmethod
def debug(cls, msg, logger=None):
cls._log(logger, logging.DEBUG, str(msg))
[docs]
@classmethod
def log(cls, msg, logger=None):
cls._log(logger, logging.INFO, str(msg))
[docs]
@classmethod
def warn(cls, msg, logger=None):
cls._log(logger, logging.WARNING, str(msg))
[docs]
@classmethod
def fatal(cls, msg, logger=None):
cls._log(logger, logging.FATAL, str(msg))
[docs]
@classmethod
def log_exception(cls, exc, logger=None):
cls._log(logger, logging.ERROR, exc, exc_info=True)
[docs]
def setup_parser(self, parser: ArgumentParser) -> None:
pass
@property
def percentage(self) -> int:
return self._progress_percentage
@percentage.setter
def percentage(self, percentage: int) -> None:
self._progress_percentage = percentage
self.progress.debug(str(percentage))
def _build_namespace(self, _namespace: Namespace | None = None, **kwargs: Any) -> Namespace:
parser = ArgumentParser()
self.setup_parser(parser)
namespace = Namespace()
args = {}
for action in parser._actions:
default = parser._defaults.get(action.dest)
if action.default is not None:
default = action.default
if hasattr(_namespace, action.dest):
default = getattr(_namespace, action.dest)
args[action.dest] = default
args.update(kwargs)
for key, value in args.items():
setattr(namespace, key, value)
return namespace
[docs]
@classmethod
def call_safe(cls, **kwargs: Any) -> Any:
try:
return cls.call(**kwargs)
except Abort:
return None
[docs]
@classmethod
def call(cls, **kwargs: Any) -> Any:
obj = cls()
namespace = obj._build_namespace(**kwargs)
return obj.call_with_namespace(namespace)
[docs]
def call_with_namespace(self, namespace: Namespace) -> Any:
self.debug('Calling %s' % self.get_action_name())
self.percentage = 0
try:
result = self.main(namespace)
except Abort as exc:
msg = str(exc)
if msg:
self.fatal(msg)
self.percentage = 100
raise
except Exception as exc:
self.log_exception(exc)
raise
else:
self.percentage = 100
return result
def _get_joinscript_path(self, app: App, unjoin: bool = False) -> str:
number = 50
suffix = ''
ext = 'inst'
if unjoin:
number = 51
ext = 'uinst'
suffix = '-uninstall'
return os.path.join(JOINSCRIPT_DIR, '%d%s%s.%s' % (number, app.id, suffix, ext))
def _call_cache_script(self, _app, _ext, *args, **kwargs):
fname = _app.get_cache_file(_ext)
# change to UCS umask + u+x: -rwxr--r--
if os.path.exists(fname):
os.chmod(fname, 0o744)
return self._call_script(fname, *args, **kwargs)
def _call_script(self, _script: str, *args: Any, **kwargs: Any) -> bool | None:
if not os.path.exists(_script):
self.debug('%s does not exist' % _script)
return None
subprocess_args = [_script, *list(args)]
for key, value in kwargs.items():
if value is None or value is False:
continue
key = '--%s' % key.replace('_', '-')
subprocess_args.append(key)
if value is not True:
subprocess_args.append(value)
process = self._subprocess(subprocess_args)
self.debug('%s returned with %s' % (_script, process.returncode))
return process.returncode == 0
def _subprocess(self, args: Sequence[str], logger: logging.Logger | None = None, env: Mapping[str, str] | None = None, cwd: str | None = None) -> Any:
if logger is None:
logger = self.logger
elif isinstance(logger, str):
logger = self.logger.getChild(logger)
return call_process(args, logger, env, cwd)
@possible_network_error
def _send_information(self, app, status, value=None):
action = self.get_action_name()
send_information(action, app, status, value)
[docs]
def get_action(action_name: str) -> UniventionAppAction | None:
_import()
return _ACTIONS.get(action_name)
[docs]
def all_actions() -> Iterator[tuple[str, UniventionAppAction]]:
_import()
for action_name in sorted(_ACTIONS):
yield action_name, _ACTIONS[action_name]
def _import() -> None:
if _ACTIONS:
return
path = os.path.dirname(__file__)
for pymodule in glob(os.path.join(path, '*.py')):
pymodule_name = os.path.basename(pymodule)[:-3] # without .py
__import__('univention.appcenter.actions.%s' % pymodule_name)