Source code for univention.testing.ucsschool.internetrule

"""
**Class InternetRule**\n
All the operations related to internet rules

.. module:: internetrule
    :platform: Unix

.. moduleauthor:: Ammar Najjar <najjar@univention.de>
"""
from __future__ import print_function

import random

import univention.testing.strings as uts
import univention.testing.ucr as ucr_test
import univention.testing.ucsschool.ucs_test_school as utu
from univention.testing.ucsschool.ucs_test_school import UCSTestSchool
from univention.testing.umc import Client

from .randomdomain import RandomDomain


[docs] class InternetRule(object): """ Contains the needed functionality for internet rules. By default they are randomly formed\n :param connection: :type connection: UMC connection object :param ucr: :type ucr: UCR object :param name: name of the internet rule to be created later :type name: str :param typ: type of the internet rule to be created later :type typ: str='whitelist' or 'blacklist' :param domains: list of the internet rule to be created later :type domains: [str] :param wlan: if the internet rule supports wlan :type wlan: bool :param priority: priority of the internet rule [0,10] :type priority: [int] """ def __init__( self, connection=None, ucr=None, name=None, typ=None, domains=None, wlan=None, priority=None ): self.name = name or uts.random_string() self.typ = typ or random.choice(["whitelist", "blacklist"]) if domains: self.domains = domains else: dom = RandomDomain() domains = dom.getDomainList(random.randint(1, 10)) self.domains = sorted(domains) self.wlan = wlan if isinstance(wlan, bool) else random.choice([True, False]) self.priority = priority if priority is not None else random.randint(0, 10) self.ucr = ucr or ucr_test.UCSTestConfigRegistry() self.client = connection or Client.get_test_connection() def __enter__(self): return self def __exit__(self, type, value, trace_back): self.ucr.revert_to_original_registry() # define the rule umcp
[docs] def define(self): """Define internet rule via UMCP""" param = [ { "object": { "name": self.name, "type": self.typ, "domains": self.domains, "wlan": self.wlan, "priority": self.priority, } } ] print("defining rule %s with UMCP:%s" % (self.name, "internetrules/add")) print("param = %r" % (param,)) reqResult = self.client.umc_command("internetrules/add", param).result assert reqResult[0]["success"], "Unable to define rule (%r)" % (param,)
[docs] def get(self, should_exist): """ gets internet rule via UMCP\n :param should_exist: True if the rule is expected to be found :type should_exist: bool """ print("Calling %s for %s" % ("internetrules/get", self.name)) reqResult = self.client.umc_command("internetrules/get", [self.name]).result assert bool(reqResult) == should_exist, "Unexpected fetching result for internet rule (%r)" % ( self.name, )
[docs] def put(self, new_name=None, new_type=None, new_domains=None, new_wlan=None, new_priority=None): """ Modify internet rule via UMCP\n with no args passed this only reset the rule properties\n :param new_name: :type new_name: str :param new_type: :type new_type: str :param new_domains: :type new_domains: [str] :param new_wlan: :type new_wlan: bool :param new_priority: :type new_priority: int [0,10] """ new_name = new_name or self.name new_type = new_type or self.typ new_domains = new_domains or self.domains new_wlan = new_wlan or self.wlan new_priority = new_priority if new_priority is not None else self.priority param = [ { "object": { "name": new_name, "type": new_type, "domains": new_domains, "wlan": new_wlan, "priority": new_priority, }, "options": {"name": self.name}, } ] print("Modifying rule %s with UMCP:%s" % (self.name, "internetrules/put")) print("param = %r" % (param,)) reqResult = self.client.umc_command("internetrules/put", param).result assert reqResult[0]["success"], "Unable to modify rule (%r)" % (param,) self.name = new_name self.typ = new_type self.domains = new_domains self.wlan = new_wlan self.priority = new_priority
[docs] def remove(self): """removes internet rule via UMCP""" print("Calling %s for %s" % ("internetrules/remove", self.name)) options = [{"object": self.name}] reqResult = self.client.umc_command("internetrules/remove", options).result assert reqResult[0]["success"], "Unable to remove rule (%r)" % (self.name,)
# Fetch the values from ucr and check if it matches # the correct values for the rule
[docs] def checkUcr(self, should_match): """ check ucr for internet rule\n Fetch the values from ucr and check if it matches the correct values for the rule\n :param should_match: :type should_match: bool """ print("Checking UCR for %s" % self.name) self.ucr.load() # extract related items from ucr exItems = {key.split("/")[-1]: value for (key, value) in self.ucr.items() if self.name in key} assert bool(exItems) == should_match, "Unexpected registery items (should_match=%r items=%r)" % ( should_match, exItems, ) if should_match: wlan = str(self.wlan).lower() typ = self.typ if self.typ == "whitelist": typ = "whitelist-block" elif self.typ == "blacklist": typ = "blacklist-pass" curtype = exItems["filtertype"] curWlan = exItems["wlan"] curPriority = int(exItems["priority"]) exDomains = {key: value for (key, value) in exItems.items() if key.isnumeric()} curDomains = sorted(exDomains.values()) currentState = (curtype, curPriority, curWlan, curDomains) assert currentState == ( typ, self.priority, wlan, self.domains, ), "Values in UCR are not updated for rule (%r)" % (self.name,)
# Assign internet rules to workgroups/classes # return a tuple (groupName, ruleName)
[docs] def assign(self, school, groupName, groupType, default=False): """ Assign internet rule via UMCP\n :param school: name of the ou :type school: str :param groupName: name of the group or class :type groupName: str :param groupType: 'workgroup' or 'class' :type groupType: str :param default: if the group is assigned to default values :type default: bool """ self.ucr.load() groupdn = "" schoolenv = utu.UCSTestSchool() school_basedn = schoolenv.get_ou_base_dn(school) if groupType == "workgroup": ucsschool = UCSTestSchool() groupdn = ucsschool.get_workinggroup_dn(school, groupName) elif groupType == "class": groupdn = "cn=%s-%s,cn=klassen,cn=schueler,cn=groups,%s" % (school, groupName, school_basedn) name = "$default$" if default else self.name param = [{"group": groupdn, "rule": name}] print("Assigning rule %s to %s: %s" % (self.name, groupType, groupName)) print("param = %r" % (param,)) result = self.client.umc_command("internetrules/groups/assign", param).result assert result, "Unable to assign internet rule to workgroup (%r)" % (param,) return (groupName, self.name)
# returns a list of all the existing internet rules via UMCP
[docs] def allRules(self): """ Get all defined rules via UMCP\n :returns: [str] list of rules names """ print("Calling %s = get all defined rules" % ("internetrules/query")) ruleList = [] rules = self.client.umc_command("internetrules/query", {"pattern": ""}).result ruleList = sorted([(x["name"]) for x in rules]) return ruleList
[docs] class Check(object): """ Contains the needed functuality for checks related to internet rules within groups/classes.\n :param school: name of the ou :type school: str :param groupRuleCouples: couples of groups and rules assigned to them :type groupRuleCouples: tuple(str,str) :param connection: :type connection: UMC connection object :param ucr: :type ucr: UCR object """ def __init__(self, school, groupRuleCouples, connection=None, ucr=None): self.school = school self.groupRuleCouples = groupRuleCouples self.ucr = ucr or ucr_test.UCSTestConfigRegistry() if connection: self.client = connection else: self.ucr.load() self.client = Client.get_test_connection() def __enter__(self): return self def __exit__(self, type, value, trace_back): self.ucr.revert_to_original_registry()
[docs] def checkRules(self): """Check if the assigned internet rules are correct UMCP""" for groupName, ruleName in self.groupRuleCouples: print("Checking %s rules" % (groupName)) param = {"school": self.school, "pattern": groupName} if ruleName is None: ruleName = "$default$" result = self.client.umc_command("internetrules/groups/query", param).result[0]["rule"] assert result == ruleName, "Assigned rule (%r) to workgroup (%r) doesn't match" % ( ruleName, groupName, )
[docs] def checkUcr(self): """Check ucr variables for groups/ classes internet rules""" self.ucr.load() for groupName, ruleName in self.groupRuleCouples: print("Checking %s UCR variables" % (groupName)) groupid = "proxy/filter/groupdefault/%s-%s" % (self.school, groupName) assert self.ucr.get(groupid) == ruleName, "Ucr variable (%r) is not correctly set" % ( groupid, )