#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright 2009-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
Univention Updater helper functions for managing a local repository.
"""
from __future__ import absolute_import
from __future__ import print_function
import os
import shutil
import subprocess
import sys
import gzip
try:
    from typing import IO, List, Optional, Tuple  # noqa: F401
except ImportError:
    pass
from univention.config_registry import ConfigRegistry
from univention.lib.ucs import UCS_Version  # noqa: F401
configRegistry = ConfigRegistry()
configRegistry.load()
# constants
ARCHITECTURES = {'amd64', 'all'}
[docs]class TeeFile(object):
    """
    Writes the given string to several files at once. Could by used
    with the print statement
    """
    def __init__(self, fds=[]):
        # type: (List[IO[str]]) -> None
        """
        Register multiple file descriptors, to which the data is written.
        :param fds: A list of opened files.
        :type fds: list(File)
        """
        self._fds = fds or [sys.stdout]
[docs]    def write(self, data):
        # type: (str) -> None
        """
        Write string to all registered files.
        :param str data: The string to write.
        """
        for fd in self._fds:
            fd.write(data)
            fd.flush()  
[docs]def gzip_file(filename):
    # type: (str) -> int
    """
    Compress file.
    :param str filename: The file name of the file to compress.
    :returns: the process exit code.
    :rtype: int
    """
    return subprocess.call(('gzip', '--keep', '--force', '--no-name', '-9', filename)) 
[docs]def copy_package_files(source_dir, dest_dir):
    # type: (str, str) -> None
    """
    Copy all Debian binary package files and signed updater scripts from `source_dir` to `dest_dir`.
    :param str source_dir: Source directory.
    :param str dest_dir: Destination directory.
    """
    for filename in os.listdir(source_dir):
        src = os.path.join(source_dir, filename)
        if not os.path.isfile(src):
            continue
        if filename.endswith('.deb') or filename.endswith('.udeb'):
            try:
                arch = filename.rsplit('_', 1)[-1].split('.', 1)[0]  # partman-btrfs_10.3.201403242318_all.udeb
            except (TypeError, ValueError):
                print("Warning: Could not determine architecture of package '%s'" % filename, file=sys.stderr)
                continue
            src_size = os.stat(src)[6]
            dest = os.path.join(dest_dir, arch, filename)
            # package already exists with correct size
            if os.path.isfile(dest) and os.stat(dest)[6] == src_size:
                continue
        elif filename in ('preup.sh', 'preup.sh.gpg', 'postup.sh', 'postup.sh.gpg'):
            dest = os.path.join(dest_dir, 'all', filename)
        else:
            continue
        try:
            shutil.copy2(src, dest)
        except shutil.Error as ex:
            print("Copying '%s' failed: %s" % (src, ex), file=sys.stderr) 
[docs]def gen_indexes(base, version):  # type: (str, UCS_Version) -> None
    """
    Re-generate Debian :file:`Packages` files from file:`dists/` file.
    :param str base: Base directory, which contains the per architecture sub directories.
    """
    A = 'Architecture: '
    F = 'Filename: '
    print('  generating index ...', end=' ')
    for arch in ARCHITECTURES:
        if arch == 'all':
            continue
        src = os.path.join(
            base,
            'dists',
            'ucs%d%d%d' % version.mmp,
            'main',
            'binary-%s' % (arch,),
            'Packages.gz',
        )
        if not os.path.exists(src):
            continue
        lines = []
        names = [os.path.join(base, name, 'Packages') for name in ('all', arch)]
        with gzip.open(src, 'rb') as f_src, open(names[0], 'w') as f_all, open(names[1], 'w') as f_arch:
            for raw in f_src:
                line = raw.decode("UTF-8")
                if line.startswith(A):
                    arch = line[len(A):].strip()
                elif line.startswith(F):
                    line = '%s%s/%s' % (F, version, line[len(F):].lstrip('/'))
                lines.append(line)
                if line == '\n':
                    f = f_all if arch == 'all' else f_arch
                    f.write(''.join(lines))
                    del lines[:]
        for name in names:
            gzip_file(name)
    print('done') 
[docs]def get_repo_basedir(packages_dir):
    # type: (str) -> str
    """
    Check if a file path is a UCS package repository.
    :param str package_dir: A directory path.
    :returns: The canonicalized path without the architecture sub directory.
    :rtype: str
    """
    path = os.path.normpath(packages_dir)
    if os.path.isfile(os.path.join(path, 'Packages')):
        head, tail = os.path.split(path)
        if tail in ARCHITECTURES:
            return head
    elif set(os.listdir(path)) & ARCHITECTURES:
        return path
    print('Error: %s does not seem to be a repository.' % packages_dir, file=sys.stderr)
    sys.exit(1) 
[docs]def assert_local_repository(out=sys.stderr):
    # type: (IO[str]) -> None
    """
    Exit with error if the local repository is not enabled.
    :param file out: Override error output. Defaults to :py:obj:`sys.stderr`.
    """
    if not configRegistry.is_true('local/repository', False):
        print('Error: The local repository is not activated. Use "univention-repository-create" to create it or set the Univention Configuration Registry variable "local/repository" to "yes" to re-enable it.', file=out)
        sys.exit(1)