Source code for univention.appcenter.actions.update

#!/usr/bin/python3
#
# Univention App Center
#  univention-app module for updating the list of available apps
#
# SPDX-FileCopyrightText: 2015-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

from __future__ import annotations

import os
import os.path
import zlib
from argparse import SUPPRESS, ArgumentParser, Namespace
from glob import glob
from gzip import open as gzip_open
from typing import TYPE_CHECKING
from urllib.error import HTTPError
from urllib.request import Request

from univention.appcenter.actions import UniventionAppAction, possible_network_error
from univention.appcenter.app import LOCAL_ARCHIVE_DIR
from univention.appcenter.app_cache import AppCache, AppCenterCache, Apps
from univention.appcenter.exceptions import NetworkError, UpdateSignatureVerificationFailed, UpdateUnpackArchiveFailed
from univention.appcenter.log import catch_stdout
from univention.appcenter.ucr import ucr_get_int, ucr_is_false, ucr_is_true, ucr_save
from univention.appcenter.utils import gpg_verify, mkdir, urlopen
from univention.config_registry import handler_commit


if TYPE_CHECKING:
    from collections.abc import Iterable, Iterator, Mapping


ETAGS_NAME = '.etags'
ZSYNC_BINARY_PATH = 'zsync'
TIMEOUT_EXIT_CODE = 124


