#
# Univention UCS@school
#
# Copyright 2007-2025 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.
"""\
A UCS@school command line interface to fetch
user and group data from LUSD and import the fetched data into
the UCS@school domain.
"""
import configparser
import json
import logging
import os
import subprocess
import sys
import time
from argparse import ArgumentParser, Namespace
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List
import jwt
import requests
from ucsschool.lib.models.school import School
from ucsschool.lib.models.utils import get_file_handler, get_stream_handler
from univention.admin.uexceptions import noObject
from univention.admin.uldap import getMachineConnection
CONFIG_PATH: Path = Path("/etc/ucs-school-import-lusd/config.ini")
LOCK_FILE = Path("/var/lib/ucs-school-import-lusd/lock")
LOG_FILE = Path("/var/log/univention/ucs-school-import-lusd.log")
ROLE_STUDENT = "student"
ROLE_TEACHER = "teacher"
LOG_LEVELS = ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")
logger = logging.getLogger(__name__)
[docs]
class ConfigurationError(ValueError):
pass
[docs]
@dataclass(frozen=True)
class Configuration:
lusd_api_url: str
lusd_api_oauth_iss: str
school_authority: str
skip_fetch: bool
skip_students: bool
skip_teachers: bool
dry_run: bool
student_import_config_path: Path
teacher_import_config_path: Path
log_level: str = "ERROR"
school_id_map: Dict[str, List[str]] = field(default_factory=dict)
authentication_key_file_path: Path = Path("/var/lib/ucs-school-import-lusd/auth_key")
lusd_data_save_path: Path = Path("/var/lib/ucs-school-import-lusd/data/")
ucs_school_import_cli: Path = Path("/usr/share/ucs-school-import/scripts/ucs-school-user-import")
[docs]
def validate(self) -> None:
"""Validate the values stored in the configuration."""
if not len(self.school_id_map) > 0:
raise ConfigurationError("No schools configured.")
if not self.student_import_config_path.exists():
raise FileNotFoundError(
f"Student import configuration not found in path {self.student_import_config_path}."
)
if not self.teacher_import_config_path.exists():
raise FileNotFoundError(
f"Teacher import configuration not found in path {self.teacher_import_config_path}."
)
if not self.authentication_key_file_path.exists():
raise FileNotFoundError(
f"Private key file not found in path {self.authentication_key_file_path}."
)
if not self.lusd_api_url:
raise ConfigurationError(f"No valid LUSD API URL given: {self.lusd_api_url}.")
if self.log_level not in LOG_LEVELS:
raise ConfigurationError(
f"Not a valid log level: {self.log_level}, choose from: {LOG_LEVELS}"
)
if not self.school_authority:
raise ConfigurationError(f"No valid school authority configured: {self.school_authority}.")
[docs]
def normalize_schools(school_id_map: Dict[str, str]) -> Dict[str, List[str]]:
# to avoid duplicates, use the same case as the ldap ou
lo, _ = getMachineConnection()
normalized_school_id_map = {}
for school, school_id in school_id_map.items():
try:
normalized_school_id_map[School.from_dn(School(school).dn, None, lo).name] = school_id.split(
","
)
except noObject:
logger.error(
f"Could not find school in ldap: {school}, delete it from the mapping or create it"
)
sys.exit(1)
return normalized_school_id_map
[docs]
class ImportLUSD:
def __init__(self, args: Namespace) -> None:
if not args.configuration_filepath.exists():
self.setup_logging()
logger.error(f"Config path {CONFIG_PATH} does not exist.")
sys.exit(1)
file_config = configparser.ConfigParser()
file_config.read(args.configuration_filepath)
try:
self.configuration = Configuration(
student_import_config_path=Path(file_config["Settings"]["student_import_config_path"]),
teacher_import_config_path=Path(file_config["Settings"]["teacher_import_config_path"]),
dry_run=args.dry_run,
skip_fetch=args.skip_fetch,
skip_students=args.skip_students
if args.skip_students
else file_config["Settings"].getboolean("skip_students", False),
skip_teachers=args.skip_teachers
if args.skip_teachers
else file_config["Settings"].getboolean("skip_teachers", False),
log_level=args.log_level if args.log_level else file_config["Settings"]["log_level"],
school_id_map=normalize_schools(dict(file_config["SchoolMappings"])),
lusd_api_url=os.environ.get("LUSD_URL", "https://ucs.hessen.de"),
lusd_api_oauth_iss=os.environ.get("LUSD_ISS", "1ebdb8ea000457f8095a"),
school_authority=file_config["Settings"].get("school_authority", None),
)
except KeyError as exc:
self.setup_logging()
logger.error(f"Incomplete configuration: {exc}.")
sys.exit(1)
self.setup_logging()
logger.debug(f"Command line arguments: {vars(args)}")
for section in file_config.sections():
logger.debug(f"File config [{section}]: {dict(file_config[section].items())}")
logger.debug(f"Configuration: {self.configuration}")
try:
self.configuration.validate()
except (ConfigurationError, FileNotFoundError) as exc:
logger.error(exc)
sys.exit(1)
if not self.configuration.skip_fetch:
self.validate_dienststellennummern()
[docs]
def run_import(self) -> None:
if not self.configuration.skip_fetch:
self.fetch_and_store_lusd_data()
else:
logger.info("Skipping LUSD data download")
for school_name in self.configuration.school_id_map.keys():
self.run_sisopi_import(school_name)
[docs]
def fetch_and_store_lusd_data(self) -> None:
"""fetch and store data for all configured schools"""
for school_name in self.configuration.school_id_map.keys():
school_ids = self.configuration.school_id_map[school_name]
for role in (ROLE_STUDENT, ROLE_TEACHER):
if self.skip_role(role):
logger.info(f"Skip download of LUSD data for role: {role}, in school: {school_name}")
continue
logger.info(f"Starting download of LUSD data for role: {role}, in school: {school_name}")
file_path = self.get_lusd_data_save_path(school_name, role)
data = self.fetch_school_lusd_data(school_ids, role, file_path)
data_dir_path = self.configuration.lusd_data_save_path.joinpath(school_name)
data_dir_path.mkdir(parents=True, exist_ok=True)
data_path = data_dir_path.joinpath(f"{role}.json")
with open(data_path, "w") as f:
json.dump(data, f, indent=2)
logger.info(f"Finished download of LUSD data for role: {role}, in school: {school_name}")
logger.debug(f"Data saved to {data_path}")
[docs]
def skip_role(self, role: str) -> bool:
if role == ROLE_STUDENT:
return self.configuration.skip_students
elif role == ROLE_TEACHER:
return self.configuration.skip_teachers
else:
return False
[docs]
def get_bearer_token(self) -> str:
private_key_file = self.configuration.authentication_key_file_path
issued_at_time = int(time.time())
expiration_time = int(issued_at_time + 30)
jwt_payload = {
"iss": self.configuration.lusd_api_oauth_iss,
"aud": "LUSD externer Datenaustausch",
"iat": issued_at_time,
"exp": expiration_time,
}
try:
jwt_token = jwt.encode(jwt_payload, private_key_file.read_text(), algorithm="RS512")
except ValueError:
logger.error(f"The authentication key {private_key_file} is not valid. Not a pem file?")
sys.exit(1)
return jwt_token
[docs]
def fetch_school_lusd_data(self, school_ids: List[str], role: str, file_path: Path) -> Any:
"""Store LUSD data for school `school_ids` in `file_path`"""
if role == ROLE_STUDENT:
action_id = "Administrationsdaten Lernende lesen"
elif role == ROLE_TEACHER:
action_id = "Administrationsdaten Personal lesen"
response = self._lusd_request(action_id, {"schulDienststellennummern": school_ids})
response_data = response.json()
return response_data
[docs]
def get_lusd_data_save_path(self, school_name: str, role: str) -> Path:
data_dir_path = self.configuration.lusd_data_save_path.joinpath(school_name)
if not data_dir_path.exists():
logger.warning(f"Path does not exist, creating {data_dir_path}")
data_dir_path.mkdir(parents=True)
data_path = data_dir_path.joinpath(f"{role}.json")
return data_path
[docs]
def run_sisopi_import(self, school_name: str) -> None:
"""Run a single SiSoPi import for school `school_name`"""
for role in (ROLE_STUDENT, ROLE_TEACHER):
if self.skip_role(role):
logger.info(f"Skip import for role {role}, in school {school_name}")
continue
if role == ROLE_STUDENT:
import_config = self.configuration.student_import_config_path
elif role == ROLE_TEACHER:
import_config = self.configuration.teacher_import_config_path
input_file_path = self.get_lusd_data_save_path(school_name, role)
# Add log path
cmd = [
str(self.configuration.ucs_school_import_cli),
"--conffile",
str(import_config),
"--infile",
str(input_file_path),
"--user_role",
role,
"--school",
school_name,
]
if self.configuration.dry_run:
cmd.append("--dry-run")
if logger.isEnabledFor(logging.DEBUG):
cmd.extend(["--logfile", str(LOG_FILE), "--verbose"])
logger.info(f"Starting import for role {role}, in school {school_name}")
logger.debug(f"Running ucs-school-user-import subprocess with command {cmd}.")
for handler in logger.handlers:
# Flush here, so the subprocess can write safely in the same file
handler.flush()
subprocess.check_call( # nosec
cmd,
stderr=None if logger.isEnabledFor(logging.DEBUG) else subprocess.DEVNULL,
)
logger.info(f"Finished import for role {role}, in school {school_name}")
def _lusd_request(self, action_id: str, parameter: Dict[str, Any]) -> requests.Response:
token = self.get_bearer_token()
request_url = f"{self.configuration.lusd_api_url}/anfrage?format=JSON"
request_data = [
{
"bezeichnung": action_id,
"version": 1,
"parameter": parameter,
}
]
response = requests.post(
request_url, json=request_data, headers={"Authorization": f"Bearer {token}"}
)
# The api returns 404 for an empty search result. E.g. search for schulDienststellennummern
if not response.ok and response.status_code != 404:
logger.error(f"Could not retrieve LUSD data: {response.text}")
if response.status_code == 401:
logger.error(
"Unauthorized: Check the date on this machine and the configured private key"
)
sys.exit(1)
return response
[docs]
def validate_dienststellennummern(self) -> None:
action_id = "schulische Organisationsdaten lesen"
school_ids = (
school_id
for school_name in self.configuration.school_id_map.keys()
for school_id in self.configuration.school_id_map[school_name]
)
for school_id in school_ids:
response = self._lusd_request(action_id, {"schulDienststellennummern": school_id})
if response.status_code == 404:
logger.error(
f"Could not find any 'schultraeger' for the 'schulDienststellennummer': "
f"{school_id}. Please check for mistakes or remove it."
)
sys.exit(1)
try:
school_authority = response.json()[0]["antwort"]["schulen"][0]["schultraeger"]
except (requests.JSONDecodeError, IndexError, KeyError):
logger.error(f"Could not parse 'schultraeger' from LUSD data:\n{response.text}")
sys.exit(1)
if self.configuration.school_authority.lower() != school_authority.lower():
logger.error(
f"The 'dienststellennummer' {school_id} is not part "
f"of your configured school authority {self.configuration.school_authority}. "
f"This is not allowed. Please check for mistakes or remove it."
)
sys.exit(1)
[docs]
def setup_logging(self) -> None:
logger.addHandler(get_stream_handler(self.configuration.log_level))
logger.addHandler(get_file_handler(self.configuration.log_level, LOG_FILE))
logger.setLevel(self.configuration.log_level)
[docs]
def get_args() -> Namespace:
parser = ArgumentParser(str(Path(sys.argv[0]).absolute()), description=__doc__)
parser.add_argument(
"--configuration-filepath",
"-c",
dest="configuration_filepath",
default=CONFIG_PATH,
help=("The path to the configuration file of this CLI tool."),
type=Path,
)
parser.add_argument(
"--log-level",
dest="log_level",
choices=LOG_LEVELS,
type=str,
help="The log level.",
)
parser.add_argument(
"--skip-fetch",
dest="skip_fetch",
default=False,
action="store_true",
help=("Skip the fetching of LUSD DATA and import the previous data set again."),
)
parser.add_argument(
"--skip-students",
dest="skip_students",
default=False,
action="store_true",
help=("Skip fetching and importing stundents."),
)
parser.add_argument(
"--skip-teacher",
dest="skip_teachers",
default=False,
action="store_true",
help=("Skip fetching and importing teachers."),
)
parser.add_argument(
"--dry-run",
dest="dry_run",
default=False,
action="store_true",
help="Run the import in dry run mode",
)
return parser.parse_args()
[docs]
def run() -> None:
args = get_args()
importLUSD = ImportLUSD(args)
try:
LOCK_FILE.parent.mkdir(exist_ok=True)
LOCK_FILE.touch(exist_ok=False)
except FileExistsError:
logger.error(f"The LUSD importer is already running: Lock file {LOCK_FILE} exists.")
sys.exit(1)
try:
importLUSD.run_import()
finally:
if LOCK_FILE.exists() and LOCK_FILE.is_file():
LOCK_FILE.unlink()
if __name__ == "__main__":
run()