Source code for univention.appcenter.app_cache

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention App Center
#  module for storing Apps in a cache
#
# Copyright 2017-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
#


import sys
import os
import os.path
from contextlib import contextmanager
from time import sleep
from glob import glob
from json import dump, load
from typing import Dict, Iterable, List, Optional, Tuple  # noqa: F401

from six.moves.configparser import NoSectionError
from six.moves.urllib_parse import urlsplit

from univention.appcenter.app import App, LooseVersion
from univention.appcenter.log import get_base_logger
from univention.appcenter.utils import mkdir, get_locale
from univention.appcenter.ini_parser import read_ini_file, IniSectionListAttribute, IniSectionAttribute, IniSectionObject
from univention.appcenter.ucr import ucr_load, ucr_get, ucr_is_true


CACHE_DIR = '/var/cache/univention-appcenter'

cache_logger = get_base_logger().getChild('cache')


def _cmp_mtimes(mtime1, mtime2):
	# type: (Optional[float], Optional[float]) -> int
	mtime1 = float('{:.3f}'.format(mtime1)) if mtime1 is not None else 0.0
	mtime2 = float('{:.3f}'.format(mtime2)) if mtime2 is not None else 0.0
	return 0 if mtime1 == mtime2 else (-1 if mtime1 < mtime2 else 1)


class _AppCache(object):
	def get_every_single_app(self):
		# type: () -> Iterable[App]
		raise NotImplementedError()

	def get_all_apps_with_id(self, app_id):
		# type: (str) -> List[App]
		ret = []
		for app in self.get_every_single_app():
			if app.id == app_id:
				ret.append(app)
		return ret

	def get_all_locally_installed_apps(self):
		# type: () -> List[App]
		ret = []
		for app in self.get_every_single_app():
			if app.is_installed():
				ret.append(app)
		return ret

	def find(self, app_id, app_version=None, latest=False):
		# type: (str, Optional[str], bool) -> Optional[App]
		apps = self.get_all_apps_with_id(app_id)
		if app_version:
			for app in apps:
				if app.version == app_version:
					return app
			return None
		elif not latest:
			for app in apps:
				if app.is_installed():
					return app
		if apps:
			latest_app = sorted(apps)[-1]
			for app in apps:
				if app == latest_app:
					return app

	def find_candidate(self, app, prevent_docker=None):
		if prevent_docker is None:
			prevent_docker = ucr_is_true('appcenter/prudence/docker/%s' % app.id)
		if app.docker:
			prevent_docker = False
		app_version = LooseVersion(app.version)
		apps = list(reversed(self.get_all_apps_with_id(app.id)))
		not_permitted_app = None
		for _app in apps:
			if prevent_docker and _app.docker and not (_app.docker_migration_works or _app.docker_migration_link):
				continue
			if _app <= app:
				continue
			if _app.required_app_version_upgrade:
				if LooseVersion(_app.required_app_version_upgrade) > app_version:
					continue
			if not _app.install_permissions_exist():
				# do not consider app without permission...
				# ... until it is the only one (and then fail eventually...)
				if not_permitted_app is None:
					not_permitted_app = _app
				continue
			return _app
		if not_permitted_app:
			return not_permitted_app

	def get_all_apps(self):
		# type: () -> List[App]
		apps = {}  # type: Dict[str, Tuple[App, bool]]
		for app in self.get_every_single_app():
			if app.id in apps:
				old_app, old_is_installed = apps[app.id]
				if not old_is_installed:
					if old_app < app:
						apps[app.id] = (app, app.is_installed())
					elif app.is_installed():
						apps[app.id] = (app, True)
			else:
				apps[app.id] = (app, app.is_installed())
		return sorted(app for (app, is_installed) in apps.values())

	def find_by_component_id(self, component_id):
		# type: (str) -> Optional[App]
		for app in self.get_every_single_app():
			if app.component_id == component_id:
				return app


