"""
.. module:: School
:platform: Unix
.. moduleauthor:: Ammar Najjar <najjar@univention.de>
"""
from __future__ import print_function
import univention.testing.strings as uts
import univention.testing.ucr as ucr_test
import univention.uldap
from ucsschool.lib.roles import create_ucsschool_role_string, role_school_admin_group
from univention.testing import utils
from univention.testing.ucsschool.computer import random_ip
from univention.testing.ucsschool.importou import (
TYPE_DC_ADMINISTRATIVE,
TYPE_DC_EDUCATIONAL,
DCMembership,
DCNotFound,
DhcpdLDAPBase,
get_ou_base,
get_school_ou_from_dn,
verify_dc,
)
from univention.testing.ucsschool.ucs_test_school import UCSTestSchool
from univention.testing.umc import Client
# This code does not create correct school replication nodes. Do not use this for new tests!
[docs]
def create_dc_slave(udm, school=None):
with ucr_test.UCSTestConfigRegistry() as ucr:
account = utils.UCSTestDomainAdminCredentials()
admin = account.binddn
passwd = account.bindpw
ldap_base = ucr.get("ldap/base")
ip = random_ip()
name = uts.random_name()
def dns_forward():
return "zoneName=%s,cn=dns,%s" % (ucr.get("domainname"), ucr.get("ldap/base"))
def dns_reverse(ip):
return "zoneName=%s.in-addr.arpa,cn=dns,%s" % (
".".join(reversed(ip.split(".")[:3])),
ucr.get("ldap/base"),
)
print("Creating Replica Directory Node (%s)" % name)
position = "cn=dc,cn=computers,%s" % ldap_base
if not ucr.is_true("ucsschool/singlemaster", False):
position = "cn=dc,cn=server,cn=computers,ou=%s,%s" % (school, ldap_base)
dn = udm.create_object(
"computers/domaincontroller_slave",
binddn=admin,
bindpwd=passwd,
position=position,
ip=ip,
name=name,
options=["nagios=False"],
)
if dn:
return name, dn
else:
utils.fail("Could not create a Replica Directory Node via udm")
[docs]
class School(object):
"""
Contains the needed functuality for schools in the UMC module schoolwizards/schools.
By default they are randomly formed.\n
:param connection:
:type connection: UMC connection object
:param ucr:
:type ucr: UCR object
:param name: name of the school to be created later
:type name: str
:param display_name: display_name of the school to be created later
:type display_name: str
"""
# Initialization (Random by default)
def __init__(self, display_name=None, name=None, dc_name=None, ucr=None, connection=None):
self.class_share_file_server = None
self.home_share_file_server = None
self.name = name if name else uts.random_string()
self.display_name = display_name if display_name else uts.random_string()
self.ucr = ucr if ucr else ucr_test.UCSTestConfigRegistry()
self.ucr.load()
singlemaster = self.ucr.is_true("ucsschool/singlemaster")
if singlemaster:
self.dc_name = None
else:
self.dc_name = dc_name if dc_name else uts.random_string()
if connection:
self.client = connection
else:
self.client = Client.get_test_connection()
def __enter__(self):
return self
def __exit__(self, type, value, trace_back):
self.ucr.revert_to_original_registry()
[docs]
def create(self):
"""Creates object school"""
flavor = "schoolwizards/schools"
param = [
{
"object": {
"name": self.name,
"dc_name": self.dc_name,
"display_name": self.display_name,
},
"options": None,
}
]
print("Creating school %s" % (self.name,))
print("param = %s" % (param,))
reqResult = self.client.umc_command("schoolwizards/schools/add", param, flavor).result
assert reqResult[0] is True, "Unable to create school (%r)" % (reqResult,)
utils.wait_for_replication()
[docs]
def get(self):
"""get the list of existing schools in the school"""
flavor = "schoolwizards/schools"
param = [{"object": {"$dn$": self.dn()}}]
reqResult = self.client.umc_command("schoolwizards/schools/get", param, flavor).result
return reqResult
[docs]
def check_get(self, attrs):
current_attrs = self.get()[0]
expected = dict(current_attrs)
expected.update(attrs)
assert current_attrs == expected, "Attributes do not match,\nfound (%r)\nexpected (%r)" % (
current_attrs,
expected,
)
[docs]
def query(self):
"""get the list of existing schools in the school"""
flavor = "schoolwizards/schools"
param = {"school": "undefined", "filter": ""}
reqResult = self.client.umc_command("schoolwizards/schools/query", param, flavor).result
return reqResult
[docs]
def check_query(self, names):
q = self.query()
k = [x["name"] for x in q]
assert set(names).issubset(
set(k)
), "schools from query do not contain the existing schools, found (%r), expected (%r)" % (
k,
names,
)
[docs]
def dn(self):
return UCSTestSchool().get_ou_base_dn(self.name)
[docs]
def remove(self):
"""Remove school"""
print("Removing school: %s" % self.name)
flavor = "schoolwizards/schools"
param = [{"object": {"$dn$": self.dn()}, "options": None}]
reqResult = self.client.umc_command("schoolwizards/schools/remove", param, flavor).result
assert reqResult[0], "Unable to remove school (%s)" % self.name
utils.wait_for_replication()
[docs]
def edit(self, new_attributes):
"""Edit object school"""
flavor = "schoolwizards/schools"
if self.dc_name:
host = self.dc_name
if new_attributes.get("home_share_file_server"):
host = new_attributes["home_share_file_server"]
home_share = "cn=%s,cn=dc,cn=server,cn=computers,%s" % (
host,
UCSTestSchool().get_ou_base_dn(self.name),
)
host = self.dc_name
if new_attributes.get("class_share_file_server"):
host = new_attributes["class_share_file_server"]
class_share = "cn=%s,cn=dc,cn=server,cn=computers,%s" % (
host,
UCSTestSchool().get_ou_base_dn(self.name),
)
else:
host = self.ucr.get("hostname")
if new_attributes.get("home_share_file_server"):
host = new_attributes["home_share_file_server"]
home_share = "cn=%s,cn=dc,cn=computers,%s" % (host, self.ucr.get("ldap/base"))
host = self.ucr.get("hostname")
if new_attributes.get("class_share_file_server"):
host = new_attributes["class_share_file_server"]
class_share = "cn=%s,cn=dc,cn=computers,%s" % (host, self.ucr.get("ldap/base"))
param = [
{
"object": {
"$dn$": self.dn(),
"name": self.name,
"home_share_file_server": home_share,
"class_share_file_server": class_share,
"dc_name": self.dc_name,
"display_name": new_attributes["display_name"],
},
"options": None,
}
]
print("Editing school %s" % (self.name,))
print("param = %s" % (param,))
reqResult = self.client.umc_command("schoolwizards/schools/put", param, flavor).result
assert reqResult[0], "Unable to edit school (%s) with the parameters (%r)" % (self.name, param)
self.home_share_file_server = home_share
self.class_share_file_server = class_share
self.display_name = new_attributes["display_name"]
utils.wait_for_replication()
[docs]
def verify_ldap(self, should_exist):
homeshare = ""
classshare = ""
if self.home_share_file_server:
homeshare = self.home_share_file_server[3:].split(",")[0]
if self.class_share_file_server:
classshare = self.class_share_file_server[3:].split(",")[0]
self.verify_ou(self.name, self.dc_name, self.ucr, homeshare, classshare, None, should_exist)
[docs]
def verify_ou(
self, ou, dc, ucr, homesharefileserver, classsharefileserver, dc_administrative, must_exist
):
print("*** Verifying OU (%s) ... " % ou)
ucr.load()
dc_name = ucr.get("hostname")
old_dhcpd_ldap_base = ucr.get("dhcpd/ldap/base")
lo = univention.uldap.getMachineConnection()
base_dn = ucr.get("ldap/base")
cn_pupils = ucr.get("ucsschool/ldap/default/container/pupils", "schueler")
cn_teachers = ucr.get("ucsschool/ldap/default/container/teachers", "lehrer")
cn_teachers_staff = ucr.get(
"ucsschool/ldap/default/container/teachers-and-staff", "lehrer und mitarbeiter"
)
cn_admins = ucr.get("ucsschool/ldap/default/container/admins", "admins")
cn_staff = ucr.get("ucsschool/ldap/default/container/staff", "mitarbeiter")
singlemaster = ucr.is_true("ucsschool/singlemaster")
noneducational_create_objects = ucr.is_true("ucsschool/ldap/noneducational/create/objects")
district_enable = ucr.is_true("ucsschool/ldap/district/enable")
# default_dcs = ucr.get('ucsschool/ldap/default/dcs')
dhcp_dns_clearou = ucr.is_true("ucsschool/import/generate/policy/dhcp/dns/clearou")
ou_base = get_ou_base(ou, district_enable)
# does dc exist?
if singlemaster:
dc_dn = ucr.get("ldap/hostdn")
dc_name = ucr.get("hostname")
elif dc:
dc_dn = "cn=%s,cn=dc,cn=server,cn=computers,%s" % (dc, ou_base)
dc_name = dc
else:
dc_dn = "cn=dc%s,cn=dc,cn=server,cn=computers,%s" % (ou, ou_base)
dc_name = "dc%s" % ou
homesharefileserver_dn = dc_dn
if homesharefileserver:
result = lo.searchDn(
filter="(&(objectClass=univentionDomainController)(cn=%s))" % homesharefileserver,
base=base_dn,
)
if result:
homesharefileserver_dn = result[0]
classsharefileserver_dn = dc_dn
if classsharefileserver:
result = lo.searchDn(
filter="(&(objectClass=univentionDomainController)(cn=%s))" % classsharefileserver,
base=base_dn,
)
if result:
classsharefileserver_dn = result[0]
utils.verify_ldap_object(
ou_base,
expected_attr={
"ou": [ou],
"ucsschoolClassShareFileServer": [classsharefileserver_dn],
"ucsschoolHomeShareFileServer": [homesharefileserver_dn],
},
should_exist=must_exist,
)
utils.verify_ldap_object(
"cn=printers,%s" % ou_base, expected_attr={"cn": ["printers"]}, should_exist=must_exist
)
utils.verify_ldap_object(
"cn=users,%s" % ou_base, expected_attr={"cn": ["users"]}, should_exist=must_exist
)
utils.verify_ldap_object(
"cn=%s,cn=users,%s" % (cn_pupils, ou_base),
expected_attr={"cn": [cn_pupils]},
should_exist=must_exist,
)
utils.verify_ldap_object(
"cn=%s,cn=users,%s" % (cn_teachers, ou_base),
expected_attr={"cn": [cn_teachers]},
should_exist=must_exist,
)
utils.verify_ldap_object(
"cn=%s,cn=users,%s" % (cn_admins, ou_base),
expected_attr={"cn": [cn_admins]},
should_exist=must_exist,
)
utils.verify_ldap_object(
"cn=computers,%s" % ou_base, expected_attr={"cn": ["computers"]}, should_exist=must_exist
)
utils.verify_ldap_object(
"cn=server,cn=computers,%s" % ou_base,
expected_attr={"cn": ["server"]},
should_exist=must_exist,
)
utils.verify_ldap_object(
"cn=dc,cn=server,cn=computers,%s" % ou_base,
expected_attr={"cn": ["dc"]},
should_exist=must_exist,
)
utils.verify_ldap_object(
"cn=networks,%s" % ou_base, expected_attr={"cn": ["networks"]}, should_exist=must_exist
)
utils.verify_ldap_object(
"cn=groups,%s" % ou_base, expected_attr={"cn": ["groups"]}, should_exist=must_exist
)
utils.verify_ldap_object(
"cn=%s,cn=groups,%s" % (cn_pupils, ou_base),
expected_attr={"cn": [cn_pupils]},
should_exist=must_exist,
)
utils.verify_ldap_object(
"cn=%s,cn=groups,%s" % (cn_teachers, ou_base),
expected_attr={"cn": [cn_teachers]},
should_exist=must_exist,
)
utils.verify_ldap_object(
"cn=klassen,cn=%s,cn=groups,%s" % (cn_pupils, ou_base),
expected_attr={"cn": ["klassen"]},
should_exist=must_exist,
)
utils.verify_ldap_object(
"cn=raeume,cn=groups,%s" % ou_base, expected_attr={"cn": ["raeume"]}, should_exist=must_exist
)
utils.verify_ldap_object(
"cn=dhcp,%s" % ou_base, expected_attr={"cn": ["dhcp"]}, should_exist=must_exist
)
utils.verify_ldap_object(
"cn=policies,%s" % ou_base, expected_attr={"cn": ["policies"]}, should_exist=must_exist
)
utils.verify_ldap_object(
"cn=shares,%s" % ou_base, expected_attr={"cn": ["shares"]}, should_exist=must_exist
)
utils.verify_ldap_object(
"cn=klassen,cn=shares,%s" % ou_base,
expected_attr={"cn": ["klassen"]},
should_exist=must_exist,
)
utils.verify_ldap_object(
"cn=dc,cn=server,cn=computers,%s" % ou_base,
expected_attr={"cn": ["dc"]},
should_exist=must_exist,
)
if noneducational_create_objects:
utils.verify_ldap_object("cn=%s,cn=users,%s" % (cn_staff, ou_base), should_exist=must_exist)
utils.verify_ldap_object(
"cn=%s,cn=users,%s" % (cn_teachers_staff, ou_base), should_exist=must_exist
)
utils.verify_ldap_object("cn=%s,cn=groups,%s" % (cn_staff, ou_base), should_exist=must_exist)
else:
utils.verify_ldap_object("cn=%s,cn=users,%s" % (cn_staff, ou_base), should_exist=False)
utils.verify_ldap_object(
"cn=%s,cn=users,%s" % (cn_teachers_staff, ou_base), should_exist=False
)
utils.verify_ldap_object("cn=%s,cn=groups,%s" % (cn_staff, ou_base), should_exist=False)
if noneducational_create_objects:
utils.verify_ldap_object(
"cn=DC-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % base_dn, should_exist=True
)
utils.verify_ldap_object(
"cn=Member-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % base_dn, should_exist=True
)
utils.verify_ldap_object(
"cn=OU%s-DC-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % (ou, base_dn),
should_exist=must_exist,
)
utils.verify_ldap_object(
"cn=OU%s-Member-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % (ou, base_dn),
should_exist=must_exist,
)
# This will fail because we don't cleanup these groups in cleanup_ou
# else:
# utils.verify_ldap_object("cn=DC-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % base_dn,
# should_exist=False)
# utils.verify_ldap_object("cn=Member-Verwaltungsnetz,cn=ucsschool,cn=groups,%s" % base_dn,
# should_exist=False)
# utils.verify_ldap_object('cn=OU%s-DC-Verwaltungsnetz,cn=ucsschool,cn=groups,%s' % (ou,
# base_dn), should_exist=False)
# utils.verify_ldap_object('cn=OU%s-Member-Verwaltungsnetz,cn=ucsschool,cn=groups,%s' % (ou,
# base_dn), should_exist=False)
if not singlemaster:
verify_dc(ou, dc_name, TYPE_DC_EDUCATIONAL, base_dn, must_exist)
if dc_administrative:
verify_dc(ou, dc_administrative, TYPE_DC_ADMINISTRATIVE, base_dn, must_exist)
grp_prefix_pupils = ucr.get("ucsschool/ldap/default/groupprefix/pupils", "schueler-")
grp_prefix_teachers = ucr.get("ucsschool/ldap/default/groupprefix/teachers", "lehrer-")
grp_prefix_admins = ucr.get("ucsschool/ldap/default/groupprefix/admins", "admins-")
grp_prefix_staff = ucr.get("ucsschool/ldap/default/groupprefix/staff", "mitarbeiter-")
grp_policy_pupils = ucr.get(
"ucsschool/ldap/default/policy/umc/pupils",
"cn=ucsschool-umc-pupils-default,cn=UMC,cn=policies,%s" % base_dn,
)
grp_policy_teachers = ucr.get(
"ucsschool/ldap/default/policy/umc/teachers",
"cn=ucsschool-umc-teachers-default,cn=UMC,cn=policies,%s" % base_dn,
)
grp_policy_admins = ucr.get(
"ucsschool/ldap/default/policy/umc/admins",
"cn=ucsschool-umc-admins-default,cn=UMC,cn=policies,%s" % base_dn,
)
grp_policy_staff = ucr.get(
"ucsschool/ldap/default/policy/umc/staff",
"cn=ucsschool-umc-staff-default,cn=UMC,cn=policies,%s" % base_dn,
)
utils.verify_ldap_object(
"cn=%s%s,cn=ouadmins,cn=groups,%s" % (grp_prefix_admins, ou, base_dn),
expected_attr={
"univentionPolicyReference": [grp_policy_admins],
"ucsschoolRole": [create_ucsschool_role_string(role_school_admin_group, ou)],
},
should_exist=must_exist,
)
utils.verify_ldap_object(
"cn=%s%s,cn=groups,%s" % (grp_prefix_pupils, ou, ou_base),
expected_attr={"univentionPolicyReference": [grp_policy_pupils]},
should_exist=must_exist,
)
utils.verify_ldap_object(
"cn=%s%s,cn=groups,%s" % (grp_prefix_teachers, ou, ou_base),
expected_attr={"univentionPolicyReference": [grp_policy_teachers]},
should_exist=must_exist,
)
if noneducational_create_objects:
utils.verify_ldap_object(
"cn=%s%s,cn=groups,%s" % (grp_prefix_staff, ou, ou_base),
expected_attr={"univentionPolicyReference": [grp_policy_staff]},
should_exist=must_exist,
)
dcmaster_module = univention.admin.modules.get("computers/domaincontroller_master")
dcbackup_module = univention.admin.modules.get("computers/domaincontroller_backup")
dcslave_module = univention.admin.modules.get("computers/domaincontroller_slave")
masterobjs = univention.admin.modules.lookup(
dcmaster_module,
None,
lo,
scope="sub",
superordinate=None,
base=base_dn,
filter=univention.admin.filter.expression("cn", dc_name),
)
backupobjs = univention.admin.modules.lookup(
dcbackup_module,
None,
lo,
scope="sub",
superordinate=None,
base=base_dn,
filter=univention.admin.filter.expression("cn", dc_name),
)
slaveobjs = univention.admin.modules.lookup(
dcslave_module,
None,
lo,
scope="sub",
superordinate=None,
base=base_dn,
filter=univention.admin.filter.expression("cn", dc_name),
)
# check group membership
# Replica Directory Node should be member
# Primary Directory Node and Backup Directory Node should not be member
dcgroups = [
"cn=OU%s-DC-Edukativnetz,cn=ucsschool,cn=groups,%s" % (ou, base_dn),
"cn=DC-Edukativnetz,cn=ucsschool,cn=groups,%s" % (base_dn),
]
if must_exist:
if masterobjs:
is_master_or_backup = True
dcobject = masterobjs[0]
elif backupobjs:
is_master_or_backup = True
dcobject = backupobjs[0]
elif slaveobjs:
is_master_or_backup = False
dcobject = slaveobjs[0]
else:
raise DCNotFound()
dcobject.open()
groups = []
membership = False
for group in dcobject.get("groups"):
groups.append(group.lower())
for dcgroup in dcgroups:
if dcgroup.lower() in groups:
membership = True
if is_master_or_backup and membership:
raise DCMembership()
elif not is_master_or_backup and not membership:
raise DCMembership()
ucr.load()
if not singlemaster:
# in multiserver setups all dhcp settings have to be checked
dhcp_dn = "cn=dhcp,%s" % (ou_base)
else:
# in singleserver setup only the first OU sets dhcpd/ldap/base and all following OUs
# should leave the UCR variable untouched.
dhcpd_ldap_base = ucr.get("dhcpd/ldap/base")
if not dhcpd_ldap_base or "ou=" not in dhcpd_ldap_base:
raise DhcpdLDAPBase('dhcpd/ldap/base=%r contains no "ou="' % (dhcpd_ldap_base,))
# use the UCR value and check if the DHCP service exists
dhcp_dn = dhcpd_ldap_base
if not old_dhcpd_ldap_base:
# seems to be the first OU, so check the variable settings
if ucr.get("dhcpd/ldap/base") != "cn=dhcp,%s" % (ou_base,):
print("ERROR: dhcpd/ldap/base =", ucr.get("dhcpd/ldap/base"))
print("ERROR: expected base =", dhcp_dn)
raise DhcpdLDAPBase()
# dhcp
print("LDAP base of dhcpd = %r" % dhcp_dn)
dhcp_service_dn = "cn=%s,%s" % (get_school_ou_from_dn(dhcp_dn, ucr), dhcp_dn)
dhcp_server_dn = "cn=%s,%s" % (dc_name, dhcp_service_dn)
if must_exist:
utils.verify_ldap_object(
dhcp_service_dn,
expected_attr={
"dhcpOption": ['wpad "http://%s.%s/proxy.pac"' % (dc_name, ucr.get("domainname"))]
},
should_exist=True,
)
utils.verify_ldap_object(dhcp_server_dn, should_exist=True)
dhcp_dns_clearou_dn = "cn=dhcp-dns-clear,cn=policies,%s" % ou_base
if dhcp_dns_clearou:
utils.verify_ldap_object(
dhcp_dns_clearou_dn,
expected_attr={"emptyAttributes": ["univentionDhcpDomainNameServers"]},
should_exist=must_exist,
)
try:
utils.verify_ldap_object(
ou_base,
expected_attr={"univentionPolicyReference": [dhcp_dns_clearou_dn]},
should_exist=must_exist,
retry_count=0,
)
except utils.LDAPObjectUnexpectedValue:
# ignore other policies
pass
else:
utils.verify_ldap_object(dhcp_dns_clearou_dn, should_exist=False)