Source code for univention.management.console.modules.internetrules

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Management Console module:
#   Defines and manages internet rules
#
# Copyright 2012-2025 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.

import fnmatch
import re

import six
from six.moves.urllib_parse import urlparse

import ucsschool.lib.internetrules as rules
import univention.admin.modules as udm_modules
import univention.admin.objects as udm_objects
import univention.config_registry
from ucsschool.lib.models.group import Group
from ucsschool.lib.school_umc_base import LDAP_Filter, SchoolBaseModule, SchoolSanitizer
from ucsschool.lib.school_umc_ldap_connection import LDAP_Connection
from univention.lib.i18n import Translation
from univention.management.console.log import MODULE
from univention.management.console.modules import UMC_Error
from univention.management.console.modules.decorators import sanitize
from univention.management.console.modules.sanitizers import (
    BooleanSanitizer,
    ChoicesSanitizer,
    DictSanitizer,
    IntegerSanitizer,
    LDAPSearchSanitizer,
    ListSanitizer,
    StringSanitizer,
)

_ = Translation("ucs-school-umc-internetrules").translate

_filterTypes = {"whitelist": rules.WHITELIST, "blacklist": rules.BLACKLIST}
_filterTypesInv = {_i[1]: _i[0] for _i in _filterTypes.items()}


