#!/usr/bin/python3
#
# Univention App Center
# module for storing Apps in a cache
#
# SPDX-FileCopyrightText: 2017-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
from __future__ import annotations
import os
import os.path
import sys
from configparser import NoSectionError
from contextlib import contextmanager
from glob import glob
from json import dump, load
from time import sleep
from typing import TYPE_CHECKING
from urllib.parse import urlsplit
from univention.appcenter.app import App, LooseVersion
from univention.appcenter.ini_parser import (
IniSectionAttribute, IniSectionListAttribute, IniSectionObject, read_ini_file,
)
from univention.appcenter.log import get_base_logger
from univention.appcenter.ucr import ucr_get, ucr_is_true, ucr_load
from univention.appcenter.utils import get_locale, mkdir
if TYPE_CHECKING:
from collections.abc import Iterable
CACHE_DIR = '/var/cache/univention-appcenter'
cache_logger = get_base_logger().getChild('cache')
def _cmp_mtimes(mtime1: float | None, mtime2: float | None) -> int:
mtime1 = float(f'{mtime1:.3f}') if mtime1 is not None else 0.0
mtime2 = float(f'{mtime2:.3f}') if mtime2 is not None else 0.0
return 0 if mtime1 == mtime2 else (-1 if mtime1 < mtime2 else 1)
class _AppCache:
def get_every_single_app(self) -> Iterable[App]:
raise NotImplementedError()
def get_all_apps_with_id(self, app_id: str) -> list[App]:
ret = []
for app in self.get_every_single_app():
if app.id == app_id:
ret.append(app)
return ret
def get_all_locally_installed_apps(self) -> list[App]:
ret = []
for app in self.get_every_single_app():
if app.is_installed():
ret.append(app)
return ret
def find(self, app_id: str, app_version: str | None = None, latest: bool = False) -> App | None:
apps = self.get_all_apps_with_id(app_id)
if app_version:
for app in apps:
if app.version == app_version:
return app
return None
elif not latest:
for app in apps:
if app.is_installed():
return app
if apps:
latest_app = sorted(apps)[-1]
for app in apps:
if app == latest_app:
return app
def find_candidate(self, app, prevent_docker=None):
if prevent_docker is None:
prevent_docker = ucr_is_true('appcenter/prudence/docker/%s' % app.id)
if app.docker:
prevent_docker = False
app_version = LooseVersion(app.version)
apps = list(reversed(self.get_all_apps_with_id(app.id)))
not_permitted_app = None
for _app in apps:
if prevent_docker and _app.docker and not (_app.docker_migration_works or _app.docker_migration_link):
continue
if _app <= app:
continue
if _app.required_app_version_upgrade and LooseVersion(_app.required_app_version_upgrade) > app_version:
continue
if not _app.install_permissions_exist():
# do not consider app without permission...
# ... until it is the only one (and then fail eventually...)
if not_permitted_app is None:
not_permitted_app = _app
continue
return _app
if not_permitted_app:
return not_permitted_app
def get_all_apps(self) -> list[App]:
apps: dict[str, tuple[App, bool]] = {}
for app in self.get_every_single_app():
if app.id in apps:
old_app, old_is_installed = apps[app.id]
if not old_is_installed:
if old_app < app:
apps[app.id] = (app, app.is_installed())
elif app.is_installed():
apps[app.id] = (app, True)
else:
apps[app.id] = (app, app.is_installed())
return sorted(app for (app, is_installed) in apps.values())
def find_by_component_id(self, component_id: str) -> App | None:
for app in self.get_every_single_app():
if app.component_id == component_id:
return app
[docs]
class AppCache(_AppCache):
_app_cache_cache = {}
def __init__(self, app_class=None, ucs_version=None, server=None, locale=None, cache_dir=None):
self._app_class = app_class
self._ucs_version = ucs_version
if server and not server.startswith('http'):
server = 'https://%s' % server
self._server = server
self._locale = locale
self._cache_dir = cache_dir
self._cache_file = None
self._cache = []
self._cache_modified_mtime = None
self._lock = False
[docs]
def copy(self, app_class=None, ucs_version=None, server=None, locale=None, cache_dir=None):
if app_class is None:
app_class = self._app_class
if ucs_version is None:
ucs_version = self._ucs_version
if server is None:
server = self._server
if locale is None:
locale = self._locale
if cache_dir is None:
cache_dir = self._cache_dir
return self.build(app_class=app_class, ucs_version=ucs_version, server=server, locale=locale, cache_dir=cache_dir)
[docs]
def get_server(self):
if self._server is None:
self._server = default_server()
return self._server
[docs]
def get_server_netloc(self):
return urlsplit(self.get_server()).netloc
[docs]
def get_ucs_version(self):
if self._ucs_version is None:
self._ucs_version = default_ucs_version()
return self._ucs_version
[docs]
def get_locale(self):
if self._locale is None:
self._locale = default_locale()
return self._locale
[docs]
def get_cache_dir(self):
if self._cache_dir is None:
server = self.get_server_netloc()
self._cache_dir = os.path.join(CACHE_DIR, server, self.get_ucs_version())
mkdir(self._cache_dir)
return self._cache_dir
[docs]
def get_cache_file(self):
if self._cache_file is None:
cache_dir = self.get_cache_dir()
locale = self.get_locale()
self._cache_file = os.path.join(cache_dir, '.apps.%s.json' % locale)
return self._cache_file
[docs]
@classmethod
def build(cls, app_class=None, ucs_version=None, server=None, locale=None, cache_dir=None):
obj = cls(app_class, ucs_version, server, locale, cache_dir)
key = cls, obj.get_app_class(), obj.get_ucs_version(), obj.get_server(), obj.get_locale(), obj.get_cache_file()
if key not in cls._app_cache_cache:
cls._app_cache_cache[key] = obj
return cls._app_cache_cache[key]
[docs]
def get_appcenter_cache_obj(self):
return AppCenterCache.build(server=self.get_server(), ucs_versions=[self.get_ucs_version()], locale=self.get_locale())
def _save_cache(self):
cache_file = self.get_cache_file()
if cache_file:
try:
tmp_file = cache_file + ".tmp"
with open(tmp_file, 'w') as fd:
dump([app.attrs_dict() for app in self._cache], fd, indent=2)
os.rename(tmp_file, cache_file)
cache_modified = self._cache_modified()
except (OSError, TypeError):
return False
else:
self._cache_modified_mtime = cache_modified
return True
def _load_cache(self):
cache_file = self.get_cache_file()
try:
cache_modified = self._cache_modified()
archive_modified = self._archive_modified()
if _cmp_mtimes(cache_modified, archive_modified) == -1:
cache_logger.debug('Cannot load cache: mtimes of cache files do not match: %r < %r', cache_modified, archive_modified)
return None
for master_file in self._relevant_master_files():
master_file_modified = os.stat(master_file).st_mtime
if _cmp_mtimes(cache_modified, master_file_modified) == -1:
cache_logger.debug('Cannot load cache: %s is newer than cache', master_file)
return None
with open(cache_file) as fd:
cache = load(fd)
self._cache_modified_mtime = cache_modified
except (OSError, ValueError, TypeError):
cache_logger.debug('Cannot load cache: getting mtimes failed')
return None
else:
try:
cache_attributes = set(cache[0].keys())
except (TypeError, AttributeError, IndexError, KeyError):
cache_logger.debug('Cannot load cache: Getting cached attributes failed')
return None
else:
code_attributes = {attr.name for attr in self.get_app_class()._attrs}
if cache_attributes != code_attributes:
cache_logger.debug('Cannot load cache: Attributes in cache file differ from attribute in code')
return None
return [self._build_app_from_attrs(attrs) for attrs in cache]
def _archive_modified(self):
try:
return os.stat(os.path.join(self.get_cache_dir(), '.all.tar')).st_mtime
except (OSError, AttributeError) as exc:
cache_logger.debug('Unable to get mtime for archive: %s', exc)
return None
def _cache_modified(self):
try:
return os.stat(self.get_cache_file()).st_mtime
except (OSError, AttributeError) as exc:
cache_logger.debug('Unable to get mtime for cache: %s', exc)
return None
def _relevant_master_files(self):
ret = set()
classes_visited = set()
def add_class(klass):
if klass in classes_visited:
return
classes_visited.add(klass)
try:
module = sys.modules[klass.__module__]
ret.add(module.__file__)
except (AttributeError, KeyError):
pass
if hasattr(klass, '__bases__'):
for base in klass.__bases__:
add_class(base)
# metaclass
add_class(type(klass))
add_class(self.get_app_class())
return ret
def _relevant_ini_files(self):
return glob(os.path.join(self.get_cache_dir(), '*.ini'))
def _build_app_from_attrs(self, attrs):
app = self.get_app_class()(attrs, self)
return app
def _build_app_from_ini(self, ini):
app = self.get_app_class().from_ini(ini, locale=self.get_locale(), cache=self)
if app:
for attr in app._attrs:
attr.post_creation(app)
return app
[docs]
def clear_cache(self):
ucr_load()
self._cache[:] = []
self._cache_modified_mtime = None
self._invalidate_cache_files()
def _invalidate_cache_files(self):
cache_dir = self.get_cache_dir()
for cache_file in glob(os.path.join(cache_dir, '.*apps*.json')):
try:
os.unlink(cache_file)
except OSError:
pass
@contextmanager
def _locked(self):
timeout = 60
wait = 0.1
while self._lock:
if timeout < 0:
raise RuntimeError('Could not get lock in %s seconds' % timeout)
sleep(wait)
timeout -= wait
self._lock = True
try:
yield
finally:
self._lock = False
[docs]
def get_every_single_app(self):
with self._locked():
cache_file = self.get_cache_file()
if cache_file:
archive_modified = self._archive_modified()
if _cmp_mtimes(archive_modified, self._cache_modified_mtime) == 1:
cache_logger.debug('Cache outdated. Need to rebuild')
self._cache[:] = []
if not self._cache:
cached_apps = self._load_cache()
if cached_apps is not None:
self._cache = cached_apps
cache_logger.debug('Loaded %d apps from cache', len(self._cache))
else:
for ini in self._relevant_ini_files():
app = self._build_app_from_ini(ini)
if app is not None:
self._cache.append(app)
self._cache.sort()
if self._save_cache():
cache_logger.debug('Saved %d apps into cache', len(self._cache))
else:
cache_logger.warning('Unable to cache apps')
return self._cache
[docs]
def get_app_class(self):
if self._app_class is None:
self._app_class = App
return self._app_class
def __repr__(self):
return 'AppCache(app_class=%r, ucs_version=%r, server=%r, locale=%r, cache_dir=%r)' % (self.get_app_class(), self.get_ucs_version(), self.get_server(), self.get_locale(), self.get_cache_dir())
[docs]
class AppCenterCache(_AppCache):
_appcenter_cache_cache = {}
def __init__(self, cache_class=None, server=None, ucs_versions=None, locale=None, cache_dir=None):
self._cache_class = cache_class
self._server = server
self._ucs_versions = ucs_versions
self._locale = locale
self._cache_dir = cache_dir
self._license_type_cache = None
self._ratings_cache = None
self._app_categories_cache = None
[docs]
@classmethod
def build(cls, cache_class=None, server=None, ucs_versions=None, locale=None, cache_dir=None):
obj = cls(cache_class, server, ucs_versions, locale, cache_dir)
key = cls, obj.get_app_cache_class(), obj.get_server(), tuple(obj.get_ucs_versions()), obj.get_locale(), obj.get_cache_dir()
if key not in cls._appcenter_cache_cache:
cls._appcenter_cache_cache[key] = obj
return cls._appcenter_cache_cache[key]
def _get_current_ucs_version(self) -> str:
try:
still_running = False
next_version = None
status_file = '/var/lib/univention-updater/univention-updater.status'
if os.path.exists(status_file):
with open(status_file) as status:
for line in status:
line = line.strip()
key, value = line.split('=', 1)
if key == 'status':
still_running = value == 'RUNNING'
elif key == 'next_version':
next_version = value.split('-')[0]
if still_running and next_version:
cache_logger.debug('Using UCS %s. Apparently an updater is running', next_version)
return next_version
except (OSError, ValueError) as exc:
cache_logger.warning('Could not parse univention-updater.status: %s', exc)
return ucr_get('version/version')
[docs]
def get_app_cache_class(self):
if self._cache_class is None:
self._cache_class = AppCache
return self._cache_class
[docs]
def get_server(self) -> str:
if self._server is None:
self._server = default_server()
return self._server
[docs]
def get_server_netloc(self) -> str:
return urlsplit(self.get_server()).netloc
[docs]
def get_ucs_versions(self) -> list[str]:
if self._ucs_versions is None:
ucs_version = self._get_current_ucs_version()
cache_file = self.get_cache_file('.ucs.ini')
self._ucs_versions = _get_ucs_versions_for(ucs_version, cache_file) or [ucs_version]
# TODO: always appending "5.0" is a workaround for now. We need a way to fetch data from .ucs.ini even
# in situations where the system cannot connect to the internet. A pre-installed version of .ucs.ini
# that we can fall back to in "offline mode" is needed.. (same way we do it with the all.tar.gz file)
if "5.0" not in self._ucs_versions:
self._ucs_versions.append("5.0")
return self._ucs_versions
[docs]
def get_locale(self) -> str:
if self._locale is None:
self._locale = default_locale()
return self._locale
[docs]
def get_cache_dir(self) -> str:
if self._cache_dir is None:
server = self.get_server_netloc()
self._cache_dir = os.path.join(CACHE_DIR, server)
mkdir(self._cache_dir)
return self._cache_dir
[docs]
def get_cache_file(self, fname: str) -> str:
return os.path.join(self.get_cache_dir(), fname)
[docs]
def get_app_caches(self):
ret = []
for ucs_version in self.get_ucs_versions():
ret.append(self._build_app_cache(ucs_version))
return ret
def _build_app_cache(self, ucs_version):
cache_dir = self.get_cache_file(ucs_version)
return self.get_app_cache_class().build(ucs_version=ucs_version, server=self.get_server(), locale=self.get_locale(), cache_dir=cache_dir)
[docs]
def get_license_description(self, license_name: str) -> str | None:
if self._license_type_cache is None:
cache_file = self.get_cache_file('.license_types.ini')
self._license_type_cache = LicenseType.all_from_file(cache_file)
for license in self._license_type_cache:
if license.name == license_name:
return license.description
[docs]
def get_ratings(self):
if self._ratings_cache is None:
cache_file = self.get_cache_file('.rating.ini')
self._ratings_cache = Rating.all_from_file(cache_file)
return self._ratings_cache
[docs]
def get_app_categories(self):
if self._app_categories_cache is None:
cache_file = self.get_cache_file('.app-categories.ini')
parser = read_ini_file(cache_file)
locale = self.get_locale()
if not parser.has_section(locale):
locale = 'en'
try:
categories = dict(parser.items(locale))
except NoSectionError:
categories = {}
self._app_categories_cache = categories
return self._app_categories_cache
[docs]
def get_every_single_app(self) -> list[App]:
ret = []
for app_cache in self.get_app_caches():
ret.extend(app_cache.get_every_single_app())
return ret
[docs]
def clear_cache(self):
ucr_load()
self._license_type_cache = None
self._ratings_cache = None
self._app_categories_cache = None
for app_cache in self.get_app_caches():
app_cache.clear_cache()
def __repr__(self):
return 'AppCenterCache(app_cache_class=%r, server=%r, ucs_versions=%r, locale=%r, cache_dir=%r)' % (self.get_app_cache_class(), self.get_server(), self.get_ucs_versions(), self.get_locale(), self.get_cache_dir())
[docs]
class Apps(_AppCache):
def __init__(self, cache_class=None, locale=None, ucs_version=None):
self._cache_class = cache_class
self._locale = locale
self._ucs_version = ucs_version
[docs]
def get_appcenter_cache_class(self):
if self._cache_class is None:
self._cache_class = AppCenterCache
return self._cache_class
[docs]
def get_locale(self) -> str:
if self._locale is None:
self._locale = default_locale()
return self._locale
[docs]
def get_appcenter_caches(self) -> list[AppCenterCache]:
server = default_server()
# get dedicated ucs_versions for self._ucs_version - if set
netloc = urlsplit(server).netloc
cache_file = os.path.join(CACHE_DIR, netloc, '.ucs.ini')
ucs_versions = _get_ucs_versions_for(self._ucs_version, cache_file)
cache = self._build_appcenter_cache(server, ucs_versions)
return [cache]
def _build_appcenter_cache(self, server, ucs_versions):
return self.get_appcenter_cache_class().build(server=server, ucs_versions=ucs_versions, locale=self.get_locale())
[docs]
def get_every_single_app(self):
ret = []
for app_cache in self.get_appcenter_caches():
for app in app_cache.get_every_single_app():
if self.include_app(app):
ret.append(app)
return ret
[docs]
def include_app(self, app):
return app.supports_ucs_version()
[docs]
def clear_cache(self) -> None:
for app_cache in self.get_appcenter_caches():
app_cache.clear_cache()
[docs]
@classmethod
def find_by_string(cls, app_string):
app_id, app_version, ucs_version, server = cls.split_app_string(app_string)
server = server or default_server()
ucs_versions = [ucs_version] if ucs_version else None
cache = AppCenterCache.build(server=server, ucs_versions=ucs_versions)
return cache.find(app_id, app_version=app_version)
[docs]
@classmethod
def split_app_string(cls, app_string):
try:
app_id, app_version = app_string.split('=', 1)
except ValueError:
app_id, app_version = app_string, None
try:
ucs_version, app_id = app_id.split('/', 1)
except ValueError:
ucs_version, app_id = None, app_id # noqa: PLW0127
if ucs_version:
try:
ucs_version, server = ucs_version.split('@', 1)
except ValueError:
ucs_version, server = ucs_version, None # noqa: PLW0127
else:
server = None
ucs_version = ucs_version or None
return app_id, app_version, ucs_version, server
[docs]
class AllApps(Apps):
[docs]
def include_app(self, app):
return True
[docs]
class AppCenterVersion(IniSectionObject):
supported_ucs_versions = IniSectionListAttribute(required=True)
[docs]
class LicenseType(IniSectionObject):
description = IniSectionAttribute(localisable=True)
[docs]
class Rating(IniSectionObject):
label = IniSectionAttribute(localisable=True)
description = IniSectionAttribute(localisable=True)
[docs]
def default_locale() -> str:
return get_locale() or 'en'
[docs]
def default_server() -> str:
server = ucr_get('repository/app_center/server', 'https://appcenter.software-univention.de')
if not server.startswith('http'):
server = 'https://%s' % server
return server
[docs]
def default_ucs_version() -> str:
cache = AppCenterCache.build(server=default_server())
return cache.get_ucs_versions()[0]
def _get_ucs_versions_for(ucs_version, ucs_ini_file):
if ucs_version is None:
return None
versions = AppCenterVersion.all_from_file(ucs_ini_file)
for version in versions:
if version.name == ucs_version:
return version.supported_ucs_versions