Source code for ucsschool.http_api.import_api.tasks

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention UCS@school
#
# Copyright 2017-2025 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.

"""Celery tasks"""

#
# To monitor the celery queue run in an ncurses terminal UI:
#    celery --app=ucsschool.http_api.app.celery:app control enable_events
#    celery --app=ucsschool.http_api.app.celery:app events
#

from __future__ import unicode_literals

import itertools
import logging
import pathlib
import shutil
import time
import traceback

from celery import shared_task
from celery.utils.log import get_task_logger
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Q
from django_celery_results.models import TaskResult

from ucsschool.importer.exceptions import (
    InitialisationError,
    UcsSchoolImportError,
    UcsSchoolImportFatalError,
)
from univention.config_registry import ucr

from .constants import JOB_ABORTED, JOB_FINISHED, JOB_SCHEDULED, JOB_STARTED
from .http_api_import_frontend import HttpApiImportFrontend
from .models import Logfile, PasswordsFile, SummaryFile, UserImportJob

IMPORT_JOB_BASEDIR = pathlib.Path(settings.UCSSCHOOL_IMPORT["import_jobs_basedir"]).resolve()
logger = get_task_logger(__name__)
logger.level = logging.DEBUG
logging.root.setLevel(
    logging.INFO
)  # someone sets this to DEBUG, and then we catch all of Djangos SQL queries!


[docs] def run_import_job(task, importjob_id): try: importjob = UserImportJob.objects.get(pk=importjob_id) except ObjectDoesNotExist as exc: logger.exception(str(exc)) raise timeout = 10 while importjob.status != JOB_SCHEDULED: # possible race condition: we write JOB_STARTED into DB before client # (UserImportJobSerializer) writes JOB_SCHEDULED into DB time.sleep(1) importjob.refresh_from_db() timeout -= 1 if timeout <= 0: raise InitialisationError("{} did not reach JOB_SCHEDULED state in 60s.".format(importjob)) runner = HttpApiImportFrontend(importjob, task, logger) importjob.log_file = Logfile.objects.create(path=runner.logfile_path) importjob.password_file = PasswordsFile.objects.create(path=runner.password_file) importjob.summary_file = SummaryFile.objects.create(path=runner.summary_file) importjob.status = JOB_STARTED runner.update_job_state(description="Initializing: 0%.") try: task_result = TaskResult.objects.get(task_id=importjob.task_id) importjob.result = task_result except ObjectDoesNotExist: logger.error( "Cannot find TaskMeta object after running update_job_state() for import job {!r}.".format( importjob ) ) importjob.save(update_fields=("log_file", "password_file", "result", "status", "summary_file")) logger.info("-- Preparing import job... --") success = False try: runner.prepare_import() except Exception as exc: # log.exception unless it's a InitialisationError with a 'log_traceback' attribute set to False if getattr(exc, "log_traceback", True): log = logger.exception else: log = logger.error log("An error occurred while preparing the import job: {}".format(exc)) else: # from here on we can log with the import logger runner.logger.info("-- Starting import job... --") try: runner.do_import() success = True except UcsSchoolImportError as exc: runner.errors.append(exc) except Exception as exc: runner.errors.append( UcsSchoolImportFatalError("An unknown error terminated the import job: {}".format(exc)) ) runner.logger.error( "An unknown error terminated the import job: {}\n{}".format( exc, "".join(traceback.format_exc()) ) ) runner.logger.info("-- Finished import. --") importjob = UserImportJob.objects.get(pk=importjob_id) importjob.status = JOB_FINISHED if success else JOB_ABORTED importjob.save(update_fields=("status",)) return success, "{}\n{}".format( runner.user_import_summary_str, "\n".join(str(err) for err in runner.errors) )
[docs] def cleanup_import_jobs(): """Delete old stopped import jobs if ucsschool/import/http_api/import_jobs_to_keep is set.""" max_import_jobs = ucr.get_int("ucsschool/import/http_api/import_jobs_to_keep") if max_import_jobs <= 0: return all_stopped_non_dryrun_jobs = UserImportJob.objects.filter( (Q(status=JOB_ABORTED) | Q(status=JOB_FINISHED)) & Q(dryrun=False) ).order_by("-date_created") all_stopped_dryrun_jobs = UserImportJob.objects.filter( (Q(status=JOB_ABORTED) | Q(status=JOB_FINISHED)) & Q(dryrun=True) ).order_by("-date_created") if ( all_stopped_non_dryrun_jobs.count() <= max_import_jobs and all_stopped_dryrun_jobs.count() <= max_import_jobs ): return logger.info("Cleaning up old import jobs.") jobs_to_delete = all_stopped_non_dryrun_jobs[max_import_jobs:] dry_run_jobs_to_delete = all_stopped_dryrun_jobs[max_import_jobs:] logger.info( f"Removing {jobs_to_delete.count()} import and {dry_run_jobs_to_delete.count()} dry run jobs." ) for job in itertools.chain(jobs_to_delete, dry_run_jobs_to_delete): path = pathlib.Path(job.basedir).resolve() try: if ( path.exists() and path.is_dir() and path.is_relative_to(IMPORT_JOB_BASEDIR) and path != IMPORT_JOB_BASEDIR ): logger.info(f"Removing folder {path} of {job=!s}") shutil.rmtree(path) except Exception as exc: logger.error(f"Error during removal of folder {path} of {job=}:\n{exc}") job.delete() logger.info("Finished cleanup_import_jobs.")
@shared_task(bind=True) def import_users(self, importjob_id): logger.info("Starting UserImportJob %d (%r).", importjob_id, self) success, summary_str = run_import_job(self, importjob_id) logger.info("Finished UserImportJob %d.", importjob_id) try: cleanup_import_jobs() except Exception as exc: logger.error("Error during cleanup_import_jobs(): %s", exc) return HttpApiImportFrontend.make_job_state( description="UserImportJob #{} ended {}.\n\n{}".format( importjob_id, "successfully" if success else "with error", summary_str ), percentage=100, ) @shared_task(bind=True) def dry_run(self, importjob_id): logger.info("Starting dry run %d (%r).", importjob_id, self) success, summary_str = run_import_job(self, importjob_id) logger.info("Finished dry run %d.", importjob_id) return HttpApiImportFrontend.make_job_state( description="UserImportJob #{} (dry run) ended {}.\n\n{}".format( importjob_id, "successfully" if success else "with error", summary_str ), percentage=100, )