[docs] class Instance(SchoolBaseModule):
[docs] @sanitize( pattern=LDAPSearchSanitizer(required=False, default="", use_asterisks=True, add_asterisks=True) ) def query(self, request): """ Searches for internet filter rules requests.options = {} 'pattern' -- pattern to match within the rule name or the list of domains """ MODULE.info("internetrules.query: options: %s" % (request.options,)) pattern = request.options.get("pattern", "").lower() def _matchDomain(domains): # helper function to match pattern within the list of domains matches = [idom for idom in domains if pattern in idom.lower()] return 0 < len(matches) # filter out all rules that match the given pattern result = [ { "name": irule.name, "type": _filterTypesInv[irule.type], "domains": len(irule.domains), "priority": irule.priority, "wlan": irule.wlan, } for irule in rules.list() if pattern in irule.name.lower() or _matchDomain(irule.domains) or fnmatch.fnmatchcase(irule.name, pattern) ] MODULE.info("internetrules.query: results: %s" % (result,)) self.finished(request.id, result)
[docs] @sanitize(StringSanitizer()) def get(self, request): """ Returns the specified rules requests.options = [ <ruleName>, ... ] """ MODULE.info("internetrules.get: options: %s" % (request.options,)) result = [] # fetch all rules with the given names (we need to make sure that "name" is UTF8) names = set(request.options) result = [ { "name": irule.name, "type": _filterTypesInv[irule.type], "domains": irule.domains, "priority": irule.priority, "wlan": irule.wlan, } for irule in rules.list() if irule.name in names ] MODULE.info("internetrules.get: results: %s" % (result,)) self.finished(request.id, result)
[docs] @sanitize(DictSanitizer({"object": StringSanitizer()}, required=True)) def remove(self, request): """ Removes the specified rules requests.options = [ { "object": <ruleName> }, ... ] """ MODULE.info("internetrules.remove: options: %s" % (request.options,)) result = [] # fetch all rules with the given names for ientry in request.options: iname = ientry["object"] success = False if iname: success = rules.remove(iname) result.append({"name": iname, "success": success}) MODULE.info("internetrules.remove: results: %s" % (result,)) self.finished(request.id, result)
@staticmethod def _parseRule(iprops, forceAllProperties=False): # validate types for ikey, itype in ( ("name", six.string_types), ("type", six.string_types), ("priority", (int, six.string_types)), ("wlan", bool), ("domains", list), ): if ikey not in iprops: if forceAllProperties: # raise exception as the key is not present raise ValueError(_('The key "%s" has not been specified: %s') % (ikey, iprops)) continue if not isinstance(iprops[ikey], itype): typeStr = "" if isinstance(itype, tuple): typeStr = ", ".join(i.__name__ for i in itype) else: typeStr = itype.__name__ raise ValueError(_('The key "%s" needs to be of type: %s') % (ikey, typeStr)) # validate name if "name" in iprops and not univention.config_registry.validate_key(iprops["name"]): raise ValueError( _( 'Invalid rule name "%s". The name needs to be a string, the following special ' "characters are not allowed: %s" ) % ( iprops.get("name"), '!, ", §, $, %, &, (, ), [, ], {, }, =, ?, `, +, #, \', ",", ;, <, >, \\', ) ) # validate type if "type" in iprops and iprops["type"] not in _filterTypes: raise ValueError(_("Filter type is unknown: %s") % iprops["type"]) # validate domains if "domains" in iprops: parsedDomains = [] for idomain in iprops["domains"]: def _validValueChar(): # helper function to check for invalid characters return not set(idomain) & set(univention.config_registry.backend.INVALID_VALUE_CHARS) if not isinstance(idomain, six.string_types) or not _validValueChar(): raise ValueError(_("Invalid domain ")) # parse domain domain = idomain if "://" not in domain: # make sure that we have a scheme defined for parsing MODULE.info("Adding a leading scheme for parsing of domain: %s" % idomain) domain = "http://%s" % domain domain = urlparse(domain).hostname MODULE.info("Parsed domain: %s -> %s" % (idomain, domain)) if not domain: raise ValueError( _( 'The specified domain "%s" is not valid. Please specify a valid domain ' 'name, such as "wikipedia.org", "facebook.com"' ) % idomain ) # add domain to list of parsed domains parsedDomains.append(domain) # save parsed domains in the dict iprops["domains"] = parsedDomains return iprops
[docs] @sanitize( DictSanitizer( { "object": DictSanitizer( { "name": StringSanitizer(required=True), "type": ChoicesSanitizer(list(_filterTypes.keys()), required=True), "wlan": BooleanSanitizer(required=True), "priority": IntegerSanitizer(required=True), "domains": ListSanitizer(StringSanitizer(required=True), required=True), }, required=True, ) }, required=True, ) ) def add(self, request): """ Add the specified new rules: requests.options = [ { 'object': { 'name': <str>, 'type': 'whitelist' | 'blacklist', 'priority': <int> | <str>, 'wlan': <bool>, 'domains': [<str>, ...], } }, ... ] """ # try to create all specified projects result = [] for ientry in request.options: iprops = ientry["object"] try: # make sure that the rule does not already exist irule = rules.load(iprops["name"]) if irule: raise ValueError( _("A rule with the same name does already exist: %s") % iprops["name"] ) # parse the properties parsedProps = self._parseRule(iprops, True) # create a new rule from the user input newRule = rules.Rule( name=parsedProps["name"], type=_filterTypes[parsedProps["type"]], priority=parsedProps["priority"], wlan=parsedProps["wlan"], domains=parsedProps["domains"], ) # try to save filter rule newRule.save() MODULE.info("Created new rule: %s" % newRule) # everything ok result.append({"name": iprops["name"], "success": True}) except (ValueError, KeyError) as e: # data not valid... create error info MODULE.info( 'data for internet filter rule "%s" is not valid: %s' % (iprops.get("name"), e) ) result.append({"name": iprops.get("name"), "success": False, "details": str(e)}) # return the results self.finished(request.id, result)
[docs] @sanitize( DictSanitizer( { "object": DictSanitizer( { "name": StringSanitizer(required=True), "type": ChoicesSanitizer(list(_filterTypes.keys()), required=True), "wlan": BooleanSanitizer(required=True), "priority": IntegerSanitizer(required=True), "domains": ListSanitizer(StringSanitizer(required=True), required=True), }, required=True, ), "options": DictSanitizer({"name": StringSanitizer()}, required=True), }, required=True, ) ) def put(self, request): """ Modify an existing rule: requests.options = [ { 'object': { 'name': <str>, # optional 'type': 'whitelist' | 'blacklist', # optional 'priority': <int>, # optional 'wlan': <bool>, # optional 'domains': [<str>, ...], # optional }, 'options': { 'name': <str> # the original name of the object } }, ... ] """ # try to create all specified projects result = [] for ientry in request.options: try: # get properties and options from entry iprops = ientry["object"] iname = None ioptions = ientry.get("options") if ioptions: iname = ioptions.get("name") if not iname: raise ValueError(_('No "name" attribute has been specified in the options.')) # make sure that the rule already exists irule = rules.load(iname) if not irule: raise ValueError( _("The rule does not exist and cannot be modified: %s") % iprops.get("name", "") ) # parse the properties self._parseRule(iprops) if iprops.get("name", iname) != iname: # name has been changed -> remove old rule and create a new one rules.remove(iname) irule.name = iprops["name"] if "type" in iprops: # set rule type, move all domains from the previous type oldDomains = irule.domains irule.domains = [] irule.type = _filterTypes[iprops["type"]] irule.domains = oldDomains if "priority" in iprops: # set priority irule.priority = iprops["priority"] if "wlan" in iprops: # set wlan irule.wlan = iprops["wlan"] if "domains" in iprops: # set domains irule.domains = iprops["domains"] # try to save filter rule irule.save() MODULE.info("Saved rule: %s" % irule) # everything ok result.append({"name": iname, "success": True}) except ValueError as e: # data not valid... create error info MODULE.info( 'data for internet filter rule "%s" is not valid: %s' % (iprops.get("name"), e) ) result.append({"name": iprops.get("name"), "success": False, "details": str(e)}) # return the results self.finished(request.id, result)
[docs] @sanitize( school=SchoolSanitizer(required=True), pattern=LDAPSearchSanitizer(required=False, default="", use_asterisks=True, add_asterisks=False), ) @LDAP_Connection() def groups_query(self, request, ldap_user_read=None, ldap_position=None): """List all groups (classes, workgroups) and their assigned internet rule""" school = request.options["school"] pattern = LDAP_Filter.forAll( request.options.get("pattern", ""), ["name", "description"], _escape_filter_chars=False, school_prefix=school, ) pattern_with_school_suffix = LDAP_Filter.forAll( request.options.get("pattern", ""), ["name", "description"], _escape_filter_chars=False, school_suffix=school, ) pattern_with_space_school_suffix = LDAP_Filter.forAll( request.options.get("pattern", ""), ["name", "description"], _escape_filter_chars=False, school_suffix=school, seperator=" ", # For the case: Domain Users <OU> and possibly others? ) # Bug #55034: In case values in ucsschool/ldap/default/groupprefix/* # are changed and do not end with a space or "-". pattern_with_school_suffix_without_separator = LDAP_Filter.forAll( request.options.get("pattern", ""), ["name", "description"], _escape_filter_chars=False, school_suffix=school, seperator="", ) groups = [ x for x in Group.get_all( ldap_user_read, school, "(|{}{}{}{})".format( pattern, pattern_with_school_suffix, pattern_with_space_school_suffix, pattern_with_school_suffix_without_separator, ), ) if not x.self_is_computerroom() ] internet_rules = rules.getGroupRuleName([i.name for i in groups]) name = re.compile("-%s$" % (re.escape(school)), flags=re.IGNORECASE) result = [ { "name": i.get_relative_name() if hasattr(i, "get_relative_name") else name.sub("", i.name), "$dn$": i.dn, "rule": internet_rules.get(i.name, "default") or "$default$", } for i in groups ] result.sort(key=lambda x: x["name"]) self.finished(request.id, result)
[docs] @sanitize( DictSanitizer({"group": StringSanitizer(required=True), "rule": StringSanitizer(required=True)}) ) @LDAP_Connection() def groups_assign(self, request, ldap_user_read=None, ldap_position=None): """ Assigns default rules to groups: request.options = [ { 'group': <groupDN>, 'rule': <ruleName> }, ... ] """ MODULE.info("internetrules.groups_assign: options: %s" % str(request.options)) # try to load all group rules newRules = {} rmRules = [] for ientry in request.options: # make sure the group exists igrp = udm_objects.get( udm_modules.get("groups/group"), None, ldap_user_read, ldap_position, ientry["group"] ) if not igrp: raise UMC_Error("unknown group object") igrp.open() # check the rule name irule = ientry["rule"] if irule == "$default$": # remove the rule rmRules.append(igrp["name"]) else: try: # make sure the rule name is valid self._parseRule({"name": irule}) except ValueError as exc: raise UMC_Error(str(exc)) # add new rule newRules[igrp["name"]] = irule # assign default filter rules to groups rules.setGroupRuleName(newRules) rules.unsetGroupRuleName(rmRules) MODULE.info("internetrules.groups_assign: finished") self.finished(request.id, True)