Source code for univention.appcenter.packages

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention App Center
#  Package functions
#
# Copyright 2016-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
#

import os
import fcntl
import time
import re
from logging import Handler
from contextlib import contextmanager

from six import string_types

from univention.lib.package_manager import PackageManager, LockError  # LockError is actually imported from other files!

from univention.appcenter.log import get_base_logger, LogCatcher
from univention.appcenter.utils import call_process


package_logger = get_base_logger().getChild('packages')


LOCK_FILE = '/var/run/univention-appcenter.lock'


class _PackageManagerLogHandler(Handler):

	def emit(self, record):
		if record.name.startswith('packagemanager.dpkg'):
			if isinstance(record.msg, string_types):
				record.msg = record.msg.rstrip() + '\r'
			if record.name.startswith('packagemanager.dpkg.percentage'):
				record.levelname = 'DEBUG'
				record.levelno = 10


[docs]def get_package_manager(): if get_package_manager._package_manager is None: package_manager = PackageManager(lock=False) package_manager.set_finished() # currently not working. accepting new tasks package_manager.logger.parent = get_base_logger() log_filter = _PackageManagerLogHandler() package_manager.logger.addHandler(log_filter) get_package_manager._package_manager = package_manager return get_package_manager._package_manager
get_package_manager._package_manager = None
[docs]def reload_package_manager(): if get_package_manager._package_manager is not None: get_package_manager().reopen_cache()
[docs]def packages_are_installed(pkgs, strict=True): package_manager = get_package_manager() if strict: return all(package_manager.is_installed(pkg) for pkg in pkgs) else: # app.is_installed(package_manager, strict=True) uses # apt_pkg.CURSTATE. Not desired when called during # installation of umc-module-appcenter together with # several other (app relevant) packages; for example # in postinst or joinscript (on Primary Node). # see Bug #33535 and Bug #31261 for pkg_name in pkgs: try: pkg = package_manager.get_package(pkg_name, raise_key_error=True) except KeyError: return False else: if not pkg.is_installed: return False return True
[docs]@contextmanager def package_lock(): try: fd = open(LOCK_FILE, 'w') fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except EnvironmentError: raise LockError('Could not acquire lock!') else: package_logger.debug('Holding LOCK') try: yield finally: package_logger.debug('Releasing LOCK') try: os.unlink(LOCK_FILE) except EnvironmentError: pass fd.close()
[docs]def wait_for_dpkg_lock(timeout=120): lock_files = ['/var/lib/dpkg/lock', '/var/lib/apt/lists/lock'] lock_file_string = ' or '.join(lock_files) package_logger.debug('Trying to get a lock for %s...' % lock_file_string) first = True while first or timeout > 0: returncode = call_process(['fuser'] + lock_files).returncode if returncode == 0: if first: package_logger.info('Could not lock %s. Is another process using it? Waiting up to %s seconds' % (lock_file_string, timeout)) first = False # there seems to be a timing issue with the fuser approach # in which the second (the apt) process releases its lock before # re-grabbing it once again # we hope to minimize this error by having a relatively high sleep duration sleep_duration = 3 time.sleep(sleep_duration) timeout -= sleep_duration else: if not first: package_logger.info('Finally got the lock. Continuing...') return True package_logger.info('Unable to get a lock. Giving up...') return False
def _apt_args(dry_run=False): apt_args = ['-o', 'DPkg::Options::=--force-confold', '-o', 'DPkg::Options::=--force-overwrite', '-o', 'DPkg::Options::=--force-overwrite-dir', '--trivial-only=no', '--assume-yes', '--auto-remove'] return apt_args def _apt_get(action, pkgs): env = os.environ.copy() env['DEBIAN_FRONTEND'] = 'noninteractive' apt_args = _apt_args() ret = call_process(['/usr/bin/apt-get'] + apt_args + [action] + pkgs, logger=package_logger, env=env).returncode == 0 reload_package_manager() return ret def _apt_get_dry_run(action, pkgs): apt_args = _apt_args() logger = LogCatcher(package_logger) success = call_process(['/usr/bin/apt-get'] + apt_args + [action, '-s'] + pkgs, logger=logger).returncode == 0 install, remove, broken = [], [], [] install_regex = re.compile(r'^(Inst) ([^ ]*?) \((.*?) ') upgrade_remove_regex = re.compile(r'^(Remv|Inst) ([^ ]*?) \[(.*?)\]') for line in logger.stdout(): for regex in [install_regex, upgrade_remove_regex]: match = regex.match(line) if match: operation, pkg_name, version = match.groups() if operation == 'Inst': install.append(pkg_name) elif operation == 'Remv': remove.append(pkg_name) break if not success: for pkg in pkgs: if action == 'install' and pkg not in install: broken.append(pkg) if action == 'remove' and pkg not in remove: broken.append(pkg) return dict(zip(['install', 'remove', 'broken'], [install, remove, broken]))
[docs]def install_packages_dry_run(pkgs): return _apt_get_dry_run('install', pkgs)
[docs]def dist_upgrade_dry_run(): return _apt_get_dry_run('dist-upgrade', [])
[docs]def install_packages(pkgs): return _apt_get('install', pkgs)
[docs]def remove_packages_dry_run(pkgs): return _apt_get_dry_run('remove', pkgs)
[docs]def remove_packages(pkgs): return _apt_get('remove', pkgs)
[docs]def dist_upgrade(): return _apt_get('dist-upgrade', [])
[docs]def update_packages(): call_process(['/usr/bin/apt-get', 'update'], logger=package_logger) reload_package_manager()
[docs]def mark_packages_as_manually_installed(pkgs): call_process(['/usr/bin/apt-mark', 'manual'] + pkgs, logger=package_logger) reload_package_manager()