# -*- coding: utf-8 -*-
"""
.. module:: acl
:platform: Unix
.. moduleauthor:: Ammar Najjar <najjar@univention.de>
"""
from __future__ import print_function
import copy
import subprocess
import univention.testing.strings as uts
import univention.testing.ucr as ucr_test
import univention.testing.ucsschool.ucs_test_school as utu
import univention.testing.udm as udm_test
from univention.uldap import getMachineConnection
[docs]
def run_commands(cmdlist, argdict):
"""
Start all commands in cmdlist and replace formatstrings with arguments in argdict.
>>> run_commands([['/bin/echo', '%(msg)s'], ['/bin/echo', 'World']], {'msg': 'Hello'})
[('Hello\n', ''), ('World\n', '')]
:param list cmdlist: list of commands to start
:param dict argdict: formatstrings for commands in `cmdlist`
:return: tuple: (output message, error message)
:rtype: tuple[str, str]
"""
result_list = []
for cmd in cmdlist:
cmd = copy.deepcopy(cmd)
for i, val in enumerate(cmd):
cmd[i] = val % argdict
print("*** %r" % cmd)
out, err = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
result_list.append((out.decode("UTF-8"), err.decode("UTF-8")))
return result_list
[docs]
class CreateContextManager(object):
udm_module = ""
create_kwargs = {}
def __init__(self, container_dn, **kwargs):
"""
Create random object in a specific container:
:param str container_dn: container dn to create the object in
:param kwargs: arguments to pass to `udm.create_object()` additionally to cls.create_kwargs
"""
self.container_dn = container_dn
self.create_kwargs.update(kwargs)
self.udm = None
self.dn = ""
def __enter__(self):
self.udm = udm_test.UCSTestUDM()
self.dn = self.udm.create_object(
self.udm_module, position=self.container_dn, **self.create_kwargs
)
return self.dn
def __exit__(self, exc_type, exc_value, etraceback):
if exc_type:
print("*** Cleanup after exception: %s %s" % (exc_type, exc_value))
self.udm.cleanup()
[docs]
class CreateGroupInContainer(CreateContextManager):
udm_module = "groups/group"
create_kwargs = {"name": uts.random_name()}
[docs]
class CreateDCSlaveInContainer(CreateContextManager):
udm_module = "computers/domaincontroller_slave"
create_kwargs = {"name": uts.random_name()}
[docs]
class Acl(object):
"""
Acl class
contains the basic functuality to test acls for the common container in ucsschool may change with
time.
"""
def __init__(self, school, auth_dn, access_allowance):
"""
:param str school: school name
:param str auth_dn: dn of the authentication actor
:param str access_allowance: the expected access result - `ALLOWED` or `DENIED`
"""
self.school = school
self.auth_dn = auth_dn
self.access_allowance = access_allowance
self.ucr = ucr_test.UCSTestConfigRegistry()
self.ucr.load()
[docs]
def assert_acl(self, target_dn, access, attrs, access_allowance=None):
"""
Test ACL rule:
:param str target_dn: Target dn to test access to
:param attrs: names of the attributes to test acl against
:type attrs: list[str]
:param str access: type of access - `read`, `write` or `none`
"""
access_allowance = access_allowance if access_allowance else self.access_allowance
print(
"\n * Targetdn = %s\n * Authdn = %s\n * Access = %s\n * Access allowance = %s\n"
% (
target_dn,
self.auth_dn,
access,
access_allowance,
)
)
cmd = [
"slapacl",
"-f",
"/etc/ldap/slapd.conf",
"-D",
"%(self.auth_dn)s",
"-b",
"%(target_dn)s",
"%(attr)s/%(access)s",
"-d",
"0",
]
for attr in attrs:
argdict = {
"self.auth_dn": self.auth_dn,
"target_dn": target_dn,
"access": access,
"attr": attr,
}
out, err = run_commands([cmd], argdict)[0]
assert err, "command %r was not executed successfully" % cmd
try:
result = next(x for x in err.split("\n") if ("ALLOWED" in x or "DENIED" in x))
except StopIteration:
result = None
print("Failed to parse slapacl output:", attr, err)
if result:
assert access_allowance in result, "Access (%s) by (%s) to (%s) not expected %r" % (
access,
self.auth_dn,
target_dn,
result,
)
[docs]
def assert_base_dn(self, access):
"""General acces rule = all read"""
base_dn = self.ucr.get("ldap/base")
attrs = [
"entry",
"children",
"dc",
"univentionObjectType",
"krb5RealmName",
"nisDomain",
"associatedDomain",
"univentionPolicyReference",
"msGPOLink",
]
self.assert_acl(base_dn, access, attrs)
[docs]
def assert_student(self, stu_dn, access):
"""Lehrer und OU-Admins duerfen Schueler-Passwoerter aendern"""
attrs = [
"krb5KeyVersionNumber",
"krb5KDCFlags",
"krb5Key",
"krb5PasswordEnd",
"sambaAcctFlags",
"sambaPwdLastSet",
"sambaLMPassword",
"sambaNTPassword",
"shadowLastChange",
"shadowMax",
"userPassword",
"pwhistory",
"sambaPwdCanChange",
"sambaPwdMustChange",
"sambaPasswordHistory",
"sambaBadPasswordCount",
]
self.assert_acl(stu_dn, access, attrs)
[docs]
def assert_room(self, room_dn, access):
"""Lehrer und ouadmins duerfen Raum-Gruppen anlegen und bearbeiten"""
container_dn = "cn=raeume,cn=groups,%s" % utu.UCSTestSchool().get_ou_base_dn(self.school)
attrs = [
"children",
"entry",
]
self.assert_acl(container_dn, access, attrs)
# access to dn.regex="^cn=([^,]+),cn=raeume,cn=groups,ou=([^,]+),dc=najjar,dc=am$$"
# filter="(&(!(|(uidNumber=*)(objectClass=SambaSamAccount)))(objectClass=univentionGroup))"
attrs = [
"entry",
"children",
"sambaGroupType",
"cn",
"objectClass",
"univentionObjectType",
"gidNumber",
"sambaSID",
"univentionGroupType",
]
self.assert_acl(room_dn, access, attrs)
with CreateDCSlaveInContainer(container_dn) as target_dn:
self.assert_acl(target_dn, access, attrs, access_allowance="DENIED")
[docs]
def assert_teacher_group(self, access):
"""
Lehrer, Mitarbeiter und Mitglieder der lokalen Administratoren duerfen Arbeitsgruppen anlegen
und aendern
"""
group_container = "cn=lehrer,cn=groups,%s" % utu.UCSTestSchool().get_ou_base_dn(self.school)
attrs = [
"children",
"entry",
]
self.assert_acl(group_container, access, attrs)
with CreateDCSlaveInContainer(group_container) as target_dn:
self.assert_acl(target_dn, access, attrs, access_allowance="DENIED")
with CreateGroupInContainer(group_container) as group_dn:
# access to dn.regex="^cn=([^,]+),(cn=lehrer,|cn=schueler,|)cn=groups,ou=([^,]+),
# dc=najjar,dc=am$$"
# filter="(&
# (!(|(uidNumber=*)(objectClass=SambaSamAccount)))(objectClass=univentionGroup)
# )"
attrs = [
"sambaGroupType",
"cn",
"description",
"objectClass",
"memberUid",
"univentionObjectType",
"gidNumber",
"sambaSID",
"uniqueMember",
"univentionGroupType",
]
self.assert_acl(group_dn, access, attrs)
[docs]
def assert_student_group(self, access):
group_container = "cn=schueler,cn=groups,%s" % utu.UCSTestSchool().get_ou_base_dn(self.school)
attrs = [
"children",
"entry",
]
self.assert_acl(group_container, access, attrs)
with CreateDCSlaveInContainer(group_container) as target_dn:
self.assert_acl(target_dn, access, attrs, access_allowance="DENIED")
with CreateGroupInContainer(group_container) as group_dn:
attrs = [
"sambaGroupType",
"cn",
"description",
"objectClass",
"memberUid",
"univentionObjectType",
"gidNumber",
"sambaSID",
"uniqueMember",
"univentionGroupType",
]
self.assert_acl(group_dn, access, attrs)
[docs]
def assert_share_object_access(self, share_dn, access, access_allowance="ALLOWED"):
"""
Assert that for the given share object the given <access>, "read" or "write", is (not) given.
Please note that the attribute list may not be complete.
"""
share_attribute_list = [
"univentionShareNFSSync",
"univentionShareSambaForceDirectoryMode",
"cn",
"objectClass",
"univentionShareSambaDosFilemode",
"univentionShareSambaForceSecurityMode",
"univentionShareSambaLocking",
"univentionShareSambaForceDirectorySecurityMode",
"univentionShareSambaMSDFS",
"univentionShareSambaCreateMode",
"univentionShareSambaWriteable",
"univentionShareSambaInheritPermissions",
"univentionShareSambaBrowseable",
"univentionShareSambaHideUnreadable",
"univentionShareSambaInheritAcls",
"univentionShareSambaPublic",
"univentionShareSambaSecurityMode",
"univentionShareDirectoryMode",
"univentionShareSambaBlockingLocks",
"univentionSharePath",
"univentionShareWriteable",
"univentionShareSambaDirectorySecurityMode",
"univentionShareSambaLevel2Oplocks",
"univentionShareSambaNtAclSupport",
"univentionShareSambaCscPolicy",
"univentionShareSambaForceCreateMode",
"univentionObjectType",
"univentionShareSambaOplocks",
"univentionShareSambaDirectoryMode",
"univentionShareSambaForceGroup",
"univentionShareSambaFakeOplocks",
"univentionShareGid",
"univentionShareNFSRootSquash",
"univentionShareUid",
"univentionShareSambaStrictLocking",
"univentionShareSambaName",
"univentionShareNFSSubTree",
"univentionShareSambaInheritOwner",
"univentionShareHost",
]
self.assert_acl(share_dn, access, share_attribute_list, access_allowance)
[docs]
def assert_shares(self, shares_dn, access):
"""
Lehrer und Mitglieder der lokalen Administratoren duerfen Shares anlegen, Klassenshares aber
nicht aendern
"""
attrs = [
"children",
"entry",
]
self.assert_acl(shares_dn, access, attrs)
with CreateDCSlaveInContainer(shares_dn) as target_dn:
self.assert_acl(target_dn, "write", attrs, access_allowance="DENIED")
[docs]
def assert_temps(self, access):
"""
Mitglieder der lokalen Administratoren muessen einige temporaere Objekte schreiben duerfen da
keine regulaeren Ausdruecke auf Gruppenmitgliedschaften moeglich sind wird dies allen Lehrern
erlaubt
"""
base_dn = self.ucr.get("ldap/base")
temp_dn = "cn=sid,cn=temporary,cn=univention,%s" % base_dn
attrs = [
"children",
"entry",
]
self.assert_acl(temp_dn, access, attrs)
temp_dn = "cn=gid,cn=temporary,cn=univention,%s" % base_dn
self.assert_acl(temp_dn, access, attrs)
with CreateDCSlaveInContainer(temp_dn) as target_dn:
self.assert_acl(target_dn, access, attrs, access_allowance="DENIED")
temp_dn = "cn=mac,cn=temporary,cn=univention,%s" % base_dn
self.assert_acl(temp_dn, access, attrs)
with CreateDCSlaveInContainer(temp_dn) as target_dn:
self.assert_acl(target_dn, access, attrs, access_allowance="DENIED")
temp_dn = "cn=groupName,cn=temporary,cn=univention,%s" % base_dn
self.assert_acl(temp_dn, access, attrs)
with CreateDCSlaveInContainer(temp_dn) as target_dn:
self.assert_acl(target_dn, access, attrs, access_allowance="DENIED")
[docs]
def assert_gid_temps(self, access):
base_dn = self.ucr.get("ldap/base")
temp_dn = "cn=gidNumber,cn=temporary,cn=univention,%s" % base_dn
attrs = ["children", "entry", "univentionLastUsedValue"]
self.assert_acl(temp_dn, access, attrs)
with CreateDCSlaveInContainer(temp_dn) as target_dn:
self.assert_acl(target_dn, access, attrs, access_allowance="DENIED")
[docs]
def assert_ou(self, access):
"""
Replica Directory Nodes duerfen Eintraege Ihrer ou lesen und schreiben (Passwortaenderungen etc.)
Lehrer und Managed Nodes duerfen sie lesen, ou-eigene bekommen Standard-ACLs, ou-fremde
Server/user duerfen nichts
"""
attrs = [
"entry",
"children",
"ou",
"displayName",
"univentionObjectType",
"ucsschoolHomeShareFileServer",
"ucsschoolClassShareFileServer",
"univentionPolicyReference",
"objectClass",
]
target_dn = utu.UCSTestSchool().get_ou_base_dn(self.school)
self.assert_acl(target_dn, access, attrs)
[docs]
def assert_global_containers(self, access):
"""
Schüler, Lehrer, Mitarbeiter, Admins duerfen globale Container univention, policies, groups und
dns lesen (werden bei Schuelern/Rechnern angezeigt)
"""
base_dn = self.ucr.get("ldap/base")
attrs = [
"entry",
"children",
"objectClass",
"univentionObjectType",
"description",
"cn",
]
container_dn = "cn=univention,%s" % base_dn
self.assert_acl(container_dn, access, attrs)
container_dn = "cn=dns,%s" % base_dn
self.assert_acl(container_dn, access, attrs)
container_dn = "cn=policies,%s" % base_dn
self.assert_acl(container_dn, access, attrs)
container_dn = "cn=groups,%s" % base_dn
self.assert_acl(container_dn, access, attrs)
[docs]
def assert_computers(self, computer_dn, access):
"""
Mitglieder der lokalen Administratoren duerfen
MAC-Adressen im Rechner- und DHCP-Objekt aendern
"""
attrs = [
"macAddress",
]
self.assert_acl(computer_dn, access, attrs)
[docs]
def assert_user(self, user_dn, access):
"""Mitglieder der lokalen Administratoren duerfen Passwoerter unterhalb von cn=users aendern"""
attrs = [
"krb5KeyVersionNumber",
"krb5KDCFlags",
"krb5Key",
"krb5PasswordEnd",
"sambaAcctFlags",
"sambaPwdLastSet",
"sambaLMPassword",
"sambaNTPassword",
"shadowLastChange",
"shadowMax",
"userPassword",
"pwhistory",
"sambaPwdCanChange",
"sambaPwdMustChange",
"sambaPasswordHistory",
"sambaBadPasswordCount",
]
self.assert_acl(user_dn, access, attrs)
[docs]
def assert_dhcp(self, client, access, modify_only_attrs=False):
"""
Check access to DHCP host objects.
By default, all attributes are checked. If modify_only_attrs is True, only attributes that are
required to modify the DHCP host object are checked.
"""
client_dhcp_dn = "cn=%s,cn=%s,cn=dhcp,%s" % (
client,
self.school,
utu.UCSTestSchool().get_ou_base_dn(self.school),
)
attrs = [
"entry",
"children",
"dhcpOption",
]
if not modify_only_attrs:
attrs += [
"objectClass",
"univentionObjectType",
"cn",
]
self.assert_acl(client_dhcp_dn, access, attrs)
[docs]
def assert_member_server(self, access):
"""
Mitglieder der lokalen Administratoren duerfen den Replica Directory Node und Managed Node
joinen (benoetigt Passwortaenderung)
"""
base_dn = self.ucr.get("ldap/base")
attrs = [
"krb5KeyVersionNumber",
"krb5KDCFlags",
"krb5Key",
"krb5PasswordEnd",
"sambaAcctFlags",
"sambaPwdLastSet",
"sambaLMPassword",
"sambaNTPassword",
"shadowLastChange",
"shadowMax",
"userPassword",
"pwhistory",
"sambaPwdCanChange",
"sambaPwdMustChange",
"sambaPasswordHistory",
]
singlemaster = self.ucr.is_true("ucsschool/singlemaster")
lo = getMachineConnection()
if not singlemaster:
slave_found = lo.search(
filter="(|"
"(univentionObjectType=computers/domaincontroller_slave)"
"(univentionObjectType=computers/memberserver)"
")",
base=utu.UCSTestSchool().get_ou_base_dn(self.school),
)
if slave_found:
slave_dn = slave_found[0][0]
self.assert_acl(slave_dn, access, attrs)
attrs = ["sOARecord"]
zoneName = lo.search(base="cn=dns,%s" % base_dn, scope="base+one", attr=["uid"])
for target_dn, _d in zoneName:
if "zoneName" in target_dn:
self.assert_acl(target_dn, access, attrs)
break