Source code for univention.testing.ucsschool.computerroom

# -*- coding: utf-8 -*-
from __future__ import print_function

import copy
import datetime
import itertools
import os
import re
import shlex
import subprocess
import tempfile
import time
from functools import wraps

import univention.lib.atjobs as ula
import univention.testing.strings as uts
import univention.testing.ucr as ucr_test
import univention.testing.ucsschool.ucs_test_school as utu
from ucsschool.lib.models.utils import exec_cmd
from ucsschool.lib.roles import (
    create_ucsschool_role_string,
    role_ip_computer,
    role_mac_computer,
    role_win_computer,
)
from univention.lib.umc import ConnectionError
from univention.testing import utils
from univention.testing.decorators import SetTimeout
from univention.testing.ucsschool.computer import random_ip, random_mac
from univention.testing.ucsschool.internetrule import InternetRule
from univention.testing.ucsschool.simplecurl import SimpleCurl
from univention.testing.ucsschool.workgroup import Workgroup
from univention.testing.umc import Client


[docs] class CmdCheckFail(Exception): pass
[docs] def retry_cmd(func): # retry to avoid errors due to slow replication @wraps(func) def decorated(*args, **kwargs): utils.retry_on_error( lambda: func(*args, **kwargs), exceptions=(CmdCheckFail), retry_count=8, delay=4 ) return decorated
[docs] class Room(object): def __init__( self, school, name=None, dn=None, description=None, host_members=None, teacher_computers=None ): self.school = school self.name = name if name else uts.random_name() self.dn = ( dn if dn else "cn=%s-%s,cn=raeume,cn=groups,%s" % (school, self.name, utu.UCSTestSchool().get_ou_base_dn(school)) ) self.description = description if description else uts.random_name() self.host_members = host_members or [] self.teacher_computers = teacher_computers or []
[docs] def get_room_user(self, client): print("Executing command: computerroom/rooms in school:", self.school) reqResult = client.umc_command("computerroom/rooms", {"school": self.school}).result result = [x.get("user") for x in reqResult if x["label"] == self.name] return result[0] if result else None
[docs] def check_room_user(self, client, expected_user): print("Checking computer room(%s) users.........." % self.name) current_user = self.get_room_user(client) print("Room %s is in use by user %r" % (self.name, current_user)) if current_user: user_id = re.search(r"\((\w+)\)", current_user).group(1) else: user_id = current_user if expected_user != user_id: utils.fail("Room in use by user %r, expected: %r" % (user_id, expected_user))
[docs] def aquire_room(self, client): print("Executing command: computerroom/room/acquire") return client.umc_command("computerroom/room/acquire", {"room": self.dn}).result
[docs] def checK_room_aquire(self, client, expected_answer): print("Checking room aquire... (%s)" % self.name) answer = self.aquire_room(client)["message"] if answer == expected_answer: print("Room %s is %s" % (self.name, answer)) else: utils.fail("Unexpected room aquire result: %s" % (answer,))
[docs] def get_room_computers(self, client): print("Executing command: computerroom/query... (%s)" % self.name) reqResult = client.umc_command("computerroom/query", {"reload": False}).result return [x["name"] for x in reqResult]
[docs] def check_room_computers(self, client, expected_computer_list): print("Checking room computers........... (%s)" % self.name) current_computers = self.get_room_computers(client) print("Current computers in room %s are %r" % (self.name, current_computers)) for i, computer in enumerate(sorted(current_computers)): assert ( computer in sorted(expected_computer_list)[i] ), "Computers found %r do not match the expected: %r" % ( current_computers, expected_computer_list, )
[docs] def set_room_settings(self, client, new_settings): print("Executing command: computerroom/settings/set") print("new_settings = %r" % (new_settings,)) reqResult = client.umc_command("computerroom/settings/set", new_settings).result return reqResult
[docs] def get_room_settings(self, client): print("Executing command: computerroom/settings/get") reqResult = client.umc_command("computerroom/settings/get").result return reqResult
[docs] def check_room_settings(self, client, expected_settings): print("Checking computerroom (%s) settings ..........." % self.name) try: current_settings = self.get_room_settings(client) d = dict(expected_settings) # copy dictionary d["period"] = current_settings["period"] d["customRule"] = current_settings["customRule"] # TODO Bug 35258 remove assert current_settings == d, "Current settings (%r) do not match expected ones (%r)" % ( current_settings, d, ) except ConnectionError as exc: if "[Errno 4] " in str(exc): print("failed to check room (%s) settings, exception [Errno4]" % self.name) print("Exception: '%s' '%s' '%r'" % (str(exc), type(exc), exc)) raise
[docs] def get_internetRules(self, client): print("Executing command: computerroom/internetrules") reqResult = client.umc_command("computerroom/internetrules").result return reqResult
[docs] def check_internetRules(self, client): """ Check if the fetched internetrules match the already defined ones in define internet module. :param client: umc connection :type client: Client(uce.get('hostname')) """ rule = InternetRule() current_rules = rule.allRules() internetRules = self.get_internetRules(client) assert sorted(current_rules) == sorted( internetRules ), "Fetched internetrules %r, do not match the existing ones %r" % (internetRules, current_rules)
[docs] def check_atjobs(self, period, expected_existence): exist = False jobs = ula.list() for item in jobs: if period == datetime.time.strftime(item.execTime.time(), "%H:%M"): exist = True break print("Atjob result at(%r) existance: %r" % (period, exist)) print( "\n".join( "Job %s: %s owner=%s\n%s" % (i, item, item.owner, item.command) for i, item in enumerate(jobs) ) ) assert ( exist == expected_existence ), "Atjob result at(%r) is unexpected (should_exist=%r exists=%r)" % ( period, expected_existence, exist, )
[docs] def check_displayTime(self, client, period): displayed_period = self.get_room_settings(client)["period"][0:-3] print("Time displayed (%r) - Atjobs (%r)" % (displayed_period, period)) assert ( period == displayed_period ), "Time displayed (%r) is different from time at Atjobs (%r)" % (displayed_period, period)
[docs] def test_time_settings(self, client): self.aquire_room(client) settings = self.get_room_settings(client) period = datetime.time.strftime( (datetime.datetime.now() + datetime.timedelta(0, 120)).time(), "%H:%M" ) new_settings = { "customRule": "", "printMode": "none", "internetRule": "none", "shareMode": "home", "period": period, } ula_length = len(ula.list()) time_out = 30 # seconds self.set_room_settings(client, new_settings) for i in range(time_out, 0, -1): print(i) if len(ula.list()) > ula_length: break else: time.sleep(1) continue # Checking Atjobs list self.check_atjobs(period, True) # TODO FAILS because of Bug #35195 # self.check_displayTime(client, period) print("*** Waiting 2 mins for settings to expire.............") time.sleep(2 * 60 + 2) current_settings = self.get_room_settings(client) # Time field is not considered in the comparision current_settings["period"] = settings["period"] assert ( current_settings == settings ), "Current settings (%r) are not reset back after the time out, expected (%r)" % ( current_settings, settings, ) # Checking Atjobs list self.check_atjobs(period, False)
[docs] def check_home_read(self, user, ip_address, passwd="univention", expected_result=0): check_share_read(user, ip_address, share=user, passwd=passwd, expected_result=expected_result)
[docs] def check_home_write(self, user, ip_address, passwd="univention", expected_result=0): check_share_write(user, ip_address, share=user, passwd=passwd, expected_result=expected_result)
[docs] def check_marktplatz_read(self, user, ip_address, passwd="univention", expected_result=0): check_share_read( user, ip_address, share="Marktplatz", passwd=passwd, expected_result=expected_result )
[docs] def check_marktplatz_write(self, user, ip_address, passwd="univention", expected_result=0): check_share_write( user, ip_address, share="Marktplatz", passwd=passwd, expected_result=expected_result )
[docs] def check_share_access(self, user, ip_address, expected_home_result, expected_marktplatz_result): self.check_home_read(user, ip_address, expected_result=expected_home_result) self.check_home_write(user, ip_address, expected_result=expected_home_result) self.check_marktplatz_read(user, ip_address, expected_result=expected_marktplatz_result) self.check_marktplatz_write(user, ip_address, expected_result=expected_marktplatz_result)
[docs] def check_share_behavior(self, user, ip_address, shareMode): if shareMode == "all": self.check_share_access(user, ip_address, 0, 0) elif shareMode == "home": self.check_share_access(user, ip_address, 0, 1) else: utils.fail("shareMode invalid value = (%s)" % shareMode)
[docs] def test_share_access_settings(self, user, ip_address, client): self.aquire_room(client) print(self.get_room_settings(client)) # generate all the possible combinations for (rule, printmode, sharemode) white_page = "univention.de" rules = ["none", "Kein Internet", "Unbeschränkt", "custom"] printmodes = ["default", "none"] sharemodes = ["all", "home"] settings = itertools.product(rules, printmodes, sharemodes) t = 600 # Testing loop for i, (rule, printMode, shareMode) in enumerate(settings): period = datetime.time.strftime( (datetime.datetime.now() + datetime.timedelta(0, t)).time(), "%H:%M" ) t += 300 print() print( "*** %d -(internetRule, printMode, shareMode) = (%r, %r, %r) ----------" % ( i, rule, printMode, shareMode, ) ) new_settings = { "customRule": white_page, "printMode": printMode, "internetRule": rule, "shareMode": shareMode, "period": period, } self.aquire_room(client) self.set_room_settings(client, new_settings) # check if displayed values match self.check_room_settings(client, new_settings) self.check_share_behavior(user, ip_address, shareMode)
[docs] @retry_cmd def check_smb_print(self, ip, printer, user, expected_result): print("Checking print mode", "." * 40) # ensure cups is running, otherwise jobs are not rejected ucr = ucr_test.UCSTestConfigRegistry() ucr.load() cups_status = subprocess.check_output( ["lpstat", "-h", ucr.get("cups/server", ""), "-r"], env={"LC_ALL": "C"} ).decode("UTF-8") assert cups_status == "scheduler is running\n", 'CUPS status reported: "{}"'.format(cups_status) f = tempfile.NamedTemporaryFile(dir="/tmp") cmd_print = ["smbclient", "//%(ip)s/%(printer)s", "-U", "%(user)s", "-c", "print %(filename)s"] result = run_commands( [cmd_print], { "ip": ip, "printer": printer, "user": "{}%{}".format(user, "univention"), "filename": f.name, }, )[0] f.close() if result != expected_result: print("FAIL .... smbclient print result (%r), expected (%r)" % (result, expected_result)) raise CmdCheckFail("smbclient print result (%r), expected (%r)" % (result, expected_result))
[docs] def check_print_behavior(self, user, ip_address, printer, printMode): if printMode == "none": self.check_smb_print(ip_address, printer, user, 1) self.check_smb_print(ip_address, "PDFDrucker", user, 1) elif printMode == "default": self.check_smb_print(ip_address, printer, user, 0) self.check_smb_print(ip_address, "PDFDrucker", user, 0) else: utils.fail("printMode invalid value = (%s)" % printMode)
[docs] def test_printMode_settings(self, school, user, ip_address, client, ucr): ucr = ucr_test.UCSTestConfigRegistry() ucr.load() self.aquire_room(client) printer = uts.random_string() add_printer(printer, school, ucr.get("hostname"), ucr.get("domainname"), ucr.get("ldap/base")) try: # generate all the possible combinations for (rule, printmode, sharemode) white_page = "univention.de" rules = ["none", "Kein Internet", "Unbeschränkt", "custom"] printmodes = ["default", "none"] sharemodes = ["all", "home"] settings = itertools.product(rules, printmodes, sharemodes) settings_len = len(printmodes) * len(sharemodes) * len(rules) t = 600 # Testing loop for i in range(settings_len): period = datetime.time.strftime( (datetime.datetime.now() + datetime.timedelta(0, t)).time(), "%H:%M" ) t += 300 rule, printMode, shareMode = next(settings) print() print( ( "***", i, "-(internetRule, printMode, shareMode) = (", rule, ",", printMode, ",", shareMode, ")", "-" * 10, ) ) new_settings = { "customRule": white_page, "printMode": printMode, "internetRule": rule, "shareMode": shareMode, "period": period, } self.aquire_room(client) self.set_room_settings(client, new_settings) # check if displayed values match self.check_room_settings(client, new_settings) self.check_print_behavior(user, ip_address, printer, printMode) finally: remove_printer(printer, school, ucr.get("ldap/base"))
[docs] def checK_internetrules(self, ucr, user, proxy, custom_domain, global_domains, expected_rule): # Getting the redirection page when blocked banPage = get_banpage(ucr) localCurl = SimpleCurl(proxy=proxy, username=user) rule_in_control = None if expected_rule == "Kein Internet" and localCurl.getPage("univention.de") == banPage: rule_in_control = expected_rule if expected_rule == "Unbeschränkt" and localCurl.getPage("gmx.de") != banPage: rule_in_control = expected_rule if expected_rule == "custom" and localCurl.getPage(custom_domain) != banPage: rule_in_control = expected_rule if expected_rule == "none": if all(localCurl.getPage(dom) != banPage for dom in global_domains): rule_in_control = expected_rule localCurl.close() print("RULE IN CONTROL = ", rule_in_control) assert ( rule_in_control == expected_rule ), "rule in control (%s) does not match the expected one (%s)" % (rule_in_control, expected_rule)
[docs] def test_internetrules_settings(self, school, user, user_dn, ip_address, ucr, client): # Create new workgroup and assign new internet rule to it group = Workgroup(school, members=[user_dn]) group.create() try: global_domains = ["univention.de", "example.com"] rule = InternetRule(typ="whitelist", domains=global_domains) rule.define() rule.assign(school, group.name, "workgroup") self.check_internetRules(client) self.aquire_room(client) # generate all the possible combinations for (rule, printmode, sharemode) white_page = "univention.de" rules = ["none", "Kein Internet", "Unbeschränkt", "custom"] printmodes = ["default", "none"] sharemodes = ["all", "home"] settings = itertools.product(rules, printmodes, sharemodes) settings_len = len(printmodes) * len(sharemodes) * len(rules) t = 600 # Testing loop for i in range(settings_len): period = datetime.time.strftime( (datetime.datetime.now() + datetime.timedelta(0, t)).time(), "%H:%M" ) t += 300 rule, printMode, shareMode = next(settings) print() print( ( "***", i, "-(internetRule, printMode, shareMode) = (", rule, ",", printMode, ",", shareMode, ")", "-" * 10, ) ) new_settings = { "customRule": white_page, "printMode": printMode, "internetRule": rule, "shareMode": shareMode, "period": period, } self.aquire_room(client) self.set_room_settings(client, new_settings) # check if displayed values match self.check_room_settings(client, new_settings) self.checK_internetrules(ucr, user, ip_address, "univention.de", global_domains, rule) finally: group.remove()
[docs] def test_settings(self, school, user, user_dn, ip_address, ucr, client): printer = uts.random_string() # Create new workgroup and assign new internet rule to it group = Workgroup(school, client, members=[user_dn]) group.create() try: global_domains = ["univention.de", "example.com"] rule = InternetRule(typ="whitelist", domains=global_domains) rule.define() rule.assign(school, group.name, "workgroup") self.check_internetRules(client) # Add new hardware printer add_printer( printer, school, ucr.get("hostname"), ucr.get("domainname"), ucr.get("ldap/base") ) # generate all the possible combinations for (rule, printmode, sharemode) white_page = "univention.de" rules = ["none", "Kein Internet", "Unbeschränkt", "custom"] printmodes = ["default", "none"] sharemodes = ["all", "home"] settings = itertools.product(rules, printmodes, sharemodes) t = 600 utils.wait_for_replication() # Testing loop for i, (rule, printMode, shareMode) in enumerate(settings): period = datetime.time.strftime( (datetime.datetime.now() + datetime.timedelta(0, t)).time(), "%H:%M" ) print() print( "*** %d -(internetRule, printMode, shareMode, period) = (%r, %r, %r, %r) " "----------" % (i, rule, printMode, shareMode, period) ) new_settings = { "customRule": white_page, "printMode": printMode, "internetRule": rule, "shareMode": shareMode, "period": period, } self.aquire_room(client) old_settings = self.get_room_settings(client) self.set_room_settings(client, new_settings) utils.wait_for_replication_and_postrun() # give the CUPS and Samba server a little bit more time time.sleep(15) # check if displayed values match self.check_room_settings(client, new_settings) # old_period = old_settings['period'] partial_old_settings = { "period": old_settings["period"], "printMode": old_settings["printMode"], "shareMode": old_settings["shareMode"], "internetRule": old_settings["internetRule"], } self.check_behavior( partial_old_settings, new_settings, user, ip_address, printer, white_page, global_domains, ucr, ) t += 300 finally: group.remove() remove_printer(printer, school, ucr.get("ldap/base"))
[docs] def check_behavior( self, partial_old_settings, new_settings, user, ip_address, printer, white_page, global_domains, ucr, ): # extract the new_settings period = new_settings["period"] internetRule = new_settings["internetRule"] printMode = new_settings["printMode"] shareMode = new_settings["shareMode"] # check atjobs partial_new_settings = { # 'period': period, "printMode": printMode, "shareMode": shareMode, "internetRule": internetRule, } print() print("----------DEBUG-----------") print("old_period = %r" % (partial_old_settings.get("period"),)) print("new_period = %r" % (period,)) partial_old_settings = partial_old_settings.copy() partial_old_settings.pop("period") # if there is no change in settings, no atjob is added print("old=", partial_old_settings) print("new=", partial_new_settings) self.check_atjobs(period, partial_old_settings != partial_new_settings) # check internetrules self.checK_internetrules(ucr, user, ip_address, white_page, global_domains, internetRule) # check share access self.check_share_behavior(user, ip_address, shareMode) # check print mode self.check_print_behavior(user, ip_address, printer, printMode)
[docs] def get_banpage(ucr): # Getting the redirection page when blocked adminCurl = SimpleCurl(proxy=ucr.get("hostname")) redirUri = ucr.get("proxy/filter/redirecttarget") banPage = adminCurl.getPage(redirUri) adminCurl.close() return banPage
[docs] def clean_folder(path): print("Cleaning folder %r ....." % path) for root, _, filenames in os.walk(path): for f in filenames: file_path = os.path.join(root, f) os.remove(file_path)
[docs] def run_commands(cmdlist, argdict): """ Start all commands in cmdlist and replace formatstrings with arguments in argdict. run_commands([['/bin/echo', '%(msg)s'], ['/bin/echo', 'World']], {'msg': 'Hello'}) """ result_list = [] for cmd in cmdlist: cmd = copy.deepcopy(cmd) for i, val in enumerate(cmd): cmd[i] = val % argdict print("*** %r" % cmd) result = subprocess.call(cmd) result_list.append(result) return result_list
[docs] def add_printer(name, school, hostname, domainname, ldap_base): # account = utils.UCSTestDomainAdminCredentials() # adminuid = account.binddn # passwd = account.bindpw # adminuid = 'uid=Administrator,cn=users,dc=najjar,dc=local' # passwd = 'univention' cmd_add_printer = [ "udm", "shares/printer", "create", "--position", "cn=printers,ou=%(school)s,%(ldap_base)s", "--set", "name=%(name)s", "--set", "spoolHost=%(hostname)s.%(domainname)s", "--set", "uri=file:// /tmp/%(name)s.printer", "--set", "model=None", "--binddn", "uid=Administrator,cn=users,%(ldap_base)s", "--bindpwd", "univention", ] print( run_commands( [cmd_add_printer], { "name": name, "school": school, "hostname": hostname, "domainname": domainname, "ldap_base": ldap_base, }, ) ) utils.wait_for_replication_and_postrun() # give the CUPS and Samba server a little bit more time time.sleep(15)
[docs] def remove_printer(name, school, ldap_base): cmd_remove_printer = [ "udm", "shares/printer", "remove", "--dn", "cn=%(name)s,cn=printers,ou=%(school)s,%(ldap_base)s", ] print(run_commands([cmd_remove_printer], {"name": name, "school": school, "ldap_base": ldap_base}))
[docs] def set_windows_pc_password(dn, password): cmd = ["udm", "computers/windows", "modify", "--dn", "%(dn)s", "--set", "password=%(password)s"] read = run_commands([cmd], {"dn": dn, "password": password}) return read
[docs] class UmcComputer(object): def __init__( self, school, typ, name=None, ip_address=None, subnet_mask=None, mac_address=None, inventory_number=None, ): self.school = school self.typ = typ self.name = name if name else uts.random_name() self.ip_address = ip_address if ip_address else random_ip() self.subnet_mask = subnet_mask if subnet_mask else "255.255.255.0" self.mac_address = mac_address.lower() if mac_address else random_mac() self.inventory_number = inventory_number if inventory_number else "" self.ucr = ucr_test.UCSTestConfigRegistry() self.ucr.load() host = self.ucr.get("ldap/master") self.client = Client(host) account = utils.UCSTestDomainAdminCredentials() admin = account.username passwd = account.bindpw self.client.authenticate(admin, passwd)
[docs] def create(self, should_succeed=True, ignore_warning=True): """Creates object Computer""" flavor = "schoolwizards/computers" param = [ { "object": { "school": self.school, "type": self.typ, "name": self.name, "ip_address": [self.ip_address], "mac_address": [self.mac_address.lower()], "subnet_mask": self.subnet_mask, "inventory_number": self.inventory_number, "ignore_warning": ignore_warning, }, "options": None, } ] print("Creating Computer %s" % (self.name,)) print("param = %s" % (param,)) reqResult = self.client.umc_command("schoolwizards/computers/add", param, flavor).result if should_succeed and reqResult[0]["result"] is True: utils.wait_for_replication() elif not should_succeed and reqResult[0]["result"].get("error"): print( "Expected creation failed for computer (%r)\nReturn Message: %r" % ( self.name, reqResult[0]["result"]["error"], ) ) else: raise AssertionError( "Unable to create computer (%r)\nRequest Result: %r" % (param, reqResult) )
[docs] def remove(self): """Remove computer""" flavor = "schoolwizards/computers" param = [{"object": {"$dn$": self.dn(), "school": self.school}, "options": None}] reqResult = self.client.umc_command("schoolwizards/computers/remove", param, flavor).result assert reqResult[0] is True, "Unable to remove computer (%s): %r" % (self.name, reqResult) utils.wait_for_replication() utils.wait_for_s4connector_replication()
[docs] def dn(self): return "cn=%s,cn=computers,%s" % (self.name, utu.UCSTestSchool().get_ou_base_dn(self.school))
[docs] def get(self): """Get Computer""" flavor = "schoolwizards/computers" param = [{"object": {"$dn$": self.dn(), "school": self.school}}] reqResult = self.client.umc_command("schoolwizards/computers/get", param, flavor).result # zone is not used in the computer room anymore. reqResult[0].pop("zone") assert reqResult[0], "Unable to get computer (%s): %r" % (self.name, reqResult) return reqResult[0]
[docs] def check_get(self): typ2roles = { "windows": [create_ucsschool_role_string(role_win_computer, self.school)], "macos": [create_ucsschool_role_string(role_mac_computer, self.school)], "ipmanagedclient": [create_ucsschool_role_string(role_ip_computer, self.school)], } info = { "$dn$": self.dn(), "school": self.school, "type": self.typ, "name": self.name, "ip_address": [self.ip_address], "mac_address": [self.mac_address.lower()], "subnet_mask": self.subnet_mask, "inventory_number": self.inventory_number, "type_name": self.type_name(), "objectType": "computers/%s" % self.typ, "ucsschool_roles": typ2roles[self.typ], } get_result = self.get() if get_result != info: diff = {x for x in get_result if get_result[x] != info[x]} assert get_result == info, ( "Failed get request for computer %s.\nReturned result: %r.\nExpected result: %r,\n" "Difference = %r" % (self.name, get_result, info, diff) )
[docs] def type_name(self): if self.typ == "windows": return "Windows-System" elif self.typ == "macos": return "Mac OS X" elif self.typ == "ipmanagedclient": return "Gerät mit IP-Adresse"
[docs] def edit( self, name=None, ip_address=None, subnet_mask=None, mac_address=None, inventory_number=None ): """Edit object computer""" flavor = "schoolwizards/computers" param = [ { "object": { "$dn$": self.dn(), "name": self.name, "school": self.school, "type": self.typ, "ip_address": [ip_address or self.ip_address], "mac_address": [(mac_address or self.mac_address).lower()], "subnet_mask": subnet_mask or self.subnet_mask, "inventory_number": inventory_number or self.inventory_number, }, "options": None, } ] print("Editing computer %s" % (self.name,)) print("param = %s" % (param,)) reqResult = self.client.umc_command("schoolwizards/computers/put", param, flavor).result assert reqResult[0] is True, "Unable to edit computer (%s) with the parameters (%r): %r" % ( self.name, param, reqResult, ) self.ip_address = ip_address self.mac_address = mac_address.lower() if mac_address else None self.subnet_mask = subnet_mask self.inventory_number = inventory_number utils.wait_for_replication()
[docs] def query(self): """get the list of existing computer in the school""" flavor = "schoolwizards/computers" param = {"school": self.school, "filter": "", "type": "all"} reqResult = self.client.umc_command("schoolwizards/computers/query", param, flavor).result return reqResult
[docs] def check_query(self, computer_names): reqResult = self.query() names_in_result = {x["name"] for x in reqResult} assert set(computer_names).issubset( names_in_result ), "computers from query do not contain the existing computers, found (%r), expected (%r)" % ( names_in_result, computer_names, )
[docs] def verify_ldap(self, should_exist): print("verifying computer %s" % self.name) utils.verify_ldap_object(self.dn(), should_exist=should_exist)
[docs] def create_homedirs(member_dn_list, open_ldap_co): for dn in member_dn_list: for home_dir in open_ldap_co.getAttr(dn, "homeDirectory"): home_dir = home_dir.decode("UTF-8") if not os.path.exists(home_dir): print("# Creating %r for %r" % (home_dir, dn)) os.makedirs(home_dir) break else: raise AssertionError("No homeDirectory attribute found for %r" % (dn,))
@SetTimeout def check_create_share_folder( share, username, dir_name, samba_workstation="" ): # type: (str, str, str, str) -> None """test if a user can create folders inside a given share, i.e. they have edit rights.""" cmd = "smbclient -U {}%univention {} -c 'mkdir {}' ".format( shlex.quote(username), shlex.quote(share), dir_name ) if samba_workstation: cmd += " --netbiosname={}".format(shlex.quote(samba_workstation)) rv, stdout, stderr = exec_cmd(cmd, log=True, raise_exc=True, shell=True) # noqa: S604 assert ( "NT_STATUS_ACCESS_DENIED" not in stdout ), "Failed to create folder, got NT_STATUS_ACCESS_DENIED: {}".format(stdout)
[docs] def check_change_permissions( filename, user_name, allowed, samba_workstation="" ): # type: (str, str, bool, str) -> None """test if user can change the permissions a given file in a share folder.""" new_acl = "ACL:Everyone:ALLOWED/OI|CI|I/FULL" cmd = "echo 'univention' | smbcacls {} --user={} --add '{}'".format(filename, user_name, new_acl) if samba_workstation: cmd += " --netbiosname='{}'".format(samba_workstation) rv, stdout, stderr = exec_cmd(cmd, log=True, raise_exc=False, shell=True) # noqa: S604 if not allowed and "NT_STATUS_ACCESS_DENIED" not in stdout: utils.fail( "Expected NT_STATUS_ACCESS_DENIED, user could change the permissions: {}".format(stdout) ) elif allowed and "NT_STATUS_ACCESS_DENIED" in stdout: utils.fail( "Expected user to be able to change the permissions, got NT_STATUS_ACCESS_DENIED: {}".format( stdout ) )
[docs] @retry_cmd def check_share_read(user, ip_address, share, passwd="univention", filename="/*", expected_result=0): print(".... Check {} read ....".format(share)) cmd_read_share = ["smbclient", "//%(ip)s/%(share)s", "-U", "%(user)s", "-c", "dir %(filename)s"] read = run_commands( [cmd_read_share], {"ip": ip_address, "share": share, "user": "{}%{}".format(user, passwd), "filename": filename}, ) assert read[0] == expected_result, "Read share (%r) result (%r), expected (%r)" % ( share, read[0], expected_result, )
[docs] @retry_cmd def check_share_write( user, ip_address, share, passwd="univention", remote_filename=None, expected_result=0 ): print(".... Check {} write ....".format(share)) f = tempfile.NamedTemporaryFile(dir="/tmp") if not remote_filename: remote_filename = os.path.basename(f.name) cmd_write_share = [ "smbclient", "//%(ip)s/%(share)s", "-U", "%(user)s", "-c", "put %(filename)s %(remote_filename)s", ] write = run_commands( [cmd_write_share], { "ip": ip_address, "share": share, "user": "{}%{}".format(user, passwd), "filename": f.name, "remote_filename": remote_filename, }, ) f.close() assert write[0] == expected_result, "Write share (%r) result (%r), expected (%r)" % ( share, write[0], expected_result, )