Source code for univention.testing.ucsschool.importnetworks

# -*- coding: utf-8 -*-

from __future__ import print_function

import ipaddress
import os
import random
import subprocess
import tempfile

import univention.config_registry
import univention.testing.ucsschool.ucs_test_school as utu
from univention.testing import utils
from univention.testing.ucsschool.computer import random_ip
from univention.testing.ucsschool.importou import get_school_base

configRegistry = univention.config_registry.ConfigRegistry()
configRegistry.load()


[docs] def get_reverse_net(network, netmask): p = subprocess.Popen( [ "/usr/bin/univention-ipcalc6", "--ip", network, "--netmask", netmask, "--output", "reverse", "--calcdns", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) (stdout, stderr) = p.communicate() output = stdout.decode("UTF-8").strip().split(".") output.reverse() return ".".join(output)
[docs] class Network: def __init__(self, school, prefixlen): assert prefixlen > 7 assert prefixlen < 25 self._net = ipaddress.IPv4Interface("%s/%s" % (random_ip(), prefixlen)) self.network = str(self._net.network) self.iprange = "%s-%s" % ( self._net.network.network_address + 1, self._net.network.network_address + 10, ) self.defaultrouter = self._net.network.network_address + 1 self.nameserver = self._net.network.network_address + 2 self.netbiosserver = self._net.network.network_address + 8 self.router_mode = False self.school = school self.name = "%s-%s" % (self.school, self._net.network.network_address) self.school_base = get_school_base(self.school) self.dn = "cn=%s,cn=networks,%s" % (self.name, self.school_base) self.dhcp_zone = "cn=%s,cn=dhcp,%s" % (self.school, self.school_base) self.dns_forward_zone = "zoneName=%s,cn=dns,%s" % ( configRegistry.get("domainname"), configRegistry.get("ldap/base"), ) reverse_subnet = get_reverse_net( str(self._net.network.network_address), str(self._net.network.netmask) ) self.dns_reverse_zone = "zoneName=%s.in-addr.arpa,cn=dns,%s" % ( reverse_subnet, configRegistry.get("ldap/base"), ) def __str__(self): delimiter = "\t" line = self.school line += delimiter line += self.network line += delimiter if self.iprange: line += self.iprange line += delimiter line += str(self.defaultrouter) line += delimiter line += str(self.nameserver) line += delimiter line += str(self.netbiosserver) return line
[docs] def expected_attributes(self): attr = { "cn": [self.name], "univentionNetmask": [str(self._net.network.prefixlen)], "univentionNetwork": [str(self._net.network.network_address)], "univentionDnsForwardZone": [self.dns_forward_zone], "univentionDnsReverseZone": [self.dns_reverse_zone], } if self.iprange: attr["univentionIpRange"] = [self.iprange.replace("-", " ")] return attr
[docs] def verify(self): print("verify network: %s" % self.network) utils.verify_ldap_object(self.dn, expected_attr=self.expected_attributes(), should_exist=True) utils.verify_ldap_object(self.dns_forward_zone, should_exist=True) utils.verify_ldap_object(self.dns_reverse_zone, should_exist=True) utils.verify_ldap_object(self.dhcp_zone, should_exist=True) lo = univention.uldap.getMachineConnection() search_filter = ( "(&(cn=%s)(objectClass=univentionDhcpSubnet))" % self._net.network.network_address ) subnet_dn = lo.searchDn( base=self.dhcp_zone, filter=search_filter, unique=True, required=True, )[0] if self.defaultrouter: defaultrouter_policy_dn = "cn=%s,cn=routing,cn=dhcp,cn=policies,%s" % ( self.name, self.school_base, ) utils.verify_ldap_object( defaultrouter_policy_dn, expected_attr={"univentionDhcpRouters": [str(self.defaultrouter)]}, should_exist=True, ) utils.verify_ldap_object( subnet_dn, expected_attr={"univentionPolicyReference": [defaultrouter_policy_dn]}, strict=False, should_exist=True, ) if self.nameserver and not self.router_mode: nameserver_policy_dn = "cn=%s,cn=dns,cn=dhcp,cn=policies,%s" % (self.name, self.school_base) utils.verify_ldap_object( nameserver_policy_dn, expected_attr={ "univentionDhcpDomainName": [configRegistry.get("domainname")], "univentionDhcpDomainNameServers": [str(self.nameserver)], }, should_exist=True, ) utils.verify_ldap_object( subnet_dn, expected_attr={"univentionPolicyReference": [nameserver_policy_dn]}, strict=False, should_exist=True, ) if self.netbiosserver and not self.router_mode: netbios_policy_dn = "cn=%s,cn=netbios,cn=dhcp,cn=policies,%s" % (self.name, self.school_base) utils.verify_ldap_object( netbios_policy_dn, expected_attr={ "univentionDhcpNetbiosNodeType": ["8"], "univentionDhcpNetbiosNameServers": [str(self.netbiosserver)], }, should_exist=True, ) utils.verify_ldap_object( subnet_dn, expected_attr={"univentionPolicyReference": [netbios_policy_dn]}, strict=False, should_exist=True, )
[docs] def set_mode_to_router(self): self.router_mode = True
[docs] class ImportFile: def __init__(self, use_cli_api, use_python_api): self.router_mode = False self.use_cli_api = use_cli_api self.use_python_api = use_python_api self.import_fd, self.import_file = tempfile.mkstemp() os.close(self.import_fd)
[docs] def write_import(self, data): self.import_fd = os.open(self.import_file, os.O_RDWR | os.O_CREAT) os.write(self.import_fd, data.encode("utf-8")) os.close(self.import_fd)
[docs] def run_import(self, data): try: self.write_import(data) if self.use_cli_api: self._run_import_via_cli() elif self.use_python_api: self._run_import_via_python_api() print("SCHOOL DATA : %s" % data) finally: os.remove(self.import_file)
def _run_import_via_cli(self): if self.router_mode: cmd_block = ["/usr/share/ucs-school-import/scripts/import_router", self.import_file] else: cmd_block = ["/usr/share/ucs-school-import/scripts/import_networks", self.import_file] print("cmd_block: %r" % cmd_block) subprocess.check_call(cmd_block) def _run_import_via_python_api(self): raise NotImplementedError()
[docs] def set_mode_to_router(self): self.router_mode = True
[docs] class NetworkImport: def __init__(self, ou_name, nr_networks=5): assert nr_networks > 3 self.school = ou_name self.networks = [ Network(self.school, prefixlen=random.randint(8, 24)) for _i in range(nr_networks) ] self.networks[1].iprange = None def __str__(self): lines = [str(network) for network in self.networks] return "\n".join(lines)
[docs] def verify(self): for network in self.networks: network.verify()
[docs] def set_mode_to_router(self): for network in self.networks: network.set_mode_to_router()
[docs] def modify(self): self.networks[0].defaultrouter += 2 # self.networks[1].defaultrouter = '' self.networks[2].nameserver += 3 self.networks[3].defaultrouter += 3 self.networks[3].netbiosserver += 3
[docs] def create_and_verify_networks(use_cli_api=True, use_python_api=False, nr_networks=5): assert use_cli_api != use_python_api with utu.UCSTestSchool() as schoolenv: ou_name, ou_dn = schoolenv.create_ou(name_edudc=schoolenv.ucr.get("hostname")) print("********** Generate school data") network_import = NetworkImport(ou_name, nr_networks=nr_networks) print(network_import) import_file = ImportFile(use_cli_api, use_python_api) print("********** Create networks") import_file.run_import(str(network_import)) network_import.verify() print("********** Create routers") network_import.set_mode_to_router() import_file.set_mode_to_router() network_import.modify() import_file.run_import(str(network_import)) network_import.verify()
[docs] def import_networks_basics(use_cli_api=True, use_python_api=False): create_and_verify_networks(use_cli_api, use_python_api, 10)