#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention UCS@school
#
# Copyright 2017-2025 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.
"""Database / Resource models"""
from __future__ import unicode_literals
import codecs
import logging
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django_celery_results.models import TaskResult
from ldap.filter import filter_format
import univention.admin.localization
from ucsschool.importer.utils.ldap_connection import get_unprivileged_connection
from .constants import JOB_CHOICES, JOB_NEW
USER_STAFF = "staff"
USER_STUDENT = "student"
USER_TEACHER = "teacher"
USER_TEACHER_AND_STAFF = "teacher_and_staff"
USER_ROLES = (USER_STAFF, USER_STUDENT, USER_TEACHER, USER_TEACHER_AND_STAFF)
USER_ROLES_CHOICES = list(zip([u.lower().replace(" ", "_") for u in USER_ROLES], USER_ROLES))
translation = univention.admin.localization.translation("ucs-school-import-http-api")
_ = translation.translate
USER_ROLE_TRANS = {
USER_STAFF: _("staff"),
USER_STUDENT: _("student"),
USER_TEACHER: _("teacher"),
USER_TEACHER_AND_STAFF: _("teacher_and_staff"),
}
[docs]
class Role(models.Model):
name = models.CharField(max_length=255, primary_key=True)
displayName = models.CharField(max_length=255, blank=True)
def __str__(self):
return self.name
__unicode__ = __str__
[docs]
@classmethod
def update_from_ldap(cls):
"""
Update Role objects from LDAP. Currently static values are used and no
LDAP query is done. This might change in the future.
:return: None
"""
names = []
for role in USER_ROLES:
name = role
display_name = _(USER_ROLE_TRANS[role])
obj, created = cls.objects.get_or_create(name=name, defaults={"displayName": display_name})
if not created and obj.displayName != display_name:
obj.displayName = display_name
obj.save()
names.append(name)
# delete unknown roles
cls.objects.exclude(name__in=names).delete()
class Meta: # noqa: DJ012
ordering = ("name",)
#
# AccessRule and Context are currently neither used, not tested. We fetch the
# required information in the FilterBackend and Permission classes (views.py)
# directly from LDAP. This is here to show how a permission representation
# (compatible with the above Role class) could be modeled in Django.
#
# To use this, install the code, and create and activate the required
# migrations with:
# python3 -m ucsschool.http_api.manage makemigrations
# python3 -m ucsschool.http_api.manage migrate
#
# from django.contrib.auth import get_user_model
# from django.contrib.contenttypes.fields import GenericForeignKey
# from django.contrib.contenttypes.models import ContentType
# CONTEXT_TYPE_SCHOOL = 'school'
# class Context(models.Model):
# type = models.CharField(max_length=255, primary_key=True)
#
# content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
# object_id = models.CharField(max_length=255)
# content_object = GenericForeignKey('content_type', 'object_id')
#
# def __unicode__(self):
# return 'Context(type={!r}, content_object={})'.format(self.type, self.content_object)
#
# @classmethod
# def update_school_contexts(cls):
# School.update_from_ldap()
# existing_school_objs = School.objects.all()
# ct_school = ContentType.objects.get(app_label='import_api', model='school')
# for context in list(cls.objects.filter(type=CONTEXT_TYPE_SCHOOL, content_type=ct_school)):
# if context.content_object not in existing_school_objs:
# context.delete()
# existing_school_objs_in_contexts = cls.objects.filter(type=CONTEXT_TYPE_SCHOOL).values(
# 'content_object'
# )
# for school_obj in set(existing_school_objs) - set(existing_school_objs_in_contexts):
# cls.objects.create(type=CONTEXT_TYPE_SCHOOL, content_object=school_obj)
# class AccessRule(models.Model):
# access = models.BooleanField(default=True)
# contexts = models.ManyToManyField(Context)
# roles = models.ManyToManyField(Role)
# users = models.ManyToManyField(settings.AUTH_USER_MODEL)
#
# import_group_name = models.CharField(max_length=255)
#
# def __unicode__(self):
# return 'AccessRule(context={!r}, roles={}, access={!r} import_group={!r})'.format(
# self.context,
# [r.name for r in self.roles.all()],
# self.access,
# self.import_group_name
# )
#
# @classmethod
# def update_from_ldap(cls):
# """
# Update AccessRule objects from LDAP (reading ucsschoolImportGroup
# groups).
#
# :return: None
# """
# names = []
# lo, po = get_unprivileged_connection()
# for dn, import_group in lo.search('(objectClass=ucsschoolImportGroup)'):
# name = import_group['cn'][0].decode("UTF-8")
# try:
# obj, _created = cls.objects.get_or_create(import_group_name=name)
# except cls.DoesNotExist:
# obj = cls.objects.create(import_group_name=name)
# users = cls._get_users(import_group['memberUid'])
# roles = cls._get_roles(import_group['ucsschoolImportRole'])
# contexts = cls._get_school_contexts(import_group['ucsschoolImportSchool'])
# if obj.users != users or obj.roles != roles or obj.contexts != contexts:
# obj.users = users
# obj.roles = roles
# obj.contexts = contexts
# obj.save()
# names.append(name)
# cls.objects.exclude(import_group_name__in=names).delete()
#
# @classmethod
# def _get_users(cls, users):
# UserModel = get_user_model()
# username_list_filter = '{}__in'.format(UserModel.USERNAME_FIELD)
#
# existing_users = UserModel._default_manager.filter(**{username_list_filter: users})
# existing_user_names = existing_users.values_list('username', flat=True)
# for username in set(users) - set(existing_user_names):
# UserModel._default_manager.create_user(username)
# return UserModel._default_manager.filter(**{username_list_filter: users})
#
# @classmethod
# def _get_roles(cls, roles):
# Role.update_from_ldap()
# existing_roles = Role.objects.filter(name__in=roles)
# missing_roles = set(roles) - set(r.name for r in existing_roles)
# if missing_roles:
# raise RuntimeError('Cannot get unknown role(s): {!r}.'.format(missing_roles))
# return existing_roles
#
# @classmethod
# def _get_school_contexts(cls, schools):
# Context.update_school_contexts() # will also update Schools
# existing_schools = School.objects.filter(name__in=schools)
# missing_schools = set(schools) - set(s.name for s in existing_schools)
# if missing_schools:
# raise RuntimeError('Cannot get context for unknown school(s): {!r}.'.format(missing_schools))
# ct_school = ContentType.objects.get(app_label='import_api', model='school')
# return Context.objects.filter(
# type=CONTEXT_TYPE_SCHOOL, content_type=ct_school, object_id__in=schools
# )
[docs]
class School(models.Model):
name = models.CharField(max_length=255, primary_key=True)
displayName = models.CharField(max_length=255, blank=True)
def __str__(self):
return self.name
__unicode__ = __str__
@staticmethod
def _get_ous_from_ldap(ou=None):
lo, po = get_unprivileged_connection()
if ou:
return lo.search(
filter=filter_format("(&(objectClass=ucsschoolOrganizationalUnit)(ou=%s))", [ou])
)
else:
return lo.search(filter="objectClass=ucsschoolOrganizationalUnit")
[docs]
@classmethod
def update_from_ldap(cls, ou_str=None):
"""
Update one or all School objects from OUs in LDAP.
:param str ou_str: name of School object to update, all will be updated if None
:return: None
"""
names = []
res = cls._get_ous_from_ldap(ou_str)
if ou_str and not res:
raise RuntimeError("Unknown school {!r}.".format(ou_str))
for _dn, ou in res:
name = ou["ou"][0].decode("UTF-8")
display_name = ou.get("displayName", [ou["ou"][0]])[0].decode("UTF-8")
obj, created = cls.objects.get_or_create(name=name, defaults={"displayName": display_name})
if not created and obj.displayName != display_name:
obj.displayName = display_name
obj.save()
names.append(name)
if not ou_str:
# delete OUs not in LDAP (anymore)
cls.objects.exclude(name__in=names).delete()
class Meta: # noqa: DJ012
ordering = ("name",)
[docs]
class TextArtifact(models.Model):
path = models.CharField(max_length=255, unique=True)
text = models.TextField(blank=True)
class Meta:
ordering = ("-pk",)
def __str__(self):
try:
pk = "#{}".format(self.get_userimportjob().pk)
except (AttributeError, ObjectDoesNotExist):
pk = "n/a"
return "{} #{} of importjob {}".format(self.__class__.__name__, self.pk, pk)
__unicode__ = __str__
[docs]
def get_text(self):
if not self.text:
try:
with codecs.open(self.path, "rb", encoding="utf-8") as fp:
self.text = fp.read()
except IOError as exc:
logger = logging.getLogger(__name__)
logger.error("Could not read %r: %s", self.path, exc)
return ""
return self.text
[docs]
def get_userimportjob(self):
return self.userimportjob
[docs]
class Logfile(TextArtifact):
[docs]
def get_userimportjob(self):
return self.userimportjob_log_file
class Meta:
proxy = True
[docs]
class PasswordsFile(TextArtifact):
[docs]
def get_userimportjob(self):
return self.userimportjob_password_file
class Meta:
proxy = True
[docs]
class SummaryFile(TextArtifact):
[docs]
def get_userimportjob(self):
return self.userimportjob_summary_file
class Meta:
proxy = True
[docs]
class UserImportJob(models.Model):
dryrun = models.BooleanField(default=True)
principal = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
school = models.ForeignKey(School, blank=True, on_delete=models.CASCADE)
source_uid = models.CharField(max_length=255, blank=True)
status = models.CharField(max_length=10, default=JOB_NEW, choices=JOB_CHOICES)
user_role = models.CharField(max_length=20, choices=USER_ROLES_CHOICES, blank=True)
# TODO: user_role = models.ForeignKey(Role, blank=True, on_delete=models.DO_NOTHING)
task_id = models.CharField(max_length=40, blank=True)
result = models.OneToOneField(TaskResult, on_delete=models.SET_NULL, null=True, blank=True)
log_file = models.OneToOneField(
Logfile, on_delete=models.SET_NULL, null=True, blank=True, related_name="userimportjob_log_file"
)
password_file = models.OneToOneField(
PasswordsFile,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name="userimportjob_password_file",
)
summary_file = models.OneToOneField(
SummaryFile,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name="userimportjob_summary_file",
)
basedir = models.CharField(max_length=255)
date_created = models.DateTimeField(auto_now_add=True)
input_file = models.FileField(upload_to="uploads/%Y-%m-%d/")
class Meta:
ordering = ("pk",)
def __str__(self):
return "UserImportJob {} | {} ({})".format(self.pk, self.school, self.status)
__unicode__ = __str__