[docs] class Update(UniventionAppAction): """Updates the list of all available applications by asking the App Center server""" help = 'Updates the list of apps'
[docs] def setup_parser(self, parser: ArgumentParser) -> None: parser.add_argument('--ucs-version', help=SUPPRESS) parser.add_argument('--appcenter-server', help=SUPPRESS) parser.add_argument('--cache-dir', help=SUPPRESS) parser.add_argument('--just-get-cache', action='store_true', default=False, help=SUPPRESS)
[docs] def main(self, args: Namespace) -> None: something_changed = False for app_cache in self._app_caches(args): # first of all, set up local cache mkdir(app_cache.get_cache_dir()) if self._extract_local_archive(app_cache): something_changed = True for appcenter_cache in self._appcenter_caches(args): # download meta files like index.json mkdir(appcenter_cache.get_cache_dir()) if self._download_supra_files(appcenter_cache): something_changed = True for app_cache in self._app_caches(args): # try it one more time (ucs.ini may have changed) mkdir(app_cache.get_cache_dir()) if self._extract_local_archive(app_cache): something_changed = True # download apps based on meta files if self._download_apps(app_cache): something_changed = True if something_changed and not args.just_get_cache: apps_cache = Apps() for app in apps_cache.get_all_locally_installed_apps(): newest_app = apps_cache.find_candidate(app) or app if app < newest_app: ucr_save({app.ucr_upgrade_key: 'yes'}) self._update_local_files()
def _appcenter_caches(self, args: Namespace) -> list[AppCenterCache]: if args.appcenter_server: return [AppCenterCache(server=args.appcenter_server)] ret = [] servers = set() for appcenter_cache in Apps().get_appcenter_caches(): server = appcenter_cache.get_server() if server not in servers: servers.add(server) ret.append(appcenter_cache) return ret def _app_caches(self, args: Namespace) -> Iterator[AppCenterCache]: for appcenter_cache in self._appcenter_caches(args): for app_cache in appcenter_cache.get_app_caches(): if args.ucs_version: yield app_cache.copy(ucs_version=args.ucs_version, cache_dir=args.cache_dir) break else: yield app_cache.copy(cache_dir=args.cache_dir) def _get_etags(self, etags_file: str) -> dict[str, str]: ret = {} try: with open(etags_file) as f: for line in f: try: fname, etag = line.split('\t') except ValueError: pass else: ret[fname] = etag.rstrip('\n') except OSError: pass return ret def _save_etags(self, cache: AppCenterCache, etags: Mapping[str, str]) -> None: etags_file = os.path.join(cache.get_cache_dir(), ETAGS_NAME) with open(etags_file, 'w') as f: f.writelines('%s\t%s\n' % (fname, etag) for fname, etag in etags.items()) def _download_supra_files(self, appcenter_cache: AppCenterCache) -> bool: return self._download_files(appcenter_cache, ['app-categories.ini', 'rating.ini', 'license_types.ini', 'ucs.ini', 'suggestions.json']) def _download_files(self, cache: AppCenterCache, filenames: Iterable[str]) -> bool: updated = False server = cache.get_server() cache_dir = cache.get_cache_dir() etags_file = os.path.join(cache_dir, ETAGS_NAME) present_etags = self._get_etags(etags_file) ucs_version = None if hasattr(cache, 'get_ucs_version'): ucs_version = cache.get_ucs_version() for filename in filenames: etag = present_etags.get(filename) new_etag = self._download_file(server, filename, cache_dir, etag, ucs_version) if new_etag: present_etags[filename] = new_etag updated = True self._save_etags(cache, present_etags) return updated def _verify_file(self, cache_dir: str) -> None: if ucr_is_false('appcenter/index/verify'): return fname = os.path.join(cache_dir, '.tmp.tar') sname = os.path.join(cache_dir, '.all.tar.gpg') # there have been Signature Verification failures we want to investigate # one theory is, that the signature file and the file to verify do not match, # e.g the signature has been updated but not the file # we put the timestamp diff in the exception to gain some information file_timestamp = os.path.getmtime(fname) signature_timestamp = os.path.getmtime(sname) time_diff = file_timestamp - signature_timestamp (rc, gpg_error) = gpg_verify(fname, sname) if not rc: return if gpg_error: self.fatal(gpg_error) os.unlink(fname) os.unlink(sname) raise UpdateSignatureVerificationFailed(fname, gpg_error, time_diff) def _download_apps_zsync(self, app_cache: AppCache): """ Download app metadata archive using zsync for efficient delta synchronization. This method attempts to download the app metadata using zsync, which only transfers the differences between the local and remote versions. If zsync fails or times out, it automatically falls back to direct HTTPS download. :param app_cache: The app cache instance containing server and version info :raises: NetworkError if both zsync and direct download fail """ appcenter_host = app_cache.get_server() if appcenter_host.startswith('https'): appcenter_host = 'http://%s' % appcenter_host[8:] cache_dir = app_cache.get_cache_dir() tmp_file = os.path.join(cache_dir, '.tmp.tar') all_tar_file = os.path.join(cache_dir, '.all.tar') all_tar_url = '%s/meta-inf/%s/all.tar.zsync' % (appcenter_host, app_cache.get_ucs_version()) self.log('Downloading "%s"...' % all_tar_url) zsync_args = [ZSYNC_BINARY_PATH, all_tar_url, '-q', '-o', tmp_file, '-i', all_tar_file] zsync_timeout = ucr_get_int('appcenter/update/zsync-timeout') if zsync_timeout: timeout_args = ['timeout', '--kill-after', '2', '-v', str(zsync_timeout)] timeout_args.extend(zsync_args) zsync_args = timeout_args result = self._subprocess(zsync_args, cwd=cache_dir) if result.returncode == TIMEOUT_EXIT_CODE: self.warn('Downloading the App archive via zsync timed out. Falling back to downloading it directly.') self._download_apps_directly(app_cache) elif result.returncode: # fallback: download all.tar.gz without zsync. some proxys have difficulties with it, including: # * Range requests are not supported # * HTTP requests are altered self.warn('Downloading the App archive via zsync failed. Falling back to download it directly.') self.warn('For better performance, try to make zsync work for "%s". The error may be caused by a proxy altering HTTP requests' % all_tar_url) self._download_apps_directly(app_cache) def _download_apps_directly(self, app_cache: AppCache): """ Download app metadata archive directly via HTTPS without using zsync. This method downloads the complete all.tar.gz file via HTTPS. It is used as a fallback when zsync fails or when explicitly enabled via UCR variable appcenter/update/skip-zsync. :param app_cache: The app cache instance containing server and version info :raises: NetworkError if the download fails """ cache_dir = app_cache.get_cache_dir() try: self._download_files(app_cache, ['all.tar.gz']) except NetworkError as exc: # if we cannot download all.tar.gz, the .etags file is not valid anymore # https://forge.univention.org/bugzilla/show_bug.cgi?id=58469 self.fatal('Failed to download all.tar.gz: %s' % exc) if os.path.exists(os.path.join(cache_dir, ETAGS_NAME)): self.warn('Removing the old etags file %s' % os.path.join(cache_dir, ETAGS_NAME)) # remove the old etags file, so that the next update will try to download it again os.unlink(os.path.join(cache_dir, ETAGS_NAME)) raise # files are always downloaded with their filename prepended by '.' tgz_file = os.path.join(cache_dir, '.all.tar.gz') self._uncompress_archive(app_cache, tgz_file) def _download_apps(self, app_cache: AppCache) -> bool: filenames = [] if ucr_is_false('appcenter/index/verify') else ['all.tar.gpg'] if filenames and not self._download_files(app_cache, filenames): return False if ucr_is_true('appcenter/update/skip-zsync'): self._download_apps_directly(app_cache) else: self._download_apps_zsync(app_cache) self._verify_file(app_cache.get_cache_dir()) self._extract_archive(app_cache) return True @possible_network_error def _download_file(self, base_url: str, filename: str, cache_dir: str, etag: str | None, ucs_version: str | None = None) -> str | None: url = os.path.join(base_url, 'meta-inf', ucs_version or '', filename) target = os.path.join(cache_dir, '.%s' % filename) if not os.path.exists(target): etag = None self.log('Downloading "%s"...' % url) headers = {} if etag: headers['If-None-Match'] = etag request = Request(url, headers=headers) # noqa: S310 try: response = urlopen(request) except HTTPError as exc: if exc.getcode() == 304: self.debug(' ... Not Modified') return None raise etag = response.headers.get('etag') content = response.read() with open(target, 'wb') as f: f.write(content) return etag def _update_local_files(self) -> None: self.debug('Updating app files...') # some variables could change UCR templates # e.g. Name, Description self._update_conffiles() def _get_conffiles(self) -> list[str]: return [] def _update_conffiles(self) -> None: conffiles = self._get_conffiles() if conffiles: with catch_stdout(self.logger): handler_commit(conffiles) def _get_local_archive(self, app_cache: AppCenterCache) -> str | None: fname = os.path.join(LOCAL_ARCHIVE_DIR, app_cache.get_server_netloc(), app_cache.get_ucs_version(), 'all.tar.gz') if os.path.exists(fname): return fname def _extract_local_archive(self, app_cache: AppCenterCache) -> bool: local_archive = self._get_local_archive(app_cache) if not local_archive: # Not my local_archive return False if any(not fname.startswith('.') for fname in os.listdir(app_cache.get_cache_dir())): # we already have a cache. our archive is just outdated... return False self.log('Filling the App Center file cache from our local archive %s!' % local_archive) if not self._uncompress_archive(app_cache, local_archive): return False return self._extract_archive(app_cache) def _uncompress_archive(self, app_cache: AppCenterCache, local_archive: str) -> bool: """`gunzip` in Python""" try: with gzip_open(local_archive, 'rb') as zipped_file: archive_content = zipped_file.read() with open(os.path.join(app_cache.get_cache_dir(), '.tmp.tar'), 'wb') as extracted_file: extracted_file.write(archive_content) except (OSError, zlib.error) as exc: self.warn('Error while reading %s: %s' % (local_archive, exc)) return False else: return True def _extract_archive(self, app_cache: AppCenterCache) -> None: """`tar xf` in 'Python'""" cache_dir = app_cache.get_cache_dir() self.debug('Extracting archive in %s' % cache_dir) self._purge_old_cache(cache_dir) tmp_file = os.path.join(cache_dir, '.tmp.tar') self.debug('Unpacking %s...' % tmp_file) if self._subprocess(['tar', '-C', cache_dir, '-x', '-f', tmp_file]).returncode: raise UpdateUnpackArchiveFailed(tmp_file) # make sure cache dir is available for everybody os.chmod(cache_dir, 0o755) # `touch tmp_file` to get a new cache in case it was created in between extraction os.utime(tmp_file, None) # Rename temporary to final file name all_tar_file = os.path.join(cache_dir, '.all.tar') os.rename(tmp_file, all_tar_file) def _purge_old_cache(self, cache_dir: str) -> None: self.debug('Removing old files...') for fname in glob(os.path.join(cache_dir, '*')): try: os.unlink(fname) except OSError as exc: self.warn('Cannot delete %s: %s' % (fname, exc))