#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention App Center
# univention-app module for uninstalling an app
#
# Copyright 2015-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
#
import re
import os
import platform
from six import with_metaclass
from univention.appcenter.app import LooseVersion
from univention.appcenter.meta import UniventionMetaClass
from univention.appcenter.ucr import ucr_get, ucr_load, ucr_is_true
from univention.appcenter.app_cache import Apps
from univention.appcenter.utils import get_current_ram_available, get_free_disk_space, underscore, container_mode, app_ports, _
from univention.appcenter.actions import get_action
from univention.appcenter.packages import packages_are_installed, get_package_manager
_REQUIREMENTS = {}
[docs]class Requirement(with_metaclass(RequirementMetaClass)):
def __init__(self, apps, action):
self.apps = apps
self.action = action
[docs] def test(self):
if self.action == 'install':
return self._test_install()
elif self.action == 'upgrade':
return self._test_upgrade()
elif self.action == 'remove':
return self._test_remove()
return {}
def _test_install(self):
return {}
def _test_upgrade(self):
return {}
def _test_remove(self):
return {}
[docs] @classmethod
def get_name(cls):
return underscore(cls.__name__)
[docs] def other_apps(self, app):
return [_app for _app in self.apps if app != _app]
[docs]class SingleRequirement(Requirement):
def _test_install(self):
ret = {}
for app in self.apps:
result = self.test_install(app)
if result is not None and result is not True:
ret[app.id] = result
return ret
[docs] def test_install(self, app):
pass
def _test_upgrade(self):
ret = {}
for app in self.apps:
result = self.test_upgrade(app)
if result is not None and result is not True:
ret[app.id] = result
return ret
[docs] def test_upgrade(self, app):
pass
def _test_remove(self):
ret = {}
for app in self.apps:
result = self.test_remove(app)
if result is not None and result is not True:
ret[app.id] = result
return ret
[docs] def test_remove(self, app):
pass
[docs]class MultiRequirement(Requirement):
def _test_install(self):
ret = {}
result = self.test_install(self.apps)
if result is not None and result is not True:
ret['__all__'] = result
return ret
[docs] def test_install(self, apps):
pass
def _test_upgrade(self):
ret = {}
result = self.test_upgrade(self.apps)
if result is not None and result is not True:
ret['__all__'] = result
return ret
[docs] def test_upgrade(self, apps):
pass
def _test_remove(self):
ret = {}
result = self.test_remove(self.apps)
if result is not None and result is not True:
ret['__all__'] = result
return ret
[docs] def test_remove(self, apps):
pass
[docs]class HardRequirement(object):
[docs] def is_error(self):
return True
[docs]class SoftRequirement(object):
[docs] def is_error(self):
return False
[docs]class MustHaveCorrectServerRole(SingleRequirement, HardRequirement):
'''The application cannot be installed on the current server
role (%(current_role)s). In order to install the application,
one of the following roles is necessary: %(allowed_roles)r'''
[docs] def test_install(self, app):
server_role = ucr_get('server/role')
if not app._allowed_on_local_server():
return {
'current_role': server_role,
'allowed_roles': ', '.join(app.server_role),
}
test_upgrade = test_install
[docs]class MustHaveFittingAppVersion(SingleRequirement, HardRequirement):
'''To upgrade, at least version %(required_version)s needs to
be installed.'''
[docs] def test_upgrade(self, app):
if app.required_app_version_upgrade:
required_version = LooseVersion(app.required_app_version_upgrade)
installed_app = Apps().find(app.id)
installed_version = LooseVersion(installed_app.version)
if required_version > installed_version:
return {'required_version': app.required_app_version_upgrade}
[docs]class MustHaveFittingKernelVersion(MultiRequirement, HardRequirement):
'''The Kernel version has to be upgraded and the system rebootet.'''
[docs] def test_install(self, apps):
if any(app.docker for app in apps):
kernel = LooseVersion(os.uname()[2])
if kernel < LooseVersion('4.9'):
return False
test_upgrade = test_install
[docs]class MustHaveCandidate(SingleRequirement, HardRequirement):
'''The application is either not installed or no newer version is available'''
[docs] def test_upgrade(self, app):
_app = Apps().find(app.id)
if not _app.is_installed() or _app >= app:
return False
[docs]class MustHaveFittingUcsVersion(SingleRequirement, HardRequirement):
'''The application requires UCS version %(required_version)s.'''
[docs] def test_install(self, app):
required_ucs_version = None
for supported_version in app.supported_ucs_versions:
if supported_version.startswith('%s-' % ucr_get('version/version')):
required_ucs_version = supported_version
break
else:
if app.get_ucs_version() == ucr_get('version/version'):
if app.required_ucs_version:
required_ucs_version = app.required_ucs_version
else:
return True
if required_ucs_version is None:
return {'required_version': app.get_ucs_version()}
major, minor = ucr_get('version/version').split('.', 1)
patchlevel = ucr_get('version/patchlevel')
errata = ucr_get('version/erratalevel')
version_bits = re.match(r'^(\d+)\.(\d+)-(\d+)(?: errata(\d+))?$', required_ucs_version).groups()
comparisons = zip(version_bits, [major, minor, patchlevel, errata])
for required, present in comparisons:
if int(required or 0) > int(present):
return {'required_version': required_ucs_version}
if int(required or 0) < int(present):
return True
test_upgrade = test_install
[docs]class MustHaveInstallPermissions(SingleRequirement, HardRequirement):
'''You need to buy the App to install this version.'''
[docs] def test_install(self, app):
if not app.install_permissions_exist():
return {'shop_url': app.shop_url, 'version': app.version}
test_upgrade = test_install
[docs]class MustHaveNoConflictsApps(SingleRequirement, HardRequirement):
'''The application conflicts with the following applications:
%r'''
[docs] def test_install(self, app):
conflictedapps = set()
apps_cache = Apps()
# check ConflictedApps
for _app in apps_cache.get_all_apps():
if not _app._allowed_on_local_server():
# cannot be installed, continue
continue
if _app.id in app.conflicted_apps or app.id in _app.conflicted_apps:
if _app.is_installed():
conflictedapps.add(_app.id)
elif _app in self.other_apps(app):
conflictedapps.add(_app.id)
# check port conflicts
ports = []
for i in app.ports_exclusive:
ports.append(i)
for i in app.ports_redirection:
ports.append(i.split(':', 1)[0])
for app_id, container_port, host_port in app_ports():
if app_id != app.id and str(host_port) in ports:
conflictedapps.add(app_id)
for _app in self.other_apps(app):
other_ports = set()
for i in _app.ports_exclusive:
other_ports.add(i)
for i in _app.ports_redirection:
other_ports.add(i.split(':', 1)[0])
if other_ports.intersection(ports):
conflictedapps.add(_app.id)
if conflictedapps:
conflictedapps = [apps_cache.find(app_id) for app_id in conflictedapps]
return [{'id': _app.id, 'name': _app.name} for _app in conflictedapps if _app]
test_upgrade = test_install
[docs]class MustHaveNoConflictsPackages(SingleRequirement, HardRequirement):
'''The application conflicts with the following packages: %r'''
[docs] def test_install(self, app):
conflict_packages = []
for pkgname in app.conflicted_system_packages:
if packages_are_installed([pkgname], strict=True):
conflict_packages.append(pkgname)
if conflict_packages:
return conflict_packages
test_upgrade = test_install
[docs]class MustHaveNoUnmetDependencies(SingleRequirement, HardRequirement):
'''The application requires the following applications: %r'''
[docs] def test_install(self, app):
unmet_apps = []
apps_cache = Apps()
# RequiredApps
for _app in apps_cache.get_all_apps():
if _app.id in app.required_apps:
if not _app.is_installed():
unmet_apps.append({'id': _app.id, 'name': _app.name, 'in_domain': False})
# RequiredAppsInDomain
domain = get_action('domain')
apps = [apps_cache.find(app_id) for app_id in app.required_apps_in_domain]
apps_info = domain.to_dict(apps)
for _app in apps_info:
if not _app:
continue
if not _app['is_installed_anywhere']:
local_allowed = _app['id'] not in app.conflicted_apps
unmet_apps.append({'id': _app['id'], 'name': _app['name'], 'in_domain': True, 'local_allowed': local_allowed})
unmet_apps = [unmet_app for unmet_app in unmet_apps if unmet_app['id'] not in (_app.id for _app in self.other_apps(app))]
if unmet_apps:
return unmet_apps
test_upgrade = test_install
[docs]class MustHaveSupportedArchitecture(SingleRequirement, HardRequirement):
'''This application only supports %(supported)s as
architecture. %(msg)s'''
[docs] def test_install(self, app):
supported_architectures = app.supported_architectures
platform_bits = platform.architecture()[0]
aliases = {'i386': '32bit', 'amd64': '64bit'}
if supported_architectures:
for architecture in supported_architectures:
if aliases[architecture] == platform_bits:
break
else:
# For now only two architectures are supported:
# 32bit and 64bit - and this will probably not change
# too soon.
# So instead of returning lists and whatnot
# just return a nice message
# Needs to be adapted when supporting different archs
supported = supported_architectures[0]
if supported == 'i386':
needs = 32
has = 64
else:
needs = 64
has = 32
msg = _('The application needs a %(needs)s-bit operating system. This server is running a %(has)s-bit operating system.') % {'needs': needs, 'has': has}
return {'supported': supported, 'msg': msg}
test_upgrade = test_install
[docs]class MustHaveValidLicense(MultiRequirement, HardRequirement):
'''For the installation, a UCS license key
with a key identification (Key ID) is required'''
[docs] def test_install(self, apps):
if any(app.notify_vendor for app in apps):
license = ucr_get('uuid/license')
if license is None:
ucr_load()
license = ucr_get('uuid/license')
return license is not None
test_upgrade = test_install
[docs]class MustNotBeDependedOn(SingleRequirement, HardRequirement):
'''The application is required for the following applications
to work: %r'''
[docs] def test_remove(self, app):
depending_apps = []
apps_cache = Apps()
# RequiredApps
for _app in apps_cache.get_all_apps():
if app.id in _app.required_apps and _app.is_installed():
depending_apps.append({'id': _app.id, 'name': _app.name})
# RequiredAppsInDomain
apps = [_app for _app in apps_cache.get_all_apps() if app.id in _app.required_apps_in_domain]
if apps:
domain = get_action('domain')
self_info = domain.to_dict([app])[0]
hostname = ucr_get('hostname')
if not any(inst['version'] for host, inst in self_info['installations'].items() if host != hostname):
# this is the only installation
apps_info = domain.to_dict(apps)
for _app in apps_info:
if _app['is_installed_anywhere']:
depending_apps.append({'id': _app['id'], 'name': _app['name']})
depending_apps = [depending_app for depending_app in depending_apps if depending_app['id'] not in (_app.id for _app in self.other_apps(app))]
if depending_apps:
return depending_apps
[docs]class MustNotBeDockerIfDockerIsDisabled(SingleRequirement, HardRequirement):
'''The application uses a container technology while the App Center
is configured to not not support it'''
[docs] def test_install(self, app):
return not app.docker or ucr_is_true('appcenter/docker', True)
test_upgrade = test_install
[docs]class MustNotBeDockerInDocker(SingleRequirement, HardRequirement):
'''The application uses a container technology while the system
itself runs in a container. Using the application is not
supported on this host'''
[docs] def test_install(self, app):
return not app.docker or not container_mode()
test_upgrade = test_install
[docs]class MustNotBeEndOfLife(SingleRequirement, HardRequirement):
'''This application was discontinued and may not be installed
anymore'''
[docs] def test_install(self, app):
return not app.end_of_life
[docs]class MustNotBeInstalled(SingleRequirement, HardRequirement):
'''This application is already installed'''
[docs] def test_install(self, app):
return not app.is_installed()
[docs]class MustNotBeVoteForApp(SingleRequirement, HardRequirement):
'''The application is not yet installable. Vote for this app
now and bring your favorite faster to the Univention App
Center'''
[docs] def test_install(self, app):
return not app.vote_for_app
test_upgrade = test_install
[docs]class MustNotHaveConcurrentOperation(SingleRequirement, HardRequirement):
'''Another package operation is in progress'''
[docs] def test_install(self, app):
if app.docker:
return True
else:
return get_package_manager().progress_state._finished # TODO: package_manager.is_finished()
test_upgrade = test_install
test_remove = test_install
[docs]class ShallHaveEnoughFreeDiskSpace(MultiRequirement, SoftRequirement):
'''The system needs %(minimum)d MB of free disk space but only
%(current)d MB are available.'''
[docs] def test_install(self, apps):
required_free_disk_space = 0
for app in apps:
required_free_disk_space += (app.min_free_disk_space or 0)
if required_free_disk_space <= 0:
return True
current_free_disk_space = get_free_disk_space()
if current_free_disk_space and current_free_disk_space < required_free_disk_space:
return {'minimum': required_free_disk_space, 'current': current_free_disk_space}
[docs]class ShallHaveEnoughRam(MultiRequirement, SoftRequirement):
'''The system need at least %(minimum)d MB of free RAM but only
%(current)d MB are available.'''
[docs] def test_install(self, apps):
current_ram = get_current_ram_available()
required_ram = 0
for app in apps:
required_ram += app.min_physical_ram
if current_ram < required_ram:
return {'minimum': required_ram, 'current': current_ram}
[docs] def test_upgrade(self, apps):
current_ram = get_current_ram_available()
required_ram = 0
for app in apps:
required_ram += app.min_physical_ram
installed_app = Apps().find(app.id)
# is already installed, just a minor version upgrade
# RAM "used" by this installed app should count
# as free. best approach: subtract it
required_ram -= installed_app.min_physical_ram
if current_ram < required_ram:
return {'minimum': required_ram, 'current': current_ram}
[docs]class ShallNotBeDockerIfDiscouraged(SingleRequirement, HardRequirement):
'''The application has not been approved to migrate all
existing data. Maybe there is a migration guide:
%(migration_link)s'''
[docs] def test_install(self, app):
problem = app._docker_prudence_is_true() and not app.docker_migration_works
if problem:
return {'migration_link': app.docker_migration_link}
test_upgrade = test_install
[docs]class ShallOnlyBeInstalledInAdEnvWithPasswordService(SingleRequirement, SoftRequirement):
'''The application requires the password service to be set up
on the Active Directory domain controller server.'''
[docs] def test_install(self, app):
return not app._has_active_ad_member_issue('password')
test_upgrade = test_install
[docs]def check(apps, action):
errors = {}
warnings = {}
for name, klass in _REQUIREMENTS.items():
requirement = klass(apps, action)
result = requirement.test()
if result:
if requirement.is_error():
errors[name] = result
else:
warnings[name] = result
return errors, warnings
[docs]def get_requirement(name):
return _REQUIREMENTS[name]