#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# UCS@school python lib
#
# Copyright 2007-2021 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.
import re
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from ldap.filter import escape_filter_chars, filter_format
import univention.admin.modules as udm_modules
from univention.admin.filter import conjunction, parse
from univention.admin.uexceptions import noObject
from univention.lib.i18n import Translation
from univention.management.console.config import ucr
from univention.management.console.log import MODULE
from univention.management.console.modules import Base, UMC_Error
from univention.management.console.modules.decorators import sanitize
from univention.management.console.modules.sanitizers import StringSanitizer
from .school_umc_ldap_connection import LDAP_Connection, set_bind_function
if TYPE_CHECKING:
from univention.admin.handlers import simpleLdap as UdmObject
from univention.admin.uldap import access as LoType
# load UDM modules
udm_modules.update()
__bind_callback = None
_ = Translation("python-ucs-school").translate
[docs]class SchoolSanitizer(StringSanitizer):
def _sanitize(self, value, name, further_args):
value = super(SchoolSanitizer, self)._sanitize(value, name, further_args)
if not value and self.required:
raise UMC_Error(
_(
"The request did not specify any school. You have to create a school before "
'continuing. Use the "Schools" UMC module to create one.'
),
status=503,
result={"no_school_found": True},
)
return value
[docs]class SchoolBaseModule(Base):
"""This class serves as base class for UCS@school UMC modules that need
LDAP access. It initiates the list of available OUs (self.availableSchools) and
initiates the search bases (self.searchBase). set_bind_function() is called
automatically to allow LDAP connections. In order to integrate this class
into a module, use the following paradigm:
class Instance(SchoolBaseModule):
def __init__(self):
# initiate list of internal variables
SchoolBaseModule.__init__(self)
# ... custom code
def init(self):
SchoolBaseModule.init(self)
# ... custom code
"""
[docs] def init(self):
set_bind_function(self.bind_user_connection)
[docs] def bind_user_connection(self, lo): # type: (LoType) -> None
if not self.user_dn: # ... backwards compatibility
# the DN is None if we have a local user (e.g., root)
# FIXME: the statement above is not completely true, user_dn is None also if the UMC server +
# could not detect it (for whatever reason) therefore this workaround is a security hole
# which allows to execute ldap operations as machine account
try: # to get machine account password
MODULE.warn("Using machine account for local user: %s" % self.username)
with open("/etc/machine.secret", "r") as fd:
password = fd.read().strip()
user_dn = ucr.get("ldap/hostdn")
except IOError as exc:
password = None
user_dn = None
MODULE.warn("Cannot read /etc/machine.secret: %s" % (exc,))
lo.lo.bind(user_dn, password)
return
return super(SchoolBaseModule, self).bind_user_connection(lo)
[docs] @LDAP_Connection()
def schools(self, request, ldap_user_read=None):
"""Returns a list of all available school"""
from ucsschool.lib.models.school import School
schools = School.from_binddn(ldap_user_read)
if not schools:
raise UMC_Error(
_(
"Could not find any school. You have to create a school before continuing. Use the "
'"Schools" UMC module to create one.'
),
status=503,
result={"no_school_found": True},
)
self.finished(
request.id, [{"id": school.name, "label": school.display_name} for school in schools]
)
def _groups(self, ldap_connection, school, ldap_base, pattern=None, scope="sub"):
# type: (LoType, str, str, Optional[str], Optional[str]) -> List[Dict[str, str]]
"""Returns a list of all groups of the given school"""
# get list of all users matching the given pattern
ldapFilter = None
if pattern:
ldapFilter = LDAP_Filter.forGroups(pattern)
groupresult = udm_modules.lookup(
"groups/group", None, ldap_connection, scope=scope, base=ldap_base, filter=ldapFilter
)
name_pattern = re.compile(r"^%s-" % (re.escape(school)), flags=re.I)
return [{"id": grp.dn, "label": name_pattern.sub("", grp["name"])} for grp in groupresult]
[docs] @sanitize(school=SchoolSanitizer(required=True), pattern=StringSanitizer(default=""))
@LDAP_Connection()
def classes(self, request, ldap_user_read=None):
"""Returns a list of all classes of the given school"""
school = request.options["school"]
from ucsschool.lib.models.group import SchoolClass
self.finished(
request.id,
self._groups(
ldap_user_read, school, SchoolClass.get_container(school), request.options["pattern"]
),
)
[docs] @sanitize(school=SchoolSanitizer(required=True), pattern=StringSanitizer(default=""))
@LDAP_Connection()
def workgroups(self, request, ldap_user_read=None):
"""Returns a list of all working groups of the given school"""
school = request.options["school"]
from ucsschool.lib.models.group import WorkGroup
self.finished(
request.id,
self._groups(
ldap_user_read,
school,
WorkGroup.get_container(school),
request.options["pattern"],
"one",
),
)
[docs] @sanitize(school=SchoolSanitizer(required=True), pattern=StringSanitizer(default=""))
@LDAP_Connection()
def groups(self, request, ldap_user_read=None):
"""Returns a list of all groups (classes and workgroups) of the given school"""
# use as base the path for 'workgroups', as it incorporates workgroups and classes
# when searching with scope 'sub'
school = request.options["school"]
from ucsschool.lib.models.group import WorkGroup
self.finished(
request.id,
self._groups(
ldap_user_read, school, WorkGroup.get_container(school), request.options["pattern"]
),
)
[docs] @sanitize(school=SchoolSanitizer(required=True), pattern=StringSanitizer(default=""))
@LDAP_Connection()
def rooms(self, request, ldap_user_read=None):
"""Returns a list of all available school"""
school = request.options["school"]
from ucsschool.lib.models.group import ComputerRoom
self.finished(
request.id,
self._groups(
ldap_user_read, school, ComputerRoom.get_container(school), request.options["pattern"]
),
)
def _users(self, ldap_connection, school, group=None, user_type=None, pattern=""):
# type: (LoType, str, Optional[str], Optional[str], Optional[str]) -> List[ucsschool.lib.models.User] # noqa: E501
"""Returns a list of all users given 'pattern', 'school' (search base) and 'group'"""
import ucsschool.lib.models
if not user_type:
classes = [ucsschool.lib.models.User]
elif user_type.lower() in ("teachers", "teacher"):
classes = [ucsschool.lib.models.Teacher, ucsschool.lib.models.TeachersAndStaff]
elif user_type.lower() in ("student", "students", "pupil", "pupils"):
classes = [ucsschool.lib.models.Student]
else:
raise TypeError("user_type %r unknown." % (user_type,))
# open the group
groupObj = None
if group not in (None, "None"):
groupModule = udm_modules.get("groups/group")
groupObj = groupModule.object(None, ldap_connection, None, group)
groupObj.open()
# lazy loading of exception classes to prevent import loop
from ucsschool.lib.models.base import UnknownModel, WrongModel
# The following code block prevents a massive performance loss if the group
# contains far less users than all available users. The else-block opens
# all available users ==> high LDAP load! (Bug #42167)
users = []
for userdn in set(groupObj["users"]):
search_filter_list = [LDAP_Filter.forSchool(school)]
if pattern:
search_filter_list.append(LDAP_Filter.forUsers(pattern))
# concatenate LDAP filters
search_filter = u"{}".format(
conjunction("&", [parse(subfilter) for subfilter in search_filter_list])
)
for cls in classes:
try:
udm_obj = cls.get_only_udm_obj(ldap_connection, search_filter, base=userdn)
except noObject:
MODULE.error(
"Possible group inconsistency detected: %r contains member %r but member "
"was not found in LDAP" % (group, userdn)
)
udm_obj = None
if udm_obj is not None:
# make sure that the found UDM object is of requested user type
try:
cls.from_udm_obj(udm_obj, school, ldap_connection)
except (UnknownModel, WrongModel):
continue
users.append(udm_obj)
else:
# be aware that this search opens all user objects of specified type and may take some time!
users = []
for cls in classes:
_users = cls.get_all(ldap_connection, school, LDAP_Filter.forUsers(pattern))
users.extend(user.get_udm_object(ldap_connection) for user in _users)
return users
def _users_ldap(self, ldap_connection, school, group=None, user_type=None, pattern="", attr=None):
# type: (LoType, str, Optional[str], Optional[str], Optional[str], Optional[str]) -> List[Tuple[str, Dict[str, Any]]] # noqa: E501
"""
Returns a list of LDAP query result tuples (dn, attr) of all users
given `pattern`, `school` (search base) and `group`.
"""
import ucsschool.lib.models
if not user_type:
classes = [ucsschool.lib.models.User]
elif user_type.lower() in ("teachers", "teacher"):
classes = [ucsschool.lib.models.Teacher, ucsschool.lib.models.TeachersAndStaff]
elif user_type.lower() in ("student", "students", "pupil", "pupils"):
classes = [ucsschool.lib.models.Student]
elif user_type.lower() in ("staff",):
classes = [ucsschool.lib.models.Staff, ucsschool.lib.models.TeachersAndStaff]
else:
raise TypeError("user_type %r unknown." % (user_type,))
attr = attr or []
users = []
user_module = udm_modules.get("users/user")
if group not in (None, "None"):
# The following code block prevents a massive performance loss if the group
# contains far less users than all available users. The else-block opens
# all available users ==> high LDAP load! (Bug #42167)
user_dns = [
group.decode("utf-8") for group in ldap_connection.get(group).get("uniqueMember", [])
]
for userdn in set(user_dns):
search_filter_list = [LDAP_Filter.forSchool(school)]
if pattern:
search_filter_list.append(LDAP_Filter.forUsers(pattern))
for cls in classes:
search_filter_list.append(cls.type_filter)
# concatenate LDAP filters
search_filter = u"{}".format(
user_module.lookup_filter(
conjunction("&", [parse(subfilter) for subfilter in search_filter_list])
)
)
try:
ldap_objs = ldap_connection.search(search_filter, base=userdn, attr=attr)
except noObject:
raise noObject(
"User with DN: {} was not found in the group {}."
" Please make sure it is a valid UCS@school user and is member of all "
"necessary groups. For more information visit https://help.univention.com"
"/t/how-an-ucs-school-user-should-look-like/15630".format(userdn, group)
)
if len(ldap_objs) == 1:
users.append(ldap_objs[0])
# else:
# either: 'Possible group inconsistency detected: %r contains member %r but member
# was not found in LDAP' % (group, userdn))
# or: DN does not belong to teacher/student (WrongModel)
# in both cases: ignore user
else:
for cls in classes:
filter_s = u"{}".format(
user_module.lookup_filter(
conjunction(
"&",
[
parse(LDAP_Filter.forSchool(school)),
parse(LDAP_Filter.forUsers(pattern)),
parse(cls.type_filter),
],
)
)
)
users.extend(ldap_connection.search(filter=filter_s, attr=attr))
return users
[docs]class LDAP_Filter:
[docs] @staticmethod
def forSchool(school): # type: (str) -> str
return filter_format("(ucsschoolSchool=%s)", [school])
[docs] @staticmethod
def forUsers(pattern): # type: (str) -> str
return LDAP_Filter.forAll(pattern, ["lastname", "username", "firstname"])
[docs] @staticmethod
def forGroups(pattern, school=None): # type: (str, Optional[str]) -> str
# school parameter is deprecated
return LDAP_Filter.forAll(pattern, ["name", "description"])
[docs] @staticmethod
def forComputers(pattern): # type: (str) -> str
return LDAP_Filter.forAll(pattern, ["name", "description"], ["mac", "ip"])
regWhiteSpaces = re.compile(r"\s+")
[docs] @staticmethod
def forAll(pattern, subMatch=[], fullMatch=[], prefixes={}):
# type: (str, Optional[List[str]], Optional[List[str]], Optional[Dict[str, Any]]) -> str
expressions = []
for iword in LDAP_Filter.regWhiteSpaces.split(pattern or ""):
# evaluate the subexpression (search word over different attributes)
subexpr = []
# all expressions for a full string match
iword = escape_filter_chars(iword)
if iword:
subexpr += ["(%s=%s)" % (jattr, iword) for jattr in fullMatch]
# all expressions for a substring match
if not iword:
iword = "*"
elif iword.find("*") < 0:
iword = "*%s*" % iword
subexpr += ["(%s=%s%s)" % (jattr, prefixes.get(jattr, ""), iword) for jattr in subMatch]
# append to list of all search expressions
expressions.append("(|%s)" % "".join(subexpr))
if not expressions:
return ""
return "(&%s)" % "".join(expressions)
[docs]class Display:
[docs] @staticmethod
def user(udm_object): # type: (UdmObject) -> str
fullname = udm_object["lastname"]
if "firstname" in udm_object and udm_object["firstname"]:
fullname += ", %(firstname)s" % udm_object
return fullname + " (%(username)s)" % udm_object
[docs] @staticmethod
def user_ldap(ldap_object): # type: (Dict[str, Any]) -> str
fullname = ldap_object.get("sn", [b""])[0].decode("utf-8")
if ldap_object.get("givenName", [b""])[0]:
fullname += ", %s" % ldap_object["givenName"][0].decode("utf-8")
return fullname + " (%s)" % ldap_object["uid"][0].decode("utf-8")