[docs]class AppCache(_AppCache): _app_cache_cache = {} def __init__(self, app_class=None, ucs_version=None, server=None, locale=None, cache_dir=None): self._app_class = app_class self._ucs_version = ucs_version if server and not server.startswith('http'): server = 'https://%s' % server self._server = server self._locale = locale self._cache_dir = cache_dir self._cache_file = None self._cache = [] self._cache_modified_mtime = None self._lock = False
[docs] def copy(self, app_class=None, ucs_version=None, server=None, locale=None, cache_dir=None): if app_class is None: app_class = self._app_class if ucs_version is None: ucs_version = self._ucs_version if server is None: server = self._server if locale is None: locale = self._locale if cache_dir is None: cache_dir = self._cache_dir return self.build(app_class=app_class, ucs_version=ucs_version, server=server, locale=locale, cache_dir=cache_dir)
[docs] def get_server(self): if self._server is None: self._server = default_server() return self._server
[docs] def get_server_netloc(self): return urlsplit(self.get_server()).netloc
[docs] def get_ucs_version(self): if self._ucs_version is None: self._ucs_version = default_ucs_version() return self._ucs_version
[docs] def get_locale(self): if self._locale is None: self._locale = default_locale() return self._locale
[docs] def get_cache_dir(self): if self._cache_dir is None: server = self.get_server_netloc() self._cache_dir = os.path.join(CACHE_DIR, server, self.get_ucs_version()) mkdir(self._cache_dir) return self._cache_dir
[docs] def get_cache_file(self): if self._cache_file is None: cache_dir = self.get_cache_dir() locale = self.get_locale() self._cache_file = os.path.join(cache_dir, '.apps.%s.json' % locale) return self._cache_file
[docs] @classmethod def build(cls, app_class=None, ucs_version=None, server=None, locale=None, cache_dir=None): obj = cls(app_class, ucs_version, server, locale, cache_dir) key = cls, obj.get_app_class(), obj.get_ucs_version(), obj.get_server(), obj.get_locale(), obj.get_cache_file() if key not in cls._app_cache_cache: cls._app_cache_cache[key] = obj return cls._app_cache_cache[key]
[docs] def get_appcenter_cache_obj(self): return AppCenterCache.build(server=self.get_server(), ucs_versions=[self.get_ucs_version()], locale=self.get_locale())
def _save_cache(self): cache_file = self.get_cache_file() if cache_file: try: tmp_file = cache_file + ".tmp" with open(tmp_file, 'w') as fd: dump([app.attrs_dict() for app in self._cache], fd, indent=2) os.rename(tmp_file, cache_file) cache_modified = self._cache_modified() except (EnvironmentError, TypeError): return False else: self._cache_modified_mtime = cache_modified return True def _load_cache(self): cache_file = self.get_cache_file() try: cache_modified = self._cache_modified() archive_modified = self._archive_modified() if _cmp_mtimes(cache_modified, archive_modified) == -1: cache_logger.debug('Cannot load cache: mtimes of cache files do not match: %r < %r' % (cache_modified, archive_modified)) return None for master_file in self._relevant_master_files(): master_file_modified = os.stat(master_file).st_mtime if _cmp_mtimes(cache_modified, master_file_modified) == -1: cache_logger.debug('Cannot load cache: %s is newer than cache' % master_file) return None with open(cache_file, 'r') as fd: cache = load(fd) self._cache_modified_mtime = cache_modified except (EnvironmentError, ValueError, TypeError): cache_logger.debug('Cannot load cache: getting mtimes failed') return None else: try: cache_attributes = set(cache[0].keys()) except (TypeError, AttributeError, IndexError, KeyError): cache_logger.debug('Cannot load cache: Getting cached attributes failed') return None else: code_attributes = set(attr.name for attr in self.get_app_class()._attrs) if cache_attributes != code_attributes: cache_logger.debug('Cannot load cache: Attributes in cache file differ from attribute in code') return None return [self._build_app_from_attrs(attrs) for attrs in cache] def _archive_modified(self): try: return os.stat(os.path.join(self.get_cache_dir(), '.all.tar')).st_mtime except (EnvironmentError, AttributeError) as exc: cache_logger.debug('Unable to get mtime for archive: %s' % exc) return None def _cache_modified(self): try: return os.stat(self.get_cache_file()).st_mtime except (EnvironmentError, AttributeError) as exc: cache_logger.debug('Unable to get mtime for cache: %s' % exc) return None def _relevant_master_files(self): ret = set() classes_visited = set() def add_class(klass): if klass in classes_visited: return classes_visited.add(klass) try: module = sys.modules[klass.__module__] ret.add(module.__file__) except (AttributeError, KeyError): pass if hasattr(klass, '__bases__'): for base in klass.__bases__: add_class(base) # metaclass add_class(type(klass)) add_class(self.get_app_class()) return ret def _relevant_ini_files(self): return glob(os.path.join(self.get_cache_dir(), '*.ini')) def _build_app_from_attrs(self, attrs): app = self.get_app_class()(attrs, self) return app def _build_app_from_ini(self, ini): app = self.get_app_class().from_ini(ini, locale=self.get_locale(), cache=self) if app: for attr in app._attrs: attr.post_creation(app) return app
[docs] def clear_cache(self): ucr_load() self._cache[:] = [] self._cache_modified_mtime = None self._invalidate_cache_files()
def _invalidate_cache_files(self): cache_dir = self.get_cache_dir() for cache_file in glob(os.path.join(cache_dir, '.*apps*.json')): try: os.unlink(cache_file) except EnvironmentError: pass @contextmanager def _locked(self): timeout = 60 wait = 0.1 while self._lock: if timeout < 0: raise RuntimeError('Could not get lock in %s seconds' % timeout) sleep(wait) timeout -= wait self._lock = True try: yield finally: self._lock = False
[docs] def get_every_single_app(self): with self._locked(): cache_file = self.get_cache_file() if cache_file: archive_modified = self._archive_modified() if _cmp_mtimes(archive_modified, self._cache_modified_mtime) == 1: cache_logger.debug('Cache outdated. Need to rebuild') self._cache[:] = [] if not self._cache: cached_apps = self._load_cache() if cached_apps is not None: self._cache = cached_apps cache_logger.debug('Loaded %d apps from cache' % len(self._cache)) else: for ini in self._relevant_ini_files(): app = self._build_app_from_ini(ini) if app is not None: self._cache.append(app) self._cache.sort() if self._save_cache(): cache_logger.debug('Saved %d apps into cache' % len(self._cache)) else: cache_logger.warn('Unable to cache apps') return self._cache
[docs] def get_app_class(self): if self._app_class is None: self._app_class = App return self._app_class
def __repr__(self): return 'AppCache(app_class=%r, ucs_version=%r, server=%r, locale=%r, cache_dir=%r)' % (self.get_app_class(), self.get_ucs_version(), self.get_server(), self.get_locale(), self.get_cache_dir())
[docs]class AppCenterCache(_AppCache): _appcenter_cache_cache = {} def __init__(self, cache_class=None, server=None, ucs_versions=None, locale=None, cache_dir=None): self._cache_class = cache_class self._server = server self._ucs_versions = ucs_versions self._locale = locale self._cache_dir = cache_dir self._license_type_cache = None self._ratings_cache = None self._app_categories_cache = None
[docs] @classmethod def build(cls, cache_class=None, server=None, ucs_versions=None, locale=None, cache_dir=None): obj = cls(cache_class, server, ucs_versions, locale, cache_dir) key = cls, obj.get_app_cache_class(), obj.get_server(), tuple(obj.get_ucs_versions()), obj.get_locale(), obj.get_cache_dir() if key not in cls._appcenter_cache_cache: cls._appcenter_cache_cache[key] = obj return cls._appcenter_cache_cache[key]
def _get_current_ucs_version(self): # type: () -> str try: still_running = False next_version = None status_file = '/var/lib/univention-updater/univention-updater.status' if os.path.exists(status_file): with open(status_file, 'r') as status: for line in status: line = line.strip() key, value = line.split('=', 1) if key == 'status': still_running = value == 'RUNNING' elif key == 'next_version': next_version = value.split('-')[0] if still_running and next_version: cache_logger.debug('Using UCS %s. Apparently an updater is running' % next_version) return next_version except (EnvironmentError, ValueError) as exc: cache_logger.warn('Could not parse univention-updater.status: %s' % exc) return ucr_get('version/version')
[docs] def get_app_cache_class(self): if self._cache_class is None: self._cache_class = AppCache return self._cache_class
[docs] def get_server(self): # type: () -> str if self._server is None: self._server = default_server() return self._server
[docs] def get_server_netloc(self): # type: () -> str return urlsplit(self.get_server()).netloc
[docs] def get_ucs_versions(self): # type: () -> List[str] if self._ucs_versions is None: cache_file = self.get_cache_file('.ucs.ini') ucs_version = self._get_current_ucs_version() versions = AppCenterVersion.all_from_file(cache_file) for version in versions: if version.name == ucs_version: self._ucs_versions = version.supported_ucs_versions break else: self._ucs_versions = [ucs_version] return self._ucs_versions
[docs] def get_locale(self): # type: () -> str if self._locale is None: self._locale = default_locale() return self._locale
[docs] def get_cache_dir(self): # type: () -> str if self._cache_dir is None: server = self.get_server_netloc() self._cache_dir = os.path.join(CACHE_DIR, server) mkdir(self._cache_dir) return self._cache_dir
[docs] def get_cache_file(self, fname): # type: (str) -> str return os.path.join(self.get_cache_dir(), fname)
[docs] def get_app_caches(self): ret = [] for ucs_version in self.get_ucs_versions(): ret.append(self._build_app_cache(ucs_version)) return ret
def _build_app_cache(self, ucs_version): cache_dir = self.get_cache_file(ucs_version) return self.get_app_cache_class().build(ucs_version=ucs_version, server=self.get_server(), locale=self.get_locale(), cache_dir=cache_dir)
[docs] def get_license_description(self, license_name): # type: (str) -> Optional[str] if self._license_type_cache is None: cache_file = self.get_cache_file('.license_types.ini') self._license_type_cache = LicenseType.all_from_file(cache_file) for license in self._license_type_cache: if license.name == license_name: return license.description
[docs] def get_ratings(self): if self._ratings_cache is None: cache_file = self.get_cache_file('.rating.ini') self._ratings_cache = Rating.all_from_file(cache_file) return self._ratings_cache
[docs] def get_app_categories(self): if self._app_categories_cache is None: cache_file = self.get_cache_file('.app-categories.ini') parser = read_ini_file(cache_file) locale = self.get_locale() if not parser.has_section(locale): locale = 'en' try: categories = dict(parser.items(locale)) except NoSectionError: categories = {} self._app_categories_cache = categories return self._app_categories_cache
[docs] def get_every_single_app(self): # type: () -> List[App] ret = [] for app_cache in self.get_app_caches(): ret.extend(app_cache.get_every_single_app()) return ret
[docs] def clear_cache(self): ucr_load() self._license_type_cache = None self._ratings_cache = None self._app_categories_cache = None for app_cache in self.get_app_caches(): app_cache.clear_cache()
def __repr__(self): return 'AppCenterCache(app_cache_class=%r, server=%r, ucs_versions=%r, locale=%r, cache_dir=%r)' % (self.get_app_cache_class(), self.get_server(), self.get_ucs_versions(), self.get_locale(), self.get_cache_dir())
[docs]class Apps(_AppCache): def __init__(self, cache_class=None, locale=None): self._cache_class = cache_class self._locale = locale
[docs] def get_appcenter_cache_class(self): if self._cache_class is None: self._cache_class = AppCenterCache return self._cache_class
[docs] def get_locale(self): # type: () -> str if self._locale is None: self._locale = default_locale() return self._locale
[docs] def get_appcenter_caches(self): # type: () -> List[AppCenterCache] server = default_server() cache = self._build_appcenter_cache(server, None) return [cache]
def _build_appcenter_cache(self, server, ucs_versions): return self.get_appcenter_cache_class().build(server=server, ucs_versions=ucs_versions, locale=self.get_locale())
[docs] def get_every_single_app(self): ret = [] for app_cache in self.get_appcenter_caches(): for app in app_cache.get_every_single_app(): if self.include_app(app): ret.append(app) return ret
[docs] def include_app(self, app): return app.supports_ucs_version()
[docs] def clear_cache(self): # type: () -> None for app_cache in self.get_appcenter_caches(): app_cache.clear_cache()
[docs] @classmethod def find_by_string(cls, app_string): app_id, app_version, ucs_version, server = cls.split_app_string(app_string) server = server or default_server() ucs_versions = [ucs_version] if ucs_version else None cache = AppCenterCache.build(server=server, ucs_versions=ucs_versions) return cache.find(app_id, app_version=app_version)
[docs] @classmethod def split_app_string(cls, app_string): try: app_id, app_version = app_string.split('=', 1) except ValueError: app_id, app_version = app_string, None try: ucs_version, app_id = app_id.split('/', 1) except ValueError: ucs_version, app_id = None, app_id if ucs_version: try: ucs_version, server = ucs_version.split('@', 1) except ValueError: ucs_version, server = ucs_version, None else: server = None ucs_version = ucs_version or None return app_id, app_version, ucs_version, server
[docs]class AllApps(Apps):
[docs] def include_app(self, app): return True
[docs]class AppCenterVersion(IniSectionObject): supported_ucs_versions = IniSectionListAttribute(required=True)
[docs]class LicenseType(IniSectionObject): description = IniSectionAttribute(localisable=True)
[docs]class Rating(IniSectionObject): label = IniSectionAttribute(localisable=True) description = IniSectionAttribute(localisable=True)
[docs]def default_locale(): # type: () -> str return get_locale() or 'en'
[docs]def default_server(): # type: () -> str server = ucr_get('repository/app_center/server', 'https://appcenter.software-univention.de') if not server.startswith('http'): server = 'https://%s' % server return server
[docs]def default_ucs_version(): # type: () -> str cache = AppCenterCache.build(server=default_server()) return cache.get_ucs_versions()